177
技術社區[雲棲]
《Flink官方文檔》Python 編程指南測試版(一)
Flink中的分析程序實現了對數據集的某些操作 (例如,數據過濾,映射,合並,分組)。這些數據最初來源於特定的數據源(例如來自於讀文件或數據集合)。操作執行的結果通過數據池以寫入數據到(分布式)文件係統或標準輸出(例如命令行終端)的形式返回。Flink程序可以運行在不同的環境中,既能夠獨立運行,也可以嵌入到其他程序中運行。程序可以運行在本地的JVM上,也可以運行在服務器集群中。
為了創建你自己的Flink程序,我們鼓勵你從program skeleton(程序框架)開始,並逐漸增加你自己的transformations(變化)。以下是更多的用法和高級特性的索引。
- Example Program
- Program Skeleton
- Project setup
- Lazy Evaluation
- Transformations
- Specifying Keys
- Passing Functions to Flink
- Data Types
- Data Sources
- Data Sinks
- Broadcast Variables
- Parallel Execution
- Executing Plans
示例程序
以下程序是一段完整可運行的WordCount示例程序。你可以複製粘貼這些代碼並在本地運行。
from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator])
collector.collect((count, word))
env = get_environment()
data = env.from_elements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")
data \
.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.output()
env.execute(local=True)
程序框架
從示例程序可以看出,Flink程序看起來就像普通的python程序一樣。每個程序都包含相同的基本組成部分:
1.獲取一個運行環境
2.加載/創建初始數據
3.指定對這些數據的操作
4.指定計算結果的存放位置
5.運行程序
接下來,我們將對每個步驟給出概述,更多細節可以參考與之對應的小節。
Environment(運行環境)是所有Flink程序的基礎。你可以通過調用Environment類中的一些靜態方法來建立一個環境:
get_environment()
運行環境可通過多種讀文件的方式來指定數據源。如果是簡單的按行讀取文本文件,你可以采用:
env = get_environment()
text = env.read_text("file:///path/to/file")
這樣,你就獲得了可以進行操作(apply transformations)的數據集。關於數據源和輸入格式的更多信息,請參考 Data Sources。
一旦你獲得了一個數據集DataSet,你就可以通過transformations來創建一個新的數據集,並把它寫入到文件,再次transform,或者與其他數據集相結合。你可以通過對數據集調用自己個性化定製的函數來進行數據操作。例如,一個類似這樣的數據映射操作:
data.map(lambda x: x*2)
這將會創建一個新的數據集,其中的每個數據都是原來數據集中的2倍。若要獲取關於所有transformations的更多信息,及所有數據操作的列表,請參考Transformations。
當你需要將所獲得的數據集寫入到磁盤時,調用下麵三種函數的其中一個即可。
data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
output()
其中,最後一種方法僅適用於在本機上進行開發/調試,它會將數據集的內容輸出到標準輸出。(請注意,當函數在集群上運行時,結果將會輸出到整個集群節點的標準輸出流,即輸出到workers的.out文件。)前兩種方法,能夠將數據集寫入到對應的文件中。關於寫入到文件的更多信息,請參考Data Sinks。
當你設計好了程序之後,你需要在環境中執行execute命令來運行程序。可以選擇在本機運行,也可以提交到集群運行,這取決於Flink的創建方式。你可以通過設置execute(local=True)強製程序在本機運行。
創建項目
除了搭建好Flink運行環境,就無需進行其他準備工作了。Python包可以從你的Flink版本對應的/resource文件夾找到。在執行工作任務時,Flink 包,plan包和optional包均可以通過HDFS自動分發。
Python API已經在安裝了Python2.7或3.4的Linux/Windows係統上測試過。
默認情況下,Flink通過調用”python”或”python3″來啟動python進程,這取決於使用了哪種啟動腳本。通過在 flink-conf.yaml 中設置 “python.binary.python[2/3]”對應的值,來設定你所需要的啟動方式。
延遲(惰性)求值
所有的Flink程序都是延遲執行的。當程序的主函數執行時,數據的載入和操作並沒有在當時發生。與此相反,每一個被創建出來的操作都被加入到程序的計劃中。當程序環境中的某個對象調用了execute()函數時,這些操作才會被真正的執行。不論該程序是在本地運行還是集群上運行。
延遲求值能夠讓你建立複雜的程序,並在Flink上以一個整體的計劃單元來運行。
數據變換
數據變換(Data transformations)可以將一個或多個數據集映射為一個新的數據集。程序能夠將多種變換結合到一起來進行複雜的整合變換。
該小節將概述各種可以實現的數據變換。transformations documentation數據變換文檔中,有關於所有數據變換和示例的全麵介紹。
Transformation | Description 變換描述 |
Map | 輸入一個元素,輸出一個元素
|
FlatMap | 輸入一個元素,輸出0,1,或多個元素
|
MapPartition | 通過一次函數調用實現並行的分割操作。該函數將分割變換作為一個”迭代器”,並且能夠產生任意數量的輸出值。每次分割變換的元素數量取決於變換的並行性和之前的操作結果。
|
Filter | 對每一個元素,計算一個布爾表達式的值,保留函數計算結果為true的元素。
|
Reduce | 通過不斷的將兩個元素組合為一個,來將一組元素結合為一個單一的元素。這種縮減變換可以應用於整個數據集,也可以應用於已分組的數據集。
|
ReduceGroup | 將一組元素縮減為1個或多個元素。縮減分組變換可以被應用於一個完整的數據集,或者一個分組數據集。
|
Aggregate | 對一個數據集包含所有元組的一個域,或者數據集的每個數據組,執行某項built-in操作(求和,求最小值,求最大值)。聚集變換可以被應用於一個完整的數據集,或者一個分組數據集。
|
Join | 對兩個數據集進行聯合變換,將得到一個新的數據集,其中包含在兩個數據集中擁有相等關鍵字的所有元素對。也可通過JoinFunction來把成對的元素變為單獨的元素。關於join keys的更多信息請查看 keys 。
|
CoGroup | 是Reduce變換在二維空間的一個變體。將來自一個或多個域的數據加入數據組。變換函數transformation function將被每一對數據組調用。關於定義coGroup keys的更多信息,請查看 keys 。
|
Cross | 計算兩個輸入數據集的笛卡爾乘積(向量叉乘),得到所有元素對。也可通過CrossFunction實現將一對元素轉變為一個單獨的元素。
|
Union | 將兩個數據集進行合並。
|
ZipWithIndex | 為數據組中的元素逐個分配連續的索引。了解更多信息,請參考 [Zip Elements Guide](zip_elements_guide.html#zip-with-a-dense-index).
|
指定Keys
一些變換(例如Join和CoGroup),需要在進行變換前,為作為輸入參數的數據集指定一個關鍵字,而另一些變換(例如Reduce和GroupReduce),則允許在變換操作之前,對數據集根據某個關鍵字進行分組。
數據集可通過如下方式分組
reduced = data \
.group_by(<define key here>) \
.reduce_group(<do something>)
Flink中的數據模型並不是基於鍵-值對。你無需將數據集整理為keys和values的形式。鍵是”虛擬的”:它們被定義為在真實數據之上,引導分組操作的函數。
最後更新:2017-05-18 20:31:42