閱讀158 返回首頁    go 群英


Python SDK__SDK_大數據計算服務-阿裏雲

PyODPS 是 MaxCompute 的Python版本的SDK, 它提供了對 MaxCompute 對象的基本操作;並提供了DataFrame框架,能輕鬆在 MaxCompute 上進行數據分析。

安裝

PyOdps支持Python 2.6以上包括Python 3。係統安裝了pip後,隻需運行:

  1. pip install pyodps

PyOdps的相關依賴會自動安裝。

快速開始

首先,我們需要阿裏雲的帳號來初始化一個 MaxCompute 的入口:

  1. from odps import ODPS
  2. odps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',
  3. endpoint='**your-end-point**')

這樣就已經初始化,就可以對表、資源、函數等進行操作了。

項目空間

項目空間是 MaxCompute 的基本組織單元, 有點類似於Database的概念。

我們通過 get_project 來取到某個項目空間。

  1. project = odps.get_project('my_project') # 取到某個項目
  2. project = odps.get_project() # 取到默認項目

如果不提供參數,則取到默認項目空間。

exist_project 方法能檢驗某個項目空間是否存在。

表是 MaxCompute 的數據存儲單元。

表操作

我們可以用 list_tables 來列出項目空間下的所有表。

  1. for table in odps.list_tables():
  2. # 處理每個表

通過調用 exist_table 來判斷表是否存在。通過調用 get_table 來獲取表。

  1. >>> t = odps.get_table('dual')
  2. >>> t.schema
  3. odps.Schema {
  4. c_int_a bigint
  5. c_int_b bigint
  6. c_double_a double
  7. c_double_b double
  8. c_string_a string
  9. c_string_b string
  10. c_bool_a boolean
  11. c_bool_b boolean
  12. c_datetime_a datetime
  13. c_datetime_b datetime
  14. }
  15. >>> t.lifecycle
  16. -1
  17. >>> print(t.creation_time)
  18. 2014-05-15 14:58:43
  19. >>> t.is_virtual_view
  20. False
  21. >>> t.size
  22. 1408
  23. >>> t.schema.columns
  24. [<column c_int_a, type bigint>,
  25. <column c_int_b, type bigint>,
  26. <column c_double_a, type double>,
  27. <column c_double_b, type double>,
  28. <column c_string_a, type string>,
  29. <column c_string_b, type string>,
  30. <column c_bool_a, type boolean>,
  31. <column c_bool_b, type boolean>,
  32. <column c_datetime_a, type datetime>,
  33. <column c_datetime_b, type datetime>]

創建表的Schema

有兩種方法來初始化。第一種方式通過表的列、以及可選的分區來初始化。

  1. >>> from odps.models import Schema, Column, Partition
  2. >>> columns = [Column(name='num', type='bigint', comment='the column')]
  3. >>> partitions = [Partition(name='pt', type='string', comment='the partition')]
  4. >>> schema = Schema(columns=columns, partitions=partitions)
  5. >>> schema.columns
  6. [<column num, type bigint>, <partition pt, type string>]

第二種方法是使用 Schema.from_lists,這種方法更容易調用,但顯然無法直接設置列和分區的注釋了。

  1. >>> schema = Schema.from_lists(['num'], ['bigint'], ['pt'], ['string'])
  2. >>> schema.columns
  3. [<column num, type bigint>, <partition pt, type string>]

創建表

現在知道怎麼創建表的Schema,創建一個表就很容易了。

  1. >>> table = odps.create_table('my_new_table', schema)
  2. >>> table = odps.create_table('my_new_table', schema, if_not_exists=True) # 隻有不存在表時才創建

其他還可以設置lifecycle等參數。

獲取表數據

有若幹種方法能夠獲取表數據。首先,如果隻是查看每個表的開始的小於1萬條數據,則可以使用 head 方法。

  1. >>> t = odps.get_table('dual')
  2. >>> for record in t.head(3):
  3. >>> print(record[0]) # 取第0個位置的值
  4. >>> print(record['c_double_a']) # 通過字段取值
  5. >>> print(record[0: 3]) # 切片操作
  6. >>> print(record[0, 2, 3]) # 取多個位置的值
  7. >>> print(record['c_int_a', 'c_double_a']) # 通過多個字段取值

其次,在table上可以執行 open_reader 操作來打一個reader來讀取數據。記住這裏需要使用 with表達式

  1. >>> with t.open_reader(partition='pt=test') as reader:
  2. >>> count = reader.count
  3. >>> for record in reader[5:10] # 可以執行多次,直到將count數量的record讀完,這裏可以改造成並行操作
  4. >>> # 處理一條記錄

