閱讀812 返回首頁    go 阿裏雲 go 技術社區[雲棲]


PyODPS開發中的最佳實踐

PyODPS 支持用 Python 來對 MaxCompute 對象進行操作,它提供了 DataFrame API 來用類似 pandas 的接口進行大規模數據分析以及預處理,並且可以用 ml 模塊來執行機器學習算法。

現在為了讓大家能更好地使用 PyODPS,我們總結開發過程中的最佳實踐,來讓大家更高效地開發 PyODPS 程序。當然,希望大家能一起來幫助我們來完善總結。

注:公共雲由於未支持 Python UDF,因此本文中提到的自定義函數功能包括 apply 和 map_reduce 等功能公共雲用戶均暫不可用。

除非數據量很小,否則不要試圖進行本地數據處理

我們 PyODPS 提供了多種方便拉取數據到本地的操作,因此,很多用戶會試圖把數據拉取到本地處理,然後再上傳到 ODPS 上。

很多時候,用戶其實根本不清楚這種操作的低效,拉取到本地徹底喪失了 MaxCompute 的大規模並行能力。而有的用戶僅僅是需要對單行數據應用一個 Python 函數,或者試圖做一行變多行的操作,這些操作,用 PyODPS DataFrame 都能輕鬆完成,並且完全利用到了 MaxCompute 的並行計算能力。

比如說現在我有一份數據,都是 json 串,現在我想把 json 串按 key-value 對展開成一行。則可以寫一個簡單的函數。

In [12]: df
               json
0  {"a": 1, "b": 2}
1  {"c": 4, "b": 3}

In [14]: from odps.df import output

In [16]: @output(['k', 'v'], ['string', 'int'])
    ...: def h(row):
    ...:     import json
    ...:     for k, v in json.loads(row.json).items():
    ...:         yield k, v
    ...:   

In [21]: df.apply(h, axis=1)
   k  v
0  a  1
1  b  2
2  c  4
3  b  3

** 而這些操作,幾乎全部都可以用 apply(axis=1)map_reduce 接口完成。**

使用 pandas 計算後端進行高效本地 debug

PyODPS DataFrame 能夠根據數據來源來決定如何執行,比如,通過 pandas DataFrame 創建的 PyODPS DataFrame 則可以使用 pandas 執行本地計算;而使用 MaxCompute 表創建的 DataFrame 則可以在 MaxCompute 上執行。**而這兩種方式,除了初始化不同,後續代碼完全一致**,因此,我們可以利用這點來進行本地 debug。

所以我們可以寫出如下的代碼:

df = o.get_table('movielens_ratings').to_df()
DEBUG = True
if DEBUG:
    df = df[:100].to_pandas(wrap=True)

to_pandas 是將數據下載,根據 wrap 參數來決定是否返回 PyODPS DataFrame,如果是 True,則返回 PyODPS DataFrame;否則,返回 pandas DataFrame。

當我們把所有後續代碼都編寫完成,本地的測試速度就非常快,當測試結束後,我們就可以把 debug 改為 False,這樣後續就能在 ODPS 上執行全量的計算。

使用本地調試還有個好處,就是能利用到 IDE 的如斷點和單步調試自定義函數的功能。要知道,在 ODPS 上執行,是把函數序列化到遠端去執行,所以本地是沒法斷點進入的。而使用本地進行調試時,則可以斷點進入自定義函數,方便進行調試。

推薦大家使用 MaxCompute studio 來本地調試 PyODPS 程序。

利用 Python 語言特性來實現豐富的功能

編寫 Python 函數

一個常見的例子就是,計算兩點之間的距離,有多種計算方法,比如歐氏距離、曼哈頓距離等等,我們可以定義一係列函數,在計算時就可以根據具體情況調用相應的函數即可。

def euclidean_distance(from_x, from_y, to_x, to_y):
    return ((from_x - to_x) ** 2 + (from_y - to_y) ** 2).sqrt()

def manhattan_distance(center_x, center_y, x, y):
   return (from_x - to_x).abs() + (from_y - to_y).abs()

調用則如下:

In [42]: df
     from_x    from_y      to_x      to_y
0  0.393094  0.427736  0.463035  0.105007
1  0.629571  0.364047  0.972390  0.081533
2  0.460626  0.530383  0.443177  0.706774
3  0.647776  0.192169  0.244621  0.447979
4  0.846044  0.153819  0.873813  0.257627
5  0.702269  0.363977  0.440960  0.639756
6  0.596976  0.978124  0.669283  0.936233
7  0.376831  0.461660  0.707208  0.216863
8  0.632239  0.519418  0.881574  0.972641
9  0.071466  0.294414  0.012949  0.368514

In [43]: euclidean_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance')
   distance
0  0.330221
1  0.444229
2  0.177253
3  0.477465
4  0.107458
5  0.379916
6  0.083565
7  0.411187
8  0.517280
9  0.094420

In [44]: manhattan_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance')
   distance
0  0.392670
1  0.625334
2  0.193841
3  0.658966
4  0.131577
5  0.537088
6  0.114198
7  0.575175
8  0.702558
9  0.132617

利用 Python 語言的條件和循環語句

一個常見的需求是,用戶有大概30張表,需要合成一張表,這個時候如果寫 SQL,需要寫 union all 30張表,如果表的數量更多,會更讓人崩潰。使用 PyODPS,隻需要一句話就搞定了。

table_names = ['table1', ..., 'tableN']
dfs = [o.get_table(tn).to_df() for tn in table_names]
reduce(lambda x, y: x.union(y), dfs)

大功告成。稍微解釋下,這裏的 reduce 這句等價於:

df = dfs[0]
for other_df in dfs[1:]:
    df = df.union(other_df)

稍微擴展下,經常有一些 case 是這樣,用戶要計算的表保存在某個地方,比如說數據庫,需要根據配置來對表的字段進行處理,然後對所有表進行 union 或者 join 操作。這個時候,用 SQL 實現可能是相當複雜的,但是用 DataFrame 進行處理會非常簡單,而實際上我們就有用戶用 PyODPS 解決了這樣的問題。

盡量使用內建算子,而不是自定義函數

比如上文提到的歐氏距離的計算,實際上,計算的過程都是使用的 DataFrame 的內建算子,比如說指數和 sqrt 等操作,如果我們對一行數據應用自定義函數,則會發現,速度會慢很多。

In [54]: euclidean_distance(df.from_x, df.from_y, df.to_x, df.to_y).rename('distance').mean()
|==========================================|   1 /  1  (100.00%)         7s
0.5216082314224464

In [55]: @output(['distance'], ['float'])
    ...: def euclidean_distance2(row):
    ...:     import math
    ...:     return math.sqrt((row.from_x - row.to_x) ** 2 + (row.from_y - row.to_y) ** 2)
    ...: 

In [56]: df.apply(euclidean_distance2, axis=1, reduce=True).mean()
|==========================================|   1 /  1  (100.00%)        27s
0.5216082314224464

可以看到,當我們對一行應用了自定義函數後,執行時間從7秒延長到了27秒,這個數據隻是1百萬行數據計算的結果,如果有更大的數據集,更複雜的操作,時間的差距可能會更長。

總結

利用 PyODPS,我們其實能挖掘更多更靈活、更高效操作 MaxCompute 數據的方式。最佳實踐可以不光是我們提供的一些建議,如果你有更多好玩有用的實踐,可以多多分享出來。

最後更新:2017-07-24 06:32:29

  上一篇:go  免費OA四大價值讓你工作得心應手
  下一篇:go  spark sql