158
人物
Python SDK__SDK_大數據計算服務-阿裏雲
PyODPS 是 MaxCompute 的Python版本的SDK, 它提供了對 MaxCompute 對象的基本操作;並提供了DataFrame框架,能輕鬆在 MaxCompute 上進行數據分析。
安裝
PyOdps支持Python 2.6以上包括Python 3。係統安裝了pip後,隻需運行:
pip install pyodps
PyOdps的相關依賴會自動安裝。
快速開始
首先,我們需要阿裏雲的帳號來初始化一個 MaxCompute 的入口:
from odps import ODPS
odps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',
endpoint='**your-end-point**')
這樣就已經初始化,就可以對表、資源、函數等進行操作了。
項目空間
項目空間是 MaxCompute 的基本組織單元, 有點類似於Database的概念。
我們通過 get_project
來取到某個項目空間。
project = odps.get_project('my_project') # 取到某個項目
project = odps.get_project() # 取到默認項目
如果不提供參數,則取到默認項目空間。
exist_project
方法能檢驗某個項目空間是否存在。
表是 MaxCompute 的數據存儲單元。
表操作
我們可以用 list_tables
來列出項目空間下的所有表。
for table in odps.list_tables():
# 處理每個表
通過調用 exist_table
來判斷表是否存在。通過調用 get_table
來獲取表。
>>> t = odps.get_table('dual')
>>> t.schema
odps.Schema {
c_int_a bigint
c_int_b bigint
c_double_a double
c_double_b double
c_string_a string
c_string_b string
c_bool_a boolean
c_bool_b boolean
c_datetime_a datetime
c_datetime_b datetime
}
>>> t.lifecycle
-1
>>> print(t.creation_time)
2014-05-15 14:58:43
>>> t.is_virtual_view
False
>>> t.size
1408
>>> t.schema.columns
[<column c_int_a, type bigint>,
<column c_int_b, type bigint>,
<column c_double_a, type double>,
<column c_double_b, type double>,
<column c_string_a, type string>,
<column c_string_b, type string>,
<column c_bool_a, type boolean>,
<column c_bool_b, type boolean>,
<column c_datetime_a, type datetime>,
<column c_datetime_b, type datetime>]
創建表的Schema
有兩種方法來初始化。第一種方式通過表的列、以及可選的分區來初始化。
>>> from odps.models import Schema, Column, Partition
>>> columns = [Column(name='num', type='bigint', comment='the column')]
>>> partitions = [Partition(name='pt', type='string', comment='the partition')]
>>> schema = Schema(columns=columns, partitions=partitions)
>>> schema.columns
[<column num, type bigint>, <partition pt, type string>]
第二種方法是使用 Schema.from_lists
,這種方法更容易調用,但顯然無法直接設置列和分區的注釋了。
>>> schema = Schema.from_lists(['num'], ['bigint'], ['pt'], ['string'])
>>> schema.columns
[<column num, type bigint>, <partition pt, type string>]
創建表
現在知道怎麼創建表的Schema,創建一個表就很容易了。
>>> table = odps.create_table('my_new_table', schema)
>>> table = odps.create_table('my_new_table', schema, if_not_exists=True) # 隻有不存在表時才創建
其他還可以設置lifecycle等參數。
獲取表數據
有若幹種方法能夠獲取表數據。首先,如果隻是查看每個表的開始的小於1萬條數據,則可以使用 head
方法。
>>> t = odps.get_table('dual')
>>> for record in t.head(3):
>>> print(record[0]) # 取第0個位置的值
>>> print(record['c_double_a']) # 通過字段取值
>>> print(record[0: 3]) # 切片操作
>>> print(record[0, 2, 3]) # 取多個位置的值
>>> print(record['c_int_a', 'c_double_a']) # 通過多個字段取值
其次,在table上可以執行 open_reader
操作來打一個reader來讀取數據。記住這裏需要使用 with表達式。
>>> with t.open_reader(partition='pt=test') as reader:
>>> count = reader.count
>>> for record in reader[5:10] # 可以執行多次,直到將count數量的record讀完,這裏可以改造成並行操作
>>> # 處理一條記錄
最後,可以使用Tunnel API來進行讀取操作,open_reader
操作其實也是對Tunnel API的封裝。
向表寫數據
類似於 open_reader
,table對象同樣能執行 open_writer
來打開writer,並寫數據。同樣記住使用 with表達式。
>>> with t.open_writer(partition='pt=test') as writer:
>>> writer.write(records) # 這裏records可以是任意可迭代的records,默認寫到block 0
>>>
>>> with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 這裏同是打開兩個block
>>> writer.write(0, gen_records(block=0))
>>> writer.write(1, gen_records(block=1)) # 這裏兩個寫操作可以多線程並行,各個block間是獨立的
同樣,向表寫數據也是對Tunnel API的封裝,詳細參考 [數據上傳下載通道] 。
刪除表
>>> odps.delete_table('my_table_name', if_exists=True) # 隻有表存在時刪除
>>> t.drop() # Table對象存在的時候可以直接執行drop函數
表分區
基本操作
遍曆表全部分區:
>>> for partition in table.partitions:
>>> print(partition.name)
>>> for partition in table.iterate_partitions(spec='pt=test'):
>>> # 遍曆二級分區
判斷分區是否存在:
>>> table.exist_partition('pt=test,sub=2015')
獲取分區:
>>> partition = table.get_partition('pt=test')
>>> print(partition.creation_time)
2015-11-18 22:22:27
>>> partition.size
0
創建分區
>>> t.create_partition('pt=test', if_not_exists=True) # 不存在的時候才創建
刪除分區
>>> t.delete_partition('pt=test', if_exists=True) # 存在的時候才刪除
>>> partition.drop() # Partition對象存在的時候直接drop
SQL
PyOdps支持MaxCompute SQL的查詢,並可以讀取執行的結果。
執行SQL
>>> odps.execute_sql('select * from dual') # 同步的方式執行,會阻塞直到SQL執行完成
>>> instance = odps.run_sql('select * from dual') # 異步的方式執行
>>> instance.wait_for_success() # 阻塞直到完成
讀取SQL執行結果
運行SQL的instance能夠直接執行 open_reader
的操作,一種情況是SQL返回了結構化的數據。
>>> with odps.execute_sql('select * from dual').open_reader() as reader:
>>> for record in reader:
>>> # 處理每一個record
另一種情況是SQL可能執行的比如 desc
,這時通過 reader.raw
屬性取到原始的SQL執行結果。
>>> with odps.execute_sql('desc dual').open_reader() as reader:
>>> print(reader.raw)
Resource
資源在 MaxCompute 上常用在UDF和MapReduce中。
列出所有資源還是可以使用 list_resources
,判斷資源是否存在使用 exist_resource
。 刪除資源時,可以調用 delete_resource
,或者直接對於Resource對象調用 drop
方法。
在PyOdps中,主要支持兩種資源類型,一種是文件,另一種是表。下麵分別說明。
文件資源
文件資源包括基礎的 file
類型、以及 py
、jar
、archive
。
創建文件資源
創建文件資源可以通過給定資源名、文件類型、以及一個file-like的對象(或者是字符串對象)來創建,比如
resource = odps.create_resource('test_file_resource', 'file', file_obj=open('/to/path/file')) # 使用file-like的對象
resource = odps.create_resource('test_py_resource', 'py', file_obj='import this') # 使用字符串
讀取和修改文件資源
對文件資源調用 open
方法,或者在odps入口調用 open_resource
都能打開一個資源, 打開後的對象會是file-like的對象。 類似於Python內置的 open
方法,文件資源也支持打開的模式。我們看例子:
>>> with resource.open('r') as fp: # 以讀模式打開
>>> content = fp.read() # 讀取全部的內容
>>> fp.seek(0) # 回到資源開頭
>>> lines = fp.readlines() # 讀成多行
>>> fp.write('Hello World') # 報錯,讀模式下無法寫資源
>>>
>>> with odps.open_resource('test_file_resource', mode='r+') as fp: # 讀寫模式打開
>>> fp.read()
>>> fp.tell() # 當前位置
>>> fp.seek(10)
>>> fp.truncate() # 截斷後麵的內容
>>> fp.writelines(['Hellon', 'Worldn']) # 寫入多行
>>> fp.write('Hello World')
>>> fp.flush() # 手動調用會將更新提交到ODPS
所有支持的打開類型包括:
r
,讀模式,隻能打開不能寫w
,寫模式,隻能寫入而不能讀文件,注意用寫模式打開,文件內容會被先清空a
,追加模式,隻能寫入內容到文件末尾r+
,讀寫模式,能任意讀寫內容w+
,類似於r+
,但會先清空文件內容a+
,類似於r+
,但寫入時隻能寫入文件末尾
同時,PyOdps中,文件資源支持以二進製模式打開,打開如說一些壓縮文件等等就需要以這種模式, 因此 rb
就是指以二進製讀模式打開文件,r+b
是指以二進製讀寫模式打開。
表資源
創建表資源
>>> odps.create_resource('test_table_resource', 'table', table_name='my_table', partition='pt=test')
更新表資源
>>> table_resource = odps.get_resource('test_table_resource')
>>> table_resource.update(partition='pt=test2', project_name='my_project2')
Function
MaxCompute 用戶可以編寫自定義函數用在MaxCompute SQL中。
基本操作
同樣的,可以調用 list_functions
來獲取項目空間下的所有函數,exist_function
能判斷是否存在函數, get_function
能獲取函數。
創建函數
>>> resource = odps.get_resource('my_udf.py')
>>> function = odps.create_function('test_function', class_type='my_udf.Test', resources=[resource, ])
刪除函數
>>> odps.delete_function('test_function')
>>> function.drop() # Function對象存在時直接調用drop
DataFrame
PyOdps提供了DataFrame API,它提供了類似pandas的接口,但是能充分利用 MaxCompute 的計算能力。這裏我們給出 DataFrame 的一個例子,完整的 DataFrame 文檔可參考 這裏。
在所有步驟開始前,需要創建 ODPS 對象。
>>> o = ODPS('**your-access-id**', '**your-secret-access-key**',
project='**your-project**', endpoint='**your-end-point**'))
我們拿movielens 100K來做例子。現在我們已經有三張表了,分別是pyodps_ml_100k_movies
(電影相關的數據),pyodps_ml_100k_users
(用戶相關的數據),pyodps_ml_100k_ratings
(評分有關的數據)。
創建一個DataFrame對象十分容易,隻需傳入Table對象即可。
>>> from odps.df import DataFrame
>>> users = DataFrame(o.get_table('pyodps_ml_100k_users'))
我們可以通過dtypes屬性來查看這個DataFrame有哪些字段,分別是什麼類型
>>> users.dtypes
通過head方法,我們能取前N條數據,這讓我們能快速預覽數據。
>>> users.head(10)
user_id | age | sex | occupation | zip_code | |
---|---|---|---|---|---|
0 | 1 | 24 | M | technician | 85711 |
1 | 2 | 53 | F | other | 94043 |
2 | 3 | 23 | M | writer | 32067 |
3 | 4 | 24 | M | technician | 43537 |
4 | 5 | 33 | F | other | 15213 |
5 | 6 | 42 | M | executive | 98101 |
6 | 7 | 57 | M | administrator | 91344 |
7 | 8 | 36 | M | administrator | 05201 |
8 | 9 | 29 | M | student | 01002 |
9 | 10 | 53 | M | lawyer | 90703 |
有時候,我們並不需要都看到所有字段,我們可以從中篩選出一部分。
>>> users[['user_id', 'age']].head(5)
user_id | age | |
---|---|---|
0 | 1 | 24 |
1 | 2 | 53 |
2 | 3 | 23 |
3 | 4 | 24 |
4 | 5 | 33 |
有時候我們隻是排除個別字段。
>>> users.exclude('zip_code', 'age').head(5)
user_id | sex | occupation | |
---|---|---|---|
0 | 1 | M | technician |
1 | 2 | F | other |
2 | 3 | M | writer |
3 | 4 | M | technician |
4 | 5 | F | other |
又或者,排除掉一些字段的同時,得通過計算得到一些新的列,比如我想將sex為M的置為True,否則為False,並取名叫sex_bool。
>>> users.select(users.exclude('zip_code', 'sex'), sex_bool=users.sex == 'M').head(5)
user_id | age | occupation | sex_bool | |
---|---|---|---|---|
0 | 1 | 24 | technician | True |
1 | 2 | 53 | other | False |
2 | 3 | 23 | writer | True |
3 | 4 | 24 | technician | True |
4 | 5 | 33 | other | False |
現在,讓我們看看年齡在20到25歲之間的人有多少個
>>> users.age.between(20, 25).count().rename('count')
943
接下來,我們看看男女用戶分別有多少。
>>> users.groupby(users.sex).count()
sex | count | |
---|---|---|
0 | F | 273 |
1 | M | 670 |
用戶按職業劃分,從高到底,人數最多的前10職業是哪些呢?
>>> df = users.groupby('occupation').agg(count=users['occupation'].count())
>>> df.sort(df['count'], ascending=False)[:10]
occupation | count | |
---|---|---|
0 | student | 196 |
1 | other | 105 |
2 | educator | 95 |
3 | administrator | 79 |
4 | engineer | 67 |
5 | programmer | 66 |
6 | librarian | 51 |
7 | writer | 45 |
8 | executive | 32 |
9 | scientist | 31 |
DataFrame API提供了value_counts這個方法來快速達到同樣的目的。
>>> users.occupation.value_counts()[:10]
occupation | count | |
---|---|---|
0 | student | 196 |
1 | other | 105 |
2 | educator | 95 |
3 | administrator | 79 |
4 | engineer | 67 |
5 | programmer | 66 |
6 | librarian | 51 |
7 | writer | 45 |
8 | executive | 32 |
9 | scientist | 31 |
讓我們用更直觀的圖來看這份數據。
>>> %matplotlib inline
我們可以用個橫向的柱狀圖來可視化
>>> users['occupation'].value_counts().plot(kind='barh', x='occupation', ylabel='prefession')
我們將年齡分成30組,來看個年齡分布的直方圖
>>> users.age.hist(bins=30, title="Distribution of users' ages", xlabel='age', ylabel='count of users')
好了,現在我們把這三張表聯合起來,隻需要使用join就可以了。join完成後我們把它保存成一張新的表。
>>> movies = DataFrame(o.get_table('pyodps_ml_100k_movies'))
>>> ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))
>>> o.delete_table('pyodps_ml_100k_lens', if_exists=True)
>>> lens = movies.join(ratings).join(users).persist('pyodps_ml_100k_lens')
>>> lens.dtypes
odps.Schema {
movie_id int64
title string
release_date string
video_release_date string
imdb_url string
user_id int64
rating int64
unix_timestamp int64
age int64
sex string
occupation string
zip_code string
}
現在我們把年齡分成從0到80歲,分成8個年齡段,
>>> labels = ['0-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70-79']
>>> cut_lens = lens[lens, lens.age.cut(range(0, 81, 10), right=False, labels=labels).rename('年齡分組')]
我們取分組和年齡唯一的前10條看看。
>>> cut_lens['年齡分組', 'age'].distinct()[:10]
年齡分組 | age | |
---|---|---|
0 | 0-9 | 7 |
1 | 10-19 | 10 |
2 | 10-19 | 11 |
3 | 10-19 | 13 |
4 | 10-19 | 14 |
5 | 10-19 | 15 |
6 | 10-19 | 16 |
7 | 10-19 | 17 |
8 | 10-19 | 18 |
9 | 10-19 | 19 |
最後,我們來看看在各個年齡分組下,用戶的評分總數和評分均值分別是多少。
>>> cut_lens.groupby('年齡分組').agg(cut_lens.rating.count().rename('評分總數'), cut_lens.rating.mean().rename('評分均值'))
年齡分組 | 評分均值 | 評分總數 | |
---|---|---|---|
0 | 0-9 | 3.767442 | 43 |
1 | 10-19 | 3.486126 | 8181 |
2 | 20-29 | 3.467333 | 39535 |
3 | 30-39 | 3.554444 | 25696 |
4 | 40-49 | 3.591772 | 15021 |
5 | 50-59 | 3.635800 | 8704 |
6 | 60-69 | 3.648875 | 2623 |
7 | 70-79 | 3.649746 | 197 |
Configuration
配置選項
PyODPS 提供了一係列的配置選項,可通過 odps.options
獲得。下麵列出了可配的 MaxCompute 選項。
通用配置
選項 | 說明 | 默認值 |
---|---|---|
access_id | ODPS Access ID | None |
access_key | ODPS Access Key | None |
end_point | ODPS Endpoint | None |
default_project | 默認 Project No | ne |
log_view_host | LogView 主機名 Non | e |
log_view_hours | LogView 保持時間(小時) | 24 |
tunnel_endpoint | Tunnel Endpoint | None |
lifecycle | 所有表生命周期 | None |
temp_lifecycle | 臨時表生命周期 | 1 |
biz_id | 用戶 ID | None |
chunk_size | 寫入緩衝區大小 | 1496 |
retry_times | 請求重試次數 | 4 |
connect_timeout | 連接超時 | 5 |
read_timeout | 讀取超時 | 120 |
DataFrame 配置
選項 | 說明 | 默認值 |
---|---|---|
verbose | 是否打印日誌 | False |
verbose_log | 日誌接收器 | None |
df.analyze | 是否啟用非 ODPS 內置函數 | True |
最後更新:2016-11-23 17:16:08
上一篇:
項目空間的權限管理__安全相關語句匯總_安全指南_大數據計算服務-阿裏雲
下一篇:
客戶端___工具_大數據計算服務-阿裏雲
如何配置ECS支持負載均衡__後端 ECS 服務器常見問題_常見問題_負載均衡-阿裏雲
附錄:SQL Server 2008 R2/2012 功能差異__快速入門(SQL Server)_雲數據庫 RDS 版-阿裏雲
視頻雲服務哪家強?阿裏雲上線299創業版套餐受熱捧
查詢應用加固結果接口__應用加固API_API手冊_移動安全-阿裏雲
釋放實例、關閉自動釋放__實例_用戶指南_雲服務器 ECS-阿裏雲
單個負載均衡如何配置多站點__常見問題_負載均衡-阿裏雲
亮度和對比度__圖片效果_老版圖片服務手冊_對象存儲 OSS-阿裏雲
運行離線任務__應用管理_用戶指南_容器服務-阿裏雲
雲服務器如何通過內網訪問RDS?__MYSQL使用_技術運維問題_雲數據庫 RDS 版-阿裏雲
數據可靠性高達9個9!阿裏雲推出雲數據庫HBase版
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