閱讀219 返回首頁    go 京東網上商城


Apache Storm 官方文檔 —— 多語言接口協議

本文描述了 Storm (0.7.1 版本以上)的多語言接口協議。

Storm 多語言協議

Shell 組件

Storm 的多語言支持主要通過 ShellBolt,ShellSpout 和 ShellProcess 類來實現。這些類實現了 IBolt 接口、ISpout 接口,並通過使用 Java 的 ProcessBuilder 類調用 shell 進程實現了執行腳本的接口協議。

輸出域

輸出域是拓撲的 Thrift 定義的一部分。也就是說,如果你在 Java 中使用了多語言接口,那麼你就需要創建一個繼承自 ShellBolt 並實現 IRichBolt 接口的 bolt,這個 bolt 還需要在 declareOutputFields 方法中聲明輸出域(ShellSpout 也有類似的問題)。

你可以在基礎概念一文中了解更多相關信息。

協議報頭

最簡單的協議是通過執行腳本或程序的標準輸入輸出(STDIN/STDOUT)來實現的。在這個過程中傳輸的數據都是以 JSON 格式編碼的,這樣可以支持很多種語言。

打包

為了在集群上運行殼組件,執行的外殼腳本必須和待提交的 jar 包一起置於 resources/ 目錄下。

但是,在本地開發測試時,resources 目錄隻需要保持在 classpath 中即可。

協議

注意:

  • 輸入輸出協議的結尾都使用行讀機製,所以,必須要修剪掉輸入中的新行並將他們添加到輸出中。
  • 所有的 JSON 輸入輸出都由一個包含 “end” 的行結束標誌。注意,這個定界符並不是 JSON 的一部分。
  • 下麵的幾個標題就是從腳本作者的 STDIN 與 STDOUT 的角度出發的。

初始握手

兩種類型殼組件的初始握手過程都是相同的:

  • STDIN: 設置信息。這是一個包含 Storm 配置、PID 目錄、拓撲上下文的 JSON 對象:
{
    "conf": {
        "topology.message.timeout.secs": 3,
        // etc
    },
    "pidDir": "...",
    "context": {
        "task->component": {
            "1": "example-spout",
            "2": "__acker",
            "3": "example-bolt1",
            "4": "example-bolt2"
        },
        "taskid": 3,
        // 以下內容僅支持 Storm 0.10.0 以上版本
        "componentid": "example-bolt"
        "stream->target->grouping": {
            "default": {
                "example-bolt2": {
                    "type": "SHUFFLE"}}},
        "streams": ["default"],
        "stream->outputfields": {"default": ["word"]},
        "source->stream->grouping": {
            "example-spout": {
                "default": {
                    "type": "FIELDS",
                    "fields": ["word"]
                }
            }
        }
        "source->stream->fields": {
            "example-spout": {
                "default": ["word"]
            }
        }
    }
}

你的腳本應該在這個目錄下創建一個以 PID 命名的空文件。比如,PID 是 1234 的時候,在目錄中創建一個名為 1234 的空文件。這個文件可以讓 supervisor 了解到進程的 PID,這樣,supervisor 在需要的時候就可以關閉該進程。

Storm 0.10.0 加強了發送到殼組件的上下文的功能,現在的上下文中包含了兼容 JVM 組件的拓撲上下文中的所有內容。新增的一個關鍵因素是確定拓撲中某個殼組件的源與目標(也就是輸入與輸出)的功能,這是通過 stream->target->grouping 和 source->stream->grouping字典實現的。在這些關聯字典的底層,分組是以字典的形式表示的,至少包含有一個 type 鍵,並且也可以含有一個 fields 鍵,該鍵可以用於指定在 FIELDS 分組中所涉及的域。

  • STDOUT: 你的 PID,以 JSON 對象的形式展現,比如 {"pid": 1234}。這個殼組件將會把 PID 記錄到它自己的日誌中。

接下來怎麼做就要取決於組件的具體類型了。

Spouts

Shell Spouts 都是同步的。以下內容是在一個 while(true) 循環中實現的:

  • STDIN: 一個 next、ack 或者 fail 命令。

“next” 與 ISpout 的 nextTuple 等價,可以這樣定義 “next”:

{"command": "next"}

可以這樣定義 “ack”:

{"command": "ack", "id": "1231231"}

可以這樣定義 “fail”:

{"command": "fail", "id": "1231231"}
  • STDOUT: 前麵的命令對你的 spout 作用產生的結果。這個結果可以是一組 emits 和 logs。

emit 大概是這樣的:

