閱讀767 返回首頁    go 阿裏雲 go 技術社區[雲棲]


理解Storm的內部消息緩衝機製

這篇文章是Apache Kafka的作者之一Michael G. Noll寫的,他的博客地址在[這裏]

優化Storm計算拓撲性能的過程有助於我們理解Storm內部消息隊列的配置和使用,在這篇簡短的文章中,我將向大家解釋並說明Storm(0.8或0.9版本)的一個工作進程(worker process)和與其相關的多個執行器線程是如何完成內部通信的。

Storm工作進程(Worker processes)的內部消息機製

在以下各章節中,我會交替地使用消息(message)和元組(tuple)兩個關鍵字。

 

本文中當我提到“內部消息”時,它指的是發生在Storm的單個工作進程內部的消息通信,這類通信隻在Storm集群的單台主機(節點)上展開。Storm使用由LMAX Disruptor提供的很多消息隊列來完成通信,LMAX Disruptor是一個高性能的線程間消息通信庫。

 

需要注意的是單個工作進程內多個線程之間的通信不同於Storm的多個工作進程之間的通信,工作進程間的通信通常都需要跨主機通過網絡完成,對於工作進程間的通信Storm默認采用ZeroMQ作為基礎通信組件(在Storm 0.9版本中開始實驗性地支持Netty),也就是說,當一個工作進程中的任務(Task)想要發送數據到集群中另外一台主機的某個工作進程的某個任務時,Storm將使用ZeroMQ或Netty進行通信。
所以有如下結論供大家參考:
  • Storm工作進程內部的通信(同一個Storm工作節點(主機)的線程之間):LMAX Disruptor
  • 工作進程之間的通信(通過網絡的工作節點(主機)之間):ZeroMQ或者Netty
  • 計算拓撲之間的通信:Storm並不原生支持,需要自己實現和維護,可以使用消息係統,比如:Kafka,RabbitMQ和數據庫等等。

圖片說明

在下一節討論具體細節前,我們先來看看如下這幅圖(參考原圖):

storm-internal-message-queues
圖1:Storm的單個工作進程的內部消息隊列概覽,與工作進程相關的隊列都被標記為紅色,與工作進程中的多個執行器線程相關的隊列都被標記為綠色,為了看起來清晰,這裏隻展示了一個工作進程(一般在Storm集群的單個節點上運行著多個工作進程)和這個工作進程中的一個執行器中的所有線程(一個工作進程中一般也存在多個執行器)。

詳細描述

現在我們對Storm的工作進程內部的消息機製有了一定了解,接下來可以深入討論細節了。

工作進程(Worker processes)

為了管理自己的輸入消息和輸出消息,每個工作進程都有一個消息接收線程和一個消息發送線程,消息接收線程監聽工作進程的TCP端口(通過supervisor.slots.ports參數進行配置),消息接收線程還會將收到的消息批量地發送到執行器線程的輸入隊列中(從recieve緩衝區中讀取消息進行發送),topology.receiver.buffer.size參數用於指定接收線程的緩衝區大小,同樣的,消息發送線程負責從所有執行器發送線程共享的transfer緩衝區中讀取消息,並將消息發送給網絡上的其他消息消費者,transfer緩衝區的大小通過topology.transfer.buffer.size參數控製。
  • “topology.receiver.buffer.size”參數是工作進程中的接收線程批量向執行線程的輸入隊列發送數據時的緩衝區內消息數的最大值(接收線程從網絡讀取消息),此參數如果設置過大可能會造成一些問題(心跳線程掛掉然後吞吐率直線下降),默認值是8條消息,設置的值必須是2的冪(N次方),這是為了兼容LMAX Disruptor組件。
    // 示例: 通過Java API配置 
    Config conf = new Config(); 
    conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 16); // 默認值為8

    topology.receiver.buffer.size參數不是配置LMAX Disruptor隊列的大小,它是配置的一個ArrayList的長度,這個List用來作為輸入消息的緩衝,它不需要被多個線程訪問,它僅僅是工作進程所專有的,但是因為這個List的元素最終被用來填充基於Disruptor的隊列(執行器輸入對列),所以這個參數必須是2的冪,參考backtype.storm.messaging.loader.clj的launch-receive-thread!詳細信息。

  • 使用“topology.transfer.buffer.size”參數配置的transfer緩衝區中的每一個元素實際上是一個tuple的列表,多個Executor中的消息發送線程將批量地把消息從outgoing發送到多個Executor共享的transfer緩衝區(Executor中包含用戶邏輯線程和消息發送線程),transfer緩衝區的大小默認是1024個元素。
    // 示例: 通過JavaAPI配置
    conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); // 默認值為1024

