751
王者榮耀
StreamingPro 支持Spark Structured Streaming
前言Structured Streaming 的文章參考這裏: Spark 2.0 Structured Streaming 分析。2.0的時候隻是把架子搭建起來了,當時也隻支持FileSource(監控目錄增量文件),到2.0.2後支持Kafka了,也就進入實用階段了,目前隻支持0.10的Kafka。Structured Streaming 采用dataframe API,並且對流式計算重新進行了抽象,個人認為Spark streaming 更靈活,Structured Streaming 在某些場景則更方便,但是在StreamingPro中他們之間則沒太大區別,唯一能夠體現出來的是,Structured Streaming 使得checkpoint真的進入實用階段。
假設我們都放在/tmp目錄下
新建一個文件,/tmp/ss-test.json,內容如下:
{
"scalamaptojson": {
"desc": "測試",
"strategy": "spark",
"algorithm": [],
"ref": [
],
"compositor": [
{
"name": "ss.source.mock",
"params": [{"duration1":["1","2","3"]}]
},
{
"name": "ss.table",
"params": [{"tableName": "test"}]
},
{
"name": "ss.sql",
"params": [
{
"sql": "select value + 100 from test",
"outputTableName": "test2"
}
]
},
{
"name": "ss.output",
"params": [
{
"mode": "append",
"format": "console"
}
]
}
],
"configParams": {
}
}
}
StreamingPro 現在支持短名稱了,不用寫那麼冗長的package名。
- ss 開頭指的是structrued streaming。
- batch 則是spark 批處理
- stream 則是 spark streaming
邏輯:
- 配置模擬數據
- 映射為表
- 使用SQL查詢
- 輸出(console)
如果是接的kafka,則配置如下即可:
{
"name": "ss.source",
"params": [{
"format":"kafka"
"kaka.bootstrap.servers":"host1:port1,host2:port2",
"subscribe":"topic1,topic2"
}]
}
運行
./bin/spark-submit --class streaming.core.StreamingApp \
--master local[2] \
--name test \
/tmp/streamingpro-0.4.7-SNAPSHOT-online-2.0.2.jar \
-streaming.name test \
-streaming.platform ss \
-streaming.checkpoint file:///tmp/ss \
-streaming.job.file.path file:///tmp/ss-test.json
最後更新:2017-04-01 17:13:51