閱讀177 返回首頁    go 技術社區[雲棲]


《Flink官方文檔》Python 編程指南測試版(一)

Flink中的分析程序實現了對數據集的某些操作 (例如,數據過濾,映射,合並,分組)。這些數據最初來源於特定的數據源(例如來自於讀文件或數據集合)。操作執行的結果通過數據池以寫入數據到(分布式)文件係統或標準輸出(例如命令行終端)的形式返回。Flink程序可以運行在不同的環境中,既能夠獨立運行,也可以嵌入到其他程序中運行。程序可以運行在本地的JVM上,也可以運行在服務器集群中。

為了創建你自己的Flink程序,我們鼓勵你從program skeleton(程序框架)開始,並逐漸增加你自己的transformations(變化)。以下是更多的用法和高級特性的索引。

示例程序

以下程序是一段完整可運行的WordCount示例程序。你可以複製粘貼這些代碼並在本地運行。

from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction

class Adder(GroupReduceFunction):
  def reduce(self, iterator, collector):
    count, word = iterator.next()
    count += sum([x[0] for x in iterator])
    collector.collect((count, word))

env = get_environment()
data = env.from_elements("Who's there?",
 "I think I hear them. Stand, ho! Who's there?")

data \
  .flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
  .group_by(1) \
  .reduce_group(Adder(), combinable=True) \
  .output()

env.execute(local=True)

程序框架

從示例程序可以看出,Flink程序看起來就像普通的python程序一樣。每個程序都包含相同的基本組成部分:

1.獲取一個運行環境
2.加載/創建初始數據
3.指定對這些數據的操作
4.指定計算結果的存放位置
5.運行程序

接下來,我們將對每個步驟給出概述,更多細節可以參考與之對應的小節。
Environment(運行環境)是所有Flink程序的基礎。你可以通過調用Environment類中的一些靜態方法來建立一個環境:

get_environment()

運行環境可通過多種讀文件的方式來指定數據源。如果是簡單的按行讀取文本文件,你可以采用:

env = get_environment()
text = env.read_text("file:///path/to/file")

這樣,你就獲得了可以進行操作(apply transformations)的數據集。關於數據源和輸入格式的更多信息,請參考 Data Sources
一旦你獲得了一個數據集DataSet,你就可以通過transformations來創建一個新的數據集,並把它寫入到文件,再次transform,或者與其他數據集相結合。你可以通過對數據集調用自己個性化定製的函數來進行數據操作。例如,一個類似這樣的數據映射操作:

data.map(lambda x: x*2)

這將會創建一個新的數據集,其中的每個數據都是原來數據集中的2倍。若要獲取關於所有transformations的更多信息,及所有數據操作的列表,請參考Transformations

當你需要將所獲得的數據集寫入到磁盤時,調用下麵三種函數的其中一個即可。

data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
output()

其中,最後一種方法僅適用於在本機上進行開發/調試,它會將數據集的內容輸出到標準輸出。(請注意,當函數在集群上運行時,結果將會輸出到整個集群節點的標準輸出流,即輸出到workers的.out文件。)前兩種方法,能夠將數據集寫入到對應的文件中。關於寫入到文件的更多信息,請參考Data Sinks

當你設計好了程序之後,你需要在環境中執行execute命令來運行程序。可以選擇在本機運行,也可以提交到集群運行,這取決於Flink的創建方式。你可以通過設置execute(local=True)強製程序在本機運行。

創建項目

除了搭建好Flink運行環境,就無需進行其他準備工作了。Python包可以從你的Flink版本對應的/resource文件夾找到。在執行工作任務時,Flink 包,plan包和optional包均可以通過HDFS自動分發。

Python API已經在安裝了Python2.7或3.4的Linux/Windows係統上測試過。

默認情況下,Flink通過調用”python”或”python3″來啟動python進程,這取決於使用了哪種啟動腳本。通過在 flink-conf.yaml 中設置 “python.binary.python[2/3]”對應的值,來設定你所需要的啟動方式。

延遲(惰性)求值

所有的Flink程序都是延遲執行的。當程序的主函數執行時,數據的載入和操作並沒有在當時發生。與此相反,每一個被創建出來的操作都被加入到程序的計劃中。當程序環境中的某個對象調用了execute()函數時,這些操作才會被真正的執行。不論該程序是在本地運行還是集群上運行。

