阅读158 返回首页    go 群英


Python SDK__SDK_大数据计算服务-阿里云

PyODPS 是 MaxCompute 的Python版本的SDK, 它提供了对 MaxCompute 对象的基本操作;并提供了DataFrame框架,能轻松在 MaxCompute 上进行数据分析。

安装

PyOdps支持Python 2.6以上包括Python 3。系统安装了pip后,只需运行:

  1. pip install pyodps

PyOdps的相关依赖会自动安装。

快速开始

首先,我们需要阿里云的帐号来初始化一个 MaxCompute 的入口:

  1. from odps import ODPS
  2. odps = ODPS('**your-access-id**', '**your-secret-access-key**', '**your-default-project**',
  3. endpoint='**your-end-point**')

这样就已经初始化,就可以对表、资源、函数等进行操作了。

项目空间

项目空间是 MaxCompute 的基本组织单元, 有点类似于Database的概念。

我们通过 get_project 来取到某个项目空间。

  1. project = odps.get_project('my_project') # 取到某个项目
  2. project = odps.get_project() # 取到默认项目

如果不提供参数,则取到默认项目空间。

exist_project 方法能检验某个项目空间是否存在。

表是 MaxCompute 的数据存储单元。

表操作

我们可以用 list_tables 来列出项目空间下的所有表。

  1. for table in odps.list_tables():
  2. # 处理每个表

通过调用 exist_table 来判断表是否存在。通过调用 get_table 来获取表。

  1. >>> t = odps.get_table('dual')
  2. >>> t.schema
  3. odps.Schema {
  4. c_int_a bigint
  5. c_int_b bigint
  6. c_double_a double
  7. c_double_b double
  8. c_string_a string
  9. c_string_b string
  10. c_bool_a boolean
  11. c_bool_b boolean
  12. c_datetime_a datetime
  13. c_datetime_b datetime
  14. }
  15. >>> t.lifecycle
  16. -1
  17. >>> print(t.creation_time)
  18. 2014-05-15 14:58:43
  19. >>> t.is_virtual_view
  20. False
  21. >>> t.size
  22. 1408
  23. >>> t.schema.columns
  24. [<column c_int_a, type bigint>,
  25. <column c_int_b, type bigint>,
  26. <column c_double_a, type double>,
  27. <column c_double_b, type double>,
  28. <column c_string_a, type string>,
  29. <column c_string_b, type string>,
  30. <column c_bool_a, type boolean>,
  31. <column c_bool_b, type boolean>,
  32. <column c_datetime_a, type datetime>,
  33. <column c_datetime_b, type datetime>]

创建表的Schema

有两种方法来初始化。第一种方式通过表的列、以及可选的分区来初始化。

  1. >>> from odps.models import Schema, Column, Partition
  2. >>> columns = [Column(name='num', type='bigint', comment='the column')]
  3. >>> partitions = [Partition(name='pt', type='string', comment='the partition')]
  4. >>> schema = Schema(columns=columns, partitions=partitions)
  5. >>> schema.columns
  6. [<column num, type bigint>, <partition pt, type string>]

第二种方法是使用 Schema.from_lists,这种方法更容易调用,但显然无法直接设置列和分区的注释了。

  1. >>> schema = Schema.from_lists(['num'], ['bigint'], ['pt'], ['string'])
  2. >>> schema.columns
  3. [<column num, type bigint>, <partition pt, type string>]

创建表

现在知道怎么创建表的Schema,创建一个表就很容易了。

  1. >>> table = odps.create_table('my_new_table', schema)
  2. >>> table = odps.create_table('my_new_table', schema, if_not_exists=True) # 只有不存在表时才创建

其他还可以设置lifecycle等参数。

获取表数据

有若干种方法能够获取表数据。首先,如果只是查看每个表的开始的小于1万条数据,则可以使用 head 方法。

  1. >>> t = odps.get_table('dual')
  2. >>> for record in t.head(3):
  3. >>> print(record[0]) # 取第0个位置的值
  4. >>> print(record['c_double_a']) # 通过字段取值
  5. >>> print(record[0: 3]) # 切片操作
  6. >>> print(record[0, 2, 3]) # 取多个位置的值
  7. >>> print(record['c_int_a', 'c_double_a']) # 通过多个字段取值

