158
群英
Python SDK__SDK_大数据计算服务-阿里云
PyODPS 是 MaxCompute 的Python版本的SDK, 它提供了对 MaxCompute 对象的基本操作;并提供了DataFrame框架,能轻松在 MaxCompute 上进行数据分析。
安装
PyOdps支持Python 2.6以上包括Python 3。系统安装了pip后,只需运行:
pip install pyodps
PyOdps的相关依赖会自动安装。
快速开始
首先,我们需要阿里云的帐号来初始化一个 MaxCompute 的入口:
from odps import ODPS
odps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',
endpoint='**your-end-point**')
这样就已经初始化,就可以对表、资源、函数等进行操作了。
项目空间
项目空间是 MaxCompute 的基本组织单元, 有点类似于Database的概念。
我们通过 get_project
来取到某个项目空间。
project = odps.get_project('my_project') # 取到某个项目
project = odps.get_project() # 取到默认项目
如果不提供参数,则取到默认项目空间。
exist_project
方法能检验某个项目空间是否存在。
表是 MaxCompute 的数据存储单元。
表操作
我们可以用 list_tables
来列出项目空间下的所有表。
for table in odps.list_tables():
# 处理每个表
通过调用 exist_table
来判断表是否存在。通过调用 get_table
来获取表。
>>> t = odps.get_table('dual')
>>> t.schema
odps.Schema {
c_int_a bigint
c_int_b bigint
c_double_a double
c_double_b double
c_string_a string
c_string_b string
c_bool_a boolean
c_bool_b boolean
c_datetime_a datetime
c_datetime_b datetime
}
>>> t.lifecycle
-1
>>> print(t.creation_time)
2014-05-15 14:58:43
>>> t.is_virtual_view
False
>>> t.size
1408
>>> t.schema.columns
[<column c_int_a, type bigint>,
<column c_int_b, type bigint>,
<column c_double_a, type double>,
<column c_double_b, type double>,
<column c_string_a, type string>,
<column c_string_b, type string>,
<column c_bool_a, type boolean>,
<column c_bool_b, type boolean>,
<column c_datetime_a, type datetime>,
<column c_datetime_b, type datetime>]
创建表的Schema
有两种方法来初始化。第一种方式通过表的列、以及可选的分区来初始化。
>>> from odps.models import Schema, Column, Partition
>>> columns = [Column(name='num', type='bigint', comment='the column')]
>>> partitions = [Partition(name='pt', type='string', comment='the partition')]
>>> schema = Schema(columns=columns, partitions=partitions)
>>> schema.columns
[<column num, type bigint>, <partition pt, type string>]
第二种方法是使用 Schema.from_lists
,这种方法更容易调用,但显然无法直接设置列和分区的注释了。
>>> schema = Schema.from_lists(['num'], ['bigint'], ['pt'], ['string'])
>>> schema.columns
[<column num, type bigint>, <partition pt, type string>]
创建表
现在知道怎么创建表的Schema,创建一个表就很容易了。
>>> table = odps.create_table('my_new_table', schema)
>>> table = odps.create_table('my_new_table', schema, if_not_exists=True) # 只有不存在表时才创建
其他还可以设置lifecycle等参数。
获取表数据
有若干种方法能够获取表数据。首先,如果只是查看每个表的开始的小于1万条数据,则可以使用 head
方法。
>>> t = odps.get_table('dual')
>>> for record in t.head(3):
>>> print(record[0]) # 取第0个位置的值
>>> print(record['c_double_a']) # 通过字段取值
>>> print(record[0: 3]) # 切片操作
>>> print(record[0, 2, 3]) # 取多个位置的值
>>> print(record['c_int_a', 'c_double_a']) # 通过多个字段取值
其次,在table上可以执行 open_reader
操作来打一个reader来读取数据。记住这里需要使用 with表达式。
>>> with t.open_reader(partition='pt=test') as reader:
>>> count = reader.count
>>> for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
>>> # 处理一条记录
最后,可以使用Tunnel API来进行读取操作,open_reader
操作其实也是对Tunnel API的封装。
向表写数据
类似于 open_reader
,table对象同样能执行 open_writer
来打开writer,并写数据。同样记住使用 with表达式。
>>> with t.open_writer(partition='pt=test') as writer:
>>> writer.write(records) # 这里records可以是任意可迭代的records,默认写到block 0
>>>
>>> with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 这里同是打开两个block
>>> writer.write(0, gen_records(block=0))
>>> writer.write(1, gen_records(block=1)) # 这里两个写操作可以多线程并行,各个block间是独立的
同样,向表写数据也是对Tunnel API的封装,详细参考 [数据上传下载通道] 。
删除表
>>> odps.delete_table('my_table_name', if_exists=True) # 只有表存在时删除
>>> t.drop() # Table对象存在的时候可以直接执行drop函数
表分区
基本操作
遍历表全部分区:
>>> for partition in table.partitions:
>>> print(partition.name)
>>> for partition in table.iterate_partitions(spec='pt=test'):
>>> # 遍历二级分区
判断分区是否存在:
>>> table.exist_partition('pt=test,sub=2015')
获取分区:
>>> partition = table.get_partition('pt=test')
>>> print(partition.creation_time)
2015-11-18 22:22:27
>>> partition.size
0
创建分区
>>> t.create_partition('pt=test', if_not_exists=True) # 不存在的时候才创建
删除分区
>>> t.delete_partition('pt=test', if_exists=True) # 存在的时候才删除
>>> partition.drop() # Partition对象存在的时候直接drop
SQL
PyOdps支持MaxCompute SQL的查询,并可以读取执行的结果。
执行SQL
>>> odps.execute_sql('select * from dual') # 同步的方式执行,会阻塞直到SQL执行完成
>>> instance = odps.run_sql('select * from dual') # 异步的方式执行
>>> instance.wait_for_success() # 阻塞直到完成
读取SQL执行结果
运行SQL的instance能够直接执行 open_reader
的操作,一种情况是SQL返回了结构化的数据。
>>> with odps.execute_sql('select * from dual').open_reader() as reader:
>>> for record in reader:
>>> # 处理每一个record
另一种情况是SQL可能执行的比如 desc
,这时通过 reader.raw
属性取到原始的SQL执行结果。
>>> with odps.execute_sql('desc dual').open_reader() as reader:
>>> print(reader.raw)
Resource
资源在 MaxCompute 上常用在UDF和MapReduce中。
列出所有资源还是可以使用 list_resources
,判断资源是否存在使用 exist_resource
。 删除资源时,可以调用 delete_resource
,或者直接对于Resource对象调用 drop
方法。
在PyOdps中,主要支持两种资源类型,一种是文件,另一种是表。下面分别说明。
文件资源
文件资源包括基础的 file
类型、以及 py
、jar
、archive
。
创建文件资源
创建文件资源可以通过给定资源名、文件类型、以及一个file-like的对象(或者是字符串对象)来创建,比如
resource = odps.create_resource('test_file_resource', 'file', file_obj=open('/to/path/file')) # 使用file-like的对象
resource = odps.create_resource('test_py_resource', 'py', file_obj='import this') # 使用字符串
读取和修改文件资源
对文件资源调用 open
方法,或者在odps入口调用 open_resource
都能打开一个资源, 打开后的对象会是file-like的对象。 类似于Python内置的 open
方法,文件资源也支持打开的模式。我们看例子:
>>> with resource.open('r') as fp: # 以读模式打开
>>> content = fp.read() # 读取全部的内容
>>> fp.seek(0) # 回到资源开头
>>> lines = fp.readlines() # 读成多行
>>> fp.write('Hello World') # 报错,读模式下无法写资源
>>>
>>> with odps.open_resource('test_file_resource', mode='r+') as fp: # 读写模式打开
>>> fp.read()
>>> fp.tell() # 当前位置
>>> fp.seek(10)
>>> fp.truncate() # 截断后面的内容
>>> fp.writelines(['Hellon', 'Worldn']) # 写入多行
>>> fp.write('Hello World')
>>> fp.flush() # 手动调用会将更新提交到ODPS
所有支持的打开类型包括:
r
,读模式,只能打开不能写w
,写模式,只能写入而不能读文件,注意用写模式打开,文件内容会被先清空a
,追加模式,只能写入内容到文件末尾r+
,读写模式,能任意读写内容w+
,类似于r+
,但会先清空文件内容a+
,类似于r+
,但写入时只能写入文件末尾
同时,PyOdps中,文件资源支持以二进制模式打开,打开如说一些压缩文件等等就需要以这种模式, 因此 rb
就是指以二进制读模式打开文件,r+b
是指以二进制读写模式打开。
表资源
创建表资源
>>> odps.create_resource('test_table_resource', 'table', table_name='my_table', partition='pt=test')
更新表资源
>>> table_resource = odps.get_resource('test_table_resource')
>>> table_resource.update(partition='pt=test2', project_name='my_project2')
Function
MaxCompute 用户可以编写自定义函数用在MaxCompute SQL中。
基本操作
同样的,可以调用 list_functions
来获取项目空间下的所有函数,exist_function
能判断是否存在函数, get_function
能获取函数。
创建函数
>>> resource = odps.get_resource('my_udf.py')
>>> function = odps.create_function('test_function', class_type='my_udf.Test', resources=[resource, ])
删除函数
>>> odps.delete_function('test_function')
>>> function.drop() # Function对象存在时直接调用drop
DataFrame
PyOdps提供了DataFrame API,它提供了类似pandas的接口,但是能充分利用 MaxCompute 的计算能力。这里我们给出 DataFrame 的一个例子,完整的 DataFrame 文档可参考 这里。
在所有步骤开始前,需要创建 ODPS 对象。
>>> o = ODPS('**your-access-id**', '**your-secret-access-key**',
project='**your-project**', endpoint='**your-end-point**'))
我们拿movielens 100K来做例子。现在我们已经有三张表了,分别是pyodps_ml_100k_movies
(电影相关的数据),pyodps_ml_100k_users
(用户相关的数据),pyodps_ml_100k_ratings
(评分有关的数据)。
创建一个DataFrame对象十分容易,只需传入Table对象即可。
>>> from odps.df import DataFrame
>>> users = DataFrame(o.get_table('pyodps_ml_100k_users'))
我们可以通过dtypes属性来查看这个DataFrame有哪些字段,分别是什么类型
>>> users.dtypes
通过head方法,我们能取前N条数据,这让我们能快速预览数据。
>>> users.head(10)
user_id | age | sex | occupation | zip_code | |
---|---|---|---|---|---|
0 | 1 | 24 | M | technician | 85711 |
1 | 2 | 53 | F | other | 94043 |
2 | 3 | 23 | M | writer | 32067 |
3 | 4 | 24 | M | technician | 43537 |
4 | 5 | 33 | F | other | 15213 |
5 | 6 | 42 | M | executive | 98101 |
6 | 7 | 57 | M | administrator | 91344 |
7 | 8 | 36 | M | administrator | 05201 |
8 | 9 | 29 | M | student | 01002 |
9 | 10 | 53 | M | lawyer | 90703 |
有时候,我们并不需要都看到所有字段,我们可以从中筛选出一部分。
>>> users[['user_id', 'age']].head(5)
user_id | age | |
---|---|---|
0 | 1 | 24 |
1 | 2 | 53 |
2 | 3 | 23 |
3 | 4 | 24 |
4 | 5 | 33 |
有时候我们只是排除个别字段。
>>> users.exclude('zip_code', 'age').head(5)
user_id | sex | occupation | |
---|---|---|---|
0 | 1 | M | technician |
1 | 2 | F | other |
2 | 3 | M | writer |
3 | 4 | M | technician |
4 | 5 | F | other |
又或者,排除掉一些字段的同时,得通过计算得到一些新的列,比如我想将sex为M的置为True,否则为False,并取名叫sex_bool。
>>> users.select(users.exclude('zip_code', 'sex'), sex_bool=users.sex == 'M').head(5)
user_id | age | occupation | sex_bool | |
---|---|---|---|---|
0 | 1 | 24 | technician | True |
1 | 2 | 53 | other | False |
2 | 3 | 23 | writer | True |
3 | 4 | 24 | technician | True |
4 | 5 | 33 | other | False |
现在,让我们看看年龄在20到25岁之间的人有多少个
>>> users.age.between(20, 25).count().rename('count')
943
接下来,我们看看男女用户分别有多少。
>>> users.groupby(users.sex).count()
sex | count | |
---|---|---|
0 | F | 273 |
1 | M | 670 |
用户按职业划分,从高到底,人数最多的前10职业是哪些呢?
>>> df = users.groupby('occupation').agg(count=users['occupation'].count())
>>> df.sort(df['count'], ascending=False)[:10]
occupation | count | |
---|---|---|
0 | student | 196 |
1 | other | 105 |
2 | educator | 95 |
3 | administrator | 79 |
4 | engineer | 67 |
5 | programmer | 66 |
6 | librarian | 51 |
7 | writer | 45 |
8 | executive | 32 |
9 | scientist | 31 |
DataFrame API提供了value_counts这个方法来快速达到同样的目的。
>>> users.occupation.value_counts()[:10]
occupation | count | |
---|---|---|
0 | student | 196 |
1 | other | 105 |
2 | educator | 95 |
3 | administrator | 79 |
4 | engineer | 67 |
5 | programmer | 66 |
6 | librarian | 51 |
7 | writer | 45 |
8 | executive | 32 |
9 | scientist | 31 |
让我们用更直观的图来看这份数据。
>>> %matplotlib inline
我们可以用个横向的柱状图来可视化
>>> users['occupation'].value_counts().plot(kind='barh', x='occupation', ylabel='prefession')
我们将年龄分成30组,来看个年龄分布的直方图
>>> users.age.hist(bins=30, title="Distribution of users' ages", xlabel='age', ylabel='count of users')
好了,现在我们把这三张表联合起来,只需要使用join就可以了。join完成后我们把它保存成一张新的表。
>>> movies = DataFrame(o.get_table('pyodps_ml_100k_movies'))
>>> ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))
>>> o.delete_table('pyodps_ml_100k_lens', if_exists=True)
>>> lens = movies.join(ratings).join(users).persist('pyodps_ml_100k_lens')
>>> lens.dtypes
odps.Schema {
movie_id int64
title string
release_date string
video_release_date string
imdb_url string
user_id int64
rating int64
unix_timestamp int64
age int64
sex string
occupation string
zip_code string
}
现在我们把年龄分成从0到80岁,分成8个年龄段,
>>> labels = ['0-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70-79']
>>> cut_lens = lens[lens, lens.age.cut(range(0, 81, 10), right=False, labels=labels).rename('年龄分组')]
我们取分组和年龄唯一的前10条看看。
>>> cut_lens['年龄分组', 'age'].distinct()[:10]
年龄分组 | age | |
---|---|---|
0 | 0-9 | 7 |
1 | 10-19 | 10 |
2 | 10-19 | 11 |
3 | 10-19 | 13 |
4 | 10-19 | 14 |
5 | 10-19 | 15 |
6 | 10-19 | 16 |
7 | 10-19 | 17 |
8 | 10-19 | 18 |
9 | 10-19 | 19 |
最后,我们来看看在各个年龄分组下,用户的评分总数和评分均值分别是多少。
>>> cut_lens.groupby('年龄分组').agg(cut_lens.rating.count().rename('评分总数'), cut_lens.rating.mean().rename('评分均值'))
年龄分组 | 评分均值 | 评分总数 | |
---|---|---|---|
0 | 0-9 | 3.767442 | 43 |
1 | 10-19 | 3.486126 | 8181 |
2 | 20-29 | 3.467333 | 39535 |
3 | 30-39 | 3.554444 | 25696 |
4 | 40-49 | 3.591772 | 15021 |
5 | 50-59 | 3.635800 | 8704 |
6 | 60-69 | 3.648875 | 2623 |
7 | 70-79 | 3.649746 | 197 |
Configuration
配置选项
PyODPS 提供了一系列的配置选项,可通过 odps.options
获得。下面列出了可配的 MaxCompute 选项。
通用配置
选项 | 说明 | 默认值 |
---|---|---|
access_id | ODPS Access ID | None |
access_key | ODPS Access Key | None |
end_point | ODPS Endpoint | None |
default_project | 默认 Project No | ne |
log_view_host | LogView 主机名 Non | e |
log_view_hours | LogView 保持时间(小时) | 24 |
tunnel_endpoint | Tunnel Endpoint | None |
lifecycle | 所有表生命周期 | None |
temp_lifecycle | 临时表生命周期 | 1 |
biz_id | 用户 ID | None |
chunk_size | 写入缓冲区大小 | 1496 |
retry_times | 请求重试次数 | 4 |
connect_timeout | 连接超时 | 5 |
read_timeout | 读取超时 | 120 |
DataFrame 配置
选项 | 说明 | 默认值 |
---|---|---|
verbose | 是否打印日志 | False |
verbose_log | 日志接收器 | None |
df.analyze | 是否启用非 ODPS 内置函数 | True |
最后更新:2016-11-23 17:16:08
上一篇:
项目空间的权限管理__安全相关语句汇总_安全指南_大数据计算服务-阿里云
下一篇:
客户端___工具_大数据计算服务-阿里云
如何配置ECS支持负载均衡__后端 ECS 服务器常见问题_常见问题_负载均衡-阿里云
附录:SQL Server 2008 R2/2012 功能差异__快速入门(SQL Server)_云数据库 RDS 版-阿里云
视频云服务哪家强?阿里云上线299创业版套餐受热捧
查询应用加固结果接口__应用加固API_API手册_移动安全-阿里云
释放实例、关闭自动释放__实例_用户指南_云服务器 ECS-阿里云
单个负载均衡如何配置多站点__常见问题_负载均衡-阿里云
亮度和对比度__图片效果_老版图片服务手册_对象存储 OSS-阿里云
运行离线任务__应用管理_用户指南_容器服务-阿里云
云服务器如何通过内网访问RDS?__MYSQL使用_技术运维问题_云数据库 RDS 版-阿里云
数据可靠性高达9个9!阿里云推出云数据库HBase版
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云