閱讀432 返回首頁    go 阿裏雲 go 技術社區[雲棲]


基於OGG Datahub插件將Oracle數據同步上雲

一、背景介紹

隨著數據規模的不斷擴大,傳統的RDBMS難以滿足OLAP的需求,本文將介紹如何將Oracle的數據實時同步到阿裏雲的大數據處理平台當中,並利用大數據工具對數據進行分析。

OGG(Oracle GoldenGate)是一個基於日誌的結構化數據備份工具,一般用於Oracle數據庫之間的主從備份以及Oracle數據庫到其他數據庫(DB2, MySQL等)的同步。下麵是Oracle官方提供的一個OGG的整體架構圖,從圖中可以看出OGG的部署分為源端和目標端兩部分組成,主要有Manager,Extract,Pump,Collector,Replicat這麼一些組件。
屏幕快照 2016-11-24 下午3.31.29.png

  • Manager:在源端和目標端都會有且隻有一個Manager進程存在,負責管理其他進程的啟停和監控等;
  • Extract:負責從源端數據庫表或者事務日誌中捕獲數據,有初始加載和增量同步兩種模式可以配置,初始加載模式是直接將源表數據同步到目標端,而增量同步就是分析源端數據庫的日誌,將變動的記錄傳到目標端,本文介紹的是增量同步的模式;
  • Pump:Extract從源端抽取的數據會先寫到本地磁盤的Trail文件,Pump進程會負責將Trail文件的數據投遞到目標端;
  • Collector:目標端負責接收來自源端的數據,生成Trail文件
  • Replicat:負責讀取目標端的Trail文件,轉化為相應的DDL和DML語句作用到目標數據庫,實現數據同步。

本文介紹的Oracle數據同步是通過OGG的Datahub插件實現的,該Datahub插件在架構圖中處於Replicat的位置,會分析Trail文件,將數據的變化記錄寫入Datahub中,可以使用流計算對datahub中的數據進行實時分析,也可以將數據歸檔到MaxCompute中進行離線處理。

二、安裝步驟

0. 環境要求

  • 源端已安裝好Oracle
  • 源端已安裝好OGG(建議版本Oracle GoldenGate V12.1.2.1)
  • 目標端已安裝好OGG Adapters(建議版本Oracle GoldenGate Application Adapters 12.1.2.1)
  • java 7

(下麵將介紹Oracle/OGG相關安裝和配置過程,Oracle的安裝將不做介紹,另外需要注意的是:Oracle/OGG相關參數配置以熟悉Oracle/OGG的運維人員配置為準,本示例隻是提供一個可運行的樣本,Oracle所使用的版本為ORA11g)

1. 源端OGG安裝

下載OGG安裝包解壓後有如下目錄:

drwxr-xr-x install
drwxrwxr-x response
-rwxr-xr-x runInstaller
drwxr-xr-x stage

目前oracle一般采取response安裝的方式,在response/oggcore.rsp中配置安裝依賴,具體如下:

oracle.install.responseFileVersion=/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2
# 需要目前與oracle版本對應
INSTALL_OPTION=ORA11g
# goldegate主目錄
SOFTWARE_LOCATION=/home/oracle/u01/ggate
# 初始不啟動manager
START_MANAGER=false
# manger端口
MANAGER_PORT=7839
# 對應oracle的主目錄
DATABASE_LOCATION=/home/oracle/u01/app/oracle/product/11.2.0/dbhome_1
# 暫可不配置
INVENTORY_LOCATION=
# 分組(目前暫時將oracle和ogg用同一個賬號ogg_test,實際可以給ogg單獨賬號)
UNIX_GROUP_NAME=oinstall

執行命令:

runInstaller -silent -responseFile {YOUR_OGG_INSTALL_FILE_PATH}/response/oggcore.rsp

本示例中,安裝後OGG的目錄在/home/oracle/u01/ggate,安裝日誌在/home/oracle/u01/ggate/cfgtoollogs/oui目錄下,當silentInstall{時間}.log文件裏出現如下提示,表明安裝成功:

The installation of Oracle GoldenGate Core was successful.

執行/home/oracle/u01/ggate/ggsci命令,並在提示符下鍵入命令:CREATE SUBDIRS,從而生成ogg需要的各種目錄(dir打頭的那些)。
至此,源端OGG安裝完成。

2. 源端Oracle配置

以dba分身進入sqlplus:sqlplus / as sysdba

