閱讀724 返回首頁    go 魔獸


簡單了解Disruptor(二)

2.  Disruptor什麼時候用

Disruptor適用於兩個獨立的處理過程(兩個線程)之間交換數據。下麵以兩個簡單場景舉例:

例如場景一:

停車批量入場數據上報,數據上報後需要對每條入場數據存入DB,還需要發送kafka消息給其他業務係統。如果執行完所有的操作,再返回,那麼接口耗時比較長,我們可以批量上報後驗證數據正確性,通過後按單條入場數據寫入環形隊列,然後直接返回成功。

實現方式一:啟 動2個消費者線程,一個消費者去執行db入庫,一個消費者去發送kafka消息。

實現方式二:啟動4個消費者,2個消費者並發執行db入庫,兩個消費者並發發送kafka消息,充分利用cpu多核特性,提高執行效率。

實現方式三:如果要求寫入DB和kafka後,需要給用戶發送短信。那麼可以啟動三個消費者線程,一個執行db插入,一個執行kafka消息發布,最後一個依賴前兩個線程執行成功,前兩個線程都執行成功後,該線程執行短信發送。

例如場景二:

你在網上使用信用卡下訂單。一個簡單的零售係統將獲取您的訂單信息,使用信用卡驗證服務,以檢查您的信用卡號碼,然後確認您的訂單 – 所有這些都在一個單一過程中操作。當進行信用卡有效性檢查時,服務器這邊的線程會阻塞等待,當然這個對於用戶來說停頓不會太長。

在MAX架構中,你將此單一操作過程分為兩個,第一部分將獲取訂單信息,然後輸出事件(請求信用卡檢查有效性的請求事件)給信用卡公司. 業務邏輯處理器將繼續處理其他客戶的訂單,直至它在輸入事件中發現了信用卡已經檢查有效的事件,然後獲取該事件來確認該訂單有效。

 

3.  Disruptor為什麼快

2.1數組實現

用數組實現, 解決了鏈表節點分散, 不利於cache預讀問題,可以預分配用於存儲事件內容的內存空間;並且解決了節點每次需要分配和釋放, 需要大量的垃圾回收GC問題 (數組內元素的內存地址的連續性存儲的,在硬件級別,數組中的元素是會被預加載的,因為隻要一個元素被加載到緩存行,其他相鄰的幾個元素也會被加載進同一個緩存行)

2.2求餘操作優化

求餘操作本身也是一種高耗費的操作, 所以ringbuffer的size設成2的n次方, 可以利用位操作來高效實現求餘。要找到數組中當前序號指向的元素,可以通過mod操作,正常通過sequence mod array length = array index,優化後可以通過:sequence & (array length-1) = array index實現。比如一共有8槽,3&(8-1)=3,HashMap就是用這個方式來定位數組元素的,這種方式比取模的速度更快。

 

2.3 預讀與批量

相比鏈表隊列,實現數組預讀,減少結點操作空間釋放和申請,從而減少gc次數。生產者支持單生產,多生產者模式,單生產者cursor使用普通long實現,無鎖加快速度,多生產者才使用Sequence(AtomicLong)

生產和消費元素支持單線程批量操作數據。

2.4 Lock-Free

係統態的鎖會導致線程cache丟失. 鎖競爭的時候需要進行仲裁. 這個仲裁會涉及到操作係統的內核切換, 並且在此過程中操作係統需要做一係列操作, 導致原有線程的指令緩存和數據緩很可能被丟掉

– 用戶態的鎖往往是通過自旋鎖來實現(自旋即忙等), 而自旋在競爭激烈的時候開銷是很大的(一直在消耗CPU資源)

disruptor不使用鎖, 使用CAS(Compare And Swap/Set),嚴格意義上說仍然是使用鎖, 因為CAS本質上也是一種樂觀鎖, 隻不過是CPU級別指令, 不涉及到操作係統, 所以效率很高(AtomicLong實現Sequence)

CAS說明:

  • CAS依賴於處理器的支持, 當然大部分現代處理器都支持.
  • CAS相對於鎖是非常高效的, 因為它不需要涉及內核上下文切換進行仲裁.
  • CAS並不是免費的, 它會涉及到對指令pipeline加鎖, 並且會用到內存barrier(用來刷新內存狀態,簡單理解就是把緩存中,寄存器中的數據同步到內存中去)

 

