使用数据集成同步phoenix数据
本文主要介绍在云HBase上如果使用数据集成将一个phoenix表的数据迁移到另一个集群。一般来说,由于phoenix表的数据是存储在HBase上的,所以对于phoenix数据迁移,除了自己写代码同步之外,通常用于同步HBase数据的工具也都可以用来同步phoenix数据。限于篇幅,本文不介绍使用数据集成同步HBase数据的详细步骤,如有需要可参考这里,本文主要介绍和phoenix有关的特殊配置。
从一个简单例子开始
我们以云HBase帮助文档中Phoenix 入门里面的例子作为源数据。表结构是这样的:
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
和同步HBase数据类似,我们首先在目标集群上建表,然后通过数据集成把对应的HBase表数据导入到目标集群。为了同步HBase表,我们需要得到phoenix表所对应的HBase表的信息,主要是表名、列族名、列名。
表名:phoenix默认情况下会把phoenix表名变为全部大写作为对应的HBase表名。如果不希望变成大写,可以在create table语句中给表名加上引号,这种情况下HBase表名和phoenix表名相同。
在我们的例子中,对应的HBase表名为US_POPULATION。
列族名:主键的列族是表的默认列族(第一个非主键列的列族)。非主键列如果没有指定指定列族,就会使用默认列族0。
列名:对于非主键列,和表名一样,默认也是将phoenix列名变为大写。主键对应的列会被合并为一个名为_0的列,值为空。
在我们的例子里面,没有指定列族,因此非主键列的列族和列名为0:POPULATION,主键列为0:_0。
以下是完整的配置:
{
"configuration": {
"reader": {
"plugin": "hbase11x",
"parameter": {
"mode": "normal",
"scanCacheSize": "256",
"scanBatchSize": "100",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "0:POPULATION",
"type": "long"
},
{
"name": "0:_0",
"type": "long"
}
],
"encoding": "UTF-8",
"table": "US_POPULATION",
"hbaseConfig": {
"hbase.zookeeper.quorum": "hb-2ze5mewh34cfy7960-001.hbase.rds.aliyuncs.com:2181,hb-2ze5mewh34cfy7960-002.hbase.rds.aliyuncs.com:2181,hb-2ze5mewh34cfy7960-004.hbase.rds.aliyuncs.com:2181",
"hbase.cluster.distributed": "true"
}
}
},
"writer": {
"plugin": "hbase11x",
"parameter": {
"mode": "normal",
"walFlag": "false",
"rowkeyColumn": [
{
"index": 0,
"type": "string"
}
],
"nullMode": "empty",
"column": [
{
"name": "0:POPULATION",
"index": 1,
"type": "long"
},
{
"name": "0:_0",
"index": 2,
"type": "long"
}
],
"encoding": "UTF-8",
"table": "US_POPULATION",
"hbaseConfig": {
"hbase.zookeeper.quorum": "hb-2zel37texpqo9umcw-001.hbase.rds.aliyuncs.com:2181,hb-2zel37texpqo9umcw-002.hbase.rds.aliyuncs.com:2181,hb-2zel37texpqo9umcw-004.hbase.rds.aliyuncs.com:2181",
"hbase.cluster.distributed": "true"
}
}
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": "1",
"mbps": "1"
}
}
},
"type": "job",
"version": "1.0"
}
数据集成读取源表数据可能会对源表的读写性能造成一定影响,我们可以通过调节speed参数在导入速度和对源表的影响之间平衡。
增量数据
以上介绍了存量数据迁移的方法。增量数据的同步有两种方法:
(1)配置replicaiton。需要联系云HBase客服帮助修改hbase.replication配置项并重启集群。配置生效之后可以通过hbase shell配置要同步的表。replication配置的过程可参考HBase官方文档。
(2)修改应用代码双写。为避免增量数据和存量数据同时写入时发生冲突,我们可以把存量数据的版本号设置的比较低(例如设置为开始导入时刻的时间戳),这样可以保证应用写入的数据总是有更高的版本号,因此大多数情况下新数据总是能覆盖就数据。
删除操作可能会有新数据被旧数据覆盖的情况,例如删除的时候数据还没有导入进去。如果应用中有删除操作,需要评估这种情况造成的影响。必要的话需要使用更复杂的增量数据同步方法,例如应用中先记录日志,等存量数据同步完成后再从日志回放同步增量数据。
在数据集成的配置中,可以使用versionColumn来配置写入数据的版本号。
{
...
"writer": {
...
"versionColumn": {
"index": -1,
"value": 1502072520638
},
...
},
...
}
二级索引
我们给这个表建立一个二级索引
CREATE INDEX my_index ON US_POPULATION (city) include (state,population);
由于数据集成是调用hbase API写入数据的,导入的数据不会自动加上索引。有两个办法:
(1)我们可以在目标集群上建表时先不创建索引,待主表数据导入完成后再在目标集群上创建索引。由于云HBase目前不支持异步建索引,在执行建索引语句前需要先把客户端的超时改大。
<property>
<name>phoenix.query.timeoutMs</name>
<value>36000000</value>
</property>
<property>
<name>hbase.rpc.timeout</name>
<value>36000000</value>
</property>
<property>
<name>hbase.client.operation.timeout</name>
<value>36000000</value>
</property>
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>36000000</value>
</property>
另外,创建索引过程中,目标库的读写性能会下降,如果增量数据是线上业务同步写入目标库的话,需要评估对线上业务的影响。
(2)也可以像同步主表一样同步索引表。当前由于数据集成限制了列名的格式只能有一个冒号,所以对于covered index暂时不能使用这种方法。和主表类似,我们需要知道索引表的表名、列族名和列名。
表名:
对于global index,如果create index语句中有引号,则index名称就是索引表名称;如果没有引号,index名称变成大写字母就是索引表名称。
如果表名有schema,索引表名也会带上schema。例如把建表语句和索引表语句变成
CREATE TABLE IF NOT EXISTS my_schema.us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
CREATE INDEX my_index ON my_schema.us_population (city) include (state,population);
那么索引表的表名就变成了MY_SCHEMA.MY_INDEX。
如果创建的是local index,则索引表的表名为_LOCAL_IDX_<主表表名>。不管表名是否有schema,都把表名当成一个整体来决定索引表表名。例如
CREATE TABLE IF NOT EXISTS my_schema.us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
CREATE LOCAL INDEX my_index ON my_schema.us_population (city) include (state,population);
对应的索引表表名是_LOCAL_IDX_MY_SCHEMA.US_POPULATION
列族名和列名:
和主表一样,主键列被合并成一列,列名为_0,列族为索引表的默认列族。非主键列的列名为<列族>:<主表列名>,也就是说在索引表中完整的列族+列名是<列族>:<列族>:<主表列名>。
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
cf.population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
CREATE INDEX my_index ON us_population (city) include (state,cf.population);
这里索引表中包含两列,一列为CF:_0,一列为CF:CF:POPULATION。
最后更新:2017-08-13 22:23:50