閱讀129 返回首頁    go 微軟 go Office


回調與並發: 通過實例剖析WCF基於ConcurrencyMode.Reentrant模式下的並發控製機製

對於正常的服務調用,從客戶端發送到服務端的請求消息最終會被WCF服務運行時分發到相應的封裝了服務實例的InstanceContext上。而在回調場景中,我們同樣將回調對象封裝到InstanceContext對象,並將其封送到客戶端。當服務操作過程中執行回調操作的時候,回調消息最終也是分發到位於客戶端封裝回調對象的InstanceContext。從消息分發與並發處理的機製來看,這兩種請求並沒有本質的不同。接下來,我們通過《實踐重於理論》中的實例,綜合分析WCF對並發服務調用和並發回調的處理機製。

一、將實例改成支持回調的形式

為此,我們需要對我們上麵給出的監控程序進行相應的修改。首先需要修改的是服務契約ICalculator。服務契約ICalculator的Add操作接受傳入的操作數並以返回值得形式返回到客戶端。現在我們通過回調的形式來重寫計算服務:將Add的返回類型改稱void,計算結果通過執行回調操作的形式在客戶端顯示。

   1: [ServiceContract(Namespace="https://www.artech.com/",CallbackContract =typeof(ICalculatorCallback))]
   2: public interface ICalculator
   3: {
   4:     [OperationContract]
   5:     void Add(double x, double y);
   6: }

作為回調契約的ICalculatorCallback接口定義如下,計算結果傳入ShowResult方法顯示出來。在一般情況下,我們會將Add和ShowResult和操作定義在單向(One-way),但是這裏我並沒有這麼做,所以無論是服務操作Add還是回調操作ShowResult均采用請求/回複消息交換模式。

   1: using System.ServiceModel;
   2: namespace Artech.ConcurrentServiceInvocation.Service.Interface
   3: {
   4:     [ServiceContract(Namespace = "https://www.artech.com/")]
   5:     public interface ICalculatorCallback
   6:     {
   7:         [OperationContract]
   8:         void ShowResult(double result);
   9:     }
  10: }

在本例中我們的CalculatorService采用單例實例上下文模式(InstanceContextMode.Single)。為了能夠執行回調,將並發模式設置成ConcurrencyMode.Reentrant。在Add操作中,我們可以將整個執行過程分成三個階段:PreCallback、Callback和PostCallback,而且PreCallback和PostCallback執行時間為5秒。在開始和結束執行Add操作,以及開始與結束回調的時候都是通過EventMonitor發送相應的事件通知。修改後的CalculatorService如下麵的代碼所示。

   1: [ServiceBehavior(UseSynchronizationContext = false,InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Reentrant)]
   2: public class CalculatorService : ICalculator
   3: {
   4:     public void Add(double x, double y)
   5:     {
   6:         //PreCallback
   7:         EventMonitor.Send(EventType.StartExecute);
   8:         Thread.Sleep(5000);
   9:         double result = x + y;
  10:  
  11:         //Callback
  12:         EventMonitor.Send(EventType.StartCallback);
  13:         int clientId = OperationContext.Current.IncomingMessageHeaders.GetHeader<int>(EventMonitor.CientIdHeaderLocalName, EventMonitor.CientIdHeaderNamespace);
  14:         MessageHeader<int> messageHeader = new MessageHeader<int>(clientId);
  15:         OperationContext.Current.OutgoingMessageHeaders.Add(messageHeader.GetUntypedHeader(EventMonitor.CientIdHeaderLocalName, EventMonitor.CientIdHeaderNamespace));
  16:         OperationContext.Current.GetCallbackChannel<ICalculatorCallback>().ShowResult(result);
  17:         EventMonitor.Send(EventType.EndCallback);
  18:  
  19:         //PostCallback
  20:         Thread.Sleep(5000);
  21:         EventMonitor.Send(EventType.EndExecute);
  22:     }
  23: }

