使用數據集成同步phoenix數據
本文主要介紹在雲HBase上如果使用數據集成將一個phoenix表的數據遷移到另一個集群。一般來說,由於phoenix表的數據是存儲在HBase上的,所以對於phoenix數據遷移,除了自己寫代碼同步之外,通常用於同步HBase數據的工具也都可以用來同步phoenix數據。限於篇幅,本文不介紹使用數據集成同步HBase數據的詳細步驟,如有需要可參考這裏,本文主要介紹和phoenix有關的特殊配置。
從一個簡單例子開始
我們以雲HBase幫助文檔中Phoenix 入門裏麵的例子作為源數據。表結構是這樣的:
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
和同步HBase數據類似,我們首先在目標集群上建表,然後通過數據集成把對應的HBase表數據導入到目標集群。為了同步HBase表,我們需要得到phoenix表所對應的HBase表的信息,主要是表名、列族名、列名。
表名:phoenix默認情況下會把phoenix表名變為全部大寫作為對應的HBase表名。如果不希望變成大寫,可以在create table語句中給表名加上引號,這種情況下HBase表名和phoenix表名相同。
在我們的例子中,對應的HBase表名為US_POPULATION。
列族名:主鍵的列族是表的默認列族(第一個非主鍵列的列族)。非主鍵列如果沒有指定指定列族,就會使用默認列族0。
列名:對於非主鍵列,和表名一樣,默認也是將phoenix列名變為大寫。主鍵對應的列會被合並為一個名為_0的列,值為空。
在我們的例子裏麵,沒有指定列族,因此非主鍵列的列族和列名為0:POPULATION,主鍵列為0:_0。
以下是完整的配置:
{
"configuration": {
"reader": {
"plugin": "hbase11x",
"parameter": {
"mode": "normal",
"scanCacheSize": "256",
"scanBatchSize": "100",
"column": [
{
"name": "rowkey",
"type": "string"
},
{
"name": "0:POPULATION",
"type": "long"
},
{
"name": "0:_0",
"type": "long"
}
],
"encoding": "UTF-8",
"table": "US_POPULATION",
"hbaseConfig": {
"hbase.zookeeper.quorum": "hb-2ze5mewh34cfy7960-001.hbase.rds.aliyuncs.com:2181,hb-2ze5mewh34cfy7960-002.hbase.rds.aliyuncs.com:2181,hb-2ze5mewh34cfy7960-004.hbase.rds.aliyuncs.com:2181",
"hbase.cluster.distributed": "true"
}
}
},
"writer": {
"plugin": "hbase11x",
"parameter": {
"mode": "normal",
"walFlag": "false",
"rowkeyColumn": [
{
"index": 0,
"type": "string"
}
],
"nullMode": "empty",
"column": [
{
"name": "0:POPULATION",
"index": 1,
"type": "long"
},
{
"name": "0:_0",
"index": 2,
"type": "long"
}
],
"encoding": "UTF-8",
"table": "US_POPULATION",
"hbaseConfig": {
"hbase.zookeeper.quorum": "hb-2zel37texpqo9umcw-001.hbase.rds.aliyuncs.com:2181,hb-2zel37texpqo9umcw-002.hbase.rds.aliyuncs.com:2181,hb-2zel37texpqo9umcw-004.hbase.rds.aliyuncs.com:2181",
"hbase.cluster.distributed": "true"
}
}
},
"setting": {
"errorLimit": {
"record": "0"
},
"speed": {
"concurrent": "1",
"mbps": "1"
}
}
},
"type": "job",
"version": "1.0"
}
數據集成讀取源表數據可能會對源表的讀寫性能造成一定影響,我們可以通過調節speed參數在導入速度和對源表的影響之間平衡。
增量數據
以上介紹了存量數據遷移的方法。增量數據的同步有兩種方法:
(1)配置replicaiton。需要聯係雲HBase客服幫助修改hbase.replication配置項並重啟集群。配置生效之後可以通過hbase shell配置要同步的表。replication配置的過程可參考HBase官方文檔。
(2)修改應用代碼雙寫。為避免增量數據和存量數據同時寫入時發生衝突,我們可以把存量數據的版本號設置的比較低(例如設置為開始導入時刻的時間戳),這樣可以保證應用寫入的數據總是有更高的版本號,因此大多數情況下新數據總是能覆蓋就數據。
刪除操作可能會有新數據被舊數據覆蓋的情況,例如刪除的時候數據還沒有導入進去。如果應用中有刪除操作,需要評估這種情況造成的影響。必要的話需要使用更複雜的增量數據同步方法,例如應用中先記錄日誌,等存量數據同步完成後再從日誌回放同步增量數據。
在數據集成的配置中,可以使用versionColumn來配置寫入數據的版本號。
{
...
"writer": {
...
"versionColumn": {
"index": -1,
"value": 1502072520638
},
...
},
...
}
二級索引
我們給這個表建立一個二級索引
CREATE INDEX my_index ON US_POPULATION (city) include (state,population);
由於數據集成是調用hbase API寫入數據的,導入的數據不會自動加上索引。有兩個辦法:
(1)我們可以在目標集群上建表時先不創建索引,待主表數據導入完成後再在目標集群上創建索引。由於雲HBase目前不支持異步建索引,在執行建索引語句前需要先把客戶端的超時改大。
<property>
<name>phoenix.query.timeoutMs</name>
<value>36000000</value>
</property>
<property>
<name>hbase.rpc.timeout</name>
<value>36000000</value>
</property>
<property>
<name>hbase.client.operation.timeout</name>
<value>36000000</value>
</property>
<property>
<name>hbase.client.scanner.timeout.period</name>
<value>36000000</value>
</property>
另外,創建索引過程中,目標庫的讀寫性能會下降,如果增量數據是線上業務同步寫入目標庫的話,需要評估對線上業務的影響。
(2)也可以像同步主表一樣同步索引表。當前由於數據集成限製了列名的格式隻能有一個冒號,所以對於covered index暫時不能使用這種方法。和主表類似,我們需要知道索引表的表名、列族名和列名。
表名:
對於global index,如果create index語句中有引號,則index名稱就是索引表名稱;如果沒有引號,index名稱變成大寫字母就是索引表名稱。
如果表名有schema,索引表名也會帶上schema。例如把建表語句和索引表語句變成
CREATE TABLE IF NOT EXISTS my_schema.us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
CREATE INDEX my_index ON my_schema.us_population (city) include (state,population);
那麼索引表的表名就變成了MY_SCHEMA.MY_INDEX。
如果創建的是local index,則索引表的表名為_LOCAL_IDX_<主表表名>。不管表名是否有schema,都把表名當成一個整體來決定索引表表名。例如
CREATE TABLE IF NOT EXISTS my_schema.us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
CREATE LOCAL INDEX my_index ON my_schema.us_population (city) include (state,population);
對應的索引表表名是_LOCAL_IDX_MY_SCHEMA.US_POPULATION
列族名和列名:
和主表一樣,主鍵列被合並成一列,列名為_0,列族為索引表的默認列族。非主鍵列的列名為<列族>:<主表列名>,也就是說在索引表中完整的列族+列名是<列族>:<列族>:<主表列名>。
CREATE TABLE IF NOT EXISTS us_population (
state CHAR(2) NOT NULL,
city VARCHAR NOT NULL,
cf.population BIGINT
CONSTRAINT my_pk PRIMARY KEY (state, city));
CREATE INDEX my_index ON us_population (city) include (state,cf.population);
這裏索引表中包含兩列,一列為CF:_0,一列為CF:CF:POPULATION。
最後更新:2017-08-13 22:23:50