基於OGG Datahub插件將Oracle數據同步上雲
一、背景介紹
隨著數據規模的不斷擴大,傳統的RDBMS難以滿足OLAP的需求,本文將介紹如何將Oracle的數據實時同步到阿裏雲的大數據處理平台當中,並利用大數據工具對數據進行分析。
OGG(Oracle GoldenGate)是一個基於日誌的結構化數據備份工具,一般用於Oracle數據庫之間的主從備份以及Oracle數據庫到其他數據庫(DB2, MySQL等)的同步。下麵是Oracle官方提供的一個OGG的整體架構圖,從圖中可以看出OGG的部署分為源端和目標端兩部分組成,主要有Manager,Extract,Pump,Collector,Replicat這麼一些組件。
- Manager:在源端和目標端都會有且隻有一個Manager進程存在,負責管理其他進程的啟停和監控等;
- Extract:負責從源端數據庫表或者事務日誌中捕獲數據,有初始加載和增量同步兩種模式可以配置,初始加載模式是直接將源表數據同步到目標端,而增量同步就是分析源端數據庫的日誌,將變動的記錄傳到目標端,本文介紹的是增量同步的模式;
- Pump:Extract從源端抽取的數據會先寫到本地磁盤的Trail文件,Pump進程會負責將Trail文件的數據投遞到目標端;
- Collector:目標端負責接收來自源端的數據,生成Trail文件
- Replicat:負責讀取目標端的Trail文件,轉化為相應的DDL和DML語句作用到目標數據庫,實現數據同步。
本文介紹的Oracle數據同步是通過OGG的Datahub插件實現的,該Datahub插件在架構圖中處於Replicat的位置,會分析Trail文件,將數據的變化記錄寫入Datahub中,可以使用流計算對datahub中的數據進行實時分析,也可以將數據歸檔到MaxCompute中進行離線處理。
二、安裝步驟
0. 環境要求
- 源端已安裝好Oracle
- 源端已安裝好OGG(建議版本Oracle GoldenGate V12.1.2.1)
- 目標端已安裝好OGG Adapters(建議版本Oracle GoldenGate Application Adapters 12.1.2.1)
- java 7
(下麵將介紹Oracle/OGG相關安裝和配置過程,Oracle的安裝將不做介紹,另外需要注意的是:Oracle/OGG相關參數配置以熟悉Oracle/OGG的運維人員配置為準,本示例隻是提供一個可運行的樣本,Oracle所使用的版本為ORA11g)
1. 源端OGG安裝
下載OGG安裝包解壓後有如下目錄:
drwxr-xr-x install
drwxrwxr-x response
-rwxr-xr-x runInstaller
drwxr-xr-x stage
目前oracle一般采取response安裝的方式,在response/oggcore.rsp中配置安裝依賴,具體如下:
oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2
# 需要目前與oracle版本對應
INSTALL_OPTION=ORA11g
# goldegate主目錄
SOFTWARE_LOCATION=/home/oracle/u01/ggate
# 初始不啟動manager
START_MANAGER=false
# manger端口
MANAGER_PORT=7839
# 對應oracle的主目錄
DATABASE_LOCATION=/home/oracle/u01/app/oracle/product/11.2.0/dbhome_1
# 暫可不配置
INVENTORY_LOCATION=
# 分組(目前暫時將oracle和ogg用同一個賬號ogg_test,實際可以給ogg單獨賬號)
UNIX_GROUP_NAME=oinstall
執行命令:
runInstaller -silent -responseFile {YOUR_OGG_INSTALL_FILE_PATH}/response/oggcore.rsp
本示例中,安裝後OGG的目錄在/home/oracle/u01/ggate,安裝日誌在/home/oracle/u01/ggate/cfgtoollogs/oui目錄下,當silentInstall{時間}.log文件裏出現如下提示,表明安裝成功:
The installation of Oracle GoldenGate Core was successful.
執行/home/oracle/u01/ggate/ggsci命令,並在提示符下鍵入命令:CREATE SUBDIRS,從而生成ogg需要的各種目錄(dir打頭的那些)。
至此,源端OGG安裝完成。
2. 源端Oracle配置
以dba分身進入sqlplus:sqlplus / as sysdba
# 創建獨立的表空間
create tablespace ATMV datafile '/home/oracle/u01/app/oracle/oradata/uprr/ATMV.dbf' size 100m autoextend on next 50m maxsize unlimited;
# 創建ogg_test用戶,密碼也為ogg_test
create user ogg_test identified by ogg_test default tablespace ATMV;
# 給ogg_test賦予充分的權限
grant connect,resource,dba to ogg_test;
# 檢查附加日誌情況
Select SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_UI, SUPPLEMENTAL_LOG_DATA_FK, SUPPLEMENTAL_LOG_DATA_ALL from v$database;
# 增加數據庫附加日誌及回退
alter database add supplemental log data;
alter database add supplemental log data (primary key, unique,foreign key) columns;
# rollback
alter database drop supplemental log data (primary key, unique,foreign key) columns;
alter database drop supplemental log data;
# 全字段模式,注意:在該模式下的delete操作也隻有主鍵值
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
# 開啟數據庫強製日誌模式
alter database force logging;
# 執行marker_setup.sql 腳本
@marker_setup.sql
# 執行@ddl_setup.sql
@ddl_setup.sql
# 執行role_setup.sql
@role_setup.sql
# 給ogg用戶賦權
grant GGS_GGSUSER_ROLE to ogg_test;
# 執行@ddl_enable.sql,開啟DDL trigger
@ddl_enable.sql
# 執行優化腳本
@ddl_pin ogg_test
# 安裝sequence support
@sequence.sql
#
alter table sys.seq$ add supplemental log data (primary key) columns;
3. OGG源端mgr配置
以下是通過ggsci對ogg進行配置
配置mgr
edit params mgr
PORT 7839
DYNAMICPORTLIST 7840-7849
USERID ogg_test, PASSWORD ogg_test
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
啟動mgr(運行日誌在ggate/dirrpt中)
start mgr
查看mgr狀態
info mgr
查看mgr配置
view params mgr
4. OGG源端extract配置
以下是通過ggsci對ogg進行配置
配置extract(名字可任取,extract是組名)
edit params extract
EXTRACT extract
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
DBOPTIONS ALLOWUNUSEDCOLUMN
USERID ogg_test, PASSWORD ogg_test
REPORTCOUNT EVERY 1 MINUTES, RATE
NUMFILES 5000
DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 100
DISCARDROLLOVER AT 2:00
WARNLONGTRANS 2h, CHECKINTERVAL 3m
EXTTRAIL ./dirdat/st, MEGABYTES 200
DYNAMICRESOLUTION
TRANLOGOPTIONS CONVERTUCS2CLOBS
TRANLOGOPTIONS RAWDEVICEOFFSET 0
DDL &
INCLUDE MAPPED OBJTYPE 'table' &
INCLUDE MAPPED OBJTYPE 'index' &
INCLUDE MAPPED OBJTYPE 'SEQUENCE' &
EXCLUDE OPTYPE COMMENT
DDLOPTIONS NOCROSSRENAME REPORT
TABLE OGG_TEST.*;
SEQUENCE OGG_TEST.*;
GETUPDATEBEFORES
增加extract進程(ext後的名字要跟上麵extract
對應,本例中extract是組名)
add ext extract,tranlog, begin now
刪除某廢棄進程DP_TEST
delete ext DP_TEST
添加抽取進程,每個隊列文件大小為200m
add exttrail ./dirdat/st,ext extract, megabytes 200
啟動抽取進程(運行日誌在ggate/dirrpt中)
start extract extract
至此,extract配置完成,數據庫的一條變更可以在ggate/dirdat目錄下的文件中看到
5. 生成def文件
源端ggsci起來後執行如下命令,生成defgen文件,並且拷貝到目標端dirdef下
edit params defgen
DEFSFILE ./dirdef/ogg_test.def
USERID ogg_test, PASSWORD ogg_test
table OGG_TEST.*;
在shell中執行如下命令,生成ogg_test.def
./defgen paramfile ./dirprm/defgen.prm
6. 目標端OGG安裝和配置
解壓adapter包
將源端中dirdef/ogg_test.def文件拷貝到adapter的dirdef下
執行ggsci起來後執行如下命令,創建必須目錄
create subdirs
編輯mgr配置
edit params mgr
PORT 7839
DYNAMICPORTLIST 7840-7849
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
啟動mgr
start mgr
7. 源端ogg pump配置
啟動ggsci後執行如下操作:
編輯pump配置
edit params pump
EXTRACT pump
RMTHOST xx.xx.xx.xx, MGRPORT 7839, COMPRESS
PASSTHRU
NUMFILES 5000
RMTTRAIL ./dirdat/st
DYNAMICRESOLUTION
TABLE OGG_TEST.*;
SEQUENCE OGG_TEST.*;
添加投遞進程,從某一個隊列開始投
add ext pump,exttrailsource ./dirdat/st
備注:投遞進程,每個隊文件大小為200m
add rmttrail ./dirdat/st,ext pump,megabytes 200
啟動pump
start pump
啟動後,結合上麵adapter的配置,可以在目標端的dirdat目錄下看到過來的trailfile
8. Datahub插件安裝和配置
依賴環境:jdk1.7。
配置好JAVA_HOME, LD_LIBRARY_PATH,可以將環境變量配置到~/.bash_profile中,例如
export JAVA_HOME=/xxx/xxx/jrexx
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server
修改環境變量後,請重啟adapter的mgr進程
下載datahub-ogg-plugin.tar.gz並解壓:
修改conf路徑下的javaue.properties文件,將{YOUR_HOME}替換為解壓後的路徑
gg.handlerlist=ggdatahub
gg.handler.ggdatahub.type=com.aliyun.odps.ogg.handler.datahub.DatahubHandler
gg.handler.ggdatahub.configureFileName={YOUR_HOME}/datahub-ogg-plugin/conf/configure.xml
goldengate.userexit.nochkpt=false
goldengate.userexit.timestamp=utc
gg.classpath={YOUR_HOME}/datahub-ogg-plugin/lib/*
gg.log.level=debug
jvm.bootoptions=-Xmx512m -Dlog4j.configuration=file:{YOUR_HOME}/datahub-ogg-plugin/conf/log4j.properties -Djava.class.path=ggjava/ggjava.jar
修改conf路徑下的log4j.properties文件,將{YOUR_HOME}替換為解壓後的路徑
修改conf路徑下的configure.xml文件,修改方式見文件中的注釋
<?xml version="1.0" encoding="UTF-8"?>
<configue>
<defaultOracleConfigure>
<!-- oracle sid, 必選-->
<sid>100</sid>
<!-- oracle schema, 可以被mapping中的oracleSchema覆蓋, 兩者必須有一個非空-->
<schema>ogg_test</schema>
</defaultOracleConfigure>
<defalutDatahubConfigure>
<!-- datahub endpoint, 必填-->
<endPoint>YOUR_DATAHUB_ENDPOINT</endPoint>
<!-- datahub project, 可以被mapping中的datahubProject, 兩者必須有一個非空-->
<project>YOUR_DATAHUB_PROJECT</project>
<!-- datahub accessId, 可以被mapping中的datahubAccessId覆蓋, 兩者必須有一個非空-->
<accessId>YOUR_DATAHUB_ACCESS_ID</accessId>
<!-- datahub accessKey, 可以被mapping中的datahubAccessKey覆蓋, 兩者必須有一個非空-->
<accessKey>YOUR_DATAHUB_ACCESS_KEY</accessKey>
<!-- 數據變更類型同步到datahub對應的字段,可以被columnMapping中的ctypeColumn覆蓋 -->
<ctypeColumn>optype</ctypeColumn>
<!-- 數據變更時間同步到datahub對應的字段,可以被columnMapping中的ctimeColumn覆蓋 -->
<ctimeColumn>readtime</ctimeColumn>
<!-- 數據變更序號同步到datahub對應的字段, 按數據變更先後遞增, 不保證連續, 可以被columnMapping中的cidColumn覆蓋 -->
<cidColumn>record_id</cidColumn>
<!-- 額外增加的常量列,每條record該列值為指定值,格式為c1=xxx,c2=xxx,可以被columnMapping中的constColumnMap覆蓋-->
<constColumnMap></constColumnMap>
</defalutDatahubConfigure>
<!-- 默認最嚴格,不落文件 直接退出 無限重試-->
<!-- 運行每批上次的最多紀錄數, 可選, 默認1000-->
<batchSize>1000</batchSize>
<!-- 默認時間字段轉換格式, 可選, 默認yyyy-MM-dd HH:mm:ss-->
<defaultDateFormat>yyyy-MM-dd HH:mm:ss</defaultDateFormat>
<!-- 髒數據是否繼續, 可選, 默認false-->
<dirtyDataContinue>true</dirtyDataContinue>
<!-- 髒數據文件, 可選, 默認datahub_ogg_plugin.dirty-->
<dirtyDataFile>datahub_ogg_plugin.dirty</dirtyDataFile>
<!-- 髒數據文件最大size, 單位M, 可選, 默認500-->
<dirtyDataFileMaxSize>200</dirtyDataFileMaxSize>
<!-- 重試次數, -1:無限重試 0:不重試 n:重試次數, 可選, 默認-1-->
<retryTimes>0</retryTimes>
<!-- 重試間隔, 單位毫秒, 可選, 默認3000-->
<retryInterval>4000</retryInterval>
<!-- 點位文件, 可選, 默認datahub_ogg_plugin.chk-->
<checkPointFileName>datahub_ogg_plugin.chk</checkPointFileName>
<mappings>
<mapping>
<!-- oracle schema, 見上描述-->
<oracleSchema></oracleSchema>
<!-- oracle table, 必選-->
<oracleTable>t_person</oracleTable>
<!-- datahub project, 見上描述-->
<datahubProject></datahubProject>
<!-- datahub AccessId, 見上描述-->
<datahubAccessId></datahubAccessId>
<!-- datahub AccessKey, 見上描述-->
<datahubAccessKey></datahubAccessKey>
<!-- datahub topic, 必選-->
<datahubTopic>t_person</datahubTopic>
<ctypeColumn></ctypeColumn>
<ctimeColumn></ctimeColumn>
<cidColumn></cidColumn>
<constColumnMap></constColumnMap>
<columnMapping>
<!--
src:oracle字段名稱, 必須;
dest:datahub field, 必須;
destOld:變更前數據落到datahub的field, 可選;
isShardColumn: 是否作為shard的hashkey, 可選, 默認為false, 可以被shardId覆蓋
isDateFormat: timestamp字段是否采用DateFormat格式轉換, 默認true. 如果是false, 源端數據必須是long
dateFormat: timestamp字段的轉換格式, 不填就用默認值
-->
<column src="id" dest="id" isShardColumn="true" isDateFormat="false" dateFormat="yyyy-MM-dd HH:mm:ss"/>
<column src="name" dest="name" isShardColumn="true"/>
<column src="age" dest="age"/>
<column src="address" dest="address"/>
<column src="comments" dest="comments"/>
<column src="sex" dest="sex"/>
<column src="temp" dest="temp" destOld="temp1"/>
</columnMapping>
<!--指定shard id, 優先生效, 可選-->
<shardId>1</shardId>
</mapping>
</mappings>
</configue>
在ggsci下啟動datahub writer
edit params dhwriter
extract dhwriter
getEnv (JAVA_HOME)
getEnv (LD_LIBRARY_PATH)
getEnv (PATH)
CUSEREXIT ./libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES, PARAMS "{YOUR_HOME}/datahub-ogg-plugin/conf/javaue.properties"
sourcedefs ./dirdef/ogg_test.def
table OGG_TEST.*;
添加dhwriter
add extract dhwriter, exttrailsource ./dirdat/st
啟動dhwriter
start dhwriter
三、使用場景
這裏會用一個簡單的示例來說明數據的使用方法,例如我們在Oracle數據庫有一張商品訂單表orders(oid int, pid int, num int),該表有三列,分別為訂單ID, 商品ID和商品數量。
將這個表通過OGG Datahub進行增量數據同步之前,我們需要先將源表已有的數據通過DataX同步到MaxCompute中。增量同步的關鍵步驟如下:
(1)在Datahub上創建相應的Topic,Topic的schema為(string record_id, string optype, string readtime, bigint oid_before, bigint oid_after, bigint pid_before, bigint pid_after, bigint num_before, bigint num_after);
(2)OGG Datahub的插件按照上述的安裝流程部署配置,其中列的Mapping配置如下:
<ctypeColumn>optype</ctypeColumn>
<ctimeColumn>readtime</ctimeColumn>
<columnMapping>
<column src="oid" dest="oid_after" destOld="oid_before" isShardColumn="true"/>
<column src="pid" dest="pid_after" destOld="pid_before"/>
<column src="num" dest="num_after" destOld="num_before"/>
</columnMapping>
其中optype和readtime字段是記錄數據庫的數據變更類型和時間,optype有"I", "D", "U"三種取值,分別對應為“增”,“刪”,“改”三種數據變更操作。
(3)OGG Datahub插件部署好成功運行後,插件會源源不斷的將源表的數據變更記錄輸送至datahub中,例如我們在源訂單表中新增一條記錄(1,2,1),datahub裏收到的記錄如下:
+--------+------------+------------+------------+------------+------------+------------+------------+------------+
| record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after |
+-------+------------+------------+------------+------------+------------+------------+------------+------------+
| 14810373343020000 | I | 2016-12-06 15:15:28.000141 | NULL | 1 | NULL | 2 | NULL | 1 |
修改這條數據,比如把num改為20,datahub則會收到的一條變更數據記錄,如下:
+-------+------------+------------+------------+------------+------------+------------+------------+------------+
| record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after |
+--------+------------+------------+------------+------------+------------+------------+------------+------------+
| 14810373343080000 | U | 2016-12-06 15:15:58.000253 | 1 | 1 | 2 | 2 | 1 | 20 |
實時計算
在前一天的離線計算的基礎數據上,我們可以寫一個StreamCompute流計算的分析程序,很容易地對數據進行實時匯總,例如實時統計當前總的訂單數,每種商品的銷售量等。處理思路就是對於每一條到來的變更數據,可以拿到變化的數值,實時更新統計變量即可。
離線處理
為了便於後續的離線分析,我們也可以將Datahub裏的數據歸檔到MaxCompute中,在MaxCompute中創建相應Schema的表:
create table orders_log(record_id string, optype string, readtime string, oid_before bigint, oid_after bigint, pid_before bigint, pid_after bigint, num_before bigint, num_after bigint);
在Datahub上創建MaxCompute的數據歸檔,上述流入Datahub裏的數據將自動同步到MaxCompute當中。建議將同步到MaxCompute中的數據按照時間段進行劃分,比如每一天的增量數據都對應一個獨立分區。這樣當天的數據同步完成後,我們可以處理對應的分區,拿到當天所有的數據變更,而與和前一天的全量數據進行合並後,即可得到當天的全量數據。為了簡單起見,先不考慮分區表的情況,以2016-12-06這天的增量數據為例,假設前一天的全量數據在表orders_base裏麵,datahub同步過來的增量數據在orders_log表中,將orders_base與orders_log做合並操作,可以得到2016-12-06這天的最終全量數據寫入表orders_result中。這個過程可以在MaxCompute上用如下這樣一條SQL完成。
INSERT OVERWRITE TABLE orders_result
SELECT t.oid, t.pid, t.num
FROM
(
SELECT oid, pid, num, '0' x_record_id, 1 AS x_optype
FROM
orders_base
UNION ALL
SELECT decode(optype,'D',oid_before,oid_after) AS oid
, decode(optype,'D', pid_before,pid_after) AS pid
, num_after AS num
, record_id x_record_id
, decode(optype, 'D', 0, 1) AS x_optype
FROM
orders_log
) t
JOIN
(
SELECT
oid
, pid
, max(record_id) x_max_modified
FROM
(
SELECT
oid
, pid
, '0' record_id
FROM
orders_base UNION ALL SELECT
decode(optype,'D',oid_before,oid_after) AS oid
, decode(optype,'D', pid_before,pid_after) AS pid
, record_id
FROM
orders_log ) g
GROUP BY oid , pid
) s
ON
t.oid = s.oid AND t.pid = s.pid AND t.x_record_id = s.x_max_modified AND t.x_optype <> 0;
四、常見問題
Q:目標端報錯 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address.
A:目標端機器hostname在/etc/hosts裏麵重新設置localhost對應的ip
Q:找不到jvm相關的so包
A:將jvm的so路徑添加到LD_LIBRARY_PATH後,重啟mgr
例如:export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server
Q:有了DDL語句,比如增加一列,源端ogg沒有問題,但是adapter端的ffwriter和jmswriter進程退出,且報錯: 2015-06-11 14:01:10 ERROR OGG-01161 Bad column index (5) specified for table OGG_TEST.T_PERSON, max columns = 5.
A:由於表結構改變,需要重做def文件,將重做的def文件放入dirdef後重啟即可
最後更新:2017-06-05 11:32:30