阅读350 返回首页    go 人物


5.1 数据的导入__第五章 Data Pipeline_使用手册_分析型数据库-阿里云

分析型数据库支持多种数据入库方式,包括但不限于:

(1)内置将MaxCompute中的海量数据快速批量导入;

(2)支持标准的insert/delete语法,可使用用户程序、Kettle等第三方工具写入实时写入表;

(3)支持阿里云数据集成(CDP),将各类数据源导入批量导入表或实时写入表。阿里云大数据开发平台;

(4)支持阿里云数据传输(DTS),从阿里云RDS实时同步数据变更到分析型数据库。

本章节主要描述(1)和(3)两种数据导入方式,其余导入方式详见本手册第八章。

SQL方式将MaxCompute数据导入

分析型数据库目前内置支持从阿里云外售的MaxCompute(原ODPS)中导入数据。

将MaxCompute数据导入分析型数据库有几种方法:直接使用SQL命令进行导入,或通过MDS for AnalyticDB界面进行导入、通过数据集成(原CDP)配置job进行导入。

通过DMS for AnalyticDB界面导入数据已在《快速入门》进行了介绍;通过数据集成导入数据可以参考下一小章节。下面主要详细介绍通过SQL命令将MaxCompute数据导入AnalyticDB。

步骤一: 准备MaxCompute表或分区。

首先我们需要创建好数据源头的MaxCompute表(可以是分区表或非分区表,目前要求AnalyticDB中的字段名称和MaxCompute表中的字段名称一致),并在表中准备好要导入的数据。

如在MaxCompute中创建一个表:

  1. use projecta;--在MaxCompute的某个project中创建表
  2. CREATE TABLE
  3. odps2ads_test (
  4. user_id bigint ,
  5. amt bigint ,
  6. num bigint ,
  7. cat_id bigint ,
  8. thedate bigint
  9. )
  10. PARTITIONED BY(dt STRING);

往表中导入数据并创建分区,如:dt=’20160808’。

步骤二:账号授权。

首次导入一个新的MaxCompute表时,需要在MaxCompute中将表Describe和Select权限授权给AnalyticDB的导入账号。公共云导入账号为garuda_build@aliyun.com以及garuda_data@aliyun.com(两个都需要授权)。各个专有云的导入账号名参照专有云的相关配置文档,一般为test1000000009@aliyun.com。

授权命令:

  1. USE projecta;--表所属ODPS project
  2. ADD USER ALIYUN$xxxx@aliyun.com;--输入正确的云账号
  3. GRANT Describe,Select ON TABLE table_name TO USER ALIYUN$xxxx@aliyun.com;--输入需要赋权的表和正确的云账号

另外为了保护用户的数据安全,AnalyticDB目前仅允许导入操作者为Proejct Owner的ODPS Project的数据,或者操作者为MaxCompute表的owner(大部分专有云无此限制)。

步骤三:准备AnalyticDB表,注意创建时更新方式属性为“批量更新”,同时把表的Load Data权限授权给导入操作者(一把表创建者都默认有该权限)。

如表:

  1. CREATE TABLE db_name.odps2ads_test (
  2. user_id bigint,
  3. amt int,
  4. num int,
  5. cat_id int,
  6. thedate int,
  7. primary key (user_id)
  8. )
  9. PARTITION BY HASH KEY(user_id)
  10. PARTITION NUM 40
  11. TABLEGROUP group_name
  12. options (updateType='batch');
  13. --注意指定好数据库名和表组名

步骤四:在AnalyticDB中通过SQL命令导入数据。

语法格式:

  1. LOAD DATA
  2. FROM 'sourcepath'
  3. [IN VERSION dataVersion]
  4. [OVERWRITE] INTO TABLE tablename [PARTITION (partition_name,...)]

说明:

  • 如果使用MaxCompute(原ODPS)数据源,则sourcepath为:

    1. odps://<project>/<table>/[<partition-1=v1>/.../<partition-n=vn>]
    • ODPS项目名称 project
    • ODPS表名 table
    • ODPS分区 partition-n=vn ,可以是多级任意类型分区。
  • 数据版本dataVersion,是分析型数据库管理一次导入数据用的批次号,可用于未来可能开放的数据回滚等feature,通常情况下可以不需要填写。

    • 数据版本必须是long型,且取值在表上单调递增。
    • 如果不指定则取当前时间(秒),格式是 yyyyMMddHHmss ,如 20140812080000
  • 覆盖导入选项( OVERWRITE ),导入时如果指定数据日期的表在分析型数据库线上已存在,则返回异常,除非显示指定覆盖。
  • 分析型数据库表名( tablename ),格式 table_schema.table_name ,其中 table_name 是分析型数据库表名, table_schema 是表所属DB名,表名不区分大小写。
  • PARTITION,分析型数据库表分区,分区格式 partition_column=partition_value ,其中 partition_column 是分区列名, partition_value 是分区值

    • 分区值必须是 long 型;
    • 分区值不存在时表示动态分区,例如第一级hash分区;
    • 不区分大小写;
    • 目前最多支持二级分区。
  • 执行返回值: ‘‘、,任务ID,用于后续查询导入状态.任务ID是唯一标识该导入任务,返回的是字符串,最长256字节。