最後,可以使用Tunnel API來進行讀取操作,open_reader 操作其實也是對Tunnel API的封裝。

向表寫數據

類似於 open_reader,table對象同樣能執行 open_writer 來打開writer,並寫數據。同樣記住使用 with表達式

  1. >>> with t.open_writer(partition='pt=test') as writer:
  2. >>> writer.write(records) # 這裏records可以是任意可迭代的records,默認寫到block 0
  3. >>>
  4. >>> with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 這裏同是打開兩個block
  5. >>> writer.write(0, gen_records(block=0))
  6. >>> writer.write(1, gen_records(block=1)) # 這裏兩個寫操作可以多線程並行,各個block間是獨立的

同樣,向表寫數據也是對Tunnel API的封裝,詳細參考 [數據上傳下載通道] 。

刪除表

  1. >>> odps.delete_table('my_table_name', if_exists=True) # 隻有表存在時刪除
  2. >>> t.drop() # Table對象存在的時候可以直接執行drop函數

表分區

基本操作

遍曆表全部分區:

  1. >>> for partition in table.partitions:
  2. >>> print(partition.name)
  3. >>> for partition in table.iterate_partitions(spec='pt=test'):
  4. >>> # 遍曆二級分區

判斷分區是否存在:

  1. >>> table.exist_partition('pt=test,sub=2015')

獲取分區:

  1. >>> partition = table.get_partition('pt=test')
  2. >>> print(partition.creation_time)
  3. 2015-11-18 22:22:27
  4. >>> partition.size
  5. 0

創建分區

  1. >>> t.create_partition('pt=test', if_not_exists=True) # 不存在的時候才創建

刪除分區

  1. >>> t.delete_partition('pt=test', if_exists=True) # 存在的時候才刪除
  2. >>> partition.drop() # Partition對象存在的時候直接drop

SQL

PyOdps支持MaxCompute SQL的查詢,並可以讀取執行的結果。

執行SQL

  1. >>> odps.execute_sql('select * from dual') # 同步的方式執行,會阻塞直到SQL執行完成
  2. >>> instance = odps.run_sql('select * from dual') # 異步的方式執行
  3. >>> instance.wait_for_success() # 阻塞直到完成

讀取SQL執行結果

運行SQL的instance能夠直接執行 open_reader 的操作,一種情況是SQL返回了結構化的數據。

  1. >>> with odps.execute_sql('select * from dual').open_reader() as reader:
  2. >>> for record in reader:
  3. >>> # 處理每一個record

另一種情況是SQL可能執行的比如 desc,這時通過 reader.raw 屬性取到原始的SQL執行結果。

  1. >>> with odps.execute_sql('desc dual').open_reader() as reader:
  2. >>> print(reader.raw)

Resource

資源在 MaxCompute 上常用在UDF和MapReduce中。

列出所有資源還是可以使用 list_resources,判斷資源是否存在使用 exist_resource。 刪除資源時,可以調用 delete_resource,或者直接對於Resource對象調用 drop 方法。

在PyOdps中,主要支持兩種資源類型,一種是文件,另一種是表。下麵分別說明。

文件資源

文件資源包括基礎的 file 類型、以及 pyjararchive

創建文件資源

創建文件資源可以通過給定資源名、文件類型、以及一個file-like的對象(或者是字符串對象)來創建,比如

  1. resource = odps.create_resource('test_file_resource', 'file', file_obj=open('/to/path/file')) # 使用file-like的對象
  2. resource = odps.create_resource('test_py_resource', 'py', file_obj='import this') # 使用字符串

讀取和修改文件資源

對文件資源調用 open 方法,或者在odps入口調用 open_resource 都能打開一個資源, 打開後的對象會是file-like的對象。 類似於Python內置的 open 方法,文件資源也支持打開的模式。我們看例子:

  1. >>> with resource.open('r') as fp: # 以讀模式打開
  2. >>> content = fp.read() # 讀取全部的內容
  3. >>> fp.seek(0) # 回到資源開頭
  4. >>> lines = fp.readlines() # 讀成多行
  5. >>> fp.write('Hello World') # 報錯,讀模式下無法寫資源
  6. >>>
  7. >>> with odps.open_resource('test_file_resource', mode='r+') as fp: # 讀寫模式打開
  8. >>> fp.read()
  9. >>> fp.tell() # 當前位置
  10. >>> fp.seek(10)
  11. >>> fp.truncate() # 截斷後麵的內容
  12. >>> fp.writelines(['Hellon', 'Worldn']) # 寫入多行
  13. >>> fp.write('Hello World')
  14. >>> fp.flush() # 手動調用會將更新提交到ODPS