{
    "command": "emit",
    // tuple 的 id,如果是不可靠 emit 可以省略此值,該 id 可以為字符串或者數字
    "id": "1231231",
    // tuple 將要發送到的流 id,如果發送到默認流,將該值留空
    "stream": "1",
    // 如果是一個直接型 emit,需要定義 tuple 將要發送到的任務 id
    "task": 9,
    // 這個 tuple 中的所有值
    "tuple": ["field1", 2, 3]
}

如果不是直接型 emit,你會立即在 STDIN 上收到一條表示 tuple 發送到的任務的 id 的消息,這個消息是以 JSON 數組形式展現的。

“log” 會將消息記錄到 worker log 中,“log” 大概是這樣的:

{
    "command": "log",
    // 待記錄的消息
    "msg": "hello world!"
}
  • STDOUT: “sync” 命令會結束 emits 與 logs 的隊列,“sync” 是這樣使用的:
{"command": "sync"}

在 sync 之後, ShellSpout 不會繼續讀取你的輸出,直到它發送出新的 next,ack 或者 fail。

注意,與 ISpout 類似,worker 中的所有 spouts 都會在調用 next,ack 或者 fail 之後鎖定,直到你調用 sync。同樣,如果沒有需要發送的 tuple,你也應該在 sync 之前 sleep 一小段時間。ShellSpout 不會自動 sleep。

Bolts

Shell Bolts 的協議是異步的。你會在有 tuple 可用時立即從 STDIN 中獲取到 tuple,同時你需要像下麵的示例這樣調用 emit,ack,fail,log 等操作寫入 STDOUT:

  • STDIN: 就是一個 tuple!這是一個 JSON 編碼的結構:
{
    // tuple 的 id,為了兼容缺少 64 位數據類型的語言,這裏使用了字符串
    "id": "-6955786537413359385",
    // 創建該 tuple 的 id
    "comp": "1",
    // tuple 將要發往的流 id
    "stream": "1",
    // 創建該 tuple 的任務
    "task": 9,
    // tuple 中的所有值
    "tuple": ["snow white and the seven dwarfs", "field2", 3]
}
  • STDOUT: 一個 ack,fail,emit 或者 log。例如,emit 是這樣的:
{
    "command": "emit",
    // 標記這個輸出 tuple 的 tuples 的 ids
    "anchors": ["1231231", "-234234234"],
    // tuple 將要發送到的流 id,如果發送到默認流,將該值留空
    "stream": "1",
    // 如果是一個直接型 emit,需要定義 tuple 將要發送到的任務 id
    "task": 9,
    // 這個 tuple 中的所有值
    "tuple": ["field1", 2, 3]
}

如果不是直接型 emit,你會立即在 STDIN 上收到一條表示 tuple 發送到的任務的 id 的消息,這個消息是以 JSON 數組形式展現的。注意,由於 shell bolt 協議的異步特性,如果你在 emit 之後立即接收數據,有可能不會收到對應的任務 id,而是收到上一個 emit 的任務 id,或者是一個待處理的新 tuple。然而,最終接收到的任務 id 序列仍然是和 emit 的順序完全一致的。

ack 是這樣的:

{
    "command": "ack",
    // 待 ack 的 tuple
    "id": "123123"
}

fail 是這樣的:

{
    "command": "fail",
    // 待 fail 的 tuple
    "id": "123123"
}

“log” 會將消息記錄到 worker log 中,“log” 是這樣的:

{
    "command": "log",
    // 待記錄的消息
    "msg": "hello world!"
}
  • 注意:對於 0.7.1 版本,shell bolt 不再需要進行“同步”。

處理心跳(0.9.3 及以上版本適用)

Storm 0.9.3 通過在 ShellSpout/ShellBolt 與他們的多語言子進程之間使用心跳來檢測子進程是否處於掛起或僵死狀態。所有通過多語言接口與 Storm 交互的庫都必須使用以下步驟來……

Spout

Shell Spouts 是同步的,所有子進程會在 next() 的結尾發送 sync 命令。因此,你不需要為 spouts 做過多的處理。也就是說,在 next()過程中不能夠讓子進程的 sleep 時間超過 worker 的延時時間。

Bolt

Shell Bolts 是異步的,所以 ShellBolt 會定期向它的子進程發送心跳 tuple。心跳 tuple 是這樣的:

{
    "id": "-6955786537413359385",
    "comp": "1",
    "stream": "__heartbeat",
    // 這個 shell bolt 的係統任務 id
    "task": -1,
    "tuple": []
}

在子進程收到心跳 tuple 之後,它必須向 ShellBolt 發送一個 sync 命令。

最後更新:2017-05-22 13:32:16

  上一篇:go  Apache Storm 官方文檔 —— 定義 Storm 的非 JVM 語言 DSL
  下一篇:go  圖解 & 深入淺出 JavaWeb:Servlet 再說幾句