阅读198 返回首页    go 阿里云 go 技术社区[云栖]


WCF后续之旅(11): 关于并发、回调的线程关联性(Thread Affinity)

一、从基于Windows Application客户端的WCF回调失败谈起

在"我的WCF之旅"系列文章中,有一篇(WinForm Application中调用Duplex Service出现TimeoutException的原因和解决方案)专门介绍在一个Windows Application客户端应用, 通过WCF 的Duplex通信方式进行回调失败的文章.我们今天以此作为出发点介绍WCF在Thread Affinity下的表现和解决方案.

image

   1: namespace Artech.ThreadAffinity.Contracts
   2: {
   3:     [ServiceContract(CallbackContract = typeof(ICalculateCallback))]
   4:     public interface ICalculate
   5:     {
   6:         [OperationContract]
   7:         void Add(double op1, double op2);
   8:     }
   9: } 

   1: namespace Artech.ThreadAffinity.Contracts
   2: {
   3:    public interface ICalculateCallback
   4:     {
   5:         [OperationContract]
   6:         void DisplayResult(double result);
   7:     }
   8: } 

   1: namespace Artech.ThreadAffinity.Services
   2: {
   3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant)]
   4:     public class CalculateService:ICalculate
   5:     {
   6:         public static ListBox DisplayPanel
   7:         { get; set; }         
   8:  
   9:         #region ICalculate Members 
  10:  
  11:         public void Add(double op1, double op2)
  12:         {
  13:             double result = op1 + op2;
  14:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();            
  15:  
  16: DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
  17:  
  18:             callback.DisplayResult(result);
  19:         } 
  20:  
  21:         #endregion
  22:     }
  23: }


由于需要进行callback, 我们把ConcurrencyMode 设为Reentrant。当得到运算的结果后,通过OperationContext.Current.GetCallbackChannel得到callback对象,并调用之。还有一点需要提的是,该service是通过一个Windows Form application进行host的。并且有一个ListBox列出所有service执行的结果,就像这样:

image

3、Hosting

   1: private void HostForm_Load(object sender, EventArgs e)
   2: {    
   3:     this._serviceHost = new ServiceHost(typeof(CalculateService));
   4:     CalculateService.DisplayPanel = this.listBoxResult;
   5:     CalculateService.SynchronizationContext = SynchronizationContext.Current;
   6:     this._serviceHost.Opened += delegate
   7:     { 
   8: this.Text = "The calculate service has been started up!";
   9:     }; 
  10:  
  11:     this._serviceHost.Open();
  12: } 

   1: <configuration>
   2:     <system.serviceModel>
   3:         <services>
   4:             <service name="Artech.ThreadAffinity.Services.CalculateService">
   5:                 <endpoint binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate" />
   6:                 <host>
   7:                     <baseAddresses>
   8:                         <add baseAddress="net.tcp://127.0.0.1:8888/calculateservice" />
   9:                     </baseAddresses>
  10:                 </host>
  11:             </service>
  12:         </services>
  13:     </system.serviceModel>
  14: </configuration> 

4、Client

image

   1: namespace Clients
   2: {
   3:     public class CalculateCallback : ICalculateCallback
   4:     {
   5:         public static TextBox ResultPanel;               
   6:  
   7:         #region ICalculateCallback Members 
   8:  
   9:         public void DisplayResult(double result)
  10:         {
  11:             ResultPanel.Text = result.ToString();
  12:         }         
  13:  
  14:         #endregion
  15:     }
  16: } 

   1: <configuration>
   2:     <system.serviceModel>
   3:         <client>
   4:             <endpoint address="net.tcp://127.0.0.1:8888/calculateservice"
   5:                 binding="netTcpBinding" bindingConfiguration="" contract="Artech.ThreadAffinity.Contracts.ICalculate"
   6:                 name="calculateservice" />
   7:         </client>
   8:     </system.serviceModel>
   9: </configuration>
   1: private void buttonCalculate_Click(object sender, EventArgs e)
   2: {
   3:     CalculateCallback.ResultPanel = this.textBoxResult;
   4:     DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");
   5:     ICalculate calculator = channelFactory.CreateChannel();
   6:     calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));
   7: } 

image

二、是什么导致TimeoutException?

