具有依賴關係的並行操作執行
今天看到看到一篇MSDN文章《Parallelizing Operations With Dependencies》,作者是微軟Parallel Computing Platform團隊的一個開發經理。文中提供出一種用於並行執行一組具有依賴關係的操作的解決方案,這不由得想起我在一年之前寫的一個具有相同的功能的組件。於是翻箱倒櫃找了出來,進行了一些加工,與大家分享一下。
我們知道,較之串行化的操作,並行計算將多個任務同時執行,從而充分利用了資源,提高了應用的整體性能。對於多個互不相幹的操作,我們可以直接按照異步的方式執行就可以。但是,我們遇到的很多情況下是,部分操作之間具有相互依賴的關係,一個操作需要在其他依賴的操作執行完成後方可執行。 以下圖為例,每一個圓圈代表要執行的操作,操作之間的肩頭代表它們之間的依賴關係。
我們需要一個組件,幫助我們完成這樣的工作:將相應的操作和依賴關係直接添加到一個容器中,我們的組件能夠自動分析操作之間的依賴關係,在執行的時候根據依賴編排執行順序。
使用我所提供的這樣一個並行操作執行器(ParallelExecutor),可以幫我們解決這個問題。首先對操作本身進行抽象,用以下三個屬性來描述一個並行計算場景中的操作:
- Operation ID: 操作的唯一標識,字符類型
- Action:操作具體執行的功能,使用Action代理表示
- Depedencies:依賴操作列表
在使用ParallelExecutor對操作進行並行執行之前,我們需要通過ParallelExecutor的兩個AddOperation方法添加需要執行的操作。AddOperation定義如下。其中dependencies代表以來操作ID數組,返回值為當前創建的操作ID。
1: public class ParallelExecutor
2: {
3:
4: public string AddOperation(string id, Action action)
5: {
6: //省略實現
7: }
8:
9: public string AddOperation(string id, Action action, string[] dependencies)
10: {
11: //省略實現
12: }
13: }
14:
對於上圖中的操作的依賴結構,我們通過下麵的代碼將所有的操作添加到創建的ParallelExecutor之中並執行。在這裏的具體實現的操作僅僅是打印出操作的ID,以便我們清楚地知道操作執行的先後順序是否滿足依賴關係:
1: static void Main(string[] args)
2: {
3: Action<string> action = id=> {Console.WriteLine(id);};
4:
5: var executor = new ParallelExecutor();
6: var a1 = executor.AddOperation("A1", () => action("A1"));
7: var a2 = executor.AddOperation("A2", () => action("A2"));
8: var a3 = executor.AddOperation("A3", () => action("A3"));
9:
10: var b1 = executor.AddOperation("B1", () => action("B1"), new string[] { a1, a2 });
11: var b2 = executor.AddOperation("B2", () => action("B2"), new string[] { a3 });
12:
13: var c1 = executor.AddOperation("C1", () => action("C1"), new string[] { b1,b2 });
14: var c2 = executor.AddOperation("C2", () => action("C2"));
15:
16: executor.Execute();
17: Console.Read();
18: }
19:
由於是操作的並行執行,線程調度的不確定性使每次輸出的結果各有不同。但是無論如何,需要滿足上圖中展現的依賴關係。下麵是其中一種執行結果,可以看出這是合理的執行順序。
1: A3
2: B2
3: A1
4: A2
5: C2
6: B1
7: C1
實現這樣的並行計算有很多種解決方案。不同的解決方案大都體現在對於單一的操作該如何執行上。在我們提供這個解決方案中,我按照這樣的方案來執行任意一個操作:
直接執行無依賴的操作
如果需要執行的操作並不依賴於任何一個操作(比如C2),那麼我們直接運行就好了,這沒有什麼好說的。
先執行依賴操作,通過注冊事件的方式執行被依賴的操作
如果一個操作依賴於一組操作,在執行之前注冊依賴操作的結束事件實現,被依賴操作的執行發生在某個一個依賴操作的Completed事件觸發後。具體來講,上圖中C1具有兩個以來操作B1和B2,在初始化時,C1上會有一個用於計算尚未執行的依賴操作的個數,並注冊B1和B2得操作結束事件上麵。當B1和B2執行結束後,會觸發該事件。每次事件觸發,C1上的計數器將會減1,如果計數器為0,則表明所有的依賴操作執行結束,則執行C1相應的操作。
現在我們來看看詳細設計和具體實現。首先通過下麵的類圖看看涉及到的所有類型。其中Operation類型是最為重要的一個類型,它代表一個具體的操作。
操作的屬性
一個操作具有如下屬性:
- ID:String類型,操作的唯一標識
- Action:Action類型,操作具體是實現的功能
- Dependencies:Operation數組,依賴的操作
- Status:Operation枚舉,操作當前的狀態
-
ExecutionContext:ExecutionContext類型,用於傳遞線程執行的上下文
1: public class Operation
2: {
3: //其他成員
4: public string ID
5: { get; private set; }
6:
7: public Action Action
8: { get; private set; }
9:
10: public Operation[] Dependencies
11: { get; private set; }
12:
13: public OperationStatus Status
14: { get; private set; }
15:
16: public ExecutionContext ExecutionContext
17: { get; private set; }
18:
19: public Operation(string id, Action action)
20: {
21: if (string.IsNullOrEmpty(id))
22: {
23: throw new ArgumentNullException("id");
24: }
25:
26: if (null == action)
27: {
28: throw new ArgumentNullException("action");
29: }
30: this.Status = OperationStatus.Created;
31: this.ID = id;
32: this.Action = action;
33: this.Dependencies = new Operation[0];
34: }
35:
36: public Operation(string id, Action action, Operation[] dependencies)
37: : this(id, action)
38: {
39: if (null == dependencies)
40: {
41: throw new ArgumentNullException("dependencies");
42: }
43:
44: this.Dependencies = dependencies;
45: }
46: }
47:
操作事件
當前操作執行的狀態通過OperationStatus表示,四個枚舉值分別表示被創建、正在運行、運行結束和失敗(拋出異常)。
1: public enum OperationStatus
2: {
3: Created,
4: Running,
5: Completed,
6: Failed
7: }
操作還具有三個時間,分別在開始執行、結束執行和執行失敗時觸發。這三個事件名稱分別為OperationStarted、OperationCompleted和OperationFailed。
1: public class Operation
2: {
3: //其他成員
4: public event EventHandler<OperationEventArgs> OperationStarted;
5: public event EventHandler<OperationFailedEventArgs> OperationFailed;
6: public event EventHandler<OperationEventArgs> OperationCompleted;
7: }
8:
OperationStarted和OperationCompleted事件對應的參數類型為OperationEventArgs。OperationEventArgs直接繼承自EventArgs,並定義了一個Operation屬性代表對應的Operation對象。
1: public class OperationEventArgs : EventArgs
2: {
3: public OperationEventArgs(Operation operation)
4: {
5: if (null == operation)
6: {
7: throw new ArgumentNullException("operation");
8: }
9:
10: this.Operation = operation;
11: }
12:
13: public Operation Operation
14: { get; private set; }
15: }
16:
OperationFailed的事件參數類型為OperationFailedEventArgs。繼承自OperationEventArgs,在此基礎上添加了一個Exception類型的Error屬性,表示拋出的異常。
操作初始化和事件注冊
在第三節中已經談到過了,被依賴操作的執行通過的依賴操作執行完成後觸發OperationCompleted事件的是實現。事件注冊必須在ParallelExecutor執行之前完成,在這裏我定義了一個Initialize方法,在裏麵完成事件注冊工作:
1: public class Operation
2: {
3: //其他成員
4: private int _remainingDependencies;
5: private List<Operation> _registeredParentOps = new List<Operation>();
6:
7: private static void RegisterCompletedEvents(Operation operation)
8: {
9: operation._remainingDependencies = operation.Dependencies.Length;
10: foreach (var op in operation.Dependencies)
11: {
12: if (op._registeredParentOps.Contains(operation))
13: {
14: continue;
15: }
16: RegisterCompletedEvents(op);
17: op.OperationCompleted += (sender, args) =>
18: {
19: operation._remainingDependencies--;
20: if (operation._remainingDependencies <= 0)
21: {
22: operation.DoExecute();
23: }
24: };
25: op._registeredParentOps.Add(operation);
26: }
27: }
28:
29: public void Initialize()
30: {
31: RegisterCompletedEvents(this);
32: }
33: }
操作執行
ParallelExecutor通過調用Operation的Execute方法執行相應的操作。在Execute方法中,如果是獨立的操作,則執行執行,否則異步執行依賴操作,這是一個遞歸的過程。操作的具體實現定義在DoExecute方法中。
1: public class Operation
2: {
3: //其他成員
4: private void DoExecute()
5: {
6: if (this.Status != OperationStatus.Created)
7: {
8: return;
9: }
10:
11: if (null != this.OperationStarted)
12: {
13: this.OperationStarted(this, new OperationEventArgs(this));
14: }
15:
16: this.Status = OperationStatus.Running;
17: try
18: {
19: if (null != this.ExecutionContext)
20: {
21: ExecutionContext.Run(this.ExecutionContext.CreateCopy(), state => this.Action(), null);
22: }
23: else
24: {
25: this.Action();
26: }
27:
28: this.Status = OperationStatus.Completed;
29: if (null != this.OperationCompleted)
30: {
31: this.OperationCompleted(this, new OperationEventArgs(this));
32: }
33: }
34: catch (Exception ex)
35: {
36: this.Status = OperationStatus.Failed;
37: if (null != this.OperationFailed)
38: {
39: this.OperationFailed(this, new OperationFailedEventArgs(this, ex));
40: }
41: }
42: }
43:
44: [MethodImplAttribute(MethodImplOptions.Synchronized)]
45: public void Execute()
46: {
47: if (this.Dependencies.Length == 0)
48: {
49: this.DoExecute();
50: }
51:
52: foreach (var operation in this.Dependencies)
53: {
54: var op = operation;
55: ThreadPool.UnsafeQueueUserWorkItem(state => op.Execute(), null);
56: }
57:
58: }
59: }
60:
ParallelExecutor
ParallelExecutor提供操作的添加和整體執行。添加操作實現在兩個重載的AddOperation方法中,邏輯並不複雜。當執行Execute方法對所有的操作進行並行執行的時候,需要調用Initialize方法對每個操作進行初始化。然後異步調用每個操作的Execute方法即可。
1: public class ParallelExecutor
2: {
3: public ParallelExecutor()
4: {
5: this.Operations = new Dictionary<string, Operation>();
6: }
7:
8: public IDictionary<string, Operation> Operations
9: { get; private set; }
10:
11: public void Execute()
12: {
13: foreach (var operation in this.Operations.Values)
14: {
15: operation.Initialize();
16: }
17:
18: foreach (var operation in this.Operations.Values)
19: {
20: var op = operation;
21: ThreadPool.UnsafeQueueUserWorkItem(state => op.Execute(), null);
22: }
23: }
24:
25: public string AddOperation(string id, Action action)
26: {
27: ValidateOperation(id, action);
28: this.Operations.Add(id, new Operation(id, action));
29: return id;
30: }
31:
32: private void ValidateOperation(string id, Action action)
33: {
34: if (null == action)
35: {
36: throw new ArgumentNullException("action");
37: }
38:
39: if (this.Operations.ContainsKey(id))
40: {
41: throw new ArgumentException(string.Format("There is an existing operation whose ID is \"{0}\"", id));
42: }
43: }
44:
45: public string AddOperation(string id, Action action, string[] dependencies)
46: {
47: ValidateOperation(id, action);
48: if (null == dependencies)
49: {
50: throw new ArgumentNullException("dependencies");
51: }
52:
53: foreach (var op in dependencies)
54: {
55: if (!this.Operations.ContainsKey(op))
56: {
57: throw new ArgumentException(string.Format("The operation whose ID is \"{0}\" does not exist!", op));
58: }
59: }
60:
61: var operation = new Operation(id, action,
62: this.Operations.Values.
63: Where(op => Array.Exists<string>(dependencies, opId => opId == op.ID)).ToArray<Operation>());
64:
65: this.Operations.Add(id, operation);
66: return id;
67: }
68: }
69:
微信公眾賬號:大內老A
微博:www.weibo.com/artech
如果你想及時得到個人撰寫文章以及著作的消息推送,或者想看看個人推薦的技術資料,可以掃描左邊二維碼(或者長按識別二維碼)關注個人公眾號(原來公眾帳號蔣金楠的自媒體將會停用)。
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁麵明顯位置給出原文連接,否則保留追究法律責任的權利。
最後更新:2017-10-30 14:34:52