# 創建獨立的表空間
create tablespace ATMV datafile '/home/oracle/u01/app/oracle/oradata/uprr/ATMV.dbf' size 100m autoextend on next 50m maxsize unlimited;

# 創建ogg_test用戶,密碼也為ogg_test
create user ogg_test identified by ogg_test default tablespace ATMV;

# 給ogg_test賦予充分的權限
grant connect,resource,dba to ogg_test;

# 檢查附加日誌情況
Select SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_UI, SUPPLEMENTAL_LOG_DATA_FK, SUPPLEMENTAL_LOG_DATA_ALL from v$database;

# 增加數據庫附加日誌及回退
alter database add supplemental log data;
alter database add supplemental log data (primary key, unique,foreign key) columns;
# rollback
alter database drop supplemental log data (primary key, unique,foreign key) columns;
alter database drop supplemental log data;

# 全字段模式,注意:在該模式下的delete操作也隻有主鍵值
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
# 開啟數據庫強製日誌模式
alter database force logging;
# 執行marker_setup.sql 腳本
@marker_setup.sql
# 執行@ddl_setup.sql
@ddl_setup.sql
# 執行role_setup.sql
@role_setup.sql
# 給ogg用戶賦權
grant GGS_GGSUSER_ROLE to ogg_test;
# 執行@ddl_enable.sql,開啟DDL trigger
@ddl_enable.sql
# 執行優化腳本
@ddl_pin ogg_test
# 安裝sequence support
@sequence.sql
#
alter table sys.seq$ add supplemental log data (primary key) columns;

3. OGG源端mgr配置

以下是通過ggsci對ogg進行配置

配置mgr
edit params mgr

PORT 7839
DYNAMICPORTLIST  7840-7849
USERID ogg_test, PASSWORD ogg_test
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7

啟動mgr(運行日誌在ggate/dirrpt中)

start mgr

查看mgr狀態

info mgr

查看mgr配置

view params mgr

4. OGG源端extract配置

以下是通過ggsci對ogg進行配置

配置extract(名字可任取,extract是組名)
edit params extract

EXTRACT extract
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
DBOPTIONS   ALLOWUNUSEDCOLUMN
USERID ogg_test, PASSWORD ogg_test
REPORTCOUNT EVERY 1 MINUTES, RATE
NUMFILES 5000
DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 100
DISCARDROLLOVER AT 2:00
WARNLONGTRANS 2h, CHECKINTERVAL 3m
EXTTRAIL ./dirdat/st, MEGABYTES 200
DYNAMICRESOLUTION
TRANLOGOPTIONS CONVERTUCS2CLOBS
TRANLOGOPTIONS RAWDEVICEOFFSET 0
DDL &
INCLUDE MAPPED OBJTYPE 'table' &
INCLUDE MAPPED OBJTYPE 'index' &
INCLUDE MAPPED OBJTYPE 'SEQUENCE' &
EXCLUDE OPTYPE COMMENT
DDLOPTIONS  NOCROSSRENAME  REPORT
TABLE     OGG_TEST.*;
SEQUENCE  OGG_TEST.*;

GETUPDATEBEFORES

增加extract進程(ext後的名字要跟上麵extract對應,本例中extract是組名)
add ext extract,tranlog, begin now

刪除某廢棄進程DP_TEST
delete ext DP_TEST

添加抽取進程,每個隊列文件大小為200m
add exttrail ./dirdat/st,ext extract, megabytes 200

啟動抽取進程(運行日誌在ggate/dirrpt中)
start extract extract
至此,extract配置完成,數據庫的一條變更可以在ggate/dirdat目錄下的文件中看到

5. 生成def文件

源端ggsci起來後執行如下命令,生成defgen文件,並且拷貝到目標端dirdef下
edit params defgen

DEFSFILE ./dirdef/ogg_test.def
USERID ogg_test, PASSWORD ogg_test
table OGG_TEST.*;

在shell中執行如下命令,生成ogg_test.def
./defgen paramfile ./dirprm/defgen.prm

6. 目標端OGG安裝和配置

解壓adapter包
將源端中dirdef/ogg_test.def文件拷貝到adapter的dirdef下

執行ggsci起來後執行如下命令,創建必須目錄
create subdirs

編輯mgr配置
edit params mgr

PORT 7839
DYNAMICPORTLIST  7840-7849
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7

啟動mgr
start mgr

7. 源端ogg pump配置

啟動ggsci後執行如下操作:

編輯pump配置
edit params pump

