如何基於日誌,同步實現數據的一致性和實時抽取?
本文根據DBAplus社群第85期線上分享整理而成
講師介紹

王東
宜信技術研發中心架構師
-
目前就職於宜信技術研發中心,任架構師,負責流式計算和大數據業務產品解決方案。
-
曾任職於Naver china(韓國最大搜索引擎公司)中國研發中心資深工程師,多年從事CUBRID分布式數據庫集群開發和CUBRID數據庫引擎開發
https://www.cubrid.org/blog/news/cubrid-cluster-introduction/
主題簡介:
-
DWS 的背景介紹
-
dbus+wormhole 總體架構和技術實現方案
-
DWS的實際運用案例
前言
大家好,我是王東,來自宜信技術研發中心,這是我來社群的第一次分享,如果有什麼不足,請大家多多指正、包涵。
本次分享的主題是《基於日誌的DWS平台實現和應用》, 主要是分享一下目前我們在宜信做的一些事情。這個主題裏麵包含到2個團隊很多兄弟姐妹的努力的結果(我們團隊和山巍團隊的成果)。這次就由我代為執筆,盡我努力給大家介紹一下。
其實整個實現從原理上來說是比較簡單的,當然也涉及到不少技術。我會嚐試用盡量簡單的方式來表達,讓大家了解這個事情的原理和意義。在過程中,大家有問題可以隨時提出,我會盡力去解答。
DWS是一個簡稱,是由3個子項目組成,我稍後做解釋。
一、背景
事情是從公司前段時間的需求說起,大家知道宜信是一個互聯網金融企業,我們的很多數據與標準互聯網企業不同,大致來說就是:
玩數據的人都知道數據是非常有價值的,然後這些數據是保存在各個係統的數據庫中,如何讓需要數據的使用方得到一致性、實時的數據呢?
過去的通用做法有幾種是:
-
DBA開放各個係統的備庫,在業務低峰期(比如夜間),使用方各自抽取所需數據。由於抽取時間不同,各個數據使用方數據不一致,數據發生衝突,而且重複抽取,相信不少DBA很頭疼這個事情。
-
公司統一的大數據平台,通過Sqoop 在業務低峰期到各個係統統一抽取數據, 並保存到Hive表中, 然後為其他數據使用方提供數據服務。這種做法解決了一致性問題,但時效性差,基本是T+1的時效。
-
基於trigger的方式獲取增量變更,主要問題是業務方侵入性大,而且trigger也帶來性能損失。
這些方案都不算完美。我們在了解和考慮了不同實現方式後,最後借鑒了 linkedin的思想,認為要想同時解決數據一致性和實時性,比較合理的方法應該是來自於log。
(此圖來自:https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/)
把增量的Log作為一切係統的基礎。後續的數據使用方,通過訂閱kafka來消費log。
比如:
-
大數據的使用方可以將數據保存到Hive表或者Parquet文件給Hive或Spark查詢;
-
提供搜索服務的使用方可以保存到Elasticsearch或HBase 中;
-
提供緩存服務的使用方可以將日誌緩存到Redis或alluxio中;
-
數據同步的使用方可以將數據保存到自己的數據庫中;
-
由於kafka的日誌是可以重複消費的,並且緩存一段時間,各個使用方可以通過消費kafka的日誌來達到既能保持與數據庫的一致性,也能保證實時性;
為什麼使用log和kafka作為基礎,而不使用Sqoop進行抽取呢? 因為:
為什麼不使用dual write(雙寫)呢?,請參考https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/
我這裏就不多做解釋了。
二、總體架構
於是我們提出了構建一個基於log的公司級的平台的想法。
下麵解釋一下DWS平台, DWS平台是有3個子項目組成:
-
Dbus(數據總線):負責實時將數據從源端實時抽出,並轉換為約定的自帶schema的json格式數據(UMS 數據),放入kafka中;
-
Wormhole(數據交換平台):負責從kafka讀出數據 將數據寫入到目標中;
-
Swifts(實時計算平台):負責從kafka中讀出數據,實時計算,並將數據寫回kafka中。
圖中:
-
Log extractor和dbus共同完成數據抽取和數據轉換,抽取包括全量和增量抽取。
-
Wormhole可以將所有日誌數據保存到HDFS中; 還可以將數據落地到所有支持jdbc的數據庫,落地到HBash,Elasticsearch,Cassandra等;
-
Swifts支持以配置和SQL的方式實現對進行流式計算,包括支持流式join,look up,filter,window aggregation等功能;
-
Dbus web是dbus的配置管理端,rider除了配置管理以外,還包括對Wormhole和Swifts運行時管理,數據質量校驗等。
由於時間關係,我今天主要介紹DWS中的Dbus和Wormhole,在需要的時候附帶介紹一下Swifts。
三、dbus解決方案
日誌解析
如前麵所說,Dbus主要解決的是將日誌從源端實時的抽出。 這裏我們以MySQL為例子,簡單說明如何實現。
我們知道,雖然MySQL InnoDB有自己的log,MySQL主備同步是通過binlog來實現的。如下圖:
圖片來自:https://github.com/alibaba/canal
而binlog有三種模式:
-
Row 模式:日誌中會記錄成每一行數據被修改的形式,然後在slave端再對相同的數據進行修改。
-
Statement 模式: 每一條會修改數據的sql都會記錄到 master的bin-log中。slave在複製的時候SQL進程會解析成和原來master端執行過的相同的SQL來再次執行。
-
Mixed模式: MySQL會根據執行的每一條具體的sql語句來區分對待記錄的日誌形式,也就是在Statement和Row之間選擇一種。
他們各自的優缺點如下:
此處來自:https://www.jquerycn.cn/a_13625
由於statement 模式的缺點,在與我們的DBA溝通過程中了解到,實際生產過程中都使用row 模式進行複製。這使得讀取全量日誌成為可能。
通常我們的MySQL布局是采用 2個master主庫(vip)+ 1個slave從庫 + 1個backup容災庫 的解決方案,由於容災庫通常是用於異地容災,實時性不高也不便於部署。
為了最小化對源端產生影響,顯然我們讀取binlog日誌應該從slave從庫讀取。
讀取binlog的方案比較多,github上不少,參考https://github.com/search?utf8=%E2%9C%93&q=binlog。最終我們選用了阿裏的canal做位日誌抽取方。
Canal最早被用於阿裏中美機房同步, canal原理相對比較簡單:
-
Canal模擬MySQL Slave的交互協議,偽裝自己為MySQL Slave,向MySQL Slave發送dump協議
-
MySQL master收到dump請求,開始推送binary log給Slave(也就是canal)
-
Canal解析binary log對象(原始為byte流)
圖片來自:https://github.com/alibaba/canal
解決方案
Dbus 的MySQL版主要解決方案如下:
對於增量的log,通過訂閱Canal Server的方式,我們得到了MySQL的增量日誌:
-
按照Canal的輸出,日誌是protobuf格式,開發增量Storm程序,將數據實時轉換為我們定義的UMS格式(json格式,稍後我會介紹),並保存到kafka中;
-
增量Storm程序還負責捕獲schema變化,以控製版本號;
-
增量Storm的配置信息保存在Zookeeper中,以滿足高可用需求。
-
Kafka既作為輸出結果也作為處理過程中的緩衝器和消息解構區。
在考慮使用Storm作為解決方案的時候,我們主要是認為Storm有以下優點:
-
技術相對成熟,比較穩定,與kafka搭配也算標準組合;
-
實時性比較高,能夠滿足實時性需求;
-
滿足高可用需求;
-
通過配置Storm並發度,可以活動性能擴展的能力;
全量抽取
對於流水表,有增量部分就夠了,但是許多表需要知道最初(已存在)的信息。這時候我們需要initial load(第一次加載)。
對於initial load(第一次加載),同樣開發了全量抽取Storm程序通過jdbc連接的方式,從源端數據庫的備庫進行拉取。initial load是拉全部數據,所以我們推薦在業務低峰期進行。好在隻做一次,不需要每天都做。
全量抽取,我們借鑒了Sqoop的思想。將全量抽取Storm分為了2 個部分:
-
數據分片
-
實際抽取
數據分片需要考慮分片列,按照配置和自動選擇列將數據按照範圍來分片,並將分片信息保存到kafka中。
下麵是具體的分片策略:
全量抽取的Storm程序是讀取kafka的分片信息,采用多個並發度並行連接數據庫備庫進行拉取。因為抽取的時間可能很長。抽取過程中將實時狀態寫到Zookeeper中,便於心跳程序監控。
統一消息格式
無論是增量還是全量,最終輸出到kafka中的消息都是我們約定的一個統一消息格式,稱為UMS(unified message schema)格式。
如下圖所示:
消息中schema部分,定義了namespace 是由 類型+數據源名+schema名+表名+版本號+分庫號+分表號 能夠描述整個公司的所有表,通過一個namespace就能唯一定位。
-
_ums_op_ 表明數據的類型是I(insert),U(update),D(刪除);
-
_ums_ts_ 發生增刪改的事件的時間戳,顯然新的數據發生的時間戳更新;
-
_ums_id_ 消息的唯一id,保證消息是唯一的,但這裏我們保證了消息的先後順序(稍後解釋);
payload是指具體的數據,一個json包裏麵可以包含1條至多條數據,提高數據的有效載荷。
UMS中支持的數據類型,參考了Hive類型並進行簡化,基本上包含了所有數據類型。
全量和增量的一致性
在整個數據傳輸中,為了盡量的保證日誌消息的順序性,kafka我們使用的是1個partition的方式。在一般情況下,基本上是順序的和唯一的。
但是我們知道寫kafka會失敗,有可能重寫,Storm也用重做機製,因此,我們並不嚴格保證exactly once和完全的順序性,但保證的是at least once。
因此_ums_id_變得尤為重要。
對於全量抽取,_ums_id_是唯一的,從zk中每個並發度分別取不同的id片區,保證了唯一性和性能,填寫負數,不會與增量數據衝突,也保證他們是早於增量消息的。
對於增量抽取,我們使用的是MySQL的日誌文件號 + 日誌偏移量作為唯一id。Id作為64位的long整數,高7位用於日誌文件號,低12位作為日誌偏移量。
例如:000103000012345678。 103 是日誌文件號,12345678 是日誌偏移量。
這樣,從日誌層麵保證了物理唯一性(即便重做也這個id號也不變),同時也保證了順序性(還能定位日誌)。通過比較_ums_id_ 消費日誌就能通過比較_ums_id_知道哪條消息更新。
其實_ums_ts_與_ums_id_意圖是類似的,隻不過有時候_ums_ts_可能會重複,即在1毫秒中發生了多個操作,這樣就得靠比較_ums_id_了。
心跳監控和預警
整個係統涉及到數據庫的主備同步,Canal Server,多個並發度Storm進程等各個環節。
因此對流程的監控和預警就尤為重要。
通過心跳模塊,例如每分鍾(可配置)對每個被抽取的表插入一條心態數據並保存發送時間,這個心跳表也被抽取,跟隨著整個流程下來,與被同步表在實際上走相同的邏輯(因為多個並發的的Storm可能有不同的分支),當收到心跳包的時候,即便沒有任何增刪改的數據,也能證明整條鏈路是通的。
Storm程序和心跳程序將數據發送公共的統計topic,再由統計程序保存到influxdb中,使用grafana進行展示,就可以看到如下效果:
圖中是某業務係統的實時監控信息。上麵是實時流量情況,下麵是實時延時情況。可以看到,實時性還是很不錯的,基本上1~2秒數據就已經到末端kafka中。
Granfana提供的是一種實時監控能力。
如果出現延時,則是通過dbus的心跳模塊發送郵件報警或短信報警。
實時脫敏
考慮到數據安全性,對於有脫敏需求的場景,Dbus的全量storm和增量storm程序也完成了實時脫敏的功能。脫敏方式有3種:
總結一下:簡單的說,Dbus就是將各種源的數據,實時的導出,並以UMS的方式提供訂閱, 支持實時脫敏,實際監控和報警。
四、Wormhole解決方案
說完Dbus,該說一下Wormhole,為什麼兩個項目不是一個,而要通過kafka來對接呢?
其中很大一個原因就是解耦,kafka具有天然的解耦能力,程序直接可以通過kafka做異步的消息傳遞。Dbus和Wornhole內部也使用了kafka做消息傳遞和解耦。
另外一個原因就是,UMS是自描述的,通過訂閱kafka,任何有能力的使用方來直接消費UMS來使用。
雖然UMS的結果可以直接訂閱,但還需要開發的工作。Wormhole解決的是:提供一鍵式的配置,將kafka中的數據落地到各種係統中,讓沒有開發能力的數據使用方通過wormhole來實現使用數據。
如圖所示,Wormhole 可以將kafka中的UMS 落地到各種係統,目前用的最多的HDFS,JDBC的數據庫和HBase。
在技術棧上, wormhole選擇使用spark streaming來進行。
在Wormhole中,一條flow是指從一個namaspace從源端到目標端。一個spark streaming服務於多條flow。
選用Spark的理由是很充分的:
-
Spark天然的支持各種異構存儲係統;
-
雖然Spark Stream比Storm延時稍差,但Spark有著更好的吞吐量和更好的計算性能;
-
Spark在支持並行計算方麵有更強的靈活性;
-
Spark提供了一個技術棧內解決Sparking Job,Spark Streaming,Spark SQL的統一功能,便於後期開發;
這裏補充說一下Swifts的作用:
-
Swifts的本質是讀取kafka中的UMS數據,進行實時計算,將結果寫入到kafka的另外一個topic。
-
實時計算可以是很多種方式:比如過濾filter,projection(投影),lookup, 流式join window aggregation,可以完成各種具有業務價值的流式實時計算。
Wormhole和Swifts對比如下:
落HDFS
通過Wormhole Wpark Streaming程序消費kafka的UMS,首先UMS log可以被保存到HDFS上。
kafka一般隻保存若幹天的信息,不會保存全部信息,而HDFS中可以保存所有的曆史增刪改的信息。這就使得很多事情變為可能:
-
通過重放HDFS中的日誌,我們能夠還原任意時間的曆史快照。
-
可以做拉鏈表,還原每一條記錄的曆史信息,便於分析;
-
當程序出現錯誤是,可以通過回灌(backfill),重新消費消息,重新形成新的快照。
可以說HDFS中的日誌是很多的事情基礎。
介於Spark原生對parquet支持的很好,Spark SQL能夠對Parquet提供很好的查詢。UMS落地到HDFS上是保存到Parquet文件中的。Parquet的內容是所有log的增刪改信息以及_ums_id_,_ums_ts_都存下來。
Wormhole spark streaming根據namespace 將數據分布存儲到不同的目錄中,即不同的表和版本放在不同目錄中。
由於每次寫的Parquet都是小文件,大家知道HDFS對於小文件性能並不好,因此另外還有一個job,每天定時將這些的Parquet文件進行合並成大文件。
每個Parquet文件目錄都帶有文件數據的起始時間和結束時間。這樣在回灌數據時,可以根據選取的時間範圍來決定需要讀取哪些Parquet文件,不必讀取全部數據。
插入或更新數據的冪等性
常常我們遇到的需求是,將數據經過加工落地到數據庫或HBase中。那麼這裏涉及到的一個問題就是,什麼樣的數據可以被更新到數據?
這裏最重要的一個原則就是數據的冪等性。
無論是遇到增刪改任何的數據,我們麵臨的問題都是:
-
該更新哪一行;
-
更新的策略是什麼。
對於第一個問題,其實就需要定位數據要找一個唯一的鍵,常見的有:
-
使用業務庫的主鍵;
-
由業務方指定幾個列做聯合唯一索引;
對於第二個問題,就涉及到_ums_id_了,因為我們已經保證了_ums_id_大的值更新,因此在找到對應數據行後,根據這個原則來進行替換更新。
之所以要軟刪除和加入_is_active_列,是為了這樣一種情況:
如果已經插入的_ums_id_比較大,是刪除的數據(表明這個數據已經刪除了), 如果不是軟刪除,此時插入一個_ums_id_小的數據(舊數據),就會真的插入進去。
這就導致舊數據被插入了。不冪等了。所以被刪除的數據依然保留(軟刪除)是有價值的,它能被用於保證數據的冪等性。
HBase的保存
插入數據到Hbase中,相當要簡單一些。不同的是HBase可以保留多個版本的數據(當然也可以隻保留一個版本)默認是保留3個版本;
因此插入數據到HBase,需要解決的問題是:
-
選擇合適的rowkey:Rowkey的設計是可以選的,用戶可以選擇源表的主鍵,也可以選擇若幹列做聯合主鍵。
-
選擇合適的version:使用_ums_id_+ 較大的偏移量(比如100億) 作為row的version。
Version的選擇很有意思,利用_ums_id_的唯一性和自增性,與version自身的比較關係一致:即version較大等價於_ums_id_較大,對應的版本較新。
從提高性能的角度,我們可以將整個Spark Streaming的Dataset集合直接插入到HBase,不需要比較。讓HBase基於version自動替我們判斷哪些數據可以保留,哪些數據不需要保留。
Jdbc的插入數據:
插入數據到數據庫中,保證冪等的原理雖然簡單,要想提高性能在實現上就變得複雜很多,總不能一條一條的比較然後在插入或更新。
我們知道Spark的RDD/dataset都是以集合的方式來操作以提高性能,同樣的我們需要以集合操作的方式實現冪等性。
具體思路是:
-
首先根據集合中的主鍵到目標數據庫中查詢,得到一個已有數據集合;
-
與dataset中的集合比較,分出兩類:
A:不存在的數據,即這部分數據insert就可以;
B:存在的數據,比較_ums_id_, 最終隻將哪些_ums_id_更新較大row到目標數據庫,小的直接拋棄。
使用Spark的同學都知道,RDD/dataset都是可以partition的,可以使用多個worker並進行操作以提高效率。
在考慮並發情況下,插入和更新都可能出現失敗,那麼還有考慮失敗後的策略。
比如:因為別的worker已經插入,那麼因為唯一性約束插入失敗,那麼需要改為更新,還要比較_ums_id_看是否能夠更新。
對於無法插入其他情況(比如目標係統有問題),Wormhole還有重試機製。說起來細節特別多。這裏就不多介紹了。
有些還在開發中。
插入到其他存儲中的就不多介紹了,總的原則是:根據各自存儲自身特性,設計基於集合的,並發的插入數據實現。這些都是Wormhole為了性能而做的努力,使用Wormhole的用戶不必關心 。
五、運用案例
實時營銷
說了那麼多,DWS有什麼實際運用呢?下麵我來介紹某係統使用DWS實現了的實時營銷。
如上圖所示:
係統A的數據都保存到自己的數據庫中,我們知道,宜信提供很多金融服務,其中包括借款,而借款過程中很重要的就是信用審核。
借款人需要提供證明具有信用價值的信息,比如央行征信報告,是具有最強信用數據的數據。 而銀行流水,網購流水也是具有較強的信用屬性的數據。
借款人通過Web或手機APP在係統A中填寫信用信息時,可能會某些原因無法繼續,雖然可能這個借款人是一個優質潛在客戶,但以前由於無法或很久才能知道這個信息,所以實際上這樣的客戶是流失了。
應用了DWS以後,借款人已經填寫的信息已經記錄到數據庫中,並通過DWS實時的進行抽取、計算和落地到目標庫中。根據對客戶的打分,評價出優質客戶。然後立刻將這個客戶的信息輸出到客服係統中。
客服人員在很短的時間(幾分鍾以內)就通過打電話的方式聯係上這個借款人(潛客),進行客戶關懷,將這個潛客轉換為真正的客戶。我們知道借款是有時效性的,如果時間太久就沒有價值了。
如果沒有實時抽取/計算/落庫的能力,那麼這一切都無法實現。
實時報表係統
另外一個實時報表的應用如下:
我們數據使用方的數據來自多個係統,以前是通過T+1的方式獲得報表信息,然後指導第二天的運營,這樣時效性很差。
通過DWS,將數據從多個係統中實時抽取,計算和落地,並提供報表展示,使得運營可以及時作出部署和調整,快速應對。
六、總結
說了那麼多,大致總結一下:
-
DWS技術上基於主流實時流式大數據技術框架,高可用大吞吐強水平擴容,低延遲高容錯最終一致。
-
DWS能力上支持異構多源多目標係統,支持多數據格式(結構化半結構化非結構化數據)和實時技術能力。
-
DWS將三個子項目合並作為一個平台推出,使得我們具備了實時的能力, 驅動各種實時場景應用。
適合場景包括:實時同步/實時計算/實時監控/實時報表/實時分析/實時洞察/實時管理/實時運營/實時決策
感謝大家的聆聽,此次分享到此為止。
Q&A
Q1:Oracle log reader有開源方案嗎?
A1:對於Oracle業界也有許多商業解決方案,例如:Oracle GoldenGate(原來的goldengate), Oracle Xstream, IBM InfoSphere Change Data Capture(原來的DataMirror),Dell SharePlex (原來的Quest),國內的DSG superSync等,開源的方案好用的很少。
Q2:這個項目投入了多少人力物力?感覺有點複雜。
Q2:DWS是三個子項目組成,平均每個項目5~7人。是有點複雜,其實也是試圖使用大數據技術來解決我們公司目前遇到的困難。
因為是搞大數據相關技術,所有團隊裏麵的兄弟姐妹都還是比較happy的:)
其實這裏麵,Dbus和Wormhole相對固定模式化,容易輕鬆複用。Swifts實時計算是與每個業務相關比較大的,自定義比較強,相對比較麻煩一些。
Q3:宜信的這個DWS係統會開源麼?
A3:我們也考慮過向社區貢獻,就像宜信的其他開源項目一樣,目前項目剛剛成形,還有待進一步磨煉,我相信未來的某個時候,我們會給它開源出來。
Q4:架構師怎麼理解,是不是係統工程師?
A4:不是係統工程師,在我們宜信有多位架構師,應該算是以技術驅動業務的技術管理人員。包含產品設計,技術管理等。
Q5:複製方案是否是OGG?
A5:OGG與上麵提到的其他商業解決方案都是可選方案。
原文發布時間為:2016-12-19
本文來自雲棲社區合作夥伴DBAplus
最後更新:2017-05-11 15:01:04