閱讀590 返回首頁    go 阿裏雲 go 技術社區[雲棲]


StreamingPro 再次支持 Structured Streaming

前言

之前已經寫過一篇文章,StreamingPro 支持Spark Structured Streaming,不過當時隻是玩票性質的,因為對Spark 2.0+ 版本其實也隻是嚐試性質的,重點還是放在了spark 1.6 係列的。不過時間在推移,Spark 2.0+ 版本還是大勢所趨。所以這一版對底層做了很大的重構,StreamingPro目前支持Flink,Spark 1.6+, Spark 2.0+ 三個引擎了。


準備工作
下載streamingpro for spark 2.0的包,然後下載spark 2.1 的安裝包。

你也可以在 streamingpro目錄 找到spark 1.6+ 或者 flink的版本。最新的大體會按如下格式統一格式了:

streamingpro-spark-0.4.14-SNAPSHOT.jar  適配  spark 1.6+,scala 2.10
streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar  適配  spark 2.0+,scala 2.11
streamingpro.flink-0.4.14-SNAPSHOT-online-1.2.0.jar 適配 flink 1.2.0, scala 2.10

測試例子
寫一個json文件ss.json,內容如下:

{
  "scalamaptojson": {
    "desc": "測試",
    "strategy": "spark",
    "algorithm": [],
    "ref": [
    ],
    "compositor": [
      {
        "name": "ss.sources",
        "params": [
          {
            "format": "socket",
            "outputTable": "test",
            "port":"9999",
            "host":"localhost",
            "path": "-"
          },
          {
            "format": "com.databricks.spark.csv",
            "outputTable": "sample",
            "header":"true",
            "path": "/Users/allwefantasy/streamingpro/sample.csv"
          }
        ]
      },
      {
        "name": "ss.sql",
        "params": [
          {
            "sql": "select city from test left join sample on test.value == sample.name",
            "outputTableName": "test3"
          }
        ]
      },
      {
        "name": "ss.outputs",
        "params": [
          {
            "mode": "append",
            "format": "console",
            "inputTableName": "test3",
            "path": "-"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}
大體是一個socket源,一個sample文件。socket源是流式的,sample文件則是批處理的。sample.csv內容如下:
id,name,city,age
1,david,shenzhen,31
2,eason,shenzhen,27
3,jarry,wuhan,35
然後你在終端執行 nc -lk 9999 就好了。

然後運行spark程序:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar    \
-streaming.name test    \
-streaming.platform spark_structrued_streaming \
-streaming.job.file.path file://$SHome/ss.json
在nc 那個終端輸入比如eason ,然後回車,馬上就可以看到spark終端接受到了數據。

最後更新:2017-04-01 17:13:52

  上一篇:go MongoDB journal 與 oplog,究竟誰先寫入?
  下一篇:go StreamingPro 支持多輸入,多輸出配置