三、解决方案一:通过异步调用或者One-way回调

   1: public void Add(double op1, double op2)
   2: {
   3:     double result = op1 + op2;
   4:     ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();  
   5:  
   6:     ThreadPool.QueueUserWorkItem(delegate{ callback.DisplayResult(result); }, null);
   7: } 

   1: namespace Artech.ThreadAffinity.Contracts
   2: {
   3:     [ServiceContract(CallbackContract = typeof(ICalculateCallback))]
   4:     public interface ICalculate
   5:     {
   6:         [OperationContract(IsOneWay = true)]
   7:         void Add(double op1, double op2);
   8:     }
   9: } 

四、方案二、通过解除Callback操作和UI线程的关联性

   1: public virtual void Post(SendOrPostCallback d, object state) 
   2: public virtual void Send(SendOrPostCallback d, object state) 

   1: namespace Artech.ThreadAffinity.Clients
   2: {
   3:     [CallbackBehavior(UseSynchronizationContext = false)]
   4:     public class CalculateCallback : ICalculateCallback
   5:     {
   6:         public static TextBox ResultPanel; 
   7:  
   8:         #region ICalculateCallback Members 
   9:  
  10:         public void DisplayResult(double result)
  11:         {
  12:             ResultPanel.Text = result.ToString();
  13:  
  14:         } 
  15:  
  16:         #endregion
  17:     }
  18: }
  19:  

image

原因很简单,由于我们将callbaclk的设置成false,那么callback的操作将不会再UI线程下执行。但是我们需要运算的结果输入到UI的TextBox上,对UI上控件的操作需要在UI线程上执行,显然会抛出异常了。

   1: namespace Artech.ThreadAffinity.Clients
   2: {
   3:     [CallbackBehavior(UseSynchronizationContext = false)]
   4:     public class CalculateCallback : ICalculateCallback
   5:     {
   6:         public static TextBox ResultPanel;
   7:        public static SynchronizationContext SynchronizationContext; 
   8:  
   9:         #region ICalculateCallback Members 
  10:  
  11:         public void DisplayResult(double result)
  12:         {
  13:              SynchronizationContext.Post(delegate { ResultPanel.Text = result.ToString(); }, null);           
  14:         }        
  15:  
  16:         #endregion
  17:     }
  18: } 

   1: private void buttonCalculate_Click(object sender, EventArgs e)
   2: {
   3:     CalculateCallback.ResultPanel = this.textBoxResult;
   4:     CalculateCallback.SynchronizationContext = SynchronizationContext.Current; 
   5:  
   6:     DuplexChannelFactory<ICalculate> channelFactory = new DuplexChannelFactory<ICalculate>(new CalculateCallback(), "calculateservice");
   7:     ICalculate calculator = channelFactory.CreateChannel();
   8:     calculator.Add(double.Parse(this.textBoxOp1.Text), double.Parse(this.textBoxOp2.Text));
   9: } 

五、另一种可选方案:通过ISynchronizeInvoke的Invoke/BeginInvoke

   1: public interface ISynchronizeInvoke
   2: {
   3:     // Methods
   4:     [HostProtection(SecurityAction.LinkDemand, Synchronization=true, ExternalThreading=true)]
   5:     IAsyncResult BeginInvoke(Delegate method, object[] args);
   6:     object EndInvoke(IAsyncResult result);
   7:     object Invoke(Delegate method, object[] args); 
   8:  
   9:     // Properties
  10:     bool InvokeRequired { get; }
  11: } 
  12:  

   1: namespace Artech.ThreadAffinity.Clients
   2: {
   3:     [CallbackBehavior(UseSynchronizationContext = false)]
   4:     public class CalculateCallback : ICalculateCallback
   5:     {
   6:         public static TextBox ResultPanel;
   7:         public delegate void DisplayResultDelegate(TextBox resultPanel, double result); 
   8:  
   9:         #region ICalculateCallback Members 
  10:  
  11:         public void DisplayResult(double result)
  12:         {
  13:             DisplayResultDelegate displayResultDelegate = new DisplayResultDelegate(DisplayResult);
  14:            ResultPanel.BeginInvoke(displayResultDelegate, new object[] { ResultPanel, result });                   
  15:         } 
  16:  
  17:         private void DisplayResult(TextBox resultPanel, double result)
  18:         {
  19:             resultPanel.Text = result.ToString();
  20:         } 
  21:  
  22:         #endregion
  23:     }
  24: } 
  25:  