示例:

  1. LOAD DATA
  2. FROM 'odps://<project>/odps2ads_test/dt=20160808'
  3. INTO TABLE db_name.odps2ads_test
  4. --注意<project> 填写MaxCompute表所属的项目名称
  5. `

步骤五:查看成功导入的数据。

通过数据集成(CDP)将RDS等其他数据源的数据导入

当表的数据源是RDS、OSS等其它的云系统,我们可以通过阿里云的数据集成(原CDP)产品进行数据同步。

批量更新表导入

专有云上批量导入

专有云上可以通过大数据开发套件的数据同步任务进行操作,数据同步任务即通过封装数据集成实现数据导入。具体步骤请看“大数据开发套件”的用户指南相关数据源配置和数据同步任务配置章节。

前提条件:

  • 分析型数据库目标表更新方式是“批量更新”。
  • 在分析型数据库中给MaxCompute的base_meta这个project的owner账号至少授予表的Load Data权限,base_meta的owner账号信息可以在CMDB中查到。

注意“大数据开发套件”中数据同步任务目标为ads数据源的“导入模式”配置项需要选择“批量导入”。

公共云上批量导入

公共云上可在 https://www.aliyun.com/product/cdp/ 上开通数据集成(可能需要申请公测),

前提条件:

  • 分析型数据库目标表更新方式是“批量更新”。
  • 在分析型数据库中给cloud-data-pipeline@aliyun-inner.com这个账号至少授予表的Load Data权限。

数据集成配置示例:

  1. {
  2. "type": "job",
  3. "traceId": "rds to ads job test",
  4. "version": "1.0",
  5. "configuration": {
  6. "setting": {
  7. },
  8. "reader": {
  9. "plugin": "mysql",
  10. "parameter": {
  11. "instanceName": "你的RDS的实例名",
  12. "database": "RDS数据库名",
  13. "table": "RDS表名",
  14. "splitPk": "任意一个列的名字",
  15. "username": "RDS用户名",
  16. "password": "RDS密码",
  17. "column": ["*"],
  18. }
  19. },
  20. "writer": {
  21. "plugin": "ads",
  22. "parameter": {
  23. "url": "在分析型数据库的控制台中选择数据库时提供的连接信息",
  24. "schema": "分析型数据库数据库名",
  25. "table": "分析型数据库表名",
  26. "username": "你的access key id",
  27. "password": "你的access key secret",
  28. "partition": "",
  29. "lifeCycle": 2,
  30. "overWrite": true
  31. }
  32. }
  33. }
  34. }

运行该job即可。

结束批量导入任务

用户可以自行结束批量导入任务,运行如下命令:

  1. kill loaddata 'jobId'

其中jobId为该任务的任务ID(load data命令返回值)。

实时更新表导入

专有云上实时导入

专有云上可以通过大数据开发套件的数据同步任务进行操作,数据同步任务即通过封装数据集成实现数据导入。具体步骤请看“大数据开发套件”的用户指南相关数据源配置和数据同步任务配置章节。

前提条件:

  • 分析型数据库目标表更新方式是“实时更新”。
  • 在分析型数据库中给MaxCompute的base_meta这个project的owner账号至少授予表的Load Data权限,base_meta的owner账号信息可以在CMDB中查到。

注意“大数据开发套件”中数据同步任务目标为ads数据源的“导入模式”配置项需要选择“实时导入”。

公共云上实时导入

前提条件:

  • 分析型数据库目标表更新方式是“实时更新”。
  • 在分析型数据库中给cloud-data-pipeline@aliyun-inner.com这个账号至少授予表的Load Data权限。

数据集成配置示例:

reader配置与前面批量导入的配置一样,writer配置需要变为(以原表列和目标表列相同为例,若不同需要在column选项中配置Mapping):

  1. "writer": {
  2. "name": "adswriter",
  3. "parameter": {
  4. "writeMode": "insert",
  5. "url": "在分析型数据库的控制台中选择数据库时提供的连接信息",
  6. "schema": "分析型数据库数据库名",
  7. "table": "分析型数据库表名",
  8. "column": ["*"],
  9. "username": "你的access key id",
  10. "password": "你的access key secret",
  11. "partition": "id,ds=2015"
  12. }

其他数据导入方式

用户亦可利用阿里云数据传输(DTS)进行RDS到分析型数据库的实时数据同步(请参照手册第八章的相关内容)。

分析型数据库亦兼容通过kettle等第三方工具,或用户自行编写的程序将数据导入/写入实时写入表。

另外,分析型数据库进行实时插入和删除时,不支持事务,并且仅遵循最终一致性的设计,所以分析型数据库并不能作为OLTP系统使用。

最后更新:2016-12-06 11:56:37

  上一篇:go 4.4 多计算引擎和Hint__第四章 DML_使用手册_分析型数据库-阿里云
  下一篇:go 5.2 数据导入状态查询__第五章 Data Pipeline_使用手册_分析型数据库-阿里云