閱讀198 返回首頁    go 阿裏雲 go 技術社區[雲棲]


WCF後續之旅(11): 關於並發、回調的線程關聯性(Thread Affinity)

一、從基於Windows Application客戶端的WCF回調失敗談起

在"我的WCF之旅"係列文章中,有一篇(WinForm Application中調用Duplex Service出現TimeoutException的原因和解決方案)專門介紹在一個Windows Application客戶端應用, 通過WCF 的Duplex通信方式進行回調失敗的文章.我們今天以此作為出發點介紹WCF在Thread Affinity下的表現和解決方案.

image

   1: namespace Artech.ThreadAffinity.Contracts
   2: {
   3:     [ServiceContract(CallbackContract = typeof(ICalculateCallback))]
   4:     public interface ICalculate
   5:     {
   6:         [OperationContract]
   7:         void Add(double op1, double op2);
   8:     }
   9: } 

   1: namespace Artech.ThreadAffinity.Contracts
   2: {
   3:    public interface ICalculateCallback
   4:     {
   5:         [OperationContract]
   6:         void DisplayResult(double result);
   7:     }
   8: } 

   1: namespace Artech.ThreadAffinity.Services
   2: {
   3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
   4:     public class CalculateService:ICalculate
   5:     {
   6:         public static ListBox DisplayPanel
   7:         { get; set; }         
   8:  
   9:         #region ICalculate Members 
  10:  
  11:         public void Add(double op1, double op2)
  12:         {
  13:             double result = op1 + op2;
  14:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();            
  15:  
  16: DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
  17:  
  18:             callback.DisplayResult(result);
  19:         } 
  20:  
  21:         #endregion
  22:     }
  23: }


由於需要進行callback, 我們把ConcurrencyMode 設為Reentrant。當得到運算的結果後,通過OperationContext.Current.GetCallbackChannel得到callback對象,並調用之。還有一點需要提的是,該service是通過一個Windows Form application進行host的。並且有一個ListBox列出所有service執行的結果,就像這樣:

image

3、Hosting

   1: private void HostForm_Load(object sender, EventArgs e)
   2: {    
   3:     this._serviceHost = new ServiceHost(typeof(CalculateService));
   4:     CalculateService.DisplayPanel = this.listBoxResult;
   5:     CalculateService.SynchronizationContext = SynchronizationContext.Current;
   6:     this._serviceHost.Opened += delegate
   7:     { 
   8: this.Text = "The calculate service has been started up!";
   9:     }; 
  10:  
  11:     this._serviceHost.Open();
  12: } 

   1: <configuration>
   2:     <system.serviceModel>
   3:         <services>
   4:             <service name="Artech.ThreadAffinity.Services.CalculateService">
   5:                 <endpoint binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate" />
   6:                 <host>
   7:                     <baseAddresses>
   8:                         <add baseAddress="net.tcp://127.0.0.1:8888/calculateservice" />
   9:                     </baseAddresses>
  10:                 </host>
  11:             </service>
  12:         </services>
  13:     </system.serviceModel>
  14: </configuration> 

4、Client

image

   1: namespace Clients
   2: {
   3:     public class CalculateCallback : ICalculateCallback
   4:     {
   5:         public static TextBox ResultPanel;               
   6:  
   7:         #region ICalculateCallback Members 
   8:  
   9:         public void DisplayResult(double result)
  10:         {
  11:             ResultPanel.Text = result.ToString();
  12:         }         
  13:  
  14:         #endregion
  15:     }
  16: } 

   1: <configuration>
   2:     <system.serviceModel>
   3:         <client>
   4:             <endpoint address="net.tcp://127.0.0.1:8888/calculateservice"
   5:                 binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate"
   6:                 name="calculateservice" />
   7:         </client>
   8:     </system.serviceModel>
   9: </configuration>
   1: private void buttonCalculate_Click(object sender, EventArgs e)
   2: {
   3:     CalculateCallback.ResultPanel = this.textBoxResult;
   4:     DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");
   5:     ICalculate calculator = channelFactory.CreateChannel();
   6:     calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));
   7: } 

image

二、是什麼導致TimeoutException?

