158
阿裏雲
Python SDK__SDK_大數據計算服務-阿裏雲
PyODPS 是 MaxCompute 的Python版本的SDK, 它提供了對 MaxCompute 對象的基本操作;並提供了DataFrame框架,能輕鬆在 MaxCompute 上進行數據分析。
安裝
PyOdps支持Python 2.6以上包括Python 3。係統安裝了pip後,隻需運行:
pip install pyodps
PyOdps的相關依賴會自動安裝。
快速開始
首先,我們需要阿裏雲的帳號來初始化一個 MaxCompute 的入口:
from odps import ODPSodps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',endpoint='**your-end-point**')
這樣就已經初始化,就可以對表、資源、函數等進行操作了。
項目空間
項目空間是 MaxCompute 的基本組織單元, 有點類似於Database的概念。
我們通過 get_project 來取到某個項目空間。
project = odps.get_project('my_project') # 取到某個項目project = odps.get_project() # 取到默認項目
如果不提供參數,則取到默認項目空間。
exist_project 方法能檢驗某個項目空間是否存在。
表是 MaxCompute 的數據存儲單元。
表操作
我們可以用 list_tables 來列出項目空間下的所有表。
for table in odps.list_tables():# 處理每個表
通過調用 exist_table 來判斷表是否存在。通過調用 get_table 來獲取表。
>>> t = odps.get_table('dual')>>> t.schemaodps.Schema {c_int_a bigintc_int_b bigintc_double_a doublec_double_b doublec_string_a stringc_string_b stringc_bool_a booleanc_bool_b booleanc_datetime_a datetimec_datetime_b datetime}>>> t.lifecycle-1>>> print(t.creation_time)2014-05-15 14:58:43>>> t.is_virtual_viewFalse>>> t.size1408>>> t.schema.columns[<column c_int_a, type bigint>,<column c_int_b, type bigint>,<column c_double_a, type double>,<column c_double_b, type double>,<column c_string_a, type string>,<column c_string_b, type string>,<column c_bool_a, type boolean>,<column c_bool_b, type boolean>,<column c_datetime_a, type datetime>,<column c_datetime_b, type datetime>]
創建表的Schema
有兩種方法來初始化。第一種方式通過表的列、以及可選的分區來初始化。
>>> from odps.models import Schema, Column, Partition>>> columns = [Column(name='num', type='bigint', comment='the column')]>>> partitions = [Partition(name='pt', type='string', comment='the partition')]>>> schema = Schema(columns=columns, partitions=partitions)>>> schema.columns[<column num, type bigint>, <partition pt, type string>]
第二種方法是使用 Schema.from_lists,這種方法更容易調用,但顯然無法直接設置列和分區的注釋了。
>>> schema = Schema.from_lists(['num'], ['bigint'], ['pt'], ['string'])>>> schema.columns[<column num, type bigint>, <partition pt, type string>]
創建表
現在知道怎麼創建表的Schema,創建一個表就很容易了。
>>> table = odps.create_table('my_new_table', schema)>>> table = odps.create_table('my_new_table', schema, if_not_exists=True) # 隻有不存在表時才創建
其他還可以設置lifecycle等參數。
獲取表數據
有若幹種方法能夠獲取表數據。首先,如果隻是查看每個表的開始的小於1萬條數據,則可以使用 head 方法。
>>> t = odps.get_table('dual')>>> for record in t.head(3):>>> print(record[0]) # 取第0個位置的值>>> print(record['c_double_a']) # 通過字段取值>>> print(record[0: 3]) # 切片操作>>> print(record[0, 2, 3]) # 取多個位置的值>>> print(record['c_int_a', 'c_double_a']) # 通過多個字段取值
其次,在table上可以執行 open_reader 操作來打一個reader來讀取數據。記住這裏需要使用 with表達式。
>>> with t.open_reader(partition='pt=test') as reader:>>> count = reader.count>>> for record in reader[5:10] # 可以執行多次,直到將count數量的record讀完,這裏可以改造成並行操作>>> # 處理一條記錄
最後,可以使用Tunnel API來進行讀取操作,open_reader 操作其實也是對Tunnel API的封裝。
向表寫數據
類似於 open_reader,table對象同樣能執行 open_writer 來打開writer,並寫數據。同樣記住使用 with表達式。
>>> with t.open_writer(partition='pt=test') as writer:>>> writer.write(records) # 這裏records可以是任意可迭代的records,默認寫到block 0>>>>>> with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 這裏同是打開兩個block>>> writer.write(0, gen_records(block=0))>>> writer.write(1, gen_records(block=1)) # 這裏兩個寫操作可以多線程並行,各個block間是獨立的
同樣,向表寫數據也是對Tunnel API的封裝,詳細參考 [數據上傳下載通道] 。
刪除表
>>> odps.delete_table('my_table_name', if_exists=True) # 隻有表存在時刪除>>> t.drop() # Table對象存在的時候可以直接執行drop函數
表分區
基本操作
遍曆表全部分區:
>>> for partition in table.partitions:>>> print(partition.name)>>> for partition in table.iterate_partitions(spec='pt=test'):>>> # 遍曆二級分區
判斷分區是否存在:
>>> table.exist_partition('pt=test,sub=2015')
獲取分區:
>>> partition = table.get_partition('pt=test')>>> print(partition.creation_time)2015-11-18 22:22:27>>> partition.size0
創建分區
>>> t.create_partition('pt=test', if_not_exists=True) # 不存在的時候才創建
刪除分區
>>> t.delete_partition('pt=test', if_exists=True) # 存在的時候才刪除>>> partition.drop() # Partition對象存在的時候直接drop
SQL
PyOdps支持MaxCompute SQL的查詢,並可以讀取執行的結果。
執行SQL
>>> odps.execute_sql('select * from dual') # 同步的方式執行,會阻塞直到SQL執行完成>>> instance = odps.run_sql('select * from dual') # 異步的方式執行>>> instance.wait_for_success() # 阻塞直到完成
讀取SQL執行結果
運行SQL的instance能夠直接執行 open_reader 的操作,一種情況是SQL返回了結構化的數據。
>>> with odps.execute_sql('select * from dual').open_reader() as reader:>>> for record in reader:>>> # 處理每一個record
另一種情況是SQL可能執行的比如 desc,這時通過 reader.raw 屬性取到原始的SQL執行結果。
>>> with odps.execute_sql('desc dual').open_reader() as reader:>>> print(reader.raw)
Resource
資源在 MaxCompute 上常用在UDF和MapReduce中。
列出所有資源還是可以使用 list_resources,判斷資源是否存在使用 exist_resource。 刪除資源時,可以調用 delete_resource,或者直接對於Resource對象調用 drop 方法。
在PyOdps中,主要支持兩種資源類型,一種是文件,另一種是表。下麵分別說明。
文件資源
文件資源包括基礎的 file 類型、以及 py、jar、archive。
創建文件資源
創建文件資源可以通過給定資源名、文件類型、以及一個file-like的對象(或者是字符串對象)來創建,比如
resource = odps.create_resource('test_file_resource', 'file', file_obj=open('/to/path/file')) # 使用file-like的對象resource = odps.create_resource('test_py_resource', 'py', file_obj='import this') # 使用字符串
讀取和修改文件資源
對文件資源調用 open 方法,或者在odps入口調用 open_resource 都能打開一個資源, 打開後的對象會是file-like的對象。 類似於Python內置的 open 方法,文件資源也支持打開的模式。我們看例子:
>>> with resource.open('r') as fp: # 以讀模式打開>>> content = fp.read() # 讀取全部的內容>>> fp.seek(0) # 回到資源開頭>>> lines = fp.readlines() # 讀成多行>>> fp.write('Hello World') # 報錯,讀模式下無法寫資源>>>>>> with odps.open_resource('test_file_resource', mode='r+') as fp: # 讀寫模式打開>>> fp.read()>>> fp.tell() # 當前位置>>> fp.seek(10)>>> fp.truncate() # 截斷後麵的內容>>> fp.writelines(['Hellon', 'Worldn']) # 寫入多行>>> fp.write('Hello World')>>> fp.flush() # 手動調用會將更新提交到ODPS
所有支持的打開類型包括:
r,讀模式,隻能打開不能寫w,寫模式,隻能寫入而不能讀文件,注意用寫模式打開,文件內容會被先清空a,追加模式,隻能寫入內容到文件末尾r+,讀寫模式,能任意讀寫內容w+,類似於r+,但會先清空文件內容a+,類似於r+,但寫入時隻能寫入文件末尾
同時,PyOdps中,文件資源支持以二進製模式打開,打開如說一些壓縮文件等等就需要以這種模式, 因此 rb 就是指以二進製讀模式打開文件,r+b 是指以二進製讀寫模式打開。
表資源
創建表資源
>>> odps.create_resource('test_table_resource', 'table', table_name='my_table', partition='pt=test')
更新表資源
>>> table_resource = odps.get_resource('test_table_resource')>>> table_resource.update(partition='pt=test2', project_name='my_project2')
Function
MaxCompute 用戶可以編寫自定義函數用在MaxCompute SQL中。
基本操作
同樣的,可以調用 list_functions 來獲取項目空間下的所有函數,exist_function 能判斷是否存在函數, get_function 能獲取函數。
創建函數
>>> resource = odps.get_resource('my_udf.py')>>> function = odps.create_function('test_function', class_type='my_udf.Test', resources=[resource, ])
刪除函數
>>> odps.delete_function('test_function')>>> function.drop() # Function對象存在時直接調用drop
DataFrame
PyOdps提供了DataFrame API,它提供了類似pandas的接口,但是能充分利用 MaxCompute 的計算能力。這裏我們給出 DataFrame 的一個例子,完整的 DataFrame 文檔可參考 這裏。
在所有步驟開始前,需要創建 ODPS 對象。
>>> o = ODPS('**your-access-id**', '**your-secret-access-key**',project='**your-project**', endpoint='**your-end-point**'))
我們拿movielens 100K來做例子。現在我們已經有三張表了,分別是pyodps_ml_100k_movies(電影相關的數據),pyodps_ml_100k_users(用戶相關的數據),pyodps_ml_100k_ratings(評分有關的數據)。
創建一個DataFrame對象十分容易,隻需傳入Table對象即可。
>>> from odps.df import DataFrame
>>> users = DataFrame(o.get_table('pyodps_ml_100k_users'))
我們可以通過dtypes屬性來查看這個DataFrame有哪些字段,分別是什麼類型
>>> users.dtypes
通過head方法,我們能取前N條數據,這讓我們能快速預覽數據。
>>> users.head(10)
| user_id | age | sex | occupation | zip_code | |
|---|---|---|---|---|---|
| 0 | 1 | 24 | M | technician | 85711 |
| 1 | 2 | 53 | F | other | 94043 |
| 2 | 3 | 23 | M | writer | 32067 |
| 3 | 4 | 24 | M | technician | 43537 |
| 4 | 5 | 33 | F | other | 15213 |
| 5 | 6 | 42 | M | executive | 98101 |
| 6 | 7 | 57 | M | administrator | 91344 |
| 7 | 8 | 36 | M | administrator | 05201 |
| 8 | 9 | 29 | M | student | 01002 |
| 9 | 10 | 53 | M | lawyer | 90703 |
有時候,我們並不需要都看到所有字段,我們可以從中篩選出一部分。
>>> users[['user_id', 'age']].head(5)
| user_id | age | |
|---|---|---|
| 0 | 1 | 24 |
| 1 | 2 | 53 |
| 2 | 3 | 23 |
| 3 | 4 | 24 |
| 4 | 5 | 33 |
有時候我們隻是排除個別字段。
>>> users.exclude('zip_code', 'age').head(5)
| user_id | sex | occupation | |
|---|---|---|---|
| 0 | 1 | M | technician |
| 1 | 2 | F | other |
| 2 | 3 | M | writer |
| 3 | 4 | M | technician |
| 4 | 5 | F | other |
又或者,排除掉一些字段的同時,得通過計算得到一些新的列,比如我想將sex為M的置為True,否則為False,並取名叫sex_bool。
>>> users.select(users.exclude('zip_code', 'sex'), sex_bool=users.sex == 'M').head(5)
| user_id | age | occupation | sex_bool | |
|---|---|---|---|---|
| 0 | 1 | 24 | technician | True |
| 1 | 2 | 53 | other | False |
| 2 | 3 | 23 | writer | True |
| 3 | 4 | 24 | technician | True |
| 4 | 5 | 33 | other | False |
現在,讓我們看看年齡在20到25歲之間的人有多少個
>>> users.age.between(20, 25).count().rename('count')943
接下來,我們看看男女用戶分別有多少。
>>> users.groupby(users.sex).count()
| sex | count | |
|---|---|---|
| 0 | F | 273 |
| 1 | M | 670 |
用戶按職業劃分,從高到底,人數最多的前10職業是哪些呢?
>>> df = users.groupby('occupation').agg(count=users['occupation'].count())>>> df.sort(df['count'], ascending=False)[:10]
| occupation | count | |
|---|---|---|
| 0 | student | 196 |
| 1 | other | 105 |
| 2 | educator | 95 |
| 3 | administrator | 79 |
| 4 | engineer | 67 |
| 5 | programmer | 66 |
| 6 | librarian | 51 |
| 7 | writer | 45 |
| 8 | executive | 32 |
| 9 | scientist | 31 |
DataFrame API提供了value_counts這個方法來快速達到同樣的目的。
>>> users.occupation.value_counts()[:10]
| occupation | count | |
|---|---|---|
| 0 | student | 196 |
| 1 | other | 105 |
| 2 | educator | 95 |
| 3 | administrator | 79 |
| 4 | engineer | 67 |
| 5 | programmer | 66 |
| 6 | librarian | 51 |
| 7 | writer | 45 |
| 8 | executive | 32 |
| 9 | scientist | 31 |
讓我們用更直觀的圖來看這份數據。
>>> %matplotlib inline
我們可以用個橫向的柱狀圖來可視化
>>> users['occupation'].value_counts().plot(kind='barh', x='occupation', ylabel='prefession')