執行器(Executors)

每個工作進程控製著一個或多個執行器線程,每個執行器都有一個輸入隊列 (incoming queue)和一個輸入隊列(outcoming queue),如上所述,工作進程有它自己專有的消息接收線程,該線程將(從網絡接收到的)消息發送到執行器線程的輸入消息隊列(在選擇發送到哪個執行器線程的輸入消息隊列時,應該是會存在一定的算法進行選擇)。同樣的,每個執行器都有它自己專有的發送線程,該線程負責將本執行器的輸出消息從輸出消息隊列發送到執行器所在的工作進程的transfer緩衝區,執行器的輸入隊列和輸出隊列的大小分別通過“topology.executor.receive.buffer.size”和“topology.executor.send.buffer.size”參數進行配置。
每個執行器都有一個單獨的線程用於處理Spout或Bolt中的用戶邏輯,另外還有一個發送線程將消息從執行器的消息輸出隊列批量發送到工作進程的transfer緩衝區。
  • topology.executor.receive.buffer.size,該參數是執行器的輸入消息隊列的配置參數,隊列的每個元素是一個tuple列表,tuple被批量地加入到隊列元素中,此參數的默認配置是1024個元素,修改配置時的值必須是2的冪(適配於LMAX Disruptor)。
    // 示例: 通過Java API配置
    conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); // 批量加入tuple; 默認值是1024
  • topology.executor.send.buffer.size,該參數是執行器輸入消息隊列的配置參數,這個隊列的每個元素是一個單獨的tuple,默認值為1024,修改配置時的值必須是2的冪(適配於LMAX Disruptor)。
    // 示例: 通過Java API配置
    conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384); // 單獨的tuple; 默認值是1024

更進一步

如何配置Storm的內部消息緩衝

上麵提到的各種默認參數配置都可以在conf/defaults.yaml中找到,你可以通過配置Storm集群的conf/storm.yaml文件全部覆蓋這些默認配置信息,你也可以在創建一個Topology時通過Java API中的backtype.storm.Config 類單獨地配置這些參數。

如何配置Storm的parallelism(並行)參數(實際上就是配置集群中工作進程數,各個Spout和Bolt的實例數和線程數)

正確配置Storm集群的消息緩衝與集群的工作負載模式以及Storm集群的並行配置緊密相關,關於Storm集群的並行配置可以看看這篇文章:Understanding the Parallelism of a Storm Topology

搞明白Storm的拓撲內部在做什麼

Storm UI是你觀察運行中集群的各項關鍵指標的一個好的入口,比如:它向你說明了一個Spout或Bolt所謂的“容量”,這各項運行指標可以幫助你決定是否需要修改本文提到的各項與緩衝相關的參數的配置,這些參數的變動將會影響集群的計算性能,這篇文章可以看看:Running a Multi-Node Storm Cluster
除了這些,你還可以使用像Graphite這樣的工具生成各項運行指標並監控這些指標,可以看看下麵兩篇我寫的文章:

Sending Metrics From Storm to Graphite
Installing and Running Graphite via RPM and Supervisord

ooyala放在Github上的metrics_storm項目也是值得參考的(但我還沒有用過這個工具)。

性能優化的建議

可以看看Nathan Marz(Storm作者)的演講:Tuning and Productionization of Storm.
最開始,可以試試如下參數配置,看看是否能夠提升Storm集群的性能。
1 conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8);
2 conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32);
3 conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
4 conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
轉載自 並發編程網 - ifeve.com

最後更新:2017-05-19 14:32:21

  上一篇:go  《ZooKeeper官方指南》一致性保障
  下一篇:go  職業轉換:從金融工程師到數據科學家