三、解決方案一:通過異步調用或者One-way回調

   1: public void Add(double op1, double op2)
   2: {
   3:     double result = op1 + op2;
   4:     ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();  
   5:  
   6:     ThreadPool.QueueUserWorkItem(delegate{ callback.DisplayResult(result); }, null);
   7: } 

   1: namespace Artech.ThreadAffinity.Contracts
   2: {
   3:     [ServiceContract(CallbackContract = typeof(ICalculateCallback))]
   4:     public interface ICalculate
   5:     {
   6:         [OperationContract(IsOneWay = true)]
   7:         void Add(double op1, double op2);
   8:     }
   9: } 

四、方案二、通過解除Callback操作和UI線程的關聯性

   1: public virtual void Post(SendOrPostCallback d, object state) 
   2: public virtual void Send(SendOrPostCallback d, object state) 

   1: namespace Artech.ThreadAffinity.Clients
   2: {
   3:     [CallbackBehavior(UseSynchronizationContext = false)]
   4:     public class CalculateCallback : ICalculateCallback
   5:     {
   6:         public static TextBox ResultPanel; 
   7:  
   8:         #region ICalculateCallback Members 
   9:  
  10:         public void DisplayResult(double result)
  11:         {
  12:             ResultPanel.Text = result.ToString();
  13:  
  14:         } 
  15:  
  16:         #endregion
  17:     }
  18: }
  19:  

image

原因很簡單,由於我們將callbaclk的設置成false,那麼callback的操作將不會再UI線程下執行。但是我們需要運算的結果輸入到UI的TextBox上,對UI上控件的操作需要在UI線程上執行,顯然會拋出異常了。

   1: namespace Artech.ThreadAffinity.Clients
   2: {
   3:     [CallbackBehavior(UseSynchronizationContext = false)]
   4:     public class CalculateCallback : ICalculateCallback
   5:     {
   6:         public static TextBox ResultPanel;
   7:        public static SynchronizationContext SynchronizationContext; 
   8:  
   9:         #region ICalculateCallback Members 
  10:  
  11:         public void DisplayResult(double result)
  12:         {
  13:              SynchronizationContext.Post(delegate { ResultPanel.Text = result.ToString(); }, null);           
  14:         }        
  15:  
  16:         #endregion
  17:     }
  18: } 

   1: private void buttonCalculate_Click(object sender, EventArgs e)
   2: {
   3:     CalculateCallback.ResultPanel = this.textBoxResult;
   4:     CalculateCallback.SynchronizationContext = SynchronizationContext.Current; 
   5:  
   6:     DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");
   7:     ICalculate calculator = channelFactory.CreateChannel();
   8:     calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));
   9: } 

五、另一種可選方案:通過ISynchronizeInvoke的Invoke/BeginInvoke

   1: public interface ISynchronizeInvoke
   2: {
   3:     // Methods
   4:     [HostProtection(SecurityAction.LinkDemand, Synchronization=true, ExternalThreading=true)]
   5:     IAsyncResult BeginInvoke(Delegate method, object[] args);
   6:     object EndInvoke(IAsyncResult result);
   7:     object Invoke(Delegate method, object[] args); 
   8:  
   9:     // Properties
  10:     bool InvokeRequired { get; }
  11: } 
  12:  

   1: namespace Artech.ThreadAffinity.Clients
   2: {
   3:     [CallbackBehavior(UseSynchronizationContext = false)]
   4:     public class CalculateCallback : ICalculateCallback
   5:     {
   6:         public static TextBox ResultPanel;
   7:         public delegate void DisplayResultDelegate(TextBox resultPanel, double result); 
   8:  
   9:         #region ICalculateCallback Members 
  10:  
  11:         public void DisplayResult(double result)
  12:         {
  13:             DisplayResultDelegate displayResultDelegate = new DisplayResultDelegate(DisplayResult);
  14:            ResultPanel.BeginInvoke(displayResultDelegate, new object[] { ResultPanel, result });                   
  15:         } 
  16:  
  17:         private void DisplayResult(TextBox resultPanel, double result)
  18:         {
  19:             resultPanel.Text = result.ToString();
  20:         } 
  21:  
  22:         #endregion
  23:     }
  24: } 
  25:  

