158
阿里云
Python SDK__SDK_大数据计算服务-阿里云
PyODPS 是 MaxCompute 的Python版本的SDK, 它提供了对 MaxCompute 对象的基本操作;并提供了DataFrame框架,能轻松在 MaxCompute 上进行数据分析。
安装
PyOdps支持Python 2.6以上包括Python 3。系统安装了pip后,只需运行:
pip install pyodps
PyOdps的相关依赖会自动安装。
快速开始
首先,我们需要阿里云的帐号来初始化一个 MaxCompute 的入口:
from odps import ODPSodps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',endpoint='**your-end-point**')
这样就已经初始化,就可以对表、资源、函数等进行操作了。
项目空间
项目空间是 MaxCompute 的基本组织单元, 有点类似于Database的概念。
我们通过 get_project 来取到某个项目空间。
project = odps.get_project('my_project') # 取到某个项目project = odps.get_project() # 取到默认项目
如果不提供参数,则取到默认项目空间。
exist_project 方法能检验某个项目空间是否存在。
表是 MaxCompute 的数据存储单元。
表操作
我们可以用 list_tables 来列出项目空间下的所有表。
for table in odps.list_tables():# 处理每个表
通过调用 exist_table 来判断表是否存在。通过调用 get_table 来获取表。
>>> t = odps.get_table('dual')>>> t.schemaodps.Schema {c_int_a bigintc_int_b bigintc_double_a doublec_double_b doublec_string_a stringc_string_b stringc_bool_a booleanc_bool_b booleanc_datetime_a datetimec_datetime_b datetime}>>> t.lifecycle-1>>> print(t.creation_time)2014-05-15 14:58:43>>> t.is_virtual_viewFalse>>> t.size1408>>> t.schema.columns[<column c_int_a, type bigint>,<column c_int_b, type bigint>,<column c_double_a, type double>,<column c_double_b, type double>,<column c_string_a, type string>,<column c_string_b, type string>,<column c_bool_a, type boolean>,<column c_bool_b, type boolean>,<column c_datetime_a, type datetime>,<column c_datetime_b, type datetime>]
创建表的Schema
有两种方法来初始化。第一种方式通过表的列、以及可选的分区来初始化。
>>> from odps.models import Schema, Column, Partition>>> columns = [Column(name='num', type='bigint', comment='the column')]>>> partitions = [Partition(name='pt', type='string', comment='the partition')]>>> schema = Schema(columns=columns, partitions=partitions)>>> schema.columns[<column num, type bigint>, <partition pt, type string>]
第二种方法是使用 Schema.from_lists,这种方法更容易调用,但显然无法直接设置列和分区的注释了。
>>> schema = Schema.from_lists(['num'], ['bigint'], ['pt'], ['string'])>>> schema.columns[<column num, type bigint>, <partition pt, type string>]
创建表
现在知道怎么创建表的Schema,创建一个表就很容易了。
>>> table = odps.create_table('my_new_table', schema)>>> table = odps.create_table('my_new_table', schema, if_not_exists=True) # 只有不存在表时才创建
其他还可以设置lifecycle等参数。
获取表数据
有若干种方法能够获取表数据。首先,如果只是查看每个表的开始的小于1万条数据,则可以使用 head 方法。
>>> t = odps.get_table('dual')>>> for record in t.head(3):>>> print(record[0]) # 取第0个位置的值>>> print(record['c_double_a']) # 通过字段取值>>> print(record[0: 3]) # 切片操作>>> print(record[0, 2, 3]) # 取多个位置的值>>> print(record['c_int_a', 'c_double_a']) # 通过多个字段取值
其次,在table上可以执行 open_reader 操作来打一个reader来读取数据。记住这里需要使用 with表达式。
>>> with t.open_reader(partition='pt=test') as reader:>>> count = reader.count>>> for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作>>> # 处理一条记录
最后,可以使用Tunnel API来进行读取操作,open_reader 操作其实也是对Tunnel API的封装。
向表写数据
类似于 open_reader,table对象同样能执行 open_writer 来打开writer,并写数据。同样记住使用 with表达式。
>>> with t.open_writer(partition='pt=test') as writer:>>> writer.write(records) # 这里records可以是任意可迭代的records,默认写到block 0>>>>>> with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 这里同是打开两个block>>> writer.write(0, gen_records(block=0))>>> writer.write(1, gen_records(block=1)) # 这里两个写操作可以多线程并行,各个block间是独立的
同样,向表写数据也是对Tunnel API的封装,详细参考 [数据上传下载通道] 。
删除表
>>> odps.delete_table('my_table_name', if_exists=True) # 只有表存在时删除>>> t.drop() # Table对象存在的时候可以直接执行drop函数
表分区
基本操作
遍历表全部分区:
>>> for partition in table.partitions:>>> print(partition.name)>>> for partition in table.iterate_partitions(spec='pt=test'):>>> # 遍历二级分区
判断分区是否存在:
>>> table.exist_partition('pt=test,sub=2015')
获取分区:
>>> partition = table.get_partition('pt=test')>>> print(partition.creation_time)2015-11-18 22:22:27>>> partition.size0
创建分区
>>> t.create_partition('pt=test', if_not_exists=True) # 不存在的时候才创建
删除分区
>>> t.delete_partition('pt=test', if_exists=True) # 存在的时候才删除>>> partition.drop() # Partition对象存在的时候直接drop
SQL
PyOdps支持MaxCompute SQL的查询,并可以读取执行的结果。
执行SQL
>>> odps.execute_sql('select * from dual') # 同步的方式执行,会阻塞直到SQL执行完成>>> instance = odps.run_sql('select * from dual') # 异步的方式执行>>> instance.wait_for_success() # 阻塞直到完成
读取SQL执行结果
运行SQL的instance能够直接执行 open_reader 的操作,一种情况是SQL返回了结构化的数据。
>>> with odps.execute_sql('select * from dual').open_reader() as reader:>>> for record in reader:>>> # 处理每一个record
另一种情况是SQL可能执行的比如 desc,这时通过 reader.raw 属性取到原始的SQL执行结果。
>>> with odps.execute_sql('desc dual').open_reader() as reader:>>> print(reader.raw)
Resource
资源在 MaxCompute 上常用在UDF和MapReduce中。
列出所有资源还是可以使用 list_resources,判断资源是否存在使用 exist_resource。 删除资源时,可以调用 delete_resource,或者直接对于Resource对象调用 drop 方法。
在PyOdps中,主要支持两种资源类型,一种是文件,另一种是表。下面分别说明。
文件资源
文件资源包括基础的 file 类型、以及 py、jar、archive。
创建文件资源
创建文件资源可以通过给定资源名、文件类型、以及一个file-like的对象(或者是字符串对象)来创建,比如
resource = odps.create_resource('test_file_resource', 'file', file_obj=open('/to/path/file')) # 使用file-like的对象resource = odps.create_resource('test_py_resource', 'py', file_obj='import this') # 使用字符串
读取和修改文件资源
对文件资源调用 open 方法,或者在odps入口调用 open_resource 都能打开一个资源, 打开后的对象会是file-like的对象。 类似于Python内置的 open 方法,文件资源也支持打开的模式。我们看例子:
>>> with resource.open('r') as fp: # 以读模式打开>>> content = fp.read() # 读取全部的内容>>> fp.seek(0) # 回到资源开头>>> lines = fp.readlines() # 读成多行>>> fp.write('Hello World') # 报错,读模式下无法写资源>>>>>> with odps.open_resource('test_file_resource', mode='r+') as fp: # 读写模式打开>>> fp.read()>>> fp.tell() # 当前位置>>> fp.seek(10)>>> fp.truncate() # 截断后面的内容>>> fp.writelines(['Hellon', 'Worldn']) # 写入多行>>> fp.write('Hello World')>>> fp.flush() # 手动调用会将更新提交到ODPS
所有支持的打开类型包括:
r,读模式,只能打开不能写w,写模式,只能写入而不能读文件,注意用写模式打开,文件内容会被先清空a,追加模式,只能写入内容到文件末尾r+,读写模式,能任意读写内容w+,类似于r+,但会先清空文件内容a+,类似于r+,但写入时只能写入文件末尾
同时,PyOdps中,文件资源支持以二进制模式打开,打开如说一些压缩文件等等就需要以这种模式, 因此 rb 就是指以二进制读模式打开文件,r+b 是指以二进制读写模式打开。
表资源
创建表资源
>>> odps.create_resource('test_table_resource', 'table', table_name='my_table', partition='pt=test')
更新表资源
>>> table_resource = odps.get_resource('test_table_resource')>>> table_resource.update(partition='pt=test2', project_name='my_project2')
Function
MaxCompute 用户可以编写自定义函数用在MaxCompute SQL中。
基本操作
同样的,可以调用 list_functions 来获取项目空间下的所有函数,exist_function 能判断是否存在函数, get_function 能获取函数。
创建函数
>>> resource = odps.get_resource('my_udf.py')>>> function = odps.create_function('test_function', class_type='my_udf.Test', resources=[resource, ])
删除函数
>>> odps.delete_function('test_function')>>> function.drop() # Function对象存在时直接调用drop
DataFrame
PyOdps提供了DataFrame API,它提供了类似pandas的接口,但是能充分利用 MaxCompute 的计算能力。这里我们给出 DataFrame 的一个例子,完整的 DataFrame 文档可参考 这里。
在所有步骤开始前,需要创建 ODPS 对象。
>>> o = ODPS('**your-access-id**', '**your-secret-access-key**',project='**your-project**', endpoint='**your-end-point**'))
我们拿movielens 100K来做例子。现在我们已经有三张表了,分别是pyodps_ml_100k_movies(电影相关的数据),pyodps_ml_100k_users(用户相关的数据),pyodps_ml_100k_ratings(评分有关的数据)。
创建一个DataFrame对象十分容易,只需传入Table对象即可。
>>> from odps.df import DataFrame
>>> users = DataFrame(o.get_table('pyodps_ml_100k_users'))
我们可以通过dtypes属性来查看这个DataFrame有哪些字段,分别是什么类型
>>> users.dtypes
通过head方法,我们能取前N条数据,这让我们能快速预览数据。
>>> users.head(10)
| user_id | age | sex | occupation | zip_code | |
|---|---|---|---|---|---|
| 0 | 1 | 24 | M | technician | 85711 |
| 1 | 2 | 53 | F | other | 94043 |
| 2 | 3 | 23 | M | writer | 32067 |
| 3 | 4 | 24 | M | technician | 43537 |
| 4 | 5 | 33 | F | other | 15213 |
| 5 | 6 | 42 | M | executive | 98101 |
| 6 | 7 | 57 | M | administrator | 91344 |
| 7 | 8 | 36 | M | administrator | 05201 |
| 8 | 9 | 29 | M | student | 01002 |
| 9 | 10 | 53 | M | lawyer | 90703 |
有时候,我们并不需要都看到所有字段,我们可以从中筛选出一部分。
>>> users[['user_id', 'age']].head(5)
| user_id | age | |
|---|---|---|
| 0 | 1 | 24 |
| 1 | 2 | 53 |
| 2 | 3 | 23 |
| 3 | 4 | 24 |
| 4 | 5 | 33 |
有时候我们只是排除个别字段。
>>> users.exclude('zip_code', 'age').head(5)
| user_id | sex | occupation | |
|---|---|---|---|
| 0 | 1 | M | technician |
| 1 | 2 | F | other |
| 2 | 3 | M | writer |
| 3 | 4 | M | technician |
| 4 | 5 | F | other |
又或者,排除掉一些字段的同时,得通过计算得到一些新的列,比如我想将sex为M的置为True,否则为False,并取名叫sex_bool。
>>> users.select(users.exclude('zip_code', 'sex'), sex_bool=users.sex == 'M').head(5)
| user_id | age | occupation | sex_bool | |
|---|---|---|---|---|
| 0 | 1 | 24 | technician | True |
| 1 | 2 | 53 | other | False |
| 2 | 3 | 23 | writer | True |
| 3 | 4 | 24 | technician | True |
| 4 | 5 | 33 | other | False |
现在,让我们看看年龄在20到25岁之间的人有多少个
>>> users.age.between(20, 25).count().rename('count')943
接下来,我们看看男女用户分别有多少。
>>> users.groupby(users.sex).count()
| sex | count | |
|---|---|---|
| 0 | F | 273 |
| 1 | M | 670 |
用户按职业划分,从高到底,人数最多的前10职业是哪些呢?
>>> df = users.groupby('occupation').agg(count=users['occupation'].count())>>> df.sort(df['count'], ascending=False)[:10]
| occupation | count | |
|---|---|---|
| 0 | student | 196 |
| 1 | other | 105 |
| 2 | educator | 95 |
| 3 | administrator | 79 |
| 4 | engineer | 67 |
| 5 | programmer | 66 |
| 6 | librarian | 51 |
| 7 | writer | 45 |
| 8 | executive | 32 |
| 9 | scientist | 31 |
DataFrame API提供了value_counts这个方法来快速达到同样的目的。
>>> users.occupation.value_counts()[:10]
| occupation | count | |
|---|---|---|
| 0 | student | 196 |
| 1 | other | 105 |
| 2 | educator | 95 |
| 3 | administrator | 79 |
| 4 | engineer | 67 |
| 5 | programmer | 66 |
| 6 | librarian | 51 |
| 7 | writer | 45 |
| 8 | executive | 32 |
| 9 | scientist | 31 |
让我们用更直观的图来看这份数据。
>>> %matplotlib inline
我们可以用个横向的柱状图来可视化
>>> users['occupation'].value_counts().plot(kind='barh', x='occupation', ylabel='prefession')