其次,在table上可以执行 open_reader 操作来打一个reader来读取数据。记住这里需要使用 with表达式

  1. >>> with t.open_reader(partition='pt=test') as reader:
  2. >>> count = reader.count
  3. >>> for record in reader[5:10] # 可以执行多次,直到将count数量的record读完,这里可以改造成并行操作
  4. >>> # 处理一条记录

最后,可以使用Tunnel API来进行读取操作,open_reader 操作其实也是对Tunnel API的封装。

向表写数据

类似于 open_reader,table对象同样能执行 open_writer 来打开writer,并写数据。同样记住使用 with表达式

  1. >>> with t.open_writer(partition='pt=test') as writer:
  2. >>> writer.write(records) # 这里records可以是任意可迭代的records,默认写到block 0
  3. >>>
  4. >>> with t.open_writer(partition='pt=test', blocks=[0, 1]) as writer: # 这里同是打开两个block
  5. >>> writer.write(0, gen_records(block=0))
  6. >>> writer.write(1, gen_records(block=1)) # 这里两个写操作可以多线程并行,各个block间是独立的

同样,向表写数据也是对Tunnel API的封装,详细参考 [数据上传下载通道] 。

删除表

  1. >>> odps.delete_table('my_table_name', if_exists=True) # 只有表存在时删除
  2. >>> t.drop() # Table对象存在的时候可以直接执行drop函数

表分区

基本操作

遍历表全部分区:

  1. >>> for partition in table.partitions:
  2. >>> print(partition.name)
  3. >>> for partition in table.iterate_partitions(spec='pt=test'):
  4. >>> # 遍历二级分区

判断分区是否存在:

  1. >>> table.exist_partition('pt=test,sub=2015')

获取分区:

  1. >>> partition = table.get_partition('pt=test')
  2. >>> print(partition.creation_time)
  3. 2015-11-18 22:22:27
  4. >>> partition.size
  5. 0

创建分区

  1. >>> t.create_partition('pt=test', if_not_exists=True) # 不存在的时候才创建

删除分区

  1. >>> t.delete_partition('pt=test', if_exists=True) # 存在的时候才删除
  2. >>> partition.drop() # Partition对象存在的时候直接drop

SQL

PyOdps支持MaxCompute SQL的查询,并可以读取执行的结果。

执行SQL

  1. >>> odps.execute_sql('select * from dual') # 同步的方式执行,会阻塞直到SQL执行完成
  2. >>> instance = odps.run_sql('select * from dual') # 异步的方式执行
  3. >>> instance.wait_for_success() # 阻塞直到完成

读取SQL执行结果

运行SQL的instance能够直接执行 open_reader 的操作,一种情况是SQL返回了结构化的数据。

  1. >>> with odps.execute_sql('select * from dual').open_reader() as reader:
  2. >>> for record in reader:
  3. >>> # 处理每一个record

另一种情况是SQL可能执行的比如 desc,这时通过 reader.raw 属性取到原始的SQL执行结果。

  1. >>> with odps.execute_sql('desc dual').open_reader() as reader:
  2. >>> print(reader.raw)

Resource

资源在 MaxCompute 上常用在UDF和MapReduce中。

列出所有资源还是可以使用 list_resources,判断资源是否存在使用 exist_resource。 删除资源时,可以调用 delete_resource,或者直接对于Resource对象调用 drop 方法。

在PyOdps中,主要支持两种资源类型,一种是文件,另一种是表。下面分别说明。

文件资源

文件资源包括基础的 file 类型、以及 pyjararchive

创建文件资源

创建文件资源可以通过给定资源名、文件类型、以及一个file-like的对象(或者是字符串对象)来创建,比如

  1. resource = odps.create_resource('test_file_resource', 'file', file_obj=open('/to/path/file')) # 使用file-like的对象
  2. resource = odps.create_resource('test_py_resource', 'py', file_obj='import this') # 使用字符串

读取和修改文件资源

对文件资源调用 open 方法,或者在odps入口调用 open_resource 都能打开一个资源, 打开后的对象会是file-like的对象。 类似于Python内置的 open 方法,文件资源也支持打开的模式。我们看例子:

  1. >>> with resource.open('r') as fp: # 以读模式打开
  2. >>> content = fp.read() # 读取全部的内容
  3. >>> fp.seek(0) # 回到资源开头
  4. >>> lines = fp.readlines() # 读成多行
  5. >>> fp.write('Hello World') # 报错,读模式下无法写资源
  6. >>>
  7. >>> with odps.open_resource('test_file_resource', mode='r+') as fp: # 读写模式打开
  8. >>> fp.read()
  9. >>> fp.tell() # 当前位置
  10. >>> fp.seek(10)
  11. >>> fp.truncate() # 截断后面的内容
  12. >>> fp.writelines(['Hellon', 'Worldn']) # 写入多行
  13. >>> fp.write('Hello World')
  14. >>> fp.flush() # 手动调用会将更新提交到ODPS