EXTRACT pump
RMTHOST xx.xx.xx.xx, MGRPORT 7839, COMPRESS
PASSTHRU
NUMFILES 5000
RMTTRAIL ./dirdat/st
DYNAMICRESOLUTION
TABLE      OGG_TEST.*;
SEQUENCE   OGG_TEST.*;

添加投遞進程,從某一個隊列開始投
add ext pump,exttrailsource ./dirdat/st

備注:投遞進程,每個隊文件大小為200m
add rmttrail ./dirdat/st,ext pump,megabytes 200

啟動pump
start pump
啟動後,結合上麵adapter的配置,可以在目標端的dirdat目錄下看到過來的trailfile

8. Datahub插件安裝和配置

依賴環境:jdk1.7。
配置好JAVA_HOME, LD_LIBRARY_PATH,可以將環境變量配置到~/.bash_profile中,例如

export JAVA_HOME=/xxx/xxx/jrexx
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server

修改環境變量後,請重啟adapter的mgr進程
下載datahub-ogg-plugin.tar.gz並解壓:

修改conf路徑下的javaue.properties文件,將{YOUR_HOME}替換為解壓後的路徑

gg.handlerlist=ggdatahub

gg.handler.ggdatahub.type=com.aliyun.odps.ogg.handler.datahub.DatahubHandler
gg.handler.ggdatahub.configureFileName={YOUR_HOME}/datahub-ogg-plugin/conf/configure.xml

goldengate.userexit.nochkpt=false
goldengate.userexit.timestamp=utc