2.5 解決偽共享{False Sharing}

Cpu cache簡單示意圖:

 

 

上麵談到lock的耗費, 主要也是由於內核的切換導致cache的丟失

所以cache是優化的關鍵, cache越接近core就越快,也越小 。

其中L1,L2,L3等級緩存都是由緩存行組成的, 通常是64字節, 一個Java的long類型是8字節,因此在一個緩存行中可以存8個long類型的變量. 緩存行是緩存更新的基本單位, 就算你隻讀一個變量, 係統也會預讀其餘7個, 並cache這一行, 並且這行中的任一變量發生改變, 都需要重新加載整行, 而非僅僅重新加載一個變量.

偽共享舉例:

比如在鏈表中往往會連續定義head和tail指針, 所以對於cache-line的預讀, 很有可能會導致head和tail在同一cache-line。在實際使用中, 往往producer線程會持續更改tail指針, 而consumer線程會持續更改head指針

當producer線程和consumer線程分別被分配到core2和core1, 就會出現以下狀況,由於core1不斷改變h, 導致該cache-line過期, 對於core2, 雖然他不需要讀h, 或者t也沒有改變, 但是由於cache-line的整行更新, 所以core2仍然需要不停的更新它的cache,core2的緩存未命中被一個和它本身完全不相幹的值h, 而被大大提高, 導致cache效率底下,而實際情況下, core1會不斷更新h, 而core2會不斷更新t, 導致core1和core2都需要頻繁的重新load cache, 這就是偽共享問題

 

在Disruptor裏我們對RingBuffer的cursor和BatchEventProcessor的序列進行了緩存行填充,如下:

 

class LhsPadding {
    protected long p1;
    protected long p2;
    protected long p3;
    protected long p4;
    protected long p5;
    protected long p6;
    protected long p7;

    LhsPadding() {
    }
}

2.6 使用內存屏障

內存屏障本身不是一種優化方式, 而是你使用lock-free(CAS)的時候, 必須要配合使用內存屏障,因為CPU和memory之間有多級cache, CPU core隻會更新cache-line, 而cache-line什麼時候flush到memory, 這個是有一定延時的 ,在這個延時當中, 其他CPU core是無法得知你的更新的, 因為隻有把cache-line flush到memory後, 其他core中的相應的cache-line才會被置為過期數據,所以如果要保證使用CAS能保證線程間互斥, 即樂觀鎖, 必須當一個core發生更新後, 其他所有core立刻知道並把相應的cache-line設為過期, 否則在這些core上執行CAS讀到的都是過期數據.

內存屏障 = “立刻將cache-line flush到memory, 沒有延時”

注:可參考java中volatile的原理,同樣實現了內存屏障。

 

4.  使用Disruptor開發

下麵以車輛入場為例,入場後需要存入數據庫,需要發送kafka消息,兩步執行完後,給用戶發送短信。代碼實現如下:(參見代碼運行)

開發步驟:

  1. 定義事件
/**
 * @author mawming
 * @version 1.0
 * @create date:2016-9-12.
 */
public class InParkingDataEvent {
    private String carLicense = "";

    public void setCarLicense(String carLicense)
    {
        this.carLicense = carLicense;
    }
    public String getCarLicense()
    {
        return carLicense;
    }
}
  1. 定義事件處理的具體實現

 

/**
 * @author mawming
 * @version 1.0
 * @create date:2016-9-12.
 */
public class ParkingDataInDbHandler implements EventHandler<InParkingDataEvent>,WorkHandler<InParkingDataEvent>{

   @Override
   public void onEvent(InParkingDataEvent event) throws Exception {
      long threadId = Thread.currentThread().getId();
        String carLicense = event.getCarLicense();
      System.out.println(String.format("Thread Id %s save %s into db ....",threadId,carLicense));
   }

   @Override
   public void onEvent(InParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception {
      // TODO Auto-generated method stub
      this.onEvent(event);  
   }

}

public class ParkingDataSmsHandler implements EventHandler<InParkingDataEvent> {


    @Override
    public void onEvent(InParkingDataEvent event, long sequence, boolean endOfBatch) throws Exception {
            long threadId = Thread.currentThread().getId();
            String carLicense = event.getCarLicense();
            System.out.println(String.format("Thread Id %s send %s in plaza sms to user",threadId,carLicense));
    }
}

public class ParkingDataToKafkaHandler implements EventHandler<InParkingDataEvent> {
     