六、Service Hosting的線程關聯性

   1: namespace Artech.ThreadAffinity.Services
   2: {
   3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]
   4:     public class CalculateService:ICalculate
   5:     {
   6:         public static ListBox DisplayPanel
   7:         { get; set; }       
   8:  
   9:         #region ICalculate Members 
  10:  
  11:         public void Add(double op1, double op2)
  12:         {
  13:             double result = op1 + op2;
  14:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();            
  15:  
  16:            DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
  17:  
  18:             callback.DisplayResult(result);
  19:         } 
  20:  
  21:         #endregion
  22:     }
  23: } 
  24:  

image

   1: namespace Artech.ThreadAffinity.Services
   2: {
   3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]
   4:     public class CalculateService:ICalculate
   5:     {
   6:         public static ListBox DisplayPanel
   7:         { get; set; } 
   8:  
   9:         public static SynchronizationContext SynchronizationContext
  10:         { get; set; } 
  11:  
  12:         #region ICalculate Members 
  13:  
  14:         public void Add(double op1, double op2)
  15:         {
  16:             double result = op1 + op2;
  17:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();
  18:            SynchronizationContext.Post(delegate
  19:             {
  20:                 DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
  21:             }, null); 
  22:  
  23:             callback.DisplayResult(result);            
  24:         } 
  25:  
  26:         #endregion
  27:     }
  28: } 
  29:  

   1: private void HostForm_Load(object sender, EventArgs e)
   2: {    
   3:     this._serviceHost = new ServiceHost(typeof(CalculateService));
   4:     CalculateService.DisplayPanel = this.listBoxResult;
   5:    CalculateService.SynchronizationContext = SynchronizationContext.Current;
   6:     this._serviceHost.Opened += delegate
   7:     { 
   8: this.Text = "The calculate service has been started up!";
   9:     }; 
  10:  
  11:     this._serviceHost.Open();
  12: } 
  13:  

WCF後續之旅:
WCF後續之旅(1): WCF是如何通過Binding進行通信的
WCF後續之旅(2): 如何對Channel Layer進行擴展——創建自定義Channel
WCF後續之旅(3): WCF Service Mode Layer 的中樞—Dispatcher
WCF後續之旅(4):WCF Extension Point 概覽
WCF後續之旅(5): 通過WCF Extension實現Localization
WCF後續之旅(6): 通過WCF Extension實現Context信息的傳遞
WCF後續之旅(7):通過WCF Extension實現和Enterprise Library Unity Container的集成
WCF後續之旅(8):通過WCF Extension 實現與MS Enterprise Library Policy Injection Application Block 的集成
WCF後續之旅(9):通過WCF的雙向通信實現Session管理[Part I]
WCF後續之旅(9): 通過WCF雙向通信實現Session管理[Part II]
WCF後續之旅(10): 通過WCF Extension實現以對象池的方式創建Service Instance
WCF後續之旅(11): 關於並發、回調的線程關聯性(Thread Affinity)
WCF後續之旅(12): 線程關聯性(Thread Affinity)對WCF並發訪問的影響
WCF後續之旅(13): 創建一個簡單的WCF SOAP Message攔截、轉發工具[上篇]
WCF後續之旅(13):創建一個簡單的SOAP Message攔截、轉發工具[下篇]
WCF後續之旅(14):TCP端口共享
WCF後續之旅(15): 邏輯地址和物理地址
WCF後續之旅(16): 消息是如何分發到Endpoint的--消息篩選(Message Filter)
WCF後續之旅(17):通過tcpTracer進行消息的路由


作者:蔣金楠
微信公眾賬號:大內老A
微博:www.weibo.com/artech
如果你想及時得到個人撰寫文章以及著作的消息推送,或者想看看個人推薦的技術資料,可以掃描左邊二維碼(或者長按識別二維碼)關注個人公眾號(原來公眾帳號蔣金楠的自媒體將會停用)。
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁麵明顯位置給出原文連接,否則保留追究法律責任的權利。
原文鏈接

最後更新:2017-10-30 17:04:23

  上一篇:go  Enterprise Library深入解析與靈活應用(2): 通過SqlDependency實現Cache和Database的同步
  下一篇:go  Enterprise Library深入解析與靈活應用(3):倘若將Unity、PIAB、Exception Handling引入MVP模式.. .. ..