StreamingPro 再次支持 Structured Streaming
前言之前已經寫過一篇文章,StreamingPro 支持Spark Structured Streaming,不過當時隻是玩票性質的,因為對Spark 2.0+ 版本其實也隻是嚐試性質的,重點還是放在了spark 1.6 係列的。不過時間在推移,Spark 2.0+ 版本還是大勢所趨。所以這一版對底層做了很大的重構,StreamingPro目前支持Flink,Spark 1.6+, Spark 2.0+ 三個引擎了。
下載streamingpro for spark 2.0的包,然後下載spark 2.1 的安裝包。
你也可以在 streamingpro目錄 找到spark 1.6+ 或者 flink的版本。最新的大體會按如下格式統一格式了:
streamingpro-spark-0.4.14-SNAPSHOT.jar 適配 spark 1.6+,scala 2.10
streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar 適配 spark 2.0+,scala 2.11
streamingpro.flink-0.4.14-SNAPSHOT-online-1.2.0.jar 適配 flink 1.2.0, scala 2.10
測試例子
寫一個json文件ss.json,內容如下:
{
"scalamaptojson": {
"desc": "測試",
"strategy": "spark",
"algorithm": [],
"ref": [
],
"compositor": [
{
"name": "ss.sources",
"params": [
{
"format": "socket",
"outputTable": "test",
"port":"9999",
"host":"localhost",
"path": "-"
},
{
"format": "com.databricks.spark.csv",
"outputTable": "sample",
"header":"true",
"path": "/Users/allwefantasy/streamingpro/sample.csv"
}
]
},
{
"name": "ss.sql",
"params": [
{
"sql": "select city from test left join sample on test.value == sample.name",
"outputTableName": "test3"
}
]
},
{
"name": "ss.outputs",
"params": [
{
"mode": "append",
"format": "console",
"inputTableName": "test3",
"path": "-"
}
]
}
],
"configParams": {
}
}
}
大體是一個socket源,一個sample文件。socket源是流式的,sample文件則是批處理的。sample.csv內容如下:id,name,city,age
1,david,shenzhen,31
2,eason,shenzhen,27
3,jarry,wuhan,35
然後你在終端執行 nc -lk 9999 就好了。然後運行spark程序:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-spark-2.0-0.4.14-SNAPSHOT.jar \
-streaming.name test \
-streaming.platform spark_structrued_streaming \
-streaming.job.file.path file://$SHome/ss.json
在nc 那個終端輸入比如eason ,然後回車,馬上就可以看到spark終端接受到了數據。最後更新:2017-04-01 17:13:52