    @Override  
    public void onEvent(InParkingDataEvent event, long sequence,
            boolean endOfBatch) throws Exception {  
       long threadId = Thread.currentThread().getId();
        String carLicense = event.getCarLicense();
        System.out.println(String.format("Thread Id %s send %s in plaza messsage to kafka...",threadId,carLicense));
    }  
}

3.發布事件類實現(Disruptor 要求 RingBuffer.publish 必須得到調用,如果發生異常也一樣要調用 publish ,那麼,很顯然這個時候需要調用者在事件處理的實現上來判斷事件攜帶的數據是否是正確的或者完整的)
public class InParkingDataEventPublisher implements Runnable{
    Disruptor<InParkingDataEvent> disruptor;
    private CountDownLatch latch;  
    //private static int LOOP=10000;//模擬一萬車輛入場
    private static int LOOP=10;//模擬10車輛入場


    public InParkingDataEventPublisher(CountDownLatch latch,Disruptor<InParkingDataEvent> disruptor) {
        this.disruptor=disruptor;  
        this.latch=latch;  
    }  
  
    @Override  
    public void run() {
        InParkingDataEventTranslator tradeTransloator=new InParkingDataEventTranslator();
        for(int i=0;i<LOOP;i++){

            disruptor.publishEvent(tradeTransloator);
            try {
                Thread.currentThread().sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }  
        latch.countDown();
        System.out.println("生產者寫完" +LOOP + "個消息");
    }  
      
}  
  
class InParkingDataEventTranslator implements EventTranslator<InParkingDataEvent>{

    @Override  
    public void translateTo(InParkingDataEvent event, long sequence) {
        this.generateTradeTransaction(event);  
    }  
    private InParkingDataEvent generateTradeTransaction(InParkingDataEvent event){
       int num =  (int)(Math.random()*8000);
        num = num + 1000;
       event.setCarLicense("京Z" + num);
        System.out.println("Thread Id " + Thread.currentThread().getId() + " 寫完一個event");
        return event;
    }
}
  1. 定義用於事件處理的線程池, 指定等待策略, 啟動 Disruptor,執行完畢後關閉Disruptor
/**
 * @author mawming
 * @version 1.0
 * @create date:2016-9-12.
 * 測試 P1生產消息,C1,C2消費消息,C1和C2會共享所有的event元素! C3依賴C1,C2處理結果
 */
public class TestP1c12c3 {
   public static void main(String[] args) throws InterruptedException {  
        long beginTime=System.currentTimeMillis();  
          
        int bufferSize=1024;
        //Disruptor交給線程池來處理,共計 p1,c1,c2,c3四個線程
        ExecutorService executor=Executors.newFixedThreadPool(4);
        //構造緩衝區與事件生成
        Disruptor<InParkingDataEvent> disruptor=new Disruptor<InParkingDataEvent>(new EventFactory<InParkingDataEvent>() {
            @Override  
            public InParkingDataEvent newInstance() {
                return new InParkingDataEvent();
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
          
        //使用disruptor創建消費者組C1,C2  
        EventHandlerGroup<InParkingDataEvent> handlerGroup=disruptor
                .handleEventsWith(new ParkingDataToKafkaHandler(),new ParkingDataInDbHandler());
          
        ParkingDataSmsHandler smsHandler=new ParkingDataSmsHandler();
        //聲明在C1,C2完事之後執行JMS消息發送操作 也就是流程走到C3  
        handlerGroup.then(smsHandler);

        disruptor.start();//啟動  
        CountDownLatch latch=new CountDownLatch(1);  
        //生產者準備  
        executor.submit(new InParkingDataEventPublisher(latch, disruptor));
        latch.await();//等待生產者結束
        disruptor.shutdown();  
        executor.shutdown();  
          
        System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime));  
    }  
}

 

 

5.   相關資料

 Disruptor源碼地址: https://github.com/LMAX-Exchange/disruptor

轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 10:25:18

  上一篇:go  Storm-源碼分析-Topology Submit-Task
  下一篇:go  《雲周刊》第122期:勒索病毒防護全攻略,不再讓服務器裸奔