閱讀204 返回首頁    go 阿裏雲 go 技術社區[雲棲]


LMAX Disruptor——一個高性能、低延遲且簡單的框架

Disruptor是一個用於在線程間通信的高效低延時的消息組件,它像個增強的隊列,並且它是讓LMAX Exchange跑的如此之快的一個關鍵創新。關於什麼是Disruptor、為何它很重要以及它的工作原理方麵的信息都呈爆炸性增長 —— 這些文章很適合開始學習Disruptor,還可跟著LMAX BLOG深入學習。這裏還有一份更詳細的白皮書

雖然disruptor模式使用起來很簡單,但是建立多個消費者以及它們之間的依賴關係需要的樣板代碼太多了。為了能快速又簡單適用於99%的場景,我為Disruptor模式準備了一個簡單的領域特定語言。例如,為建立一個消費者的“四邊形模式”:

(從Trisha Gee’s excellent series explaining the disruptor pattern偷來的圖片)

在這種情況下,隻要生產者(P1)將元素放到ring buffer上,消費者C1和C2就可以並行處理這些元素。但是消費者C3必須一直等到C1和C2處理完之後,才可以處理。在現實世界中的對應的案例就 像:在處理實際的業務邏輯(C3)之前,需要校驗數據(C1),以及將數據寫入磁盤(C2)。

用原生的Disruptor語法來創建這些消費者的話代碼如下:

01 Executor executor = Executors.newCachedThreadPool();
02 BatchHandler handler1 = new MyBatchHandler1();
03 BatchHandler handler2 = new MyBatchHandler2();
04 BatchHandler handler3 = new MyBatchHandler3()
05 RingBuffer ringBuffer = new RingBuffer(ENTRY_FACTORY, RING_BUFFER_SIZE);
06 ConsumerBarrier consumerBarrier1 = ringBuffer.createConsumerBarrier();
07 BatchConsumer consumer1 = new BatchConsumer(consumerBarrier1, handler1);
08 BatchConsumer consumer2 = new BatchConsumer(consumerBarrier1, handler2);
09 ConsumerBarrier consumerBarrier2 =
10 ringBuffer.createConsumerBarrier(consumer1, consumer2);
11 BatchConsumer consumer3 = new BatchConsumer(consumerBarrier2, handler3);
12 executor.execute(consumer1);
13 executor.execute(consumer2);
14 executor.execute(consumer3);
15 ProducerBarrier producerBarrier =
16 ringBuffer.createProducerBarrier(consumer3);

在以上這段代碼中,我們不得不創建那些個handler(就是那些個MyBatchHandler實例),外加消費者屏障,BatchConsumer實例,然後在他們各自的線程中處理這些消費者。DSL能幫我們完成很多創建工作,最終的結果如下:

1 Executor executor = Executors.newCachedThreadPool();
2 BatchHandler handler1 = new MyBatchHandler1();
3 BatchHandler handler2 = new MyBatchHandler2();
4 BatchHandler handler3 = new MyBatchHandler3();
5 DisruptorWizard dw = new DisruptorWizard(ENTRY_FACTORY,
6     RING_BUFFER_SIZE, executor);
7 dw.consumeWith(handler1, handler2).then(handler3);
8 ProducerBarrier producerBarrier = dw.createProducerBarrier();

我們甚至可以在一個更複雜的六邊形模式中構建一個並行消費者鏈:

1 dw.consumeWith(handler1a, handler2a);
2 dw.after(handler1a).consumeWith(handler1b);
3 dw.after(handler2a).consumeWith(handler2b);
4 dw.after(handler1b, handler2b).consumeWith(handler3);
5 ProducerBarrier producerBarrier = dw.createProducerBarrier();

這個領域特定語言剛剛誕生不久,歡迎任何反饋,也歡迎大家從github上fork並改進它。


文章轉自 並發編程網-ifeve.com

最後更新:2017-05-23 10:32:29

  上一篇:go  跟著實例學習ZooKeeper的用法: 分布式鎖
  下一篇:go  跟著實例學習ZooKeeper的用法: Barrier