對於服務寄宿程序我們不需要做任何修改,但是我們需要采用支持雙向通信的綁定類型以實現對回調的支持,在這裏我們采用的是NetTcpBinding。為了降低安全協商(Negotiation)代碼對時延,我特意將綁定的安全模式設置成None。下麵是更新後的服務端配置,客戶端需要進行相應的修改。

   1: <?xml version="1.0" encoding="utf-8" ?>
   2: <configuration>
   3:     <system.serviceModel>
   4:       <bindings>
   5:         <netTcpBinding>
   6:           <binding name="nonSecureBinding">
   7:             <security mode="None" />
   8:           </binding>
   9:         </netTcpBinding>
  10:       </bindings>
  11:         <services>
  12:             <service name="Artech.ConcurrentServiceInvocation.Service.CalculatorService">
  13:                 <endpoint address="net.tcp://127.0.0.1:3721/calculatorservice" binding="netTcpBinding"
  14:                     bindingConfiguration="nonSecureBinding" contract="Artech.ConcurrentServiceInvocation.Service.Interface.ICalculator" />
  15:             </service>
  16:         </services>
  17:     </system.serviceModel>
  18: </configuration>

由於回調操組在客戶端執行,所以客戶端首先需要的就是實現回調契約接口創建回調類型。實現回調契約接口的ICalculatorCallback定義在CalculatorCallbackService類型中。由於在本例中我們需要的僅僅監控回調操作執行的時間,並不是真的需要顯示出運算的最終結果。所以我們僅僅是通過掛起當前線程模擬一個耗時的回調操作(10秒),在回調操作開始和結束執行的時候通過EventMonitor發送相應的事件通知。

   1: using System.ServiceModel;
   2: using System.Threading;
   3: using Artech.ConcurrentServiceInvocation.Service.Interface;
   4: namespace Artech.ConcurrentServiceInvocation.Client
   5: {
   6:     public class CalculatorCallbackService : ICalculatorCallback
   7:     {
   8:         public void ShowResult(double result)
   9:         {
  10:             EventMonitor.Send(EventType.StartExecuteCallback);
  11:             Thread.Sleep(10000);
  12:             EventMonitor.Send(EventType.EndExecuteCallback);
  13:         }
  14:     }
  15: }

最後一個步驟是對客戶端按照回調的方式進行相應的修改。首先我們創建CalculatorCallbackService對象,並以此創建一個InstanceContext作為回調實例上下文。然後通過該InstanceContext創建DuplexChannelFactory<TChannel>。最後通過ThreadPool並發地執行2次服務代理的創建和服務調用的操作,客戶端ID作為消息報頭被傳送到服務端。

   1: public partial class MonitorForm : Form
   2: {
   3:     private SynchronizationContext _syncContext;
   4:     private DuplexChannelFactory<ICalculator> _channelFactory;
   5:     private InstanceContext _callbackInstance;
   6: private int _clientId = 0;
   7:  
   8:     //其他成員
   9:     private void MonitorForm_Load(object sender, EventArgs e)
  10:     {
  11:         string header = string.Format("{0, -13}{1, -22}{2}", "Client", "Time", "Event");
  12:         this.listBoxExecutionProgress.Items.Add(header);
  13:         _syncContext = SynchronizationContext.Current;
  14:         _callbackInstance = new InstanceContext(new CalculatorCallbackService());
  15:         _channelFactory = new  DuplexChannelFactory<ICalculator>(_callbackInstance,"calculatorservice");
  16:  
  17:         EventMonitor.MonitoringNotificationSended += ReceiveMonitoringNotification;
  18:         this.Disposed += delegate
  19:         {
  20:             EventMonitor.MonitoringNotificationSended -= ReceiveMonitoringNotification;
  21:             _channelFactory.Close();
  22:         };
  23:  
  24:         for (int i = 0; i < 2; i++)
  25:         {
  26:             ThreadPool.QueueUserWorkItem(state =>
  27:             {
  28:                 int clientId = Interlocked.Increment(ref _clientId);
  29:                 EventMonitor.Send(clientId, EventType.StartCall);
  30:                 ICalculator proxy = _channelFactory.CreateChannel();
  31:                 using (OperationContextScope contextScope = new OperationContextScope(proxy as IContextChannel))
  32:                 {
  33:                     MessageHeader<int> messageHeader = new MessageHeader<int>(clientId);
  34:                     OperationContext.Current.OutgoingMessageHeaders.Add(messageHeader.GetUntypedHeader(EventMonitor.CientIdHeaderLocalName, EventMonitor.CientIdHeaderNamespace));
  35:                     proxy.Add(1, 2);
  36:                 }
  37:                 EventMonitor.Send(clientId, EventType.EndCall);
  38:             }, null);
  39:         }
  40:     } 
  41: }