我們將年齡分成30組,來看個年齡分布的直方圖
>>> users.age.hist(bins=30, title="Distribution of users' ages", xlabel='age', ylabel='count of users')

好了,現在我們把這三張表聯合起來,隻需要使用join就可以了。join完成後我們把它保存成一張新的表。
>>> movies = DataFrame(o.get_table('pyodps_ml_100k_movies'))>>> ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))>>> o.delete_table('pyodps_ml_100k_lens', if_exists=True)>>> lens = movies.join(ratings).join(users).persist('pyodps_ml_100k_lens')>>> lens.dtypes
odps.Schema {movie_id int64title stringrelease_date stringvideo_release_date stringimdb_url stringuser_id int64rating int64unix_timestamp int64age int64sex stringoccupation stringzip_code string}
現在我們把年齡分成從0到80歲,分成8個年齡段,
>>> labels = ['0-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70-79']>>> cut_lens = lens[lens, lens.age.cut(range(0, 81, 10), right=False, labels=labels).rename('年齡分組')]
我們取分組和年齡唯一的前10條看看。
>>> cut_lens['年齡分組', 'age'].distinct()[:10]
| 年齡分組 | age | |
|---|---|---|
| 0 | 0-9 | 7 |
| 1 | 10-19 | 10 |
| 2 | 10-19 | 11 |
| 3 | 10-19 | 13 |
| 4 | 10-19 | 14 |
| 5 | 10-19 | 15 |
| 6 | 10-19 | 16 |
| 7 | 10-19 | 17 |
| 8 | 10-19 | 18 |
| 9 | 10-19 | 19 |
最後,我們來看看在各個年齡分組下,用戶的評分總數和評分均值分別是多少。
>>> cut_lens.groupby('年齡分組').agg(cut_lens.rating.count().rename('評分總數'), cut_lens.rating.mean().rename('評分均值'))
| 年齡分組 | 評分均值 | 評分總數 | |
|---|---|---|---|
| 0 | 0-9 | 3.767442 | 43 |
| 1 | 10-19 | 3.486126 | 8181 |
| 2 | 20-29 | 3.467333 | 39535 |
| 3 | 30-39 | 3.554444 | 25696 |
| 4 | 40-49 | 3.591772 | 15021 |
| 5 | 50-59 | 3.635800 | 8704 |
| 6 | 60-69 | 3.648875 | 2623 |
| 7 | 70-79 | 3.649746 | 197 |
Configuration
配置選項
PyODPS 提供了一係列的配置選項,可通過 odps.options 獲得。下麵列出了可配的 MaxCompute 選項。
通用配置
| 選項 | 說明 | 默認值 |
|---|---|---|
| access_id | ODPS Access ID | None |
| access_key | ODPS Access Key | None |
| end_point | ODPS Endpoint | None |
| default_project | 默認 Project No | ne |
| log_view_host | LogView 主機名 Non | e |
| log_view_hours | LogView 保持時間(小時) | 24 |
| tunnel_endpoint | Tunnel Endpoint | None |
| lifecycle | 所有表生命周期 | None |
| temp_lifecycle | 臨時表生命周期 | 1 |
| biz_id | 用戶 ID | None |
| chunk_size | 寫入緩衝區大小 | 1496 |
| retry_times | 請求重試次數 | 4 |
| connect_timeout | 連接超時 | 5 |
| read_timeout | 讀取超時 | 120 |
DataFrame 配置
| 選項 | 說明 | 默認值 |
|---|---|---|
| verbose | 是否打印日誌 | False |
| verbose_log | 日誌接收器 | None |
| df.analyze | 是否啟用非 ODPS 內置函數 | True |
最後更新:2016-11-23 17:16:08
上一篇:
項目空間的權限管理__安全相關語句匯總_安全指南_大數據計算服務-阿裏雲
下一篇:
客戶端___工具_大數據計算服務-阿裏雲
如何配置ECS支持負載均衡__後端 ECS 服務器常見問題_常見問題_負載均衡-阿裏雲
附錄:SQL Server 2008 R2/2012 功能差異__快速入門(SQL Server)_雲數據庫 RDS 版-阿裏雲
視頻雲服務哪家強?阿裏雲上線299創業版套餐受熱捧
查詢應用加固結果接口__應用加固API_API手冊_移動安全-阿裏雲
釋放實例、關閉自動釋放__實例_用戶指南_雲服務器 ECS-阿裏雲
單個負載均衡如何配置多站點__常見問題_負載均衡-阿裏雲
亮度和對比度__圖片效果_老版圖片服務手冊_對象存儲 OSS-阿裏雲
運行離線任務__應用管理_用戶指南_容器服務-阿裏雲
雲服務器如何通過內網訪問RDS?__MYSQL使用_技術運維問題_雲數據庫 RDS 版-阿裏雲
數據可靠性高達9個9!阿裏雲推出雲數據庫HBase版
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