创建RDS到MaxCompute数据实时同步作业__实时同步_用户指南_数据传输-阿里云
本小节介绍如何使用数据传输服务快速创建RDS实例到MaxCompute实例间的实时同步作业,实现在线(RDS)到离线系统(MaxCompute)的数据实时同步,进一步为数据实时分析奠定基础。
支持功能
数据源
- 支持同一个阿里云账号下RDS MySQL实例到MaxCompute实例的数据实时同步。
- 支持不同阿里云账号下的RDS MySQL实例到MaxCompute实例的数据实时同步。
- 支持的RDS实例包括,经典网络和VPC网络两种网络模式。
同步对象
- 只支持表的同步,不支持其他非表对象的同步。
同步原理
如上图所示,整个同步过程分为两步:
(1) 全量初始化, 这个步骤将RDS MySQL中已经存在的全量数据初始化到MaxCompute中。对于同步的每个表,全量初始化的数据都会独立存储在MaxCompute中的全量基线表中,这个表的默认格式为:源表名_base。例如表 t1,那么全量基线表在MaxCompute中存储的表名为:t1_dts_base。这个存储表名前缀可以根据需要变更,您可以在配置任务时,修改表在MaxCompute存储的名称。
(2) 增量数据同步,这个步骤将RDS MySQL产生的增量数据数据实时同步到MaxCompute中。并存储在增量日志表中,每个同步表对应一个增量日志表。增量日志表在MaxCompute中存储的表名的默认格式为:源表名_log。这个存储表名前缀可以根据需要变更,您可以在配置任务时,修改表在MaxCompute存储的名称。
增量日志表除了存储更新数据,它还会存储一些元信息,增量日志表的表结构定义如下:
record_id | operation_flag | utc_timestamp | before_flag | after_flag | col1 | …. | colN |
---|---|---|---|---|---|---|---|
1 | I | 1476258462 | N | Y | 1 | ….. | JustInsert |
2 | U | 1476258463 | Y | N | 1 | ….. | JustInsert |
2 | U | 1476258463 | N | Y | 1 | ….. | JustUpdate |
3 | D | 1476258464 | Y | N | 1 | ….. | JustUpdate |
其中:
record_id: 这条增量日志的唯一标识,唯一递增。如果变更类型为update,那么增量更新会被拆分成2条,一条Insert,一条Delete。那么这两条记录的record_id相同。
operation_flag: 标示这条增量日志的操作类型。取值包括:
I : insert 操作
D : delete 操作
U : update 操作
dts_utc_timestamp: 这条增量日志的操作时间戳,为这个更新操作记录binlog的时间戳。这个时间戳为UTC时间。
before_flag: 表示这条增量日志后面带的各个column值是否更新前的值。取值包括:Y 和 N。当后面的column为更新前的值时,before_flag=Y, 当后面的column值为更新后的值时,before_flag=N。
after_flag:表示这条增量日志后面带的各个column值是否更新后的值。取值包括:Y 和 N。 当后面的column为更新前的值时,after_flag=N,当后面的column值为更新后的值时,after_flag=Y。
对于不同的操作类型,增量日志中的before_flag和after_flag定义如下:
1) 操作类型为:insert
record_id | operation_flag | utc_timestamp | before_flag | after_flag | col1 | …. | colN |
---|---|---|---|---|---|---|---|
1 | I | 1476258462 | N | Y | 1 | ….. | JustInsert |
当操作类型为insert时,后面的所有column值为新插入的记录值,即为更新后的值。所以before_flag=N, after_flag=Y。
2) 操作类型为:update
record_id | operation_flag | utc_timestamp | before_flag | after_flag | col1 | …. | colN |
---|---|---|---|---|---|---|---|
2 | U | 1476258463 | Y | N | 1 | ….. | JustInsert |
2 | U | 1476258463 | N | Y | 1 | ….. | JustUpdate |
当操作类型为update时,会将update操作拆为2条增量日志。这两条增量日志的record_id ,operation_flag 及dts_utc_timestamp相同。
第一条日志记录了更新前的值,所以before_flag=Y, after_flag=N。
第二条日志记录了更新后的值,所以before_flag=N, after_flag=Y。
3) 操作类型为:delete
record_id | operation_flag | dts_utc_timestamp | before_flag | after_flag | col1 | …. | colN |
---|---|---|---|---|---|---|---|
3 | D | 1476258464 | Y | N | 1 | ….. | JustUpdate |
当操作类型为delete时,后面的所有column值为被删除的记录值,即为更新前的值。所以before_flag=Y, after_flag=N。
(3) RDS->MaxCompute数据同步,对于每个同步表,都会在MaxCompute中生成一个全量基线表+一个增量日志表,所以如果需要在MaxCompute中获取某个时刻某张表的全量数据,就需要merge这张表的全量基线表和增量日志表。具体实现方法后面会详细讲解。
下面详细介绍RDS到MaxCompute数据实时同步作业的配置流程。
同步作业配置流程
下面详细介绍RDS到MaxCompute数据实时同步作业的配置流程。
1.购买同步链路
进入数据传输服务控制台,进入数据同步页面,点击控制台右上角“创建同步作业” 开始作业配置。
在链路配置之前需要购买一个同步链路。同步链路目前支持包年包月及按量付费两种付费模式,可以根据需要选择不同的付费模式。
在购买页面需要配置的参数包括:
- 源实例
同步作业的源实例类型,目前只支持RDS For MySQL。 - 源地域
源地域为同步实例的源RDS实例所在地域。 - 目标实例
目标实例为同步作业的目标实例类型,目前支持 RDS For MySQL, MaxCompute(原ODPS),DataHub。配置RDS->MaxCompute同步链路时,目标实例选择:MaxCompute 即可。 - 目标地域
由于MaxCompute目前只在上海地区售卖,所以目标地域选择上海。 - 实例规格
实例规格影响了链路的同步性能,可以根据业务性能选择合适的规则。 - 网络类型
RDS->MaxCompute支持通过公网、私网 同步数据。如果源RDS没有公网连接地址,那么网络类型只能选择 私网。 - 数量
数量为一次性购买的同步链路的数量,如果购买的是按量付费实例,一次最多购买99条链路。
当购买完同步实例,返回数据传输控制台,点击新购链路右侧的“配置同步作业” 开始链路配置。
2.同步链路连接信息配置。
在这一步主要配置:
同步作业名称
同步作业名称没有唯一性要求,主要为了更方便识别具体的作业,建议选择一个有业务意义的作业名称,方便后续的链路查找及管理。实例ID配置
在这个步骤中需要配置源RDS实例的实例ID,及目标MaxCompute实例的project。配置的MaxCompute project 必须属于登录DTS的阿里云账号的资源。
当这些内容配置完成后,可以点击授权白名单并进入下一步。
3.授权RDS实例白名单
这个步骤,主要是将给DTS服务账号授权MaxCompute写权限,让DTS能够将数据同步复制到MaxCompute中。
授权权限包括对project的:
CreateTable
CreateInstance
CreateResource
CreateJob
List
为了保证同步作业的稳定性,在同步过程中,请勿将写权限回收。当白名单授权后,点击下一步,进入同步账号创建。
当授权完成后,即进入同步对象选择。
4.选择同步对象
当MaxCompute账号授权完成后,即进入同步表及同步初始化的相关配置。
在这个步骤中,需要配置 同步初始化 和 同步表。其中:
(1)同步初始化
同步初始化选项包括: 结构初始化 和 全量数据初始化。
结构初始化是指对于待同步的表,在MaxCompute中创建对应的表,完成表结构定义。
全量数据初始化是指对于待同步的表,将历史数据初始化到MaxCompute中。
配置任务时,建议同时选择 结构初始化+全量数据初始化。
(2) 同步表选择
同步表只能选择某些表,不能直接选择整个库。对于同步的表,可以修改表在MaxCompute对应的全量基线表及增量日志表的表名前缀。如需修改,可以点击右边已选择对象后面的编辑按钮,进入修改界面。
当配置完同步对象后,进入同步初始化配置。
5.预检查
当上面所有选项配置完成后,即进入启动之前的预检查。具体检查项内容详见本文最后的 预检查内容 一节。
当同步作业配置完成后,数据传输服务会进行限制预检查,当预检查通过后,可以点击 确定 按钮,启动同步作业。
当同步作业启动之后,即进入同步作业列表。此时刚启动的作业处于同步初始化状态。初始化的时间长度依赖于源实例中同步对象的数据量大小。当初始化完成后同步链路即进入同步中的状态,此时源跟目标实例的同步链路才真正建立完成。
当同步任务进入 同步中 时,可以在MaxCompute中可以查询出jiangliu_test对应的全量基线表和增量日志表:
至此,完成RDS->MaxCompute数据实时同步作业的配置。
全量数据合并方案
本小节介绍,如何根据同步到MaxCompute中的全量基线表和增量日志数据得到某个时刻表的全量数据。
DTS提供通过MaxCompute SQL实现全量数据合并的能力。
通过MaxCompute SQL merge 全量基线数据 和 增量日志表 得到时刻t的全量数据。MaxCompute SQL的写法如下:
insert overwrite table result_storage_table
select col1,
col2,
colN
from(
select row_number() over(partition by t.primary_key_column
order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, col1,col2,colN
from(
select incr.record_id, incr.operation_flag, incr.after_flag, incr.col1, incr.col2,incr.colN
from table_log incr
where utc_timestamp< timestmap
union all
select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.col1, base.col2,base.colN
from table_base base) t) gt
where record_num=1
and after_flag='Y'
上面代码中的几个变量意义如下:
1) result_storage_table 表示全量merge结果集的存储表的表名
2) col1, col2,colN 表示同步表中列的列名
3) primary_key_column 表示同步表中的主键列的列名
4) table_log 表示增量日志表
5) table_base 表示全量基线表
6) timestmap 表示需要merge哪个时刻的全量数据
例如对于上面配置任务中的jiangliu_test表,jiangliu_test_20161010_base 为jiangliu_test对应的全量基线表,jiangliu_test_20161010_log 为jiangliu_test对应的增量日志表。
jiangliu_test的表结构定义为:
那么查询时间戳1476263486 时刻,jiangliu_test表的全量数据的MaxCompute SQL如下:
insert overwrite table jiangliu_test_1476263486
select id,
name
from(
select row_number() over(partition by t.id
order by record_id desc, after_flag desc) as row_number, record_id, operation_flag, after_flag, id, name
from(
select incr.record_id, incr.operation_flag, incr.after_flag, incr.id, incr.name
from jiangliu_test_20161010_log incr
where utc_timestamp< 1476263486
union all
select 0 as record_id, 'I' as operation_flag, 'Y' as after_flag, base.id, base.name
from jiangliu_test_20161010_base base) t) gt
where gt.row_number= 1
and gt.after_flag= 'Y' ;
您也可以通过大数据开发套件,在后续的计算分析操作之前,添加全量数据Merge节点,当全量数据merge完成后,可自动调度起后续的计算分析节点。同时可以配置调度周期,进行周期性的数据离线分析。
至此完成RDS->MaxCompute数据同步任务配置及全量数据合并。
最后更新:2016-12-07 10:50:45
上一篇:
创建RDS实例间数据实时同步作业__实时同步_用户指南_数据传输-阿里云
下一篇:
RDS到DataHub数据实时同步__实时同步_用户指南_数据传输-阿里云
服务商后台功能发布通知__管理后台_服务商_云市场-阿里云
User__数据类型_RAM API文档_访问控制-阿里云
删除域名分组__域名分组接口_API文档_云解析-阿里云
修改密码__管理实例_用户指南_云数据库 Memcache 版-阿里云
查询作业详情__作业_API参考_E-MapReduce-阿里云
阿里云携手德施曼等ICA联盟企业联合发布《中国智能锁应用与发展白皮书》!
查询流量数据__资源监控接口_API 手册_CDN-阿里云
搜索指定订阅信息__订阅管理相关接口_Open API_消息队列 MQ-阿里云
JavaScript 发消息示例__JavaScript 接入示例_MQTT 接入(物联)_消息队列 MQ-阿里云
搜索相关性配置__应用高级配置_产品使用手册_开放搜索-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云