《Flink官方文檔》Python 編程指南測試版(二)
為元組定義keys
最簡單的情形是對一個數據集中的元組按照一個或多個域進行分組:
reduced = data \
.group_by(0) \
.reduce_group(<do something>)
數據集中的元組被按照第一個域分組。對於接下來的group-reduce函數,輸入的數據組中,每個元組的第一個域都有相同的值。
grouped = data \
.group_by(0,1) \
.reduce(/*do something*/)
在上麵的例子中,數據集的分組基於第一個和第二個域形成的複合關鍵字,因此,reduce函數輸入數據組中,每個元組兩個域的值均相同。
關於嵌套元組需要注意:如果你有一個使用了嵌套元組的數據集,指定group_by(<index of tuple>)操作,係統將把整個元組作為關鍵字使用。
向Flink傳遞函數
一些特定的操作需要采用用戶自定義的函數,因此它們都接受lambda表達式和rich functions作為輸入參數。
data.filter(lambda x: x > 5)
class Filter(FilterFunction):
def filter(self, value):
return value > 5
data.filter(Filter())
Rich functions可以將函數作為輸入參數,允許使用broadcast-variables(廣播變量),能夠由init()函數參數化,是複雜函數的一個可考慮的實現方式。它們也是在reduce操作中,定義一個可選的combine function的唯一方式。
Lambda表達式可以讓函數在一行代碼上實現,非常便捷。需要注意的是,如果某個操作會返回多個數值,則其使用的lambda表達式應當返回一個迭代器。(所有函數將接收一個collector輸入 參數)。
數據類型
Flink的Python API目前僅支持python中的基本數據類型(int,float,bool,string)以及byte arrays。
運行環境對數據類型的支持,包括序列化器serializer,反序列化器deserializer,以及自定義類型的類。
class MyObj(object):
def __init__(self, i):
self.value = i
class MySerializer(object):
def serialize(self, value):
return struct.pack(">i", value.value)
class MyDeserializer(object):
def _deserialize(self, read):
i = struct.unpack(">i", read(4))[0]
return MyObj(i)
env.register_custom_type(MyObj, MySerializer(), MyDeserializer())
Tuples/Lists
你可以使用元組(或列表)來表示複雜類型。Python中的元組可以轉換為Flink中的Tuple類型,它們包含數量固定的不同類型的域(最多25個)。每個域的元組可以是基本數據類型,也可以是其他的元組類型,從而形成嵌套元組類型。
word_counts = env.from_elements(("hello", 1), ("world",2))
counts = word_counts.map(lambda x: x[1])
當進行一些要求指定關鍵字的操作時,例如對數據記錄進行分組或配對。通過設定關鍵字,可以非常便捷地指定元組中各個域的位置。你可以指定多個位置,從而實現複合關鍵字(更多信息,查閱Section Data Transformations)。
wordCounts \
.group_by(0) \
.reduce(MyReduceFunction())
數據源
數據源創建了初始的數據集,包括來自文件,以及來自數據接口/集合兩種方式。
基於文件的:
- read_text(path) – 按行讀取文件,並將每一行以String形式返回。
- read_csv(path,type) – 解析以逗號(或其他字符)劃分數據域的文件。
返回一個包含若幹元組的數據集。支持基本的java數據類型作為字段類型。
基於數據集合的:
- from_elements(*args) – 基於一係列數據創建一個數據集,包含所有元素。
- generate_sequence(from, to) – 按照指定的間隔,生成一係列數據。
Examples
env = get_environment
\# read text file from local files system
localLiens = env.read_text("file:#/path/to/my/textfile")
\# read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")
\# generate a number sequence
numbers = env.generate_sequence(1, 10000000)
數據池
數據池可以接收數據集,並被用來存儲或返回它們:
- write_text() – 按行以String形式寫入數據。可通過對每個數據項調用str()函數獲取String。
- write_csv(…) – 將元組寫入逗號分隔數值文件。行數和數據字段均可配置。每個字段的值可通過對數據項調用str()方法得到。
- output() – 在標準輸出上打印每個數據項的str()字符串。
一個數據集可以同時作為多個操作的輸入數據。程序可以在寫入或打印一個數據集的同時,對其進行其他的變換操作。
Examples
標準數據池相關方法示例如下:
write DataSet to a file on the local file system
textData.write_text("file:///my/result/on/localFS")
write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
write DataSet to a file and overwrite the file if it exists
textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
tuples as lines with pipe as the separator "a|b|c"
values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")
this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.write_text("file:///path/to/the/result/file")
廣播變量
使用廣播變量,能夠在使用普通輸入參數的基礎上,使得一個數據集同時被多個並行的操作所使用。這對於實現輔助數據集,或者是基於數據的參數化法非常有用。這樣,數據集就可以以集合的形式被訪問。
- 注冊廣播變量:廣播數據集可通過調用with_broadcast_set(DataSet,String)函數,按照名字注冊廣播變量。
- 訪問廣播變量:通過對調用self.context.get_broadcast_variable(String)可獲取廣播變量。
class MapperBcv(MapFunction):
def map(self, value):
factor = self.context.get_broadcast_variable("bcv")[0][0]
return value * factor
# 1. The DataSet to be broadcasted
toBroadcast = env.from_elements(1, 2, 3)
data = env.from_elements("a", "b")
# 2. Broadcast the DataSet
data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)
確保在進行廣播變量的注冊和訪問時,應當采用相同的名字(示例中的”bcv”)。
注意:由於廣播變量的內容被保存在每個節點的內部存儲中,不適合包含過多內容。一些簡單的參數,例如標量值,可簡單地通過參數化rich function來實現。
並行執行
該章節將描述如何在Flink中配置程序的並行執行。一個Flink程序可以包含多個任務(操作,數據源和數據池)。一個任務可以被劃分為多個可並行運行的部分,每個部分處理輸入數據的一個子集。並行運行的實例數量被稱作它的並行性或並行度degree of parallelism (DOP)。
在Flink中可以為任務指定不同等級的並行度。
運行環境級
Flink程序可在一個運行環境execution environment的上下文中運行。一個運行環境為其中運行的所有操作,數據源和數據池定義了一個默認的並行度。運行環境的並行度可通過對某個操作的並行度進行配置來修改。
一個運行環境的並行度可通過調用set_parallelism()方法來指定。例如,為了將WordCount示例程序中的所有操作,數據源和數據池的並行度設置為3,可以通過如下方式設置運行環境的默認並行度。
env = get_environment()
env.set_parallelism(3)
text.flat_map(lambda x,c: x.lower().split()) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.output()
env.execute()
係統級
通過設置位於./conf/flink-conf.yaml.文件的parallelism.default屬性,改變係統級的默認並行度,可設置所有運行環境的默認並行度。具體細節可查閱Configuration文檔。
執行方法
為了在Flink中運行計劃任務,到Flink目錄下,運行/bin文件夾下的pyflink.sh腳本。對於python2.7版本,運行pyflink2.sh;對於python3.4版本,運行pyflink3.sh。包含計劃任務的腳本應當作為第一個輸入參數,其後可添加一些另外的python包,最後,在“-”之後,輸入其他附加參數。
./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
最後更新:2017-05-18 20:31:43