閱讀801 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《Flink官方文檔》Python 編程指南測試版(二)

為元組定義keys

最簡單的情形是對一個數據集中的元組按照一個或多個域進行分組:

reduced = data \
  .group_by(0) \
  .reduce_group(<do something>)

數據集中的元組被按照第一個域分組。對於接下來的group-reduce函數,輸入的數據組中,每個元組的第一個域都有相同的值。

grouped = data \
  .group_by(0,1) \
  .reduce(/*do something*/)

在上麵的例子中,數據集的分組基於第一個和第二個域形成的複合關鍵字,因此,reduce函數輸入數據組中,每個元組兩個域的值均相同。
關於嵌套元組需要注意:如果你有一個使用了嵌套元組的數據集,指定group_by(<index of tuple>)操作,係統將把整個元組作為關鍵字使用。

向Flink傳遞函數

一些特定的操作需要采用用戶自定義的函數,因此它們都接受lambda表達式和rich functions作為輸入參數。

data.filter(lambda x: x > 5)
class Filter(FilterFunction):
    def filter(self, value):
        return value > 5

data.filter(Filter())

Rich functions可以將函數作為輸入參數,允許使用broadcast-variables(廣播變量),能夠由init()函數參數化,是複雜函數的一個可考慮的實現方式。它們也是在reduce操作中,定義一個可選的combine function的唯一方式。
Lambda表達式可以讓函數在一行代碼上實現,非常便捷。需要注意的是,如果某個操作會返回多個數值,則其使用的lambda表達式應當返回一個迭代器。(所有函數將接收一個collector輸入 參數)。

數據類型

Flink的Python API目前僅支持python中的基本數據類型(int,float,bool,string)以及byte arrays。
運行環境對數據類型的支持,包括序列化器serializer,反序列化器deserializer,以及自定義類型的類。

class MyObj(object):
    def __init__(self, i):
        self.value = i


class MySerializer(object):
    def serialize(self, value):
        return struct.pack(">i", value.value)


class MyDeserializer(object):
    def _deserialize(self, read):
        i = struct.unpack(">i", read(4))[0]
        return MyObj(i)


env.register_custom_type(MyObj, MySerializer(), MyDeserializer())

Tuples/Lists

你可以使用元組(或列表)來表示複雜類型。Python中的元組可以轉換為Flink中的Tuple類型,它們包含數量固定的不同類型的域(最多25個)。每個域的元組可以是基本數據類型,也可以是其他的元組類型,從而形成嵌套元組類型。

word_counts = env.from_elements(("hello", 1), ("world",2))

counts = word_counts.map(lambda x: x[1])

當進行一些要求指定關鍵字的操作時,例如對數據記錄進行分組或配對。通過設定關鍵字,可以非常便捷地指定元組中各個域的位置。你可以指定多個位置,從而實現複合關鍵字(更多信息,查閱Section Data Transformations)。

wordCounts \
    .group_by(0) \
    .reduce(MyReduceFunction())

數據源

數據源創建了初始的數據集,包括來自文件,以及來自數據接口/集合兩種方式。

基於文件的:

  • read_text(path) – 按行讀取文件,並將每一行以String形式返回。
  • read_csv(path,type) – 解析以逗號(或其他字符)劃分數據域的文件。
    返回一個包含若幹元組的數據集。支持基本的java數據類型作為字段類型。

基於數據集合的:

  • from_elements(*args) – 基於一係列數據創建一個數據集,包含所有元素。
  • generate_sequence(from, to) – 按照指定的間隔,生成一係列數據。

Examples

env  = get_environment

\# read text file from local files system
localLiens = env.read_text("file:#/path/to/my/textfile")

\# read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")

\# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))

\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")

\# generate a number sequence
numbers = env.generate_sequence(1, 10000000)

 

數據池

數據池可以接收數據集,並被用來存儲或返回它們:

  • write_text() – 按行以String形式寫入數據。可通過對每個數據項調用str()函數獲取String。
  • write_csv(…) – 將元組寫入逗號分隔數值文件。行數和數據字段均可配置。每個字段的值可通過對數據項調用str()方法得到。
  • output() – 在標準輸出上打印每個數據項的str()字符串。

一個數據集可以同時作為多個操作的輸入數據。程序可以在寫入或打印一個數據集的同時,對其進行其他的變換操作。

Examples

標準數據池相關方法示例如下:

write DataSet to a file on the local file system
textData.write_text("file:///my/result/on/localFS")

 write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")

 write DataSet to a file and overwrite the file if it exists
textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)

 tuples as lines with pipe as the separator "a|b|c"
values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")

 this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.write_text("file:///path/to/the/result/file")

 

廣播變量

使用廣播變量,能夠在使用普通輸入參數的基礎上,使得一個數據集同時被多個並行的操作所使用。這對於實現輔助數據集,或者是基於數據的參數化法非常有用。這樣,數據集就可以以集合的形式被訪問。

  • 注冊廣播變量:廣播數據集可通過調用with_broadcast_set(DataSet,String)函數,按照名字注冊廣播變量。
  • 訪問廣播變量:通過對調用self.context.get_broadcast_variable(String)可獲取廣播變量。
class MapperBcv(MapFunction):
    def map(self, value):
        factor = self.context.get_broadcast_variable("bcv")[0][0]
        return value * factor

# 1. The DataSet to be broadcasted
toBroadcast = env.from_elements(1, 2, 3)
data = env.from_elements("a", "b")

# 2. Broadcast the DataSet
data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)

確保在進行廣播變量的注冊和訪問時,應當采用相同的名字(示例中的”bcv”)。
注意:由於廣播變量的內容被保存在每個節點的內部存儲中,不適合包含過多內容。一些簡單的參數,例如標量值,可簡單地通過參數化rich function來實現。

並行執行

該章節將描述如何在Flink中配置程序的並行執行。一個Flink程序可以包含多個任務(操作,數據源和數據池)。一個任務可以被劃分為多個可並行運行的部分,每個部分處理輸入數據的一個子集。並行運行的實例數量被稱作它的並行性或並行度degree of parallelism (DOP)。
在Flink中可以為任務指定不同等級的並行度。

運行環境級

Flink程序可在一個運行環境execution environment的上下文中運行。一個運行環境為其中運行的所有操作,數據源和數據池定義了一個默認的並行度。運行環境的並行度可通過對某個操作的並行度進行配置來修改。
一個運行環境的並行度可通過調用set_parallelism()方法來指定。例如,為了將WordCount示例程序中的所有操作,數據源和數據池的並行度設置為3,可以通過如下方式設置運行環境的默認並行度。

env = get_environment()
env.set_parallelism(3)

text.flat_map(lambda x,c: x.lower().split()) \
    .group_by(1) \
    .reduce_group(Adder(), combinable=True) \
    .output()

env.execute()

係統級

通過設置位於./conf/flink-conf.yaml.文件的parallelism.default屬性,改變係統級的默認並行度,可設置所有運行環境的默認並行度。具體細節可查閱Configuration文檔。

執行方法

為了在Flink中運行計劃任務,到Flink目錄下,運行/bin文件夾下的pyflink.sh腳本。對於python2.7版本,運行pyflink2.sh;對於python3.4版本,運行pyflink3.sh。包含計劃任務的腳本應當作為第一個輸入參數,其後可添加一些另外的python包,最後,在“-”之後,輸入其他附加參數。

./bin/pyflink<2/3>.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]

轉載自 並發編程網 - ifeve.com

最後更新:2017-05-18 20:31:43

  上一篇:go  clj-xmemcached: memcached client for clojure
  下一篇:go  我寫的這些opensource項目