如何使用Disruptor(二)如何從Ringbuffer讀取
從上一篇文章中我們都了解了什麼是Ring Buffer以及它是如何的特別。但遺憾的是,我還沒有講述如何使用Disruptor向Ring Buffer寫數據和從Ring Buffer中讀取數據。
ConsumerBarrier與消費者
(好,我開始後悔使用Paint/Gimp 了。盡管這是個購買繪圖板的好借口,如果我繼續寫下去的話… UML界的權威們大概也在詛咒我的名字了。)
消費者(Consumer)是一個想從Ring Buffer裏讀取數據的線程,它可以訪問ConsumerBarrier對象——這個對象由RingBuffer創建並且代表消費者與RingBuffer進行交互。就像Ring Buffer顯然需要一個序號才能找到下一個可用節點一樣,消費者也需要知道它將要處理的序號——每個消費者都需要找到下一個它要訪問的序號。在上麵的例子中,消費者處理完了Ring Buffer裏序號8之前(包括8)的所有數據,那麼它期待訪問的下一個序號是9。
消費者可以調用ConsumerBarrier對象的waitFor()方法,傳遞它所需要的下一個序號.
final long availableSeq = consumerBarrier.waitFor(nextSequence);
ConsumerBarrier返回RingBuffer的最大可訪問序號——在上麵的例子中是12。ConsumerBarrier有一個WaitStrategy方法來決定它如何等待這個序號,我現在不會去描述它的細節,代碼的注釋裏已經概括了每一種WaitStrategy的優點和缺點 。
接下來怎麼做?
接下來,消費者會一直原地停留,等待更多數據被寫入Ring Buffer。並且,一旦數據寫入後消費者會收到通知——節點9,10,11和12 已寫入。現在序號12到了,消費者可以讓ConsumerBarrier去拿這些序號節點裏的數據了。
拿到了數據後,消費者(Consumer)會更新自己的標識(cursor)。
你應該已經感覺得到,這樣做是怎樣有助於平緩延遲的峰值了——以前需要逐個節點地詢問“我可以拿下一個數據嗎?現在可以了麼?現在呢?”,消費者(Consumer)現在隻需要簡單的說“當你拿到的數字比我這個要大的時候請告訴我”,函數返回值會告訴它有多少個新的節點可以讀取數據了。因為這些新的節點的確已經寫入了數據(Ring Buffer本身的序號已經更新),而且消費者對這些節點的唯一操作是讀而不是寫,因此訪問不用加鎖。這太好了,不僅代碼實現起來可以更加安全和簡單,而且不用加鎖使得速度更快。
另一個好處是——你可以用多個消費者(Consumer)去讀同一個RingBuffer ,不需要加鎖,也不需要用另外的隊列來協調不同的線程(消費者)。這樣你可以在Disruptor的協調下實現真正的並發數據處理。
BatchConsumer代碼是一個消費者的例子。如果你實現了BatchHandler, 你可以用BatchConsumer來完成上麵我提到的複雜工作。它很容易對付那些需要成批處理的節點(例如上文中要處理的9-12節點)而不用單獨地去讀取每一個節點。
更新:注意Disruptor 2.0版本使用了與本文不一樣的命名。如果你對類名感到困惑,請閱讀我的變更總結。
文章轉自 並發編程網-ifeve.com
最後更新:2017-05-23 10:02:46