閱讀523 返回首頁    go 阿裏雲 go 技術社區[雲棲]


解析Disruptor的依賴關係

現在我已經講了 RingBuffer​ 本身,如何從它 讀取​ 以及如何向它 寫入​。從邏輯上來說,下一件要做的事情就是把所有的東西拚裝到在一起。

我前麵提到過多生產者的情況——他們通過 ProducerBarrier 保證寫入操作順序與可控。我也提到過簡單場景下的多消費者數據訪問。更多的消費者的場景會變得更加複雜,我們​ 實現了一些聰明的機製允許多個消費者在訪問 Ring Buffer 的時候互相等待(依賴)。像很多應用裏,有一連串的工作需要在實際執行業務邏輯之前完成 (happen before) —— 例如,在做任何操作之前,我們都必須先保證消息寫入磁盤。

Disruptor 論文​ 和性能測試裏包含了你可能想到的一些基本結構。我準備講一下其中最有趣的那個,這多半是因為我需要練習如何使用畫圖板。

菱形結構

DiamondPath1P3CPerfTest​ 展示了一個並不罕見的結構——獨立的一個生產者和三個消費者。最棘手的一點是:第三個消費者必須等待前兩個消費者處理完成後,才能開始工作。

1P3C-Diamond

消費者 C3 也許是你的業務邏輯。消費者 C1 可能在備份接收到的數據,而消費者 C2 可能在準備數據或者別的東西。

用隊列實現菱形結構

在一個 SEDA-風格的架構​ 中,每個處理階段都會用隊列分開:

1P3C-Diamond-Queue
(為什麼單詞 Queue 裏必須有這麼多 “e” 呢?這是我在畫這些圖時遇到的最麻煩的詞)。

你也許從這裏看到了問題的端倪:一條消息從 P1 傳輸到 C3 要完整的穿過四個隊列,每個隊列在消息進入隊列和取出隊列時都會產生消耗成本。

用 Disruptor 實現菱形結構

在 Disruptor​ 的世界裏,一切都由一個單獨的 Ring Buffer 管理:

1P3C-Diamond-RingBuffer

這張圖看起來更複雜。不過所有的參與者都隻依賴 Ring Buffer 作為一個單獨的聯結點,而且所有的交互都是基於 Barrier 對象與檢查依賴的目標序號來實現的。

生產者這邊比較簡單,它是我在 上文​ 中描述過的單生產者模型。有趣的是,生產者並不需要關心所有的消費者。它隻關心消費者 C3,如果消費者 C3 處理完了 Ring Buffer 的某一個節點,那麼另外兩個消費者肯定也處理完了。因此,隻要 C3 的位置向前移動,Ring Buffer 的後續節點就會空閑出來。

管理消費者的依賴關係需要兩個 ConsumerBarrier 對象。第一個僅僅與 Ring Buffer 交互,C1 和 C2 消費者向它申請下一個可訪問節點。第二個 ConsumerBarrier 隻知道消費者 C1 和 C2,它返回兩個消費者訪問過的消息序號中較小的那個。

Disruptor 怎樣實現消費者等待(依賴)

Hmmm。我想需要一個例子。

1P3C-Diamond-RingBuffer-Example

我們從這個故事發生到一半的時候來看:生產者 P1 已經在 Ring Buffer 裏寫到序號 22 了,消費者 C1 已經訪問和處理完了序號 21 之前的所有數據。消費者 C2 處理到了序號 18。消費者 C3,就是依賴其他消費者的那個,才處理到序號 15。

生產者 P1 不能繼續向 RingBuffer 寫入數據了,因為序號 15 占據了我們想要寫入序號 23 的數據節點 (Slot)。

1P3C-Diamond-RingBuffer-Example2
(抱歉,我真的試過用其他顏色來代替紅色和綠色,但是別的都更容易混淆。)