所有支持的打开类型包括:

  • r,读模式,只能打开不能写
  • w,写模式,只能写入而不能读文件,注意用写模式打开,文件内容会被先清空
  • a,追加模式,只能写入内容到文件末尾
  • r+,读写模式,能任意读写内容
  • w+,类似于 r+,但会先清空文件内容
  • a+,类似于 r+,但写入时只能写入文件末尾

同时,PyOdps中,文件资源支持以二进制模式打开,打开如说一些压缩文件等等就需要以这种模式, 因此 rb 就是指以二进制读模式打开文件,r+b 是指以二进制读写模式打开。

表资源

创建表资源

  1. >>> odps.create_resource('test_table_resource', 'table', table_name='my_table', partition='pt=test')

更新表资源

  1. >>> table_resource = odps.get_resource('test_table_resource')
  2. >>> table_resource.update(partition='pt=test2', project_name='my_project2')

Function

MaxCompute 用户可以编写自定义函数用在MaxCompute SQL中。

基本操作

同样的,可以调用 list_functions 来获取项目空间下的所有函数,exist_function 能判断是否存在函数, get_function 能获取函数。

创建函数

  1. >>> resource = odps.get_resource('my_udf.py')
  2. >>> function = odps.create_function('test_function', class_type='my_udf.Test', resources=[resource, ])

删除函数

  1. >>> odps.delete_function('test_function')
  2. >>> function.drop() # Function对象存在时直接调用drop

DataFrame

PyOdps提供了DataFrame API,它提供了类似pandas的接口,但是能充分利用 MaxCompute 的计算能力。这里我们给出 DataFrame 的一个例子,完整的 DataFrame 文档可参考 这里

在所有步骤开始前,需要创建 ODPS 对象。

  1. >>> o = ODPS('**your-access-id**', '**your-secret-access-key**',
  2. project='**your-project**', endpoint='**your-end-point**'))

我们拿movielens 100K来做例子。现在我们已经有三张表了,分别是pyodps_ml_100k_movies(电影相关的数据),pyodps_ml_100k_users(用户相关的数据),pyodps_ml_100k_ratings(评分有关的数据)。

创建一个DataFrame对象十分容易,只需传入Table对象即可。

  1. >>> from odps.df import DataFrame
  1. >>> users = DataFrame(o.get_table('pyodps_ml_100k_users'))

我们可以通过dtypes属性来查看这个DataFrame有哪些字段,分别是什么类型

  1. >>> users.dtypes

通过head方法,我们能取前N条数据,这让我们能快速预览数据。

  1. >>> users.head(10)
  user_id age sex occupation zip_code
0 1 24 M technician 85711
1 2 53 F other 94043
2 3 23 M writer 32067
3 4 24 M technician 43537
4 5 33 F other 15213
5 6 42 M executive 98101
6 7 57 M administrator 91344
7 8 36 M administrator 05201
8 9 29 M student 01002
9 10 53 M lawyer 90703

有时候,我们并不需要都看到所有字段,我们可以从中筛选出一部分。

  1. >>> users[['user_id', 'age']].head(5)
  user_id age
0 1 24
1 2 53
2 3 23
3 4 24
4 5 33

有时候我们只是排除个别字段。

  1. >>> users.exclude('zip_code', 'age').head(5)
  user_id sex occupation
0 1 M technician
1 2 F other
2 3 M writer
3 4 M technician
4 5 F other

又或者,排除掉一些字段的同时,得通过计算得到一些新的列,比如我想将sex为M的置为True,否则为False,并取名叫sex_bool。

  1. >>> users.select(users.exclude('zip_code', 'sex'), sex_bool=users.sex == 'M').head(5)
  user_id age occupation sex_bool
0 1 24 technician True
1 2 53 other False
2 3 23 writer True
3 4 24 technician True
4 5 33 other False

现在,让我们看看年龄在20到25岁之间的人有多少个

  1. >>> users.age.between(20, 25).count().rename('count')
  2. 943

接下来,我们看看男女用户分别有多少。

  1. >>> users.groupby(users.sex).count()
  sex count
0 F 273
1 M 670

