顛覆大數據分析之Storm簡介
之前我們已經極為簡單的介紹了Storm。現在我們要對它做一個更詳細的了解。Storm是一個複雜事件處理引擎(CEP),最初由Twitter實現。在實時計算與分析領域,Storm正在得到日益廣泛的應用。Storm可以輔助基本的流式處理,例如聚合數據流,以及基於數據流的機器學習(譯者注:原文是ML,根據上下文判斷,此處應是指機器學習,下文相同不再綴述)。通常情況,數據分析(譯者注:原文為prestorage analytics,意義應是保存分析結果之前的分析計算)在Storm之上進行,然後把結果保存在NOSQL或關係數據庫管理係統(RDBMSs)。以氣象頻道為例,使用Storm以並行方式處理大數據集(譯者注:原文用到munging,意義應是洗數據)並為離線計算持久化它們。
下麵是一些公司使用Storm的有趣方式:
- Storm用於持續計算,p並把處理過的數據傳輸給一個可視化引擎。Data Salt,一個先行者,使用Storm處理大容量數據源。Twitter采用相同的方式,將Storm作為它的發布者分析產品的基礎。
- Groupon也采用Storm實現了低延遲、高吞吐量的數據處理。
- Yahoo采用Storm作為CEP每天處理數以億計的事件。他們還把Storm整合進了0和Hadoop YARN,以便Storm能夠彈性的使用集群資源,以及更易於使用HBase和Hadoop生態係統中的其它組件。
- Infochimps采用Storm-Kafka加強他們的數據交付雲服務。
- Storm還被Cerner公司用於醫療領域,用來處理增量更新,並低延遲的把它們保存在HBase,有效的運用Storm作為流式處理引擎和Hadoop作為批處理引擎。
- Impetus將Storm與Kafka結合,運行機器學習算法,探索製造業的故障模式。他們的客戶是一家大型的電子一站式服務商。他們運行分類算法,依據日誌實時探測故障,識別故障根源。這是一個更一般的用例:日誌實時分析。
- Impetus還利用Storm在一個分布式係統中構建實時索引。這個係統非常強大,因為它搜索過程幾乎是瞬時的。
數據流
Storm的一個基本概念是數據流,它可以被定義為無級的無界序列。Storm隻提供多種去中心化且容錯的數據流轉換方式。流的模式可以指定它的數據類型為以下幾種之一:整型、布爾型、字符串、短整型、長整型、字節、字節數組等等。類OutputFieldsDeclarer用來指定流的模式。還可以使用用戶自定義類型,這種情況下,用戶可能需要提供自定義序列化程序。一旦聲明了一個數據流,它就有一個ID,並有一個默認類型的默認值。
拓撲
在Storm內部,數據流的處理由Storm拓撲完成。拓撲包含一個spout,數據源;bolt,負責處理來自spout和其它bolt的數據。目前已經有各種spout,包括從Kafka讀取數據的spout(LinkedIn貢獻的分布式發布-訂閱係統),Twitter API的spout,Kestrel隊列的spout,甚至還有從像Oracle這樣的關係數據庫讀取數據的spout。spout可以是可靠的,一旦數據處理失敗,它會重新發送數據流。不可靠的spout不跟蹤流的狀態,不會在失敗時重新發送數據。Spout的一個重要方法是nextTuple——它返回下一條待處理的元組。還有兩個分別是ack和fail,分別在流被處理成功或處理不成功時調用。Storm的每個spout必須實現IRichSpout接口。Spout可能會分發多個數據流作為輸出。
拓撲中的另一個重要的實體是bolt。bolt執行數據流轉換,包括比如計算、過濾、聚合、連接。一個拓撲可以有多個bolt,用來完成複雜的轉換和聚合。在聲明一個bolt的輸入流時,必須訂閱其它組件(要麼是spout要麼是其它bolt)的特定數據流。通過InputDeclarer類和基於數據流組的適當方法完成訂閱,這個方法針對數據流組做了簡短說明。
execute方法是bolt的一個重要方法,通過調用它處理數據。它從參數接收一個新的數據流,通過OutputCollector分發新的元組。這個方法是線程安全的,這意味著bolt可以是多線程的。bolt必須實現IBasicBolt接口,這個接口提供了ack方法的聲明,用來發送確認通知。
Storm集群
一個Storm集群由主節點和從節點構成。主節點通常運行著Nimbus守護進程。Storm已經實現了在Hadoop YARN之上運行——它可以請求YARN的資源管理器額外啟動一個應用主節點的守護進程。Nimbus守護進程負責在集群中傳輸代碼,分派任務,監控集群健康狀態。在YARN之上實現的Storm可以與YARN的資源管理器配合完成監控及分派任務的工作。
每個從節點運行一個叫做supervisor的守護進程。這是一個工人進程,負責執行拓撲的一部分工作。一個典型的拓撲由運行在多個集群節點中的進程組成。supervisor接受主節點分派的任務後啟動工人進程處理。
主從節點之間的協調通訊由ZooKeeper集群完成。(ZooKeeper是一個apache的分布式協作項目,被廣泛應用於諸如Storm,Hadoop YARN,以及Kafka等多個項目中。)集群狀態由ZooKeeper集群維護,確保集群可恢複性,故障發生時可選舉出新的主節點,並繼續執行拓撲。
拓撲本身是由spouts、bolts,以及它們連接在一起的方式構成的圖結構。它與Map-Reduce任務的主要區別在於,MR任務是短命的,而Storm拓撲一直運行。Storm提供了殺死與重啟拓撲的方法。
簡單的實時計算例子
一個Kafka spout就是下麵展示的樣子:
Kafka Spout的open()方法:
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){ _collector = collector; _rand = new Random(); }
Kafka Spout的nextTuple()方法:
public void nextTuple() { KafkaConsumer consumer = new KafkaConsumer(kafkaServerURL, kafkaTopic); ArrayList<String> input_data = consumer.getKafkaStreamData(); while(true) { for(String inputTuple : input_data){ _collector.emit(new Values(inputTuple)); } } }
KafkaConsumer類來自開源項目storm-kafka:https:// github.com/nathanmarz/ storm-contrib/tree/master/storm-kafka。
public void prepare(Map stormConf, TopologyContext context){ //創建輸出日誌文件,記錄輸出結果日誌 try{ String logFileName = logFileLocation; //"file"與"outputFile"已作為類屬性定義 file = new FileWriter(logFileName); outputFile = new PrintWriter(file); outputFile.println("In the prepare() method of bolt"); } catch (IOException e){ System.out.println("an exception has occured"); e.printStackTrace(); } } public void execute(Tuple input, BasicOutputCollector collector){ //從元組取得要處理的字符串 String inputMsg=input.getString(0); inputMsg=inputMsg = "I am a bolt"; outputFile.println("接收的消息:" + inputMsg); outputFile.flush(); collector.emit(tuple(inputMsg)); }
前麵創建的spout與這個bolt連接,這個bolt向數據流的字符串域添加一條消息:我是一個bolt。前文顯示的就是這個bolt的代碼。接下來的代碼是構建拓撲的最後一步。它顯示了spout和bolts連接在一起構成拓撲,並運行在集群中。
public static void main(String[] args){ int numberOfWorkers = 2; //拜年中的工人進程數量 int numberOfExecutorsSpout = 1; //spout 執行者數量 int numberOfExecutorsBolt = 1; //bolt執行者數量 String numbersHost = "192.168.0.0"; // Storm集群中運行Nimbus的節點IP TopologyBuilder builder = new TopologyBuilder(); Config conf = new Config(); builder.setSpout("spout", new TestSpout(false), numberOfExecutorsSpout); //set the spout for the topology builder.setBolt("bolt",new TestBolt(), numberOfExecutorsBolt).shuffleGrouping("spout"); //set the bolt for the topology //啟動遠程 Storm集群的配置 conf.setNumWorkers(numberOfWorkers); conf.put(Config.NIMBUS_HOST,nimbusHost); conf.put(Config.NIMBUS_THRIFT_PORT, 6627L); //遠程Storm集群配置 try{ StormSubmitter.submitTopology("testing_topology", conf, builder.createTopology()); } catch (AlreadyAliveException e){ System.out.println("Topology with the Same name is already running on the cluster."); e.printStackTrace(); } catch (InvalidTopologyException e) { System.out.println("Topology seems to be invalid."); e.printStackTrace(); } }
數據流組
spout和bolt都可能並行執行多個任務。必須有一種方法指定哪個數據流路由到哪個spout/bolt。數據流組用來指定一個拓撲內必須遵守的路由過程。下麵是Storm內建數據流組:
- 隨機數據流組:隨機分發數據流,不過它確保所有任務都可得到相同數量的數據流。
- 域數據流組:基於元組中域的數據流組。比如,有一個machine_id域,擁有相同machine_id域的元組由相同的任務處理。
- 全部數據流組:它向所有任務分發元組——它可能導致處理衝突。
- 直接數據流組:一種特殊的數據流組,實現動態路由。元組生產者決定哪個消費者應該接收這個元組。可能是基於運行時的任務ID。bolt可以通過TopologyContext類得到消費者的任務ID,或OutputCollector的emit方法也可使用直接直接數據流組。
- 本地數據流組:如果目標bolt在相同進程中有一個以上的任務,元組將被隨機分配(就像隨機數據流組),但是隻分配相同進程中的那些任務。
- 全局數據流組:所有元組到達擁有最小ID的bolt。
- 不分組:目前與隨機數據流組一樣。
Storm的消息處理擔保
從spout生成的元組能夠觸發進一步的元組分發,基於拓撲和所應用的轉換。這意味著可能是整個消息樹。Storm擔保每個元組被完整的處理了——樹上的每個節點已被處理過了。這一擔保不能沒有程序員的支持。每當消息樹中創建了一個新的節點或者一個節點被處理了,程序員都必須向Storm指明。第一點通過錨定實現,也就是將處理完成的元組作為OutputCollector的emit方法的第一個參數。這就保證了消息被錨定到了合適的元組。消息也可以錨定到多個元組,這樣就構成了一個消息的非循環有向圖(DAG),而不隻是一棵樹。即使在消息的循環有向圖存在的情況下,Storm也可以擔保消息處理。
在每條消息被處理後,程序員可通過調用ack或fail方法,告訴Storm這條消息已被成功處理或處理失敗。Storm會在失敗時重新發送數據流——這裏滿足至少處理一次的語義。Storm也會在發送數據流時采用超時機製——這是一個storm.yaml的配置參數(config.TOPOLOGY_MESSAGE_TIMEOUT_ SECS)。
在Storm內部,有一組“ackeer”任務持續追蹤來自每條元組消息的DAG。這些任務的數量可通過storm.yaml中的TOPOLOGY_ACKERS參數設定。在處理大量消息時,可能將不得不增大這個數字。每個消息元組得到一個64-bit ID,用於ackers追蹤。元組的DAG狀態由一個叫做ack val的64-bit值維護,隻是簡單的把樹中每個確認過(譯者注:原諒是acked)的ID執行異或運算。當ack val成為0時,acker任務就認為這棵元組樹被完全處理了。
在某些情況下,當性能至關重要,而可靠性又不是問題時,可靠性也可以關閉。在這些情況下,程序員可以指定TOPOLOGY_ACKERS為0,並在分發新元組時,不指定輸入元組的非錨定消息(譯者注:原文為unanchor messages)。這樣就跳過了確認消息,節省了帶寬,提高了吞吐量。到目前為止我們已經討論且隻討論了至少處理一次數據流的語義(譯者注:原文為at-least-once stream semantics)。
僅處理一次數據流的語義可以采用事務性拓撲實現。Storm通過為每條元組提供相關聯的事務ID為數據流處理提供事務性語義(僅一次,不完全等同於關係數據庫的ACID語義)。對於重新發送數據流來說,相同的事務ID也會被發送並擔保這個元組不會被重複處理。這方麵牽涉到對於消息處理的嚴格順序,就像是在處理一個元組。由於這樣做的低效率,Storm允許批量處理由一個事務ID關聯的元組。不像早先的情況 ,程序不得不將消息錨定到輸入元組,事務性拓撲對程序員是透明的。Storm內部將元組的處理分為兩階段——第一階段為處理階段,可以並行處理多個批次,第二階段為提交階段,強製嚴格按照批次ID提交。
事務性拓撲已經過時了——它已被整合進了一個叫做Trident的更大的框架。Trident允許對流數據進行查詢,包括聚合、連接、分組函數,還有過濾器。Trident構建於事務性拓撲之上並提供了一致的一次性語義。更多關於Trident的細節請參考
最後更新:2017-05-22 18:01:49