我们将年龄分成30组,来看个年龄分布的直方图
>>> users.age.hist(bins=30, title="Distribution of users' ages", xlabel='age', ylabel='count of users')

好了,现在我们把这三张表联合起来,只需要使用join就可以了。join完成后我们把它保存成一张新的表。
>>> movies = DataFrame(o.get_table('pyodps_ml_100k_movies'))>>> ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))>>> o.delete_table('pyodps_ml_100k_lens', if_exists=True)>>> lens = movies.join(ratings).join(users).persist('pyodps_ml_100k_lens')>>> lens.dtypes
odps.Schema {movie_id int64title stringrelease_date stringvideo_release_date stringimdb_url stringuser_id int64rating int64unix_timestamp int64age int64sex stringoccupation stringzip_code string}
现在我们把年龄分成从0到80岁,分成8个年龄段,
>>> labels = ['0-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70-79']>>> cut_lens = lens[lens, lens.age.cut(range(0, 81, 10), right=False, labels=labels).rename('年龄分组')]
我们取分组和年龄唯一的前10条看看。
>>> cut_lens['年龄分组', 'age'].distinct()[:10]
| 年龄分组 | age | |
|---|---|---|
| 0 | 0-9 | 7 |
| 1 | 10-19 | 10 |
| 2 | 10-19 | 11 |
| 3 | 10-19 | 13 |
| 4 | 10-19 | 14 |
| 5 | 10-19 | 15 |
| 6 | 10-19 | 16 |
| 7 | 10-19 | 17 |
| 8 | 10-19 | 18 |
| 9 | 10-19 | 19 |
最后,我们来看看在各个年龄分组下,用户的评分总数和评分均值分别是多少。
>>> cut_lens.groupby('年龄分组').agg(cut_lens.rating.count().rename('评分总数'), cut_lens.rating.mean().rename('评分均值'))
| 年龄分组 | 评分均值 | 评分总数 | |
|---|---|---|---|
| 0 | 0-9 | 3.767442 | 43 |
| 1 | 10-19 | 3.486126 | 8181 |
| 2 | 20-29 | 3.467333 | 39535 |
| 3 | 30-39 | 3.554444 | 25696 |
| 4 | 40-49 | 3.591772 | 15021 |
| 5 | 50-59 | 3.635800 | 8704 |
| 6 | 60-69 | 3.648875 | 2623 |
| 7 | 70-79 | 3.649746 | 197 |
Configuration
配置选项
PyODPS 提供了一系列的配置选项,可通过 odps.options 获得。下面列出了可配的 MaxCompute 选项。
通用配置
| 选项 | 说明 | 默认值 |
|---|---|---|
| access_id | ODPS Access ID | None |
| access_key | ODPS Access Key | None |
| end_point | ODPS Endpoint | None |
| default_project | 默认 Project No | ne |
| log_view_host | LogView 主机名 Non | e |
| log_view_hours | LogView 保持时间(小时) | 24 |
| tunnel_endpoint | Tunnel Endpoint | None |
| lifecycle | 所有表生命周期 | None |
| temp_lifecycle | 临时表生命周期 | 1 |
| biz_id | 用户 ID | None |
| chunk_size | 写入缓冲区大小 | 1496 |
| retry_times | 请求重试次数 | 4 |
| connect_timeout | 连接超时 | 5 |
| read_timeout | 读取超时 | 120 |
DataFrame 配置
| 选项 | 说明 | 默认值 |
|---|---|---|
| verbose | 是否打印日志 | False |
| verbose_log | 日志接收器 | None |
| df.analyze | 是否启用非 ODPS 内置函数 | True |
最后更新:2016-11-23 17:16:08
上一篇:
项目空间的权限管理__安全相关语句汇总_安全指南_大数据计算服务-阿里云
下一篇:
客户端___工具_大数据计算服务-阿里云
如何配置ECS支持负载均衡__后端 ECS 服务器常见问题_常见问题_负载均衡-阿里云
附录:SQL Server 2008 R2/2012 功能差异__快速入门(SQL Server)_云数据库 RDS 版-阿里云
视频云服务哪家强?阿里云上线299创业版套餐受热捧
查询应用加固结果接口__应用加固API_API手册_移动安全-阿里云
释放实例、关闭自动释放__实例_用户指南_云服务器 ECS-阿里云
单个负载均衡如何配置多站点__常见问题_负载均衡-阿里云
亮度和对比度__图片效果_老版图片服务手册_对象存储 OSS-阿里云
运行离线任务__应用管理_用户指南_容器服务-阿里云
云服务器如何通过内网访问RDS?__MYSQL使用_技术运维问题_云数据库 RDS 版-阿里云
数据可靠性高达9个9!阿里云推出云数据库HBase版
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云