二、從並發控製機製分析得到的輸出結果

現在重新運行我們更新後的監控程序,你將會得到如圖1所示的輸出結果。如果你仔細分析服務端和客戶端輸出的結果你將會看到Add操作的整個執行時間有一段是重合的,也就是說整個服務操作存在並發執行的情況。但是單看PreCallback和PostCallback,則不存在並發執行的情況。從客戶端的角度來看,回調操作也不存在並發執行的情況

image

圖1 Reentrant(Service) + Single(Callback)監控結果

可能上麵的輸出結果還不是很直觀,現在我們通過時間軸的形式來描述通過輸出結果表現出的執行情況。我們忽略掉客戶端和服務通信以及WCF消息分發導致的時延,兩次服務調用在執行的情況如圖2所示。假設服務端在0s接收到兩個並發的調用請求,一個請求被分發給InstanceContext,另一個則被放到等待隊列。到5s的時候,第一個請求完成PreCallback的操作後進行回調,此時InstanceContext被釋放出來,使得它可以用於處理等待著的第二個請求。到10s的時候,第二個請求完成了PreCallback操作準備進行回調,但是封裝回調實例的InstanceContext正在處理第一個回調請求,所示自己在一個等待,直到20s時第一個回調請求處理完畢。

image

圖2 Reentrant(Service) + Single(Callback)監控結果時間軸描述

上麵我們模擬的時單例實例上下文情況下,服務和回調分別采用Concurrency.Reentrant和Concurrency.Single的情況。實例演示的結果充分證明在《並發中的同步--WCF並發體係的同步機製實現》中關於針對InstanceContext加鎖的同步機製的分析。進一步地,如果按照我們的分析,如果我們同時將服務和回調采用的並發模式均換成Concurrency.Multiple,那麼無論是作用於服務實例上下文的PreCallback和PostCallback操作,還是作用於回調實例上下文的Callback都可以並發地執行。為此,我們隻需要對分別應用於CalculatorService和CalculatorCallbackService的ServiceBehaviorAttribute和CallbackBehaviorAttribute的兩個特性稍加修改,將ConcurrencyMode屬性設置成ConcurrencyMode.Multiple即可。相應的改動如下麵的代碼所示:

   1: [ServiceBehavior(UseSynchronizationContext = false,InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
   2: public class CalculatorService : ICalculator
   3: {
   4:    //省略成員
   5: }
   6:  
   7: [CallbackBehavior(ConcurrencyMode = ConcurrencyMode.Multiple)]
   8: public class CalculatorCallbackService : ICalculatorCallback
   9: {
  10: //省略成員
  11: }

再次運行我們的監控程序,得到的如圖3所示的輸出,可以看出這正是我們希望的結果,無論作用於那個InstanceContext的操作都是並發執行的

image

圖3 Multiple(Service) + Multiple(Callback)監控結果



作者:蔣金楠
微信公眾賬號:大內老A
微博:www.weibo.com/artech
如果你想及時得到個人撰寫文章以及著作的消息推送,或者想看看個人推薦的技術資料,可以掃描左邊二維碼(或者長按識別二維碼)關注個人公眾號(原來公眾帳號蔣金楠的自媒體將會停用)。
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁麵明顯位置給出原文連接,否則保留追究法律責任的權利。
原文鏈接

最後更新:2017-10-27 15:04:52

  上一篇:go  並發與實例上下文模式: WCF服務在不同實例上下文模式下具有怎樣的並發表現
  下一篇:go  ConcurrencyMode.Multiple模式下的WCF服務就一定是並發執行的嗎:探討同步上下文對並發的影響[上篇]