402
技術社區[雲棲]
Apache Storm 官方文檔 —— Trident Spouts
與一般的 Storm API 一樣,spout 也是 Trident 拓撲的數據來源。不過,為了實現更複雜的功能服務,Trident Spout 在普通的 Storm Spout 之上另外提供了一些 API 接口。
數據源、數據流以及基於數據流更新 state(比如數據庫)的操作,他們之間的耦合關係是不可避免的。Trident State 一文中有這方麵的詳細解釋,理解他們之間的這種聯係對於理解 spout 的運作方式非常重要。
Trident 拓撲中的大部分 spout 都是非事務型 spout。在 Trident 拓撲中可以使用普通的 IRichSpout
接口來創建數據流:
TridentTopology topology = new TridentTopology();
topology.newStream("myspoutid", new MyRichSpout());
Trident 拓撲中的所有 spout 都必須有一個唯一的標識,而且這個標識必須在整個 Storm 集群中都是唯一的。Trident 需要使用這個標識來存儲 spout 從 ZooKeeper 中消費的元數據(metadata),包括 txid 以及其他相關的 spout 元數據。
你可以使用以下配置項來設置用於存儲 spout 元數據的 ZooKeeper 地址(一般情況下不需要設置以下選項,因為 Storm 默認會直接使用集群的 ZooKeeper 服務器來存儲數據 —— 譯者注):
-
transactional.zookeeper.servers
:ZooKeeper 的服務器列表 -
transactional.zookeeper.port
:ZooKeeper 集群的端口 -
transactional.zookeeper.root
:元數據在 ZooKeeper 中存儲的根目錄。元數據會直接存儲在該設置目錄下。
管道
默認情況下,Trident 每次處理隻一個 batch,知道該 batch 處理成功或者失敗之後才會開始處理其他的 batch。你可以通過將 batch 管道化來提高吞吐率,降低每個 batch 的處理延時。同時處理的 batch 的最大數量可以通過topology.max.spout.pending
來進行配置。
不過,即使在同時處理多個 batch 的情況下,Trident 也會按照 batch 的順序來更新 state。例如,假如你正在處理一個將全局計數結果整合並更新到數據庫中的任務,那麼在你向數據庫中更新 batch1 的計數結果時,你同時可以繼續處理 batch2、batch3 甚至 batch10 的計數工作。不過,Trident 隻會在 batch1 的 state 更新結束之後才會處理後續 batch 的 state 更新操作。這是實現恰好一次處理的語義的必要基礎,我們已經在 Trident State 一文中討論了這一點。
Trident spout 類型
下麵列出了一些可用的 spout API 接口:
- ITridentSpout:這是最常用的 API,支持事務型和模煳事務型的語義實現。不過一般會根據需要使用它的某個已有的實現,而不是直接實現該接口。
- IBatchSpout:非事務型 spout,每次會輸出一個 batch 的 tuple。
- IPartitionedTridentSpout:可以從分布式數據源(比如一個集群或者 Kafka 服務器)讀取數據的事務型 spout。
- OpaquePartitionedTridentSpout:可以從分布式數據源讀取數據的模煳事務型 spout。
當然,正如這篇教程的開頭提到的,除了這些 API 之外,你還可以使用普通的 IRichSpout
。
最後更新:2017-05-22 14:33:01