【源碼】canal和otter的高可靠性分析
一般來說,我們對於數據庫最主要的要求就是:數據不丟。不管是主從複製,還是使用類似otter+canal這樣的數據庫同步方案,我們最基本的需求是,在數據不丟失的前提下,盡可能的保證係統的高可用,也就是在某個節點掛掉,或者數據庫發生主從切換等情況下,我們的數據同步係統依然能夠發揮它的作用--數據同步。本文討論的場景是數據庫發生主從切換,本文將從源碼的角度,來看看otter和canal是如何保證高可用和高可靠的。
一、EventParser
通過閱讀文檔和源碼,我們可以知道,對於一個canal server,基礎的框架包括以下幾個部分:MetaManager、EventParser、EventSink和EventStore。其中EventParser的作用就是發送dump命令,從mysql數據庫獲取binlog文件。發送dump命令,可以指定時間戳或者position,從指定的時間或者位置開始dump。我們來看看過程:
首先是CanalServer啟動。otter默認使用的是內置版的canal server,所以我們主要看CanalServerWithEmbedded這個類。來看下他的啟動過程:
public void start(final String destination) {
final CanalInstance canalInstance = canalInstances.get(destination);
if (!canalInstance.isStart()) {
try {
MDC.put("destination", destination);
canalInstance.start();//啟動實例
logger.info("start CanalInstances[{}] successfully", destination);
} finally {
MDC.remove("destination");
}
}
}
我們看下實例啟動那一行,跟到AbstractCanalInstance類中
public void start() {
super.start();
if (!metaManager.isStart()) {
metaManager.start();//源數據管理啟動
}
if (!alarmHandler.isStart()) {
alarmHandler.start();//報警處理器啟動
}
if (!eventStore.isStart()) {
eventStore.start();//數據存儲器啟動
}
if (!eventSink.isStart()) {
eventSink.start();//數據過濾器啟動
}
if (!eventParser.isStart()) {//數據解析器啟動
beforeStartEventParser(eventParser);
eventParser.start();
afterStartEventParser(eventParser);
}
logger.info("start successful....");
}
我們主要看下eventParser.start()方法裏麵的內容。我們主要關注的是EventParser使如何在主從切換的條件下,進行dump節點的確定的。我們跟蹤到AbstractEventParser類中的start()方法,重點看下
// 4. 獲取最後的位置信息
EntryPosition position = findStartPosition(erosaConnection);
這塊有兩個實現,但是canal目前使用的是MysqlEventParser,也就是基於Mysql的Binlog文件來進行數據同步。我們看下代碼:
protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
EntryPosition startPosition = findStartPositionInternal(connection);
if (needTransactionPosition.get()) {
logger.warn("prepare to find last position : {}", startPosition.toString());
Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
logger.warn("find new start Transaction Position , old : {} , new : {}",
startPosition.getPosition(),
preTransactionStartPosition);
startPosition.setPosition(preTransactionStartPosition);
}
needTransactionPosition.compareAndSet(true, false);
}
return startPosition;
}
對於第一行findStartPositionInternal(connection),我們重點關注的情況是數據庫連接地址發生變化,也就是進行了主從切換的情況。
boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
&& logPosition.getPostion().getServerId() != null
&& !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
if (case2) {
long timestamp = logPosition.getPostion().getTimestamp();
long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
logger.warn("prepare to find start position by last position {}:{}:{}", new Object[]{"", "",
logPosition.getPostion().getTimestamp()});
EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
// 重新置為一下
dumpErrorCount = 0;
return findPosition;
}
我們分析下case2這個條件,其實就是表示的就是配置了主從切換,而且發生了serverId變化的情況,在這種情況下,首先需要獲取到事件發生的時間戳,然後將這個事件發生的時間減去60s,也就是向前推一分鍾之後,在新的binlog文件中根據新的時間戳來找到當時對應的事件。
這塊根據時間戳來尋找事件的過程比較簡單,首先根據binglog-index文件找到所有的binlog文件名,然後遍曆binlog文件的頭,找到binlog文件的寫入時間,與新的時間戳進行對比,定位到binlog文件。定位到文件後,直接根據時間戳來進行遍曆,找到新的時間戳之前發生的那個事務起始位置。
/**
* 根據給定的時間戳,在指定的binlog中找到最接近於該時間戳(必須是小於時間戳)的一個事務起始位置。
* 針對最後一個binlog會給定endPosition,避免無盡的查詢
*/
private EntryPosition findAsPerTimestampInSpecificLogFile(MysqlConnection mysqlConnection,
final Long startTimestamp,
final EntryPosition endPosition,
final String searchBinlogFile) {
final LogPosition logPosition = new LogPosition();
try {
mysqlConnection.reconnect();
// 開始遍曆文件
mysqlConnection.seek(searchBinlogFile, 4L, new SinkFunction<LogEvent>() {
private LogPosition lastPosition;
public boolean sink(LogEvent event) {
EntryPosition entryPosition = null;
try {
CanalEntry.Entry entry = parseAndProfilingIfNecessary(event);
if (entry == null) {
return true;
}
String logfilename = entry.getHeader().getLogfileName();
Long logfileoffset = entry.getHeader().getLogfileOffset();
Long logposTimestamp = entry.getHeader().getExecuteTime();
if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())
|| CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
logger.debug("compare exit condition:{},{},{}, startTimestamp={}...", new Object[]{
logfilename, logfileoffset, logposTimestamp, startTimestamp});
// 事務頭和尾尋找第一條記錄時間戳,如果最小的一條記錄都不滿足條件,可直接退出
if (logposTimestamp >= startTimestamp) {
return false;
}
}
if (StringUtils.equals(endPosition.getJournalName(), logfilename)
&& endPosition.getPosition() <= (logfileoffset + event.getEventLen())) {
return false;
}
// 記錄一下上一個事務結束的位置,即下一個事務的position
// position = current +
// data.length,代表該事務的下一條offest,避免多餘的事務重複
if (CanalEntry.EntryType.TRANSACTIONEND.equals(entry.getEntryType())) {
entryPosition = new EntryPosition(logfilename,
logfileoffset + event.getEventLen(),
logposTimestamp);
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
logPosition.setPostion(entryPosition);
} else if (CanalEntry.EntryType.TRANSACTIONBEGIN.equals(entry.getEntryType())) {
// 當前事務開始位點
entryPosition = new EntryPosition(logfilename, logfileoffset, logposTimestamp);
logger.debug("set {} to be pending start position before finding another proper one...",
entryPosition);
logPosition.setPostion(entryPosition);
}
lastPosition = buildLastPosition(entry);
} catch (Throwable e) {
processSinkError(e, lastPosition, searchBinlogFile, 4L);
}
return running;
}
});
} catch (IOException e) {
logger.error("ERROR ## findAsPerTimestampInSpecificLogFile has an error", e);
}
if (logPosition.getPostion() != null) {
return logPosition.getPostion();
} else {
return null;
}
}
這塊的邏輯如下:
- 發送dump命令,起始位置為4L,也就是跳過了binlog的第一個標誌事件。
- canal收到binlog,開始進行對binlog文件進行解析。
- 主要我們看的是事務開始和事務提交的事件,判斷事務開始或結束的時間,是否小於我們要找的時間戳,如果大於等於,直接遍曆下一個事件。
- 傳入了一個endPosition,防止無限掃描。
- 雖說是從頭開始掃描的,但是要想跳出遍曆,需要滿足一定的條件。在跳出遍曆之前,最後一次設置的logPosition才是我們要招的logPosition。
- 如果是一個事務提交的事件,我們要找的position就是這個事件的position+event.length。如果是事務開始,position就是當前事件的position。其他的事件都忽略。
至此,我們已經找到了我們想要的binlog文件名和對應的事務開始position,我們繼續下麵的步驟即可。
二、EventStore
這塊內容的主要思想如下:
- 維護一個類似於Disruptor的RingBuffer,同時維護三個序列,put/get/ack。
- EventSink之後的數據,調用put接口,將數據放入環形隊列中。
- Canal client獲取數據,調用get方法。
- 異步調用ack方法,清除ack之前的數據。
- 值得注意的是,這塊get和ack采用了流式API的模式,get和ack異步進行,可以先get,然後異步調用ack。
- ack是有序的,不允許跳躍式的提交。
三、Binlog的Row模式
至此,我們基本上知道了canal是如何在發生數據庫主從切換時保證高可用和高可靠的,我們可能還有疑惑:為什麼要回退60s,來解析binlog,這樣不會導致數據重複嗎?還有一些自增的update語句(不具備冪等性),不會產生數據錯誤嗎?要想回答這些問題,就需要我們了解Binlog的Row模式了。
Mysql Binlog的Row模式記錄的,是數據庫中每一行的數據變化,而不僅僅是sql語句。比如我們對數據庫中的多行,使用一條sql語句進行了修改。在這種情況下,如果Binlog模式為Statement,隻會記錄一條sql語句。而Row模式下,會對每一行的數據變化進行記錄,以及變化前後每個字段的值。這也就是為什麼Row模式的binlog文件如此之大的原因。
對於一些不具備冪等性的sql語句,采用Row語句進行Binlog解析時,也是可以通過重複執行,來保證我們數據的最終一致性的。這也就解釋了,為什麼要回退60s來進行Binlog位點定位、解析的問題。考慮到Mysql主從的數據複製的延遲性(60s,一般來說的延遲沒有這麼久),我們可以在主節點掛掉的情況下,回退60s到從節點上繼續進行binlog的解析。
當然,也需要考慮一些極端的情況,也就是主從複製確實超過了60s的延遲,在這種情況下,就需要otter登場了。基本思路是:反查數據庫同步 (以數據庫最新版本同步,解決交替性,比如設置一致性反查數據庫延遲閥值為60秒,即當同步過程中發現數據延遲超過了60秒,就會基於PK反查一次數據庫,拿到當前最新值進行同步,減少交替性的問題)。
最後更新:2017-10-17 16:03:30