六、Service Hosting的线程关联性

   1: namespace Artech.ThreadAffinity.Services
   2: {
   3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]
   4:     public class CalculateService:ICalculate
   5:     {
   6:         public static ListBox DisplayPanel
   7:         { get; set; }       
   8:  
   9:         #region ICalculate Members 
  10:  
  11:         public void Add(double op1, double op2)
  12:         {
  13:             double result = op1 + op2;
  14:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();            
  15:  
  16:            DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
  17:  
  18:             callback.DisplayResult(result);
  19:         } 
  20:  
  21:         #endregion
  22:     }
  23: } 
  24:  

image

   1: namespace Artech.ThreadAffinity.Services
   2: {
   3:     [ServiceBehavior(ConcurrencyMode = ConcurrencyMode.Reentrant,UseSynchronizationContext = false)]
   4:     public class CalculateService:ICalculate
   5:     {
   6:         public static ListBox DisplayPanel
   7:         { get; set; } 
   8:  
   9:         public static SynchronizationContext SynchronizationContext
  10:         { get; set; } 
  11:  
  12:         #region ICalculate Members 
  13:  
  14:         public void Add(double op1, double op2)
  15:         {
  16:             double result = op1 + op2;
  17:             ICalculateCallback callback = OperationContext.Current.GetCallbackChannel<ICalculateCallback>();
  18:            SynchronizationContext.Post(delegate
  19:             {
  20:                 DisplayPanel.Items.Add(string.Format("{0} + {1} = {2}", op1, op2, result));
  21:             }, null); 
  22:  
  23:             callback.DisplayResult(result);            
  24:         } 
  25:  
  26:         #endregion
  27:     }
  28: } 
  29:  

   1: private void HostForm_Load(object sender, EventArgs e)
   2: {    
   3:     this._serviceHost = new ServiceHost(typeof(CalculateService));
   4:     CalculateService.DisplayPanel = this.listBoxResult;
   5:    CalculateService.SynchronizationContext = SynchronizationContext.Current;
   6:     this._serviceHost.Opened += delegate
   7:     { 
   8: this.Text = "The calculate service has been started up!";
   9:     }; 
  10:  
  11:     this._serviceHost.Open();
  12: } 
  13:  

WCF后续之旅:
WCF后续之旅(1): WCF是如何通过Binding进行通信的
WCF后续之旅(2): 如何对Channel Layer进行扩展——创建自定义Channel
WCF后续之旅(3): WCF Service Mode Layer 的中枢—Dispatcher
WCF后续之旅(4):WCF Extension Point 概览
WCF后续之旅(5): 通过WCF Extension实现Localization
WCF后续之旅(6): 通过WCF Extension实现Context信息的传递
WCF后续之旅(7):通过WCF Extension实现和Enterprise Library Unity Container的集成
WCF后续之旅(8):通过WCF Extension 实现与MS Enterprise Library Policy Injection Application Block 的集成
WCF后续之旅(9):通过WCF的双向通信实现Session管理[Part I]
WCF后续之旅(9): 通过WCF双向通信实现Session管理[Part II]
WCF后续之旅(10): 通过WCF Extension实现以对象池的方式创建Service Instance
WCF后续之旅(11): 关于并发、回调的线程关联性(Thread Affinity)
WCF后续之旅(12): 线程关联性(Thread Affinity)对WCF并发访问的影响
WCF后续之旅(13): 创建一个简单的WCF SOAP Message拦截、转发工具[上篇]
WCF后续之旅(13):创建一个简单的SOAP Message拦截、转发工具[下篇]
WCF后续之旅(14):TCP端口共享
WCF后续之旅(15): 逻辑地址和物理地址
WCF后续之旅(16): 消息是如何分发到Endpoint的--消息筛选(Message Filter)
WCF后续之旅(17):通过tcpTracer进行消息的路由


作者:蒋金楠
微信公众账号:大内老A
微博:www.weibo.com/artech
如果你想及时得到个人撰写文章以及着作的消息推送,或者想看看个人推荐的技术资料,可以扫描左边二维码(或者长按识别二维码)关注个人公众号(原来公众帐号蒋金楠的自媒体将会停用)。
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
原文链接

最后更新:2017-10-30 17:04:23

  上一篇:go  Enterprise Library深入解析与灵活应用(2): 通过SqlDependency实现Cache和Database的同步
  下一篇:go  Enterprise Library深入解析与灵活应用(3):倘若将Unity、PIAB、Exception Handling引入MVP模式.. .. ..