閱讀823 返回首頁    go 阿裏雲 go 技術社區[雲棲]


具有依賴關係的並行操作執行

今天看到看到一篇MSDN文章《Parallelizing Operations With Dependencies,作者是微軟Parallel Computing Platform團隊的一個開發經理。文中提供出一種用於並行執行一組具有依賴關係的操作的解決方案,這不由得想起我在一年之前寫的一個具有相同的功能的組件。於是翻箱倒櫃找了出來,進行了一些加工,與大家分享一下。

我們知道,較之串行化的操作,並行計算將多個任務同時執行,從而充分利用了資源,提高了應用的整體性能。對於多個互不相幹的操作,我們可以直接按照異步的方式執行就可以。但是,我們遇到的很多情況下是,部分操作之間具有相互依賴的關係,一個操作需要在其他依賴的操作執行完成後方可執行。 以下圖為例,每一個圓圈代表要執行的操作,操作之間的肩頭代表它們之間的依賴關係。

clip_image002

我們需要一個組件,幫助我們完成這樣的工作:將相應的操作和依賴關係直接添加到一個容器中,我們的組件能夠自動分析操作之間的依賴關係,在執行的時候根據依賴編排執行順序。

使用我所提供的這樣一個並行操作執行器(ParallelExecutor),可以幫我們解決這個問題。首先對操作本身進行抽象,用以下三個屬性來描述一個並行計算場景中的操作:

  •  Operation ID: 操作的唯一標識,字符類型
  •  Action:操作具體執行的功能,使用Action代理表示
  •  Depedencies:依賴操作列表

在使用ParallelExecutor對操作進行並行執行之前,我們需要通過ParallelExecutor的兩個AddOperation方法添加需要執行的操作。AddOperation定義如下。其中dependencies代表以來操作ID數組,返回值為當前創建的操作ID

   1: public class ParallelExecutor
   2: {
   3:  
   4:     public string AddOperation(string id, Action action)
   5:     {
   6:         //省略實現
   7:     }
   8:  
   9:     public string AddOperation(string id, Action action, string[] dependencies)
  10:     {
  11:         //省略實現
  12:     }
  13: }
  14:  

對於上圖中的操作的依賴結構,我們通過下麵的代碼將所有的操作添加到創建的ParallelExecutor之中並執行。在這裏的具體實現的操作僅僅是打印出操作的ID,以便我們清楚地知道操作執行的先後順序是否滿足依賴關係:

   1: static void Main(string[] args)
   2: {
   3:     Action<string> action = id=> {Console.WriteLine(id);}; 
   4:  
   5:     var executor = new ParallelExecutor();
   6:     var a1 = executor.AddOperation("A1", () => action("A1"));
   7:     var a2 = executor.AddOperation("A2", () => action("A2"));
   8:     var a3 = executor.AddOperation("A3", () => action("A3")); 
   9:  
  10:     var b1 = executor.AddOperation("B1", () => action("B1"), new string[] { a1, a2 });
  11:     var b2 = executor.AddOperation("B2", () => action("B2"), new string[] { a3 }); 
  12:  
  13:     var c1 = executor.AddOperation("C1", () => action("C1"), new string[] { b1,b2 });
  14:     var c2 = executor.AddOperation("C2", () => action("C2")); 
  15:  
  16:     executor.Execute();
  17: Console.Read();
  18: }
  19:  

由於是操作的並行執行,線程調度的不確定性使每次輸出的結果各有不同。但是無論如何,需要滿足上圖中展現的依賴關係。下麵是其中一種執行結果,可以看出這是合理的執行順序。

   1: A3
   2: B2
   3: A1
   4: A2
   5: C2
   6: B1
   7: C1

實現這樣的並行計算有很多種解決方案。不同的解決方案大都體現在對於單一的操作該如何執行上。在我們提供這個解決方案中,我按照這樣的方案來執行任意一個操作:

直接執行無依賴的操作

如果需要執行的操作並不依賴於任何一個操作(比如C2),那麼我們直接運行就好了,這沒有什麼好說的。

先執行依賴操作,通過注冊事件的方式執行被依賴的操作

如果一個操作依賴於一組操作,在執行之前注冊依賴操作的結束事件實現,被依賴操作的執行發生在某個一個依賴操作的Completed事件觸發後。具體來講,上圖中C1具有兩個以來操作B1B2,在初始化時,C1上會有一個用於計算尚未執行的依賴操作的個數,並注冊B1B2得操作結束事件上麵。當B1B2執行結束後,會觸發事件。每次事件觸發,C1上的計數器將會減1,如果計數器為0,則表明所有的依賴操作執行結束,則執行C1相應的操作。

現在我們來看看詳細設計和具體實現。首先通過下麵的類圖看看涉及到的所有類型。其中Operation類型是最為重要的一個類型,它代表一個具體的操作。

clip_image004

操作的屬性