用户按职业划分,从高到底,人数最多的前10职业是哪些呢?

  1. >>> df = users.groupby('occupation').agg(count=users['occupation'].count())
  2. >>> df.sort(df['count'], ascending=False)[:10]
  occupation count
0 student 196
1 other 105
2 educator 95
3 administrator 79
4 engineer 67
5 programmer 66
6 librarian 51
7 writer 45
8 executive 32
9 scientist 31

DataFrame API提供了value_counts这个方法来快速达到同样的目的。

  1. >>> users.occupation.value_counts()[:10]
  occupation count
0 student 196
1 other 105
2 educator 95
3 administrator 79
4 engineer 67
5 programmer 66
6 librarian 51
7 writer 45
8 executive 32
9 scientist 31

让我们用更直观的图来看这份数据。

  1. >>> %matplotlib inline

我们可以用个横向的柱状图来可视化

  1. >>> users['occupation'].value_counts().plot(kind='barh', x='occupation', ylabel='prefession')

image

我们将年龄分成30组,来看个年龄分布的直方图

  1. >>> users.age.hist(bins=30, title="Distribution of users' ages", xlabel='age', ylabel='count of users')

image

好了,现在我们把这三张表联合起来,只需要使用join就可以了。join完成后我们把它保存成一张新的表。

  1. >>> movies = DataFrame(o.get_table('pyodps_ml_100k_movies'))
  2. >>> ratings = DataFrame(o.get_table('pyodps_ml_100k_ratings'))
  3. >>> o.delete_table('pyodps_ml_100k_lens', if_exists=True)
  4. >>> lens = movies.join(ratings).join(users).persist('pyodps_ml_100k_lens')
  5. >>> lens.dtypes
  1. odps.Schema {
  2. movie_id int64
  3. title string
  4. release_date string
  5. video_release_date string
  6. imdb_url string
  7. user_id int64
  8. rating int64
  9. unix_timestamp int64
  10. age int64
  11. sex string
  12. occupation string
  13. zip_code string
  14. }

现在我们把年龄分成从0到80岁,分成8个年龄段,

  1. >>> labels = ['0-9', '10-19', '20-29', '30-39', '40-49', '50-59', '60-69', '70-79']
  2. >>> cut_lens = lens[lens, lens.age.cut(range(0, 81, 10), right=False, labels=labels).rename('年龄分组')]

我们取分组和年龄唯一的前10条看看。

  1. >>> cut_lens['年龄分组', 'age'].distinct()[:10]
  年龄分组 age
0 0-9 7
1 10-19 10
2 10-19 11
3 10-19 13
4 10-19 14
5 10-19 15
6 10-19 16
7 10-19 17
8 10-19 18
9 10-19 19

最后,我们来看看在各个年龄分组下,用户的评分总数和评分均值分别是多少。

  1. >>> cut_lens.groupby('年龄分组').agg(cut_lens.rating.count().rename('评分总数'), cut_lens.rating.mean().rename('评分均值'))
  年龄分组 评分均值 评分总数
0 0-9 3.767442 43
1 10-19 3.486126 8181
2 20-29 3.467333 39535
3 30-39 3.554444 25696
4 40-49 3.591772 15021
5 50-59 3.635800 8704
6 60-69 3.648875 2623
7 70-79 3.649746 197

Configuration

配置选项

PyODPS 提供了一系列的配置选项,可通过 odps.options 获得。下面列出了可配的 MaxCompute 选项。

通用配置

选项 说明 默认值
access_id ODPS Access ID None
access_key ODPS Access Key None
end_point ODPS Endpoint None
default_project 默认 Project No ne
log_view_host LogView 主机名 Non e
log_view_hours LogView 保持时间(小时) 24
tunnel_endpoint Tunnel Endpoint None
lifecycle 所有表生命周期 None
temp_lifecycle 临时表生命周期 1
biz_id 用户 ID None
chunk_size 写入缓冲区大小 1496
retry_times 请求重试次数 4
connect_timeout 连接超时 5
read_timeout 读取超时 120

DataFrame 配置

选项 说明 默认值
verbose 是否打印日志 False
verbose_log 日志接收器 None
df.analyze 是否启用非 ODPS 内置函数 True

最后更新:2016-11-23 17:16:08

  上一篇:go 项目空间的权限管理__安全相关语句汇总_安全指南_大数据计算服务-阿里云
  下一篇:go 客户端___工具_大数据计算服务-阿里云