gg.classpath={YOUR_HOME}/datahub-ogg-plugin/lib/*
gg.log.level=debug

jvm.bootoptions=-Xmx512m -Dlog4j.configuration=file:{YOUR_HOME}/datahub-ogg-plugin/conf/log4j.properties -Djava.class.path=ggjava/ggjava.jar

修改conf路徑下的log4j.properties文件,將{YOUR_HOME}替換為解壓後的路徑

修改conf路徑下的configure.xml文件,修改方式見文件中的注釋

<?xml version="1.0" encoding="UTF-8"?>
<configue>

    <defaultOracleConfigure>
        <!-- oracle sid, 必選-->
        <sid>100</sid>
        <!-- oracle schema, 可以被mapping中的oracleSchema覆蓋, 兩者必須有一個非空-->
        <schema>ogg_test</schema>
    </defaultOracleConfigure>

    <defalutDatahubConfigure>
        <!-- datahub endpoint, 必填-->
        <endPoint>YOUR_DATAHUB_ENDPOINT</endPoint>
        <!-- datahub project, 可以被mapping中的datahubProject, 兩者必須有一個非空-->
        <project>YOUR_DATAHUB_PROJECT</project>
        <!-- datahub accessId, 可以被mapping中的datahubAccessId覆蓋, 兩者必須有一個非空-->
        <accessId>YOUR_DATAHUB_ACCESS_ID</accessId>
        <!-- datahub accessKey, 可以被mapping中的datahubAccessKey覆蓋, 兩者必須有一個非空-->
        <accessKey>YOUR_DATAHUB_ACCESS_KEY</accessKey>
        <!-- 數據變更類型同步到datahub對應的字段,可以被columnMapping中的ctypeColumn覆蓋 -->
        <ctypeColumn>optype</ctypeColumn>
        <!-- 數據變更時間同步到datahub對應的字段,可以被columnMapping中的ctimeColumn覆蓋 -->
        <ctimeColumn>readtime</ctimeColumn>
        <!-- 數據變更序號同步到datahub對應的字段, 按數據變更先後遞增, 不保證連續, 可以被columnMapping中的cidColumn覆蓋 -->
        <cidColumn>record_id</cidColumn>
<!-- 額外增加的常量列,每條record該列值為指定值,格式為c1=xxx,c2=xxx,可以被columnMapping中的constColumnMap覆蓋-->
         <constColumnMap></constColumnMap>
    </defalutDatahubConfigure>

    <!-- 默認最嚴格,不落文件 直接退出 無限重試-->

    <!-- 運行每批上次的最多紀錄數, 可選, 默認1000-->
    <batchSize>1000</batchSize>

    <!-- 默認時間字段轉換格式, 可選, 默認yyyy-MM-dd HH:mm:ss-->
    <defaultDateFormat>yyyy-MM-dd HH:mm:ss</defaultDateFormat>

    <!-- 髒數據是否繼續, 可選, 默認false-->
    <dirtyDataContinue>true</dirtyDataContinue>

    <!-- 髒數據文件, 可選, 默認datahub_ogg_plugin.dirty-->
    <dirtyDataFile>datahub_ogg_plugin.dirty</dirtyDataFile>

    <!-- 髒數據文件最大size, 單位M, 可選, 默認500-->
    <dirtyDataFileMaxSize>200</dirtyDataFileMaxSize>

    <!-- 重試次數, -1:無限重試 0:不重試 n:重試次數, 可選, 默認-1-->
    <retryTimes>0</retryTimes>

    <!-- 重試間隔, 單位毫秒, 可選, 默認3000-->
    <retryInterval>4000</retryInterval>

    <!-- 點位文件, 可選, 默認datahub_ogg_plugin.chk-->
    <checkPointFileName>datahub_ogg_plugin.chk</checkPointFileName>

    <mappings>
        <mapping>
            <!-- oracle schema, 見上描述-->
            <oracleSchema></oracleSchema>
            <!-- oracle table, 必選-->
            <oracleTable>t_person</oracleTable>
            <!-- datahub project, 見上描述-->
            <datahubProject></datahubProject>
            <!-- datahub AccessId, 見上描述-->
            <datahubAccessId></datahubAccessId>
            <!-- datahub AccessKey, 見上描述-->
            <datahubAccessKey></datahubAccessKey>
            <!-- datahub topic, 必選-->
            <datahubTopic>t_person</datahubTopic>
            <ctypeColumn></ctypeColumn>
            <ctimeColumn></ctimeColumn>
            <cidColumn></cidColumn>
            <constColumnMap></constColumnMap>
            <columnMapping>
                <!--
                src:oracle字段名稱, 必須;
                dest:datahub field, 必須;
                destOld:變更前數據落到datahub的field, 可選;
                isShardColumn: 是否作為shard的hashkey, 可選, 默認為false, 可以被shardId覆蓋
                isDateFormat: timestamp字段是否采用DateFormat格式轉換, 默認true. 如果是false, 源端數據必須是long
                dateFormat: timestamp字段的轉換格式, 不填就用默認值
                -->
                <column src="id" dest="id" isShardColumn="true"  isDateFormat="false" dateFormat="yyyy-MM-dd HH:mm:ss"/>
                <column src="name" dest="name" isShardColumn="true"/>
                <column src="age" dest="age"/>
                <column src="address" dest="address"/>
                <column src="comments" dest="comments"/>
                <column src="sex" dest="sex"/>
                <column src="temp" dest="temp" destOld="temp1"/>
            </columnMapping>

            <!--指定shard id, 優先生效, 可選-->
            <shardId>1</shardId>
        </mapping>
    </mappings>
</configue>

在ggsci下啟動datahub writer

edit params dhwriter

extract dhwriter
getEnv (JAVA_HOME)
getEnv (LD_LIBRARY_PATH)
getEnv (PATH)
CUSEREXIT ./libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES, PARAMS "{YOUR_HOME}/datahub-ogg-plugin/conf/javaue.properties"
sourcedefs ./dirdef/ogg_test.def
table OGG_TEST.*;

添加dhwriter
add extract dhwriter, exttrailsource ./dirdat/st

啟動dhwriter
start dhwriter

三、使用場景

這裏會用一個簡單的示例來說明數據的使用方法,例如我們在Oracle數據庫有一張商品訂單表orders(oid int, pid int, num int),該表有三列,分別為訂單ID, 商品ID和商品數量。
將這個表通過OGG Datahub進行增量數據同步之前,我們需要先將源表已有的數據通過DataX同步到MaxCompute中。增量同步的關鍵步驟如下:
(1)在Datahub上創建相應的Topic,Topic的schema為(string record_id, string optype, string readtime, bigint oid_before, bigint oid_after, bigint pid_before, bigint pid_after, bigint num_before, bigint num_after);
(2)OGG Datahub的插件按照上述的安裝流程部署配置,其中列的Mapping配置如下:

            <ctypeColumn>optype</ctypeColumn>
            <ctimeColumn>readtime</ctimeColumn>
            <columnMapping>
                <column src="oid" dest="oid_after" destOld="oid_before" isShardColumn="true"/>
                <column src="pid" dest="pid_after" destOld="pid_before"/>
                <column src="num" dest="num_after" destOld="num_before"/>
            </columnMapping>

其中optype和readtime字段是記錄數據庫的數據變更類型和時間,optype有"I", "D", "U"三種取值,分別對應為“增”,“刪”,“改”三種數據變更操作。
(3)OGG Datahub插件部署好成功運行後,插件會源源不斷的將源表的數據變更記錄輸送至datahub中,例如我們在源訂單表中新增一條記錄(1,2,1),datahub裏收到的記錄如下:

+--------+------------+------------+------------+------------+------------+------------+------------+------------+
| record_id | optype     | readtime   | oid_before | oid_after  | pid_before | pid_after  | num_before | num_after  |
+-------+------------+------------+------------+------------+------------+------------+------------+------------+
| 14810373343020000 |     I          | 2016-12-06 15:15:28.000141 | NULL       | 1          | NULL       | 2          | NULL       | 1   |       

修改這條數據,比如把num改為20,datahub則會收到的一條變更數據記錄,如下:

+-------+------------+------------+------------+------------+------------+------------+------------+------------+
| record_id | optype     | readtime   | oid_before | oid_after  | pid_before | pid_after  | num_before | num_after  |
+--------+------------+------------+------------+------------+------------+------------+------------+------------+
| 14810373343080000 |     U          | 2016-12-06 15:15:58.000253 | 1          | 1          | 2          | 2          | 1          | 20         |

實時計算

在前一天的離線計算的基礎數據上,我們可以寫一個StreamCompute流計算的分析程序,很容易地對數據進行實時匯總,例如實時統計當前總的訂單數,每種商品的銷售量等。處理思路就是對於每一條到來的變更數據,可以拿到變化的數值,實時更新統計變量即可。

離線處理

為了便於後續的離線分析,我們也可以將Datahub裏的數據歸檔到MaxCompute中,在MaxCompute中創建相應Schema的表:

create table orders_log(record_id string, optype string, readtime string, oid_before bigint, oid_after bigint, pid_before bigint, pid_after bigint, num_before bigint, num_after bigint);

在Datahub上創建MaxCompute的數據歸檔,上述流入Datahub裏的數據將自動同步到MaxCompute當中。建議將同步到MaxCompute中的數據按照時間段進行劃分,比如每一天的增量數據都對應一個獨立分區。這樣當天的數據同步完成後,我們可以處理對應的分區,拿到當天所有的數據變更,而與和前一天的全量數據進行合並後,即可得到當天的全量數據。為了簡單起見,先不考慮分區表的情況,以2016-12-06這天的增量數據為例,假設前一天的全量數據在表orders_base裏麵,datahub同步過來的增量數據在orders_log表中,將orders_base與orders_log做合並操作,可以得到2016-12-06這天的最終全量數據寫入表orders_result中。這個過程可以在MaxCompute上用如下這樣一條SQL完成。

INSERT OVERWRITE TABLE orders_result
SELECT t.oid, t.pid, t.num
FROM
(
     SELECT oid, pid, num, '0' x_record_id, 1 AS x_optype
     FROM
     orders_base 
     UNION ALL
     SELECT decode(optype,'D',oid_before,oid_after) AS oid
              , decode(optype,'D', pid_before,pid_after) AS pid
              , num_after AS num
              , record_id x_record_id
              , decode(optype, 'D', 0, 1) AS x_optype
     FROM
     orders_log
 ) t
JOIN
 (
     SELECT
     oid
     , pid
     , max(record_id) x_max_modified
     FROM
     (
     SELECT
     oid
     , pid
     , '0' record_id
     FROM
     orders_base UNION ALL SELECT
                      decode(optype,'D',oid_before,oid_after) AS oid
                      , decode(optype,'D', pid_before,pid_after) AS pid
                      , record_id
                      FROM
                      orders_log ) g
     GROUP BY oid , pid
 ) s
ON
t.oid = s.oid AND t.pid = s.pid AND t.x_record_id = s.x_max_modified AND t.x_optype <> 0;

四、常見問題

Q:目標端報錯 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address.
A:目標端機器hostname在/etc/hosts裏麵重新設置localhost對應的ip

Q:找不到jvm相關的so包
A:將jvm的so路徑添加到LD_LIBRARY_PATH後,重啟mgr

例如:export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server

Q:有了DDL語句,比如增加一列,源端ogg沒有問題,但是adapter端的ffwriter和jmswriter進程退出,且報錯: 2015-06-11 14:01:10 ERROR OGG-01161 Bad column index (5) specified for table OGG_TEST.T_PERSON, max columns = 5.
A:由於表結構改變,需要重做def文件,將重做的def文件放入dirdef後重啟即可

最後更新:2017-06-05 11:32:30

  上一篇:go  UI自動化框架調研-番外篇
  下一篇:go  阿裏雲快照容量基本原理