StreamingPro添加Scala script 模塊支持
SQL 在解析字符串方麵,能力還是有限,因為支持的算子譬如substring,split等有限,且不具備複雜的流程表達能力。我們內部有個通過JSON描述的DSL引擎方便配置化解析,然而也有一定的學習時間成本。我們當然可以通過SQL的 UDF函數等來完成字符串解析,在streamingpro中也很簡單,隻要注冊下你的UDF函數庫即可:
"udf_register": {
"desc": "測試",
"strategy": "....SparkStreamingRefStrategy",
"algorithm": [],
"ref": [],
"compositor": [
{
"name": "...SQLUDFCompositor",
"params": [
{
"analysis": "streaming.core.compositor.spark.udf.func.MLFunctions"
}
]
}
]
}
這樣你就可以在SQL中使用MLfunctions裏麵所有的udf函數了。然而為此專門提供一個jar包也是略顯麻煩。
這個時候如果能直接寫腳本解析就好了,最好是能支持各種腳本,比如groovy,javascript,python,scala,java等。任何一個會編程的人都可以實現一個比較複雜的解析邏輯。
核心是ScriptCompositor模塊:
{
"name": "...ScriptCompositor",
"params": [
{
"inputTableName": "test",
"outputTableName": "test3"
},
{
"raw": [
"val Array(a,b)=rawLine.split(\"\t\");",
"Map(\"a\"->a,\"b\"->b)"
]
}
]
}
如果我想在代碼裏直接處理所有的列,則如下:{
"name": "streaming.core.compositor.spark.transformation.ScriptCompositor",
"params": [
{
"inputTableName": "test2",
"outputTableName": "test3",
"useDocMap": true
},
{
"anykey": "val Array(a,b)=doc(\"raw\").toString.split(\"\t\");Map(\"a\"->a,\"b\"->b)"
}
]
}
通過添加useDocMap為true,則你在代碼裏可以通過doc(doc是個Map[String,Any]) 來獲取你想要的任何字段,然後形成一個新的Map。如果你隻要新生成Map裏的字段,忽略掉舊的,則設置ignoreOldColumns=true 即可。
你可以把代碼放到一個文件裏,如下:
{
"name": "....ScriptCompositor",
"params": [
{
"inputTableName": "test",
"outputTableName": "test3"
},
{
"raw": "file:///tmp/raw_process.scala"
}
]
}
通過inputTableName指定輸入的表,outputTableName作為輸出結果表。 raw代表inputTableName中你需要解析的字段,然後通過你的scala腳本進行解析。在腳本中 rawLine 是固定的,對應raw字段(其他字段也是一樣)的值。腳本隻有一個要求,最後的返回結果暫時需要是個Map[String,Any]。這裏,你隻是提供了一個map作為返回值,作為一行,然後以outputTableName指定的名字輸出,作為下一條SQL的輸入,所以StreamingPro需要推測出你的Schema。 數據量大到一定程度,推測Schema的效率就得不到保證,這個時候,你可以通過配置schema來提升性能:
{
"name": "....ScriptCompositor",
"params": [
{
"inputTableName": "test",
"outputTableName": "test3",
"schema": "file:///tmp/schema.scala",
"useDocMap": true
},
{
"raw": "file:///tmp/raw_process.scala"
}
]
}
schema.scala的內容大致如下:Some(
StructType(
Array(
StructField("a", StringType, true),
StructField("b", StringType, true)))
)
後續roadmap是:- 支持外部腳本,比如放在hdfs或者http服務器上。
- 支持java 腳本
- 支持javascript腳本
- 支持 python 腳本
- 支持 ruby腳本
- 支持 groovy 腳本
舉個案例,從HDFS讀取一個文件,並且映射為隻有一個raw字段的表,接著通過ScriptCompositor配置的scala代碼解析raw字段,展開成a,b兩個字段,然後繼續用SQL繼續處理,最後輸出。
{
"convert_data_parquet": {
"desc": "測試",
"strategy": "...SparkStreamingStrategy",
"algorithm": [],
"ref": [],
"compositor": [
{
"name": "...SQLSourceCompositor",
"params": [
{
"path": "file:///tmp/hdfsfile",
"format": "org.apache.spark.sql.execution.datasources.hdfs",
"fieldName": "raw"
}
]
},
{
"name": "...JSONTableCompositor",
"params": [
{
"tableName": "test"
}
]
},
{
"name": "...ScriptCompositor",
"params": [
{
"inputTableName": "test",
"outputTableName": "test3"
},
{
"raw": [
"val Array(a,b)=rawLine.split(\"\t\");",
"Map(\"a\"->a,\"b\"->b)"
]
}
]
},
{
"name": "...transformation.SQLCompositor",
"params": [
{
"sql": "select a,b from test3 "
}
]
},
{
"name": "...streaming.core.compositor.spark.output.SQLUnitTestCompositor",
"params": [
{
}
]
}
],
"configParams": {
}
}
}
體驗地址: https://github.com/allwefantasy/streamingpro/blob/master/README.md最後更新:2017-04-01 17:13:51
上一篇:
運維之殤
下一篇:
用更少的錢看更清晰的視頻——詳談阿裏雲窄帶高清
Ceph實驗室:第八課:查看Ceph CRUSH map
對org.springframework.beans.CachedIntrospectionResults的再次解讀
mac下麵的secureCRT默認保存不上密碼
未能正確加載“VSTS for Database Professionals Sql Server Data-tier Application”包。
Intent對象中內置的Flag總結
吳京、趙薇都被吸引來了~~1億人次參與的奇跡,就差你了!
android繼承View實現畫圓
POJ-1088-滑雪
玩轉Android 之 絢麗的自定義Gallery
《Log4j2官方文檔》從Log4j 1.x遷移