閱讀407 返回首頁    go 小米MIX


StreamingPro 簡化流式計算配置

前言
前些天可以讓批處理的配置變得更優雅StreamingPro 支持多輸入,多輸出配置,現在流式計算也支持相同的配置方式了。

另外未來等另外一個項目穩定,會釋放出來配合StreamingPro使用,它可以讓你很方便的讀寫HBase,比如可以為HBase 表 添加mapping,類似ES的做法,也可以不用mapping,係統會自動為你創建列(familly:column作為列名),或者將所有列合並成一個字段讓你做處理。


配置

首先需要配置源:

{
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"

          }
        ]
      }

我們配置了一個Kafka流,一個普通的CSV文件。目前StreamingPro隻允許配置一個Kafka流,但是支持多個topic,按逗號分隔即可。你可以配置多個其他非流式源,比如從MySQL,Parquet,CSV同時讀取數據並且映射成表。

之後你就可以寫SQL進行處理了。

{
        "name": "stream.sql",
        "params": [
          {
            "sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
            "outputTableName": "finalOutputTable"
          }
        ]
      },

我這裏做了簡單的join。

{
        "name": "stream.outputs",
        "params": [
          {
            "format": "jdbc",
            "path": "-",
            "driver":"com.mysql.jdbc.Driver",
            "url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
            "inputTableName": "finalOutputTable",
            "user":"~",
            "password":"~",
            "dbtable":"aaa",
            "mode":"Append"
          }
        ]
      }
然後把數據追加到Mysql裏去。其實你也可以配置多個輸出。


完整配置

{
  "example": {
    "desc": "測試",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "stream.sources.kafka",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "topics":"test",
            "zk":"127.0.0.1",
            "groupId":"kk3",
            "outputTable": "abc"

          }
        ]
      },
      {
        "name": "stream.sql",
        "params": [
          {
            "sql": "select abc.content,'abc' as dd from abc left join test on test.content = abc.content",
            "outputTableName": "finalOutputTable"
          }
        ]
      },
      {
        "name": "stream.outputs",
        "params": [
          {
            "format": "jdbc",
            "path": "-",
            "driver":"com.mysql.jdbc.Driver",
            "url":"jdbc:mysql://127.0.0.1/~?characterEncoding=utf8",
            "inputTableName": "finalOutputTable",
            "user":"~",
            "password":"~",
            "dbtable":"aaa",
            "mode":"Append"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}
你可以在StreamingPro-0.4.11 下載到包,然後用命令啟動:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.11-SNAPSHOT-online-1.6.1-jar-with-dependencies.jar    \
-streaming.name test    \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json

最後更新:2017-04-01 17:13:51

  上一篇:go 潘金蓮改變了曆史嗎 - PostgreSQL輿情事件分析應用
  下一篇:go StreamingPro 支持Spark Structured Streaming