Disruptor入門
獲得Disruptor
可以通過Maven或者下載jar來安裝Disruptor。隻要把對應的jar放在Java classpath就可以了。
基本的事件生產和消費
我們從一個簡單的例子開始學習Disruptor:生產者傳遞一個long類型的值給消費者,而消費者消費這個數據的方式僅僅是把它打印出來。首先聲明一個Event來包含需要傳遞的數據:
public class LongEvent { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
由於需要讓Disruptor為我們創建事件,我們同時還聲明了一個EventFactory來實例化Event對象。
public class LongEventFactory implements EventFactory {
@Override public Object newInstance() { return new LongEvent(); } }
我們還需要一個事件消費者,也就是一個事件處理器。這個事件處理器簡單地把事件中存儲的數據打印到終端:
/** */public class LongEventHandler implements EventHandler<LongEvent> { @Override public void onEvent(LongEvent longEvent, long l, boolean b) throws Exception { System.out.println(longEvent.getValue()); } }
事件都會有一個生成事件的源,這個例子中假設事件是由於磁盤IO或者network讀取數據的時候觸發的,事件源使用一個ByteBuffer來模擬它接受到的數據,也就是說,事件源會在IO讀取到一部分數據的時候觸發事件(觸發事件不是自動的,程序員需要在讀取到數據的時候自己觸發事件並發布):
public class LongEventProducer { private final RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } /** * onData用來發布事件,每調用一次就發布一次事件事件 * 它的參數會通過事件傳遞給消費者 * * @param bb */public void onData(ByteBuffer bb) { //可以把ringBuffer看做一個事件隊列,那麼next就是得到下麵一個事件槽 long sequence = ringBuffer.next();try { //用上麵的索引取出一個空的事件用於填充 LongEvent event = ringBuffer.get(sequence);// for the sequence event.setValue(bb.getLong(0)); } finally { //發布事件 ringBuffer.publish(sequence); } } }
很明顯的是:當用一個簡單隊列來發布事件的時候會牽涉更多的細節,這是因為事件對象還需要預先創建。發布事件最少需要兩步:獲取下一個事件槽並發布事件(發布事件的時候要使用try/finnally保證事件一定會被發布)。如果我們使用RingBuffer.next()獲取一個事件槽,那麼一定要發布對應的事件。如果不能發布事件,那麼就會引起Disruptor狀態的混亂。尤其是在多個事件生產者的情況下會導致事件消費者失速,從而不得不重啟應用才能會恢複。
Disruptor 3.0提供了lambda式的API。這樣可以把一些複雜的操作放在Ring Buffer,所以在Disruptor3.0以後的版本最好使用Event Publisher或者Event Translator來發布事件。
public class LongEventProducerWithTranslator { //一個translator可以看做一個事件初始化器,publicEvent方法會調用它 //填充Event private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { public void translateTo(LongEvent event, long sequence, ByteBuffer bb) { event.setValue(bb.getLong(0)); } }; private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer bb) { ringBuffer.publishEvent(TRANSLATOR, bb); } }
上麵寫法的另一個好處是,Translator可以分離出來並且更加容易單元測試。Disruptor提供了不同的接口(EventTranslator, EventTranslatorOneArg, EventTranslatorTwoArg, 等等)去產生一個Translator對象。很明顯,Translator中方法的參數是通過RingBuffer來傳遞的。
最後一步就是把所有的代碼組合起來完成一個完整的事件處理係統。Disruptor在這方麵做了簡化,使用了DSL風格的代碼(其實就是按照直觀的寫法,不太能算得上真正的DSL)。雖然DSL的寫法比較簡單,但是並沒有提供所有的選項。如果依靠DSL已經可以處理大部分情況了。
public class LongEventMain { public static void main(String[] args) throws InterruptedException { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // The factory for the event LongEventFactory factory = new LongEventFactory(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(new LongEventHandler()); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); producer.onData(bb); Thread.sleep(1000); } } }
使用Java 8
Disruptor在自己的接口裏麵添加了對於Java 8 Lambda的支持。大部分Disruptor中的接口都符合Functional Interface的要求(也就是在接口中僅僅有一個方法)。所以在Disruptor中,可以廣泛使用Lambda來代替自定義類。
public class LongEventMainJava8 { /** * 用lambda表達式來注冊EventHandler和EventProductor * @param args * @throws InterruptedException */public static void main(String[] args) throws InterruptedException { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024;// Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // 可以使用lambda來注冊一個EventHandler disruptor.handleEventsWith((event, sequence, endOfBatch) -> System.out.println("Event: " + event.getValue())); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8);for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence, buffer) -> event.setValue(buffer.getLong(0)), bb); Thread.sleep(1000); } } }
在上麵的代碼中,有很多自定義類型可以被省略了。還有注意的是:publishEvent方法中僅調用傳遞給它的參數,並不是直接調用對應的對象。如果把這段代碼換成下麵的代碼:
ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent((event, sequence) -> event.set(bb.getLong(0))); Thread.sleep(1000); }
這段代碼中有一個捕獲參數的lambda,意味著在lambda表達式生成的內部類中會生成一個對象來存儲這個捕獲的bb對象。這會增加不必要的GC。所以在需要較低GC水平的情況下最好把所有的參數都通過publishEvent傳遞。
由於在Java 8中方法引用也是一個lambda,因此還可以把上麵的代碼改成下麵的代碼:
public class LongEventWithMethodRef { public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println(event.getValue()); } public static void translate(LongEvent event, long sequence, ByteBuffer buffer) { event.setValue(buffer.getLong(0)); } public static void main(String[] args) throws Exception { // Executor that will be used to construct new threads for consumers Executor executor = Executors.newCachedThreadPool(); // Specify the size of the ring buffer, must be power of 2. int bufferSize = 1024; // Construct the Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, executor); // Connect the handler disruptor.handleEventsWith(LongEventWithMethodRef::handleEvent); // Start the Disruptor, starts all threads running disruptor.start(); // Get the ring buffer from the Disruptor to be used for publishing. RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer bb = ByteBuffer.allocate(8); for (long l = 0; true; l++) { bb.putLong(0, l); ringBuffer.publishEvent(LongEventWithMethodRef::translate, bb); Thread.sleep(1000); } } }
基本調整選項
上麵的代碼已經可以處理大多數的情況了,但是在有的時候還是會需要根據不同的軟件或者硬件來調整選項以獲得更高的性能。基本的選項有兩個:單或者多生產者模式和可選的等待策略。
單或多 事件生產者
在並發係統中提高性能最好的方式之一就是單一寫者原則,對Disruptor也是適用的。如果在你的代碼中僅僅有一個事件生產者,那麼可以設置為單一生產者模式來提高係統的性能。
public class singleProductorLongEventMain { public static void main(String[] args) throws Exception { //.....// Construct the Disruptor with a SingleProducerSequencer Disruptor<LongEvent> disruptor = new Disruptor(factory, bufferSize, ProducerType.SINGLE, // Single producernew BlockingWaitStrategy(), executor);//..... } }
為了證明,下麵的數據是從Mac Air i7上麵測試的結果:
多生產者:
Run 0, Disruptor=26,553,372 ops/sec
Run 1, Disruptor=28,727,377 ops/sec
Run 2, Disruptor=29,806,259 ops/sec
Run 3, Disruptor=29,717,682 ops/sec
Run 4, Disruptor=28,818,443 ops/sec
Run 5, Disruptor=29,103,608 ops/sec
Run 6, Disruptor=29,239,766 ops/sec
單生產者:
Run 0, Disruptor=89,365,504 ops/sec
Run 1, Disruptor=77,579,519 ops/sec
Run 2, Disruptor=78,678,206 ops/sec
Run 3, Disruptor=80,840,743 ops/sec
Run 4, Disruptor=81,037,277 ops/sec
Run 5, Disruptor=81,168,831 ops/sec
Run 6, Disruptor=81,699,346 ops/sec
可選的等待策略
Disruptor默認的等待策略是BlockingWaitStrategy。這個策略的內部適用一個鎖和條件變量來控製線程的執行和等待(Java基本的同步方法)。BlockingWaitStrategy是最慢的等待策略,但也是CPU使用率最低和最穩定的選項。然而,可以根據不同的部署環境調整選項以提高性能。
SleepingWaitStrategy
和BlockingWaitStrategy一樣,SpleepingWaitStrategy的CPU使用率也比較低。它的方式是循環等待並且在循環中間調用LockSupport.parkNanos(1)來睡眠,(在Linux係統上麵睡眠時間60µs).然而,它的優點在於生產線程隻需要計數,而不執行任何指令。並且沒有條件變量的消耗。但是,事件對象從生產者到消費者傳遞的延遲變大了。SleepingWaitStrategy最好用在不需要低延遲,而且事件發布對於生產者的影響比較小的情況下。比如異步日誌功能。
YieldingWaitStrategy
YieldingWaitStrategy是可以被用在低延遲係統中的兩個策略之一,這種策略在減低係統延遲的同時也會增加CPU運算量。YieldingWaitStrategy策略會循環等待sequence增加到合適的值。循環中調用Thread.yield()允許其他準備好的線程執行。如果需要高性能而且事件消費者線程比邏輯內核少的時候,推薦使用YieldingWaitStrategy策略。例如:在開啟超線程的時候。
BusySpinWaitStrategy
BusySpinWaitStrategy是性能最高的等待策略,同時也是對部署環境要求最高的策略。這個性能最好用在事件處理線程比物理內核數目還要小的時候。例如:在禁用超線程技術的時候。
最後更新:2017-05-23 18:02:30