900
技術社區[雲棲]
Stream Processing for Everyone with SQL and Apache Flink
Where did we come from?
With the 0.9.0-milestone1 release, Apache Flink added an API to process relational data with SQL-like expressions called the Table API. The central concept of this API is a Table, a structured data set or stream on which relational operations can be applied. The Table API is tightly integrated with the DataSet and DataStream API. A Table can be easily created from a DataSet or DataStream and can also be converted back into a DataSet or DataStream as the following example shows
從0.9開始,引入Table API來支持關係型操作,
val execEnv = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(execEnv) // obtain a DataSet from somewhere val tempData: DataSet[(String, Long, Double)] = // convert the DataSet to a Table val tempTable: Table = tempData.toTable(tableEnv, 'location, 'time, 'tempF) // compute your result val avgTempCTable: Table = tempTable .where('location.like("room%")) .select( ('time / (3600 * 24)) as 'day, 'Location as 'room, (('tempF - 32) * 0.556) as 'tempC ) .groupBy('day, 'room) .select('day, 'room, 'tempC.avg as 'avgTempC) // convert result Table back into a DataSet and print it avgTempCTable.toDataSet[Row].print()
可以看到可以很簡單的把dataset轉換為Table,指定其元數據即可
然後對於table就可以進行各種關係型操作,
最後還可以把Table再轉換回dataset
Although the example shows Scala code, there is also an equivalent Java version of the Table API. The following picture depicts the original architecture of the Table API.
對於table的關係型操作,最終通過code generation還是會轉換為dataset的邏輯
Table API joining forces with SQL
the community was also well aware of the multitude of dedicated “SQL-on-Hadoop” solutions in the open source landscape (Apache Hive, Apache Drill,Apache Impala, Apache Tajo, just to name a few).
Given these alternatives, we figured that time would be better spent improving Flink in other ways than implementing yet another SQL-on-Hadoop solution.
What we came up with was a revised architecture for a Table API that supports SQL (and Table API) queries on streaming and static data sources.
We did not want to reinvent the wheel and decided to build the new Table API on top of Apache Calcite, a popular SQL parser and optimizer framework. Apache Calcite is used by many projects including Apache Hive, Apache Drill, Cascading, and many more. Moreover, the Calcite community put SQL on streams on their roadmap which makes it a perfect fit for Flink’s SQL interface.
雖然社區已經有很多的Sql-on-Hadoop方案,flink希望把時間花在更有價值的地方,而不是再實現一套
但是當前這樣的需要非常強烈,所以在revise Table API的基礎上實現對SQL的支持
對於SQL的支持,借助於Calcite,並且Calcite已經把SQL on streams放在roadmap上,有希望成為streaming sql的標準
Calcite is central in the new design as the following architecture sketch shows:
The new architecture features two integrated APIs to specify relational queries, the Table API and SQL.
Queries of both APIs are validated against a catalog of registered tables and converted into Calcite’s representation for logical plans.
In this representation, stream and batch queries look exactly the same.
Next, Calcite’s cost-based optimizer applies transformation rules and optimizes the logical plans.
Depending on the nature of the sources (streaming or static) we use different rule sets.
Finally, the optimized plan is translated into a regular Flink DataStream or DataSet program. This step involves again code generation to compile relational expressions into Flink functions.
這裏Table API和SQL都統一的轉換為Calcite的邏輯plans,然後再通過Calcite Optimizer進行優化,最終通過code generation轉換為Flink的函數
With this effort, we are adding SQL support for both streaming and static data to Flink.
However, we do not want to see this as a competing solution to dedicated, high-performance SQL-on-Hadoop solutions, such as Impala, Drill, and Hive.
Instead, we see the sweet spot of Flink’s SQL integration primarily in providing access to streaming analytics to a wider audience.
In addition, it will facilitate integrated applications that use Flink’s API’s as well as SQL while being executed on a single runtime engine
再次說明,支持SQL並不是為了再造一個專用的SQL-on-Hadoop solutions;而是為了讓更多的人可以來使用Flink,說白了,這塊不是當前的戰略重點
How will Flink’s SQL on streams look like?
So far we discussed the motivation for and architecture of Flink’s stream SQL interface, but how will it actually look like?
// get environments val execEnv = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(execEnv) // configure Kafka connection val kafkaProps = ... // define a JSON encoded Kafka topic as external table val sensorSource = new KafkaJsonSource[(String, Long, Double)]( "sensorTopic", kafkaProps, ("location", "time", "tempF")) // register external table tableEnv.registerTableSource("sensorData", sensorSource) // define query in external table val roomSensors: Table = tableEnv.sql( "SELECT STREAM time, location AS room, (tempF - 32) * 0.556 AS tempC " + "FROM sensorData " + "WHERE location LIKE 'room%'" ) // define a JSON encoded Kafka topic as external sink val roomSensorSink = new KafkaJsonSink(...) // define sink for room sensor data and execute query roomSensors.toSink(roomSensorSink) execEnv.execute()
跟Table API相比,可以通過純粹的SQL來做相應的操作
當前SQL不支持,windows aggregation,
但是Calcite的Streaming SQL是支持的,比如,
SELECT STREAM TUMBLE_END(time, INTERVAL '1' DAY) AS day, location AS room, AVG((tempF - 32) * 0.556) AS avgTempC FROM sensorData WHERE location LIKE 'room%' GROUP BY TUMBLE(time, INTERVAL '1' DAY), location
可以用Table API實現,
val avgRoomTemp: Table = tableEnv.ingest("sensorData") .where('location.like("room%")) .partitionBy('location) .window(Tumbling every Days(1) on 'time as 'w) .select('w.end, 'location, , (('tempF - 32) * 0.556).avg as 'avgTempCs)
What’s up next?
The Flink community is actively working on SQL support for the next minor version Flink 1.1.0. In the first version, SQL (and Table API) queries on streams will be limited to selection, filter, and union operators. Compared to Flink 1.0.0, the revised Table API will support many more scalar functions and be able to read tables from external sources and write them back to external sinks. A lot of work went into reworking the architecture of the Table API and integrating Apache Calcite.
In Flink 1.2.0, the feature set of SQL on streams will be significantly extended. Among other things, we plan to support different types of window aggregates and maybe also streaming joins. For this effort, we want to closely collaborate with the Apache Calcite community and help extending Calcite’s support for relational operations on streaming data when necessary.
1.2會有window aggregates和streaming joins,值得期待。。。最後更新:2017-04-07 21:23:50