第一個 ConsumerBarrier(CB1)告訴 C1 和 C2 消費者可以去訪問序號 22 前麵的所有數據,這是 Ring Buffer 中的最大序號。第二個 ConsumerBarrier (CB2) 不但會檢查 RingBuffer 的序號,也會檢查另外兩個消費者的序號並且返回它們之間的最小值。因此,三號消費者被告知可以訪問 Ring Buffer 裏序號 18 前麵的數據。

注意這些消費者還是直接從 Ring Buffer 拿數據節點——並不是由 C1 和 C2 消費者把數據節點從 Ring Buffer 裏取出再傳遞給 C3 消費者的。作為替代的是,由第二個 ConsumerBarrier 告訴 C3 消費者,在 RingBuffer 裏的哪些節點可以安全的處理。

這產生了一個問題——如果任何數據都來自於 Ring Buffer,那麼 C3 消費者如何讀到前麵兩個消費者處理完成的數據呢?如果 C3 消費者關心的隻是先前的消費者是否已經完成它們的工作(例如,把數據複製到別的地方),那麼這一切都沒有問題—— C3 消費者知道工作已完成就放心了。但是,如果 C3 消費者需要訪問先前的消費者的處理結果,它又從哪裏去獲取呢?

更新數據節點

秘密在於把處理結果寫入 Ring Buffer 數據節點 (Entry) 本身。這樣,當 C3 消費者從 Ring Buffer 取出節點時,它已經填充好了 C3 消費者工作需要的所有信息。這裏 真正 重要的地方是節點 (Entry) 對象的每一個字段應該隻允許一個消費者寫入。這可以避免產生並發寫入衝突 (write-contention) 減慢了整個處理過程。

FizzBuzzEntry

你可以在 DiamondPath1P3CPerfTest​ 裏看到這個例子—— FizzBuzzEntry​ 有兩個字段:fizz 和 buzz。如果消費者是 Fizz Consumer, 它隻寫入字段 fizz。如果是 Buzz Consumer, 它隻寫入字段 buzz。第三個消費者 FizzBuzz,它隻去讀這兩個字段但是不會做寫入,因為讀沒問題,不會引起爭用。

一些實際的 Java 代碼

這一切看起來都要比隊列實現更複雜。是的,它涉及到更多的內部協調。但是這些細節對於消費者和生產者是隱藏的,它們隻和 Barrier 對象交互。訣竅在消費者結構裏。上文例子中提到的菱形結構可以用下麵的方法創建:

01 ConsumerBarrier consumerBarrier1 =
02     ringBuffer.createConsumerBarrier();
03 BatchConsumer consumer1 =
04     new BatchConsumer(consumerBarrier1, handler1);
05 BatchConsumer consumer2 =
06     new BatchConsumer(consumerBarrier1, handler2);
07 ConsumerBarrier consumerBarrier2 =
08     ringBuffer.createConsumerBarrier(consumer1, consumer2);
09 BatchConsumer consumer3 =
10     new BatchConsumer(consumerBarrier2, handler3);
11 ProducerBarrier producerBarrier =
12     ringBuffer.createProducerBarrier(consumer3);

總結

現在你知道了——如何關聯 Disruptor 與相互依賴(等待)的多個消費者。關鍵點是:

  •  使用多個 ConsumerBarrier 來管理消費者之間的依賴(等待)關係。
  •  使用 ProducerBarrier 監視結構圖中最後一個消費者。
  •  隻允許一個消費者更新數據節點 (Entry) 的每一個獨立字段。

更新:Adrian 寫了一個非常好的 DSL 工具讓拚接 Disruptor 更加簡單了。

更新 2:注意 Disruptor 2.0 版使用了與本文不一樣的命名。如果你對類名感到困惑,請閱讀我的 變更總結​​。另外,Adrian 的 DSL 工具現在是 Disruptor 主幹代碼的一部分了。


文章轉自 並發編程網-ifeve.com

最後更新:2017-05-22 20:03:41

  上一篇:go  非阻塞算法
  下一篇:go  顛覆大數據分析之Spark彈性分布式數據集