一個操作具有如下屬性:

  • IDString類型,操作的唯一標識
  • ActionAction類型,操作具體是實現的功能
  • DependenciesOperation數組,依賴的操作
  • StatusOperation枚舉,操作當前的狀態
  • ExecutionContextExecutionContext類型,用於傳遞線程執行的上下文
   1: public class Operation
   2: {    
   3:     //其他成員
   4:     public string ID
   5:     { get; private set; } 
   6:  
   7:     public Action Action
   8:     { get; private set; } 
   9:  
  10:     public Operation[] Dependencies
  11:     { get; private set; } 
  12:  
  13:     public OperationStatus Status
  14:     { get; private set; } 
  15:  
  16:     public ExecutionContext ExecutionContext
  17:     { get; private set; } 
  18:  
  19:     public Operation(string id, Action action)
  20:     {
  21:         if (string.IsNullOrEmpty(id))
  22:         {
  23:             throw new ArgumentNullException("id");
  24:         } 
  25:  
  26:         if (null == action)
  27:         {
  28:             throw new ArgumentNullException("action");
  29:         }
  30:         this.Status = OperationStatus.Created;
  31:         this.ID = id;
  32:         this.Action = action;
  33:         this.Dependencies = new Operation[0];
  34:     } 
  35:  
  36:     public Operation(string id, Action action, Operation[] dependencies)
  37:         : this(id, action)
  38:     {
  39:         if (null == dependencies)
  40:         {
  41:             throw new ArgumentNullException("dependencies");
  42:         } 
  43:  
  44:         this.Dependencies = dependencies;
  45:     }     
  46: }
  47:  

操作事件

當前操作執行的狀態通過OperationStatus表示,四個枚舉值分別表示被創建、正在運行、運行結束和失敗(拋出異常)。

   1: public enum OperationStatus
   2: {
   3:     Created,
   4:     Running,
   5:     Completed,
   6: Failed
   7: }

操作還具有三個時間,分別在開始執行、結束執行和執行失敗時觸發。這三個事件名稱分別為OperationStartedOperationCompletedOperationFailed

   1: public class Operation
   2: {
   3:     //其他成員
   4:     public event EventHandler<OperationEventArgs> OperationStarted;
   5:     public event EventHandler<OperationFailedEventArgs> OperationFailed;
   6:     public event EventHandler<OperationEventArgs> OperationCompleted;     
   7: }
   8:  

OperationStartedOperationCompleted事件對應的參數類型為OperationEventArgsOperationEventArgs直接繼承自EventArgs,並定義了一個Operation屬性代表對應的Operation對象。

   1: public class OperationEventArgs : EventArgs
   2: {
   3:     public OperationEventArgs(Operation operation)
   4:     {
   5:         if (null == operation)
   6:         {
   7:             throw new ArgumentNullException("operation");
   8:         }
   9:  
  10:         this.Operation = operation;
  11:     }
  12:  
  13:     public Operation Operation
  14:     { get; private set; }
  15: }
  16:  

OperationFailed的事件參數類型為OperationFailedEventArgs。繼承自OperationEventArgs,在此基礎上添加了一個Exception類型的Error屬性,表示拋出的異常。

操作初始化和事件注冊

在第三節中已經談到過了,被依賴操作的執行通過的依賴操作執行完成後觸發OperationCompleted事件的是實現。事件注冊必須在ParallelExecutor執行之前完成,在這裏我定義了一個Initialize方法,在裏麵完成事件注冊工作:

   1: public class Operation
   2: {
   3:     //其他成員
   4:     private int _remainingDependencies;
   5:     private List<Operation> _registeredParentOps = new List<Operation>();     
   6:  
   7:     private static void RegisterCompletedEvents(Operation operation)
   8:     {
   9:         operation._remainingDependencies = operation.Dependencies.Length;
  10:         foreach (var op in operation.Dependencies)
  11:         {
  12:             if (op._registeredParentOps.Contains(operation))
  13:             {
  14:                 continue;
  15:             }
  16:             RegisterCompletedEvents(op);
  17:             op.OperationCompleted += (sender, args) =>
  18:                 {
  19:                     operation._remainingDependencies--;
  20:                     if (operation._remainingDependencies <= 0)
  21:                     {
  22:                         operation.DoExecute();
  23:                     }
  24:                 };
  25:             op._registeredParentOps.Add(operation);
  26:         }            
  27:     } 
  28:  
  29:     public void Initialize()
  30:     {
  31:         RegisterCompletedEvents(this);
  32:    }
  33: }

操作執行

