閱讀443 返回首頁    go 技術社區[雲棲]


StreamingPro 支持多輸入,多輸出配置

前言
最近正好有個需求,就是從不同的數據庫以及表裏拉出數據,經過一定的處理放到ES裏供查詢,最好還能放個到parquet裏,這樣可以支持更複雜的SQL。之前StreamingPro是隻能配置一個數據源的,所以做了些改造,方便配置多個數據源,以及多個寫出。

最新的下載地址: https://pan.baidu.com/s/1eRO5Wga 依然的,比較大,因為現在他還能支持Thrift JDBC /Rest SQL: 使用StreamingPro 快速構建Spark SQL on CarbonData


輸入配置

{
        "name": "batch.sources",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },
以前用的是 batch.source, 如果你有多個輸入源,則需要使用batch.sources 組件。每個源需要配置一個outputTable,也就是說這個源取個名字,方便後麵使用。

如果是數據庫,則可以這麼寫:
{
        "name": "batch.sources",
        "params": [
          {
             url:"jdbc:mysql://localhost/test?user=fred&password=secret",
            "dbtable":"table1",
            "driver":"com.mysql...",
            "path": "-",
            "format": "jdbc",
            "outputTable": "test",

          },
          {
            "path": "-",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },


輸出

{
        "name": "batch.outputs",
        "params": [
          {
            "format": "json",
            "path": "file:///tmp/kk2",
            "inputTableName": "finalOutputTable"
          },
          {
            "format": "parquet",
            "path": "file:///tmp/kk3",
            "inputTableName": "finalOutputTable"
          }
        ]
      }
我這裏同時輸出為json以及parquet格式。


一個簡單但是涉及點比較多的例子

{
  "convert-multi-csv-to-json": {
    "desc": "測試",
    "strategy": "spark",
    "algorithm": [],
    "ref": [],
    "compositor": [
      {
        "name": "batch.sources",
        "params": [
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test",
            "header": "true"
          },
          {
            "path": "file:///tmp/sample.csv",
            "format": "com.databricks.spark.csv",
            "outputTable": "test2",
            "header": "true"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select city as tp  from test limit 100",
            "outputTableName": "sqlTable"
          }
        ]
      },
      {
        "name": "batch.script",
        "params": [
          {
            "inputTableName": "sqlTable",
            "outputTableName": "scriptTable",
            "useDocMap": true
          },
          {
            "-": "val count = doc(\"tp\").toString.length;Map(\"count\"->count)"
          }
        ]
      },
      {
        "name": "batch.sql",
        "params": [
          {
            "sql": "select scriptTable.tp,scriptTable.count,test2.city,test2.name  from scriptTable,test2 limit 100",
            "outputTableName": "finalOutputTable"
          }
        ]
      },
      {
        "name": "batch.outputs",
        "params": [
          {
            "format": "json",
            "path": "file:///tmp/kk2",
            "inputTableName": "finalOutputTable"
          },
          {
            "format": "parquet",
            "path": "file:///tmp/kk3",
            "inputTableName": "finalOutputTable"
          }
        ]
      }
    ],
    "configParams": {
    }
  }
}
在 batch.sql 裏你可以引用任何一個源的表,或者之前已經在batch.sql裏申明的outputTable, 同理batch.script。 而在batch.outputs裏,你則可以將任何一張表寫入到MySQL,ES,HDFS等文件存儲係統中。

將配置文件保存一下,然後就可以啟動了:
SHome=/Users/allwefantasy/streamingpro
./bin/spark-submit   --class streaming.core.StreamingApp \
--master local[2] \
--name test \
$SHome/streamingpro-0.4.8-SNAPSHOT-online-1.6.1.jar    \
-streaming.name test    \
-streaming.platform spark \
-streaming.job.file.path file://$SHome/batch.json

最後更新:2017-04-01 17:13:52

  上一篇:go StreamingPro 再次支持 Structured Streaming
  下一篇:go 讓CarbonData使用更簡單