延遲求值能夠讓你建立複雜的程序,並在Flink上以一個整體的計劃單元來運行。

數據變換

數據變換(Data transformations)可以將一個或多個數據集映射為一個新的數據集。程序能夠將多種變換結合到一起來進行複雜的整合變換。

該小節將概述各種可以實現的數據變換。transformations documentation數據變換文檔中,有關於所有數據變換和示例的全麵介紹。

Transformation  Description    變換描述
Map 輸入一個元素,輸出一個元素

data.map(lambda x: x * 2)
FlatMap 輸入一個元素,輸出0,1,或多個元素

data.flat_map(
  lambda x,c: [(1,word) for word in line.lower().split() for line 
in x])
MapPartition 通過一次函數調用實現並行的分割操作。該函數將分割變換作為一個”迭代器”,並且能夠產生任意數量的輸出值。每次分割變換的元素數量取決於變換的並行性和之前的操作結果。

data.map_partition(lambda x,c: [value * 2 for value in x])
Filter 對每一個元素,計算一個布爾表達式的值,保留函數計算結果為true的元素。

data.filter(lambda x: x > 1000)
Reduce 通過不斷的將兩個元素組合為一個,來將一組元素結合為一個單一的元素。這種縮減變換可以應用於整個數據集,也可以應用於已分組的數據集。

data.reduce(lambda x,y : x + y)
ReduceGroup 將一組元素縮減為1個或多個元素。縮減分組變換可以被應用於一個完整的數據集,或者一個分組數據集。

lass Adder(GroupReduceFunction):
  def reduce(self, iterator, collector):
    count, word = iterator.next()
    count += sum([x[0] for x in iterator)      
    collector.collect((count, word))

data.reduce_group(Adder())
Aggregate 對一個數據集包含所有元組的一個域,或者數據集的每個數據組,執行某項built-in操作(求和,求最小值,求最大值)。聚集變換可以被應用於一個完整的數據集,或者一個分組數據集。

# This code finds the sum of all of the values in the first field
 and the maximum of all of the values in the second field
data.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)

# min(), max(), and sum() syntactic sugar functions are also available
data.sum(0).and_agg(Aggregation.Max, 1)
Join 對兩個數據集進行聯合變換,將得到一個新的數據集,其中包含在兩個數據集中擁有相等關鍵字的所有元素對。也可通過JoinFunction來把成對的元素變為單獨的元素。關於join keys的更多信息請查看 keys 。

# In this case tuple fields are used as keys.
# "0" is the join field on the first tuple
# "1" is the join field on the second tuple.
result = input1.join(input2).where(0).equal_to(1)
CoGroup 是Reduce變換在二維空間的一個變體。將來自一個或多個域的數據加入數據組。變換函數transformation function將被每一對數據組調用。關於定義coGroup keys的更多信息,請查看 keys 。

data1.co_group(data2).where(0).equal_to(1)
Cross 計算兩個輸入數據集的笛卡爾乘積(向量叉乘),得到所有元素對。也可通過CrossFunction實現將一對元素轉變為一個單獨的元素。

result = data1.cross(data2)
Union 將兩個數據集進行合並。

data.union(data2)
ZipWithIndex 為數據組中的元素逐個分配連續的索引。了解更多信息,請參考 [Zip Elements Guide](zip_elements_guide.html#zip-with-a-dense-index).

data.zip_with_index()

指定Keys

一些變換(例如Join和CoGroup),需要在進行變換前,為作為輸入參數的數據集指定一個關鍵字,而另一些變換(例如Reduce和GroupReduce),則允許在變換操作之前,對數據集根據某個關鍵字進行分組。

數據集可通過如下方式分組

reduced = data \
  .group_by(<define key here>) \
  .reduce_group(<do something>)

Flink中的數據模型並不是基於鍵-值對。你無需將數據集整理為keys和values的形式。鍵是”虛擬的”:它們被定義為在真實數據之上,引導分組操作的函數。


轉載自 並發編程網 - ifeve.com

最後更新:2017-05-18 20:31:42

  上一篇:go  我寫的這些opensource項目
  下一篇:go  緊急發布xmemcached 1.3.5