ParallelExecutor通過調用OperationExecute方法執行相應的操作。在Execute方法中,如果是獨立的操作,則執行執行,否則異步執行依賴操作,這是一個遞歸的過程。操作的具體實現定義在DoExecute方法中。

   1: public class Operation
   2: {   
   3:     //其他成員
   4:     private void DoExecute()
   5:     {
   6:         if (this.Status != OperationStatus.Created)
   7:         {
   8:             return;
   9:         } 
  10:  
  11:         if (null != this.OperationStarted)
  12:         {
  13:             this.OperationStarted(this, new OperationEventArgs(this));
  14:         } 
  15:  
  16:         this.Status = OperationStatus.Running;
  17:         try
  18:         {
  19:             if (null != this.ExecutionContext)
  20:             {
  21:                 ExecutionContext.Run(this.ExecutionContext.CreateCopy(), state => this.Action(), null);
  22:             }
  23:             else
  24:             {
  25:                 this.Action();
  26:             } 
  27:  
  28:             this.Status = OperationStatus.Completed;
  29:             if (null != this.OperationCompleted)
  30:             {
  31:                 this.OperationCompleted(this, new OperationEventArgs(this));
  32:             }
  33:         }
  34:         catch (Exception ex)
  35:         {
  36:             this.Status = OperationStatus.Failed;
  37:             if (null != this.OperationFailed)
  38:             {
  39:                 this.OperationFailed(this, new OperationFailedEventArgs(this, ex));
  40:             }
  41:         }
  42:     } 
  43:  
  44:     [MethodImplAttribute(MethodImplOptions.Synchronized)]
  45:     public void Execute()
  46:     {
  47:         if (this.Dependencies.Length == 0)
  48:         {
  49:             this.DoExecute();
  50:         }           
  51:  
  52:         foreach (var operation in this.Dependencies)
  53:         {
  54:             var op = operation;
  55:             ThreadPool.UnsafeQueueUserWorkItem(state => op.Execute(), null);
  56:         }        
  57:  
  58:     } 
  59: }
  60:  

ParallelExecutor

ParallelExecutor提供操作的添加和整體執行。添加操作實現在兩個重載的AddOperation方法中,邏輯並不複雜。當執行Execute方法對所有的操作進行並行執行的時候,需要調用Initialize方法對每個操作進行初始化。然後異步調用每個操作的Execute方法即可。

   1: public class ParallelExecutor
   2: {
   3:     public ParallelExecutor()
   4:     {
   5:         this.Operations = new Dictionary<string, Operation>();
   6:     }
   7:  
   8:     public IDictionary<string, Operation> Operations
   9:     { get; private set; }
  10:  
  11:     public void Execute()
  12:     {
  13:         foreach (var operation in this.Operations.Values)
  14:         {
  15:             operation.Initialize();
  16:         }
  17:  
  18:         foreach (var operation in this.Operations.Values)
  19:         {
  20:             var op = operation;
  21:             ThreadPool.UnsafeQueueUserWorkItem(state => op.Execute(), null);
  22:         }
  23:     }
  24:  
  25:     public string AddOperation(string id, Action action)
  26:     {
  27:         ValidateOperation(id, action);
  28:         this.Operations.Add(id, new Operation(id, action));
  29:         return id;
  30:     }
  31:  
  32:     private void ValidateOperation(string id, Action action)
  33:     {
  34:         if (null == action)
  35:         {
  36:             throw new ArgumentNullException("action");
  37:         }
  38:  
  39:         if (this.Operations.ContainsKey(id))
  40:         {
  41:             throw new ArgumentException(string.Format("There is an existing operation whose ID is \"{0}\"", id));
  42:         }
  43:     }
  44:  
  45:     public string AddOperation(string id, Action action, string[] dependencies)
  46:     {
  47:         ValidateOperation(id, action);
  48:         if (null == dependencies)
  49:         {
  50:             throw new ArgumentNullException("dependencies");
  51:         }
  52:  
  53:         foreach (var op in dependencies)
  54:         {
  55:             if (!this.Operations.ContainsKey(op))
  56:             {
  57:                 throw new ArgumentException(string.Format("The operation whose ID is \"{0}\" does not exist!", op));
  58:             }
  59:         }
  60:  
  61:         var operation = new Operation(id, action,
  62:                this.Operations.Values.
  63:                Where(op => Array.Exists<string>(dependencies, opId => opId == op.ID)).ToArray<Operation>());
  64:  
  65:         this.Operations.Add(id, operation);
  66:         return id;
  67:     }
  68: }
  69:  


作者:蔣金楠
微信公眾賬號:大內老A
微博:www.weibo.com/artech
如果你想及時得到個人撰寫文章以及著作的消息推送,或者想看看個人推薦的技術資料,可以掃描左邊二維碼(或者長按識別二維碼)關注個人公眾號(原來公眾帳號蔣金楠的自媒體將會停用)。
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁麵明顯位置給出原文連接,否則保留追究法律責任的權利。
原文鏈接

最後更新:2017-10-30 14:34:52

  上一篇:go  難道調用ThreadPool.QueueUserWorkItem()的時候,真是必須調用Thread.Sleep(N)嗎?
  下一篇:go  560萬Facebook人際關係數據,揭秘家庭職業傳承“真相”