所有支持的打開類型包括:

  • r,讀模式,隻能打開不能寫
  • w,寫模式,隻能寫入而不能讀文件,注意用寫模式打開,文件內容會被先清空
  • a,追加模式,隻能寫入內容到文件末尾
  • r+,讀寫模式,能任意讀寫內容
  • w+,類似於 r+,但會先清空文件內容
  • a+,類似於 r+,但寫入時隻能寫入文件末尾

同時,PyOdps中,文件資源支持以二進製模式打開,打開如說一些壓縮文件等等就需要以這種模式, 因此 rb 就是指以二進製讀模式打開文件,r+b 是指以二進製讀寫模式打開。

表資源

創建表資源

  1. >>> odps.create_resource('test_table_resource', 'table', table_name='my_table', partition='pt=test')

更新表資源

  1. >>> table_resource = odps.get_resource('test_table_resource')
  2. >>> table_resource.update(partition='pt=test2', project_name='my_project2')

Function

MaxCompute 用戶可以編寫自定義函數用在MaxCompute SQL中。

基本操作

同樣的,可以調用 list_functions 來獲取項目空間下的所有函數,exist_function 能判斷是否存在函數, get_function 能獲取函數。

創建函數

  1. >>> resource = odps.get_resource('my_udf.py')
  2. >>> function = odps.create_function('test_function', class_type='my_udf.Test', resources=[resource, ])

刪除函數

  1. >>> odps.delete_function('test_function')
  2. >>> function.drop() # Function對象存在時直接調用drop

DataFrame

PyOdps提供了DataFrame API,它提供了類似pandas的接口,但是能充分利用 MaxCompute 的計算能力。這裏我們給出 DataFrame 的一個例子,完整的 DataFrame 文檔可參考 這裏

在所有步驟開始前,需要創建 ODPS 對象。

  1. >>> o = ODPS('**your-access-id**', '**your-secret-access-key**',
  2. project='**your-project**', endpoint='**your-end-point**'))

我們拿movielens 100K來做例子。現在我們已經有三張表了,分別是pyodps_ml_100k_movies(電影相關的數據),pyodps_ml_100k_users(用戶相關的數據),pyodps_ml_100k_ratings(評分有關的數據)。

創建一個DataFrame對象十分容易,隻需傳入Table對象即可。

  1. >>> from odps.df import DataFrame
  1. >>> users = DataFrame(o.get_table('pyodps_ml_100k_users'))

我們可以通過dtypes屬性來查看這個DataFrame有哪些字段,分別是什麼類型

  1. >>> users.dtypes

通過head方法,我們能取前N條數據,這讓我們能快速預覽數據。

  1. >>> users.head(10)
  user_id age sex occupation zip_code
0 1 24 M technician 85711
1 2 53 F other 94043
2 3 23 M writer 32067
3 4 24 M technician 43537
4 5 33 F other 15213
5 6 42 M executive 98101
6 7 57 M administrator 91344
7 8 36 M administrator 05201
8 9 29 M student 01002
9 10 53 M lawyer 90703

有時候,我們並不需要都看到所有字段,我們可以從中篩選出一部分。

  1. >>> users[['user_id', 'age']].head(5)
  user_id age
0 1 24
1 2 53
2 3 23
3 4 24
4 5 33

有時候我們隻是排除個別字段。

  1. >>> users.exclude('zip_code', 'age').head(5)
  user_id sex occupation
0 1 M technician
1 2 F other
2 3 M writer
3 4 M technician
4 5 F other

又或者,排除掉一些字段的同時,得通過計算得到一些新的列,比如我想將sex為M的置為True,否則為False,並取名叫sex_bool。

  1. >>> users.select(users.exclude('zip_code', 'sex'), sex_bool=users.sex == 'M').head(5)
  user_id age occupation sex_bool
0 1 24 technician True
1 2 53 other False
2 3 23 writer True
3 4 24 technician True
4 5 33 other False

現在,讓我們看看年齡在20到25歲之間的人有多少個

  1. >>> users.age.between(20, 25).count().rename('count')
  2. 943

接下來,我們看看男女用戶分別有多少。

  1. >>> users.groupby(users.sex).count()
  sex count
0 F 273
1 M 670

用戶按職業劃分,從高到底,人數最多的前10職業是哪些呢?

  1. >>> df = users.groupby('occupation').agg(count=users['occupation'].count())
  2. >>> df.sort(df['count'], ascending=False)[:10]
  occupation count
0 student 196
1 other 105
2 educator 95
3 administrator 79
4 engineer 67
5 programmer 66
6 librarian 51
7 writer 45
8 executive 32
9 scientist 31

DataFrame API提供了value_counts這個方法來快速達到同樣的目的。

  1. >>> users.occupation.value_counts()[:10]
  occupation count
0 student 196
1 other 105
2 educator 95
3 administrator 79
4 engineer 67
5 programmer 66
6 librarian 51
7 writer 45
8 executive 32
9 scientist 31

讓我們用更直觀的圖來看這份數據。

  1. >>> %matplotlib inline

我們可以用個橫向的柱狀圖來可視化

  1. >>> users['occupation'].value_counts().plot(kind='barh', x='occupation', ylabel='prefession')

image

我們將年齡分成30組,來看個年齡分布的直方圖

  1. >>> users.age.hist(bins=30, title="Distribution of users' ages", xlabel='age', ylabel='count of users')

image

好了,現在我們把這三張表聯合起來,隻需要使用join就可以了。join完成後我們把它保存成一張新的表。

  1. >>> movies = DataFrame(o.get_table('pyodps_ml_100k_movies'))
  2. >>> ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))
  3. >>> o.delete_table('pyodps_ml_100k_lens', if_exists=True)
  4. >>> lens = movies.join(ratings).join(users).persist('pyodps_ml_100k_lens')
  5. >>> lens.dtypes
  1. odps.Schema {
  2. movie_id int64
  3. title string
  4. release_date string
  5. video_release_date string
  6. imdb_url string
  7. user_id int64
  8. rating int64
  9. unix_timestamp int64
  10. age int64
  11. sex string
  12. occupation string
  13. zip_code string
  14. }

現在我們把年齡分成從0到80歲,分成8個年齡段,

  1. >>> labels = ['0-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70-79']
  2. >>> cut_lens = lens[lens, lens.age.cut(range(0, 81, 10), right=False, labels=labels).rename('年齡分組')]

我們取分組和年齡唯一的前10條看看。

  1. >>> cut_lens['年齡分組', 'age'].distinct()[:10]
  年齡分組 age
0 0-9 7
1 10-19 10
2 10-19 11
3 10-19 13
4 10-19 14
5 10-19 15
6 10-19 16
7 10-19 17
8 10-19 18
9 10-19 19

最後,我們來看看在各個年齡分組下,用戶的評分總數和評分均值分別是多少。

  1. >>> cut_lens.groupby('年齡分組').agg(cut_lens.rating.count().rename('評分總數'), cut_lens.rating.mean().rename('評分均值'))
  年齡分組 評分均值 評分總數
0 0-9 3.767442 43
1 10-19 3.486126 8181
2 20-29 3.467333 39535
3 30-39 3.554444 25696
4 40-49 3.591772 15021
5 50-59 3.635800 8704
6 60-69 3.648875 2623
7 70-79 3.649746 197

Configuration

配置選項

PyODPS 提供了一係列的配置選項,可通過 odps.options 獲得。下麵列出了可配的 MaxCompute 選項。

通用配置

選項 說明 默認值
access_id ODPS Access ID None
access_key ODPS Access Key None
end_point ODPS Endpoint None
default_project 默認 Project No ne
log_view_host LogView 主機名 Non e
log_view_hours LogView 保持時間(小時) 24
tunnel_endpoint Tunnel Endpoint None
lifecycle 所有表生命周期 None
temp_lifecycle 臨時表生命周期 1
biz_id 用戶 ID None
chunk_size 寫入緩衝區大小 1496
retry_times 請求重試次數 4
connect_timeout 連接超時 5
read_timeout 讀取超時 120

DataFrame 配置

選項 說明 默認值
verbose 是否打印日誌 False
verbose_log 日誌接收器 None
df.analyze 是否啟用非 ODPS 內置函數 True

最後更新:2016-11-23 17:16:08

  上一篇:go 項目空間的權限管理__安全相關語句匯總_安全指南_大數據計算服務-阿裏雲
  下一篇:go 客戶端___工具_大數據計算服務-阿裏雲