BookKeeper設計介紹及其在Hadoop2.0 Namenode HA方案中的使用分析
BookKeeper背景
BK是一個可靠的日誌流記錄係統,用於將係統產生的日誌(也可以是其他數據)記錄在BK集群上,由BK這個第三方Storage保證數據存儲的可靠和一致性。典型場景是係統寫write-ahead log,即先把log寫到BK上,再對log做處理,比如將log寫到內存的數據結構中。BookKeeper同時適用於任何單點寫入並要求保證高性能和數據不丟失(Strong Durabilty Guarantees)的場景。
BK誕生於Hadoop2.0的namenode HA。在Hadoop中,出於故障恢複的考慮,Namenode在對它的記錄做修改前都會先將本條修改的日誌寫到磁盤上。但是這裏有一個潛在問題,當Namenode發生故障時,很可能連本地磁盤也不能訪問,這時之前的記錄的日誌也就沒用了。基於上述考慮,可以將Namenode的日誌信息保存在一個可靠的外部Storage中。最初業界通過NFS這樣的Share Storage來實現日誌同步。之所以選擇NFS,一方麵因為可以很方便地實現數據共享,另外一方麵是因為NFS相對穩定成熟。雖然如此,NFS也有缺點不能滿足HDFS的在線存儲業務:網絡單點及其存儲節點單點。為了滿足共享日誌的高可用性,社區引入了BK。除此之外還有默認的HA方案:QJM。Hadoop2.0 Namenode HA的介紹可以參考我之前的博文:Hadoop2.0 Namenode HA實現方案介紹及匯總。
BookKeeper介紹
BK帶有多個讀寫日誌的server,稱為 bookies。每一個bookie是一個bk的存儲服務,存儲了寫到bk上的write-ahead日誌,及其數據內容。寫入的log流(稱它為流是因為BK記錄的是byte[])稱為 ledgers,一個ledger是一個日誌文件,每個日誌單元叫 ledger entry,也就是bookies是存ledgers的。ledger隻支持append操作,而且同時隻能有一個單線程來寫。ZK充當BK的元數據存儲服務,在zk中會存儲ledger相關的元數據,包括當前可用的bookies,ledger分布的位置等。
BK通過讀寫多個存儲節點達到高可用性,同時為了恢複由於異常造成的多節點數據不一致性,引入了數據一致性算法。BK的可用性還體現在隻要有足夠多的bookies可用,整個服務就可用。實際上,一份entry的寫入需要確保N份日誌冗餘在N個bookie上寫成功,而我們需要>N個bookie提供服務。在啟動BK的時候,需要指定一個ensemble值,即bookie可用的最小節點數量,還需要指定一個quorums值,即日誌寫入bk服務端的冗餘份數。BK的可靠性體現在服務有多個備份,entry的記錄也是冗餘的。BK的可擴展性體現在可以增加bookie服務的定額數目,同時增加server數據可以一定程度提高吞吐量。
Ledger在BK中扮演了很重要的角色,其相關操作及其作用如下:
- CreateLedger:創建一個空的ledger,此時會在zk中存儲相關元數據;
- AddEntry:添加一個記錄到ledger中,如果客戶端失敗或者ledger已經關閉,則不能再追加entry;
- openLedger:開始讀取數據前,必須先打開ledger,如果某ledger處於未關閉,不能讀取相關數據,如果有異常,需先恢複;
- readEntries:讀取ledger中的entry
從編碼角度講,操縱entry讀寫的類為LedgerHandle,LedgerHandle對應一個可以被client讀寫entry的ledger。下麵是創建ledgerHandle並讀寫entry的例子。
ClientConfiguration conf = new ClientConfiguration(); conf.setZkServers("zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181"); BookKeeper client = new BookKeeper(conf); LedgerHandle lh = client.createLedger(3, 2, DigestType.CRC32, "foobar"); lh.addEntry("Hello World!".getBytes()); lh.close(); LedgerHandle lh2 = client.openLedger(1, DigestType.CRC32, "foobar"); long lastEntry = lh2.getLastAddConfirmed(); Enumeration<LedgerEntry> entries = lh2.readEntries(0, 9); while (entries.hasMoreElements()) { byte[] bytes = entries.nextElement().getEntry(); System.out.println(new String(bytes)); }更多BK文檔可以參考官網文檔。
BookKeeper in HDFS
Hdfs有兩個抽象類提供對EditLog的讀出和寫回:EditLogOutputStream(以下簡稱ELOS)和EditLogInputStream(以下簡稱ELIS)。同時還有一個JournalManager接口,負責管理EditLog的可靠存取。它的實現包括QJM(QuorumJournalManager)和BKJM(BookKeeperJournalManager)。
寫日誌
對於hdfs而言,主節點寫的每一個日誌對象為BK的entry,entry的集合組成一個ledger,每一個日誌段對應一個ledger,相同日誌段追加edits即為向ledger追加entry。Ledger有一個遞增的ledgerId,entry也有遞增的entryId,每個entryId對應一個txId。
ELOS使用write()將FSEditLogOp往外寫,對應的BookKeeperEditLogOutputStream的實現為:
@Override public void write(FSEditLogOp op) throws IOException { writer.writeOp(op); if (bufCurrent.getLength() > transmissionThreshold) { transmit(); } }
BookKeeperEditLogOutputStream內部有一個buffer,每次調用write()寫FSEditLogOp的時候,會由一個Writer將此次FSEditLogOp寫入buffer,當buffer長度達到門檻值時,進行transmit操作:把buffer裏的editLog發送到BK上,代碼如下:
/** * Transmit the current buffer to bookkeeper. * Synchronised at the FSEditLog level. #write() and #setReadyToFlush() * are never called at the same time. */ private void transmit() throws IOException { if (!transmitResult.compareAndSet(BKException.Code.OK, BKException.Code.OK)) { throw new IOException("Trying to write to an errored stream;" + " Error code : (" + transmitResult.get() + ") " + BKException.getMessage(transmitResult.get())); } if (bufCurrent.getLength() > 0) { byte[] entry = Arrays.copyOf(bufCurrent.getData(), bufCurrent.getLength()); lh.asyncAddEntry(entry, this, null); bufCurrent.reset(); outstandingRequests.incrementAndGet(); } }
lh為BK的LedgerHandle,asyncAddEntry方法異步將entry寫往一個open狀態的ledger。這就是一個簡單的把Editlog寫往BK的過程。
BKJM簡單寫的代碼如下:
public void testSimpleWrite() throws Exception { NamespaceInfo nsi = newNSInfo(); BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf, BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi); bkjm.format(nsi); EditLogOutputStream out = bkjm.startLogSegment(1); for (long i = 1 ; i <= 100; i++) { FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance(); op.setTransactionId(i); out.write(op); } out.close(); bkjm.finalizeLogSegment(1, 100); String zkpath = bkjm.finalizedLedgerZNode(1, 100); }
BKJM的startLogSegment(txId)將產生一個新的ledger,對應一個新的日誌段,該日誌段狀態為接收寫入日誌的狀態。創建ledger之前有一些校驗工作
if (txId <= maxTxId.get()) { throw new IOException("We've already seen " + txId + ". A new stream cannot be created with it"); } try { String existingInprogressNode = ci.read(); if (null != existingInprogressNode && zkc.exists(existingInprogressNode, false) != null) { throw new IOException("Inprogress node already exists"); } if (currentLedger != null) { // bookkeeper errored on last stream, clean up ledger currentLedger.close(); } currentLedger = bkc.createLedger(ensembleSize, quorumSize, BookKeeper.DigestType.MAC, digestpw.getBytes()); } catch (BKException bke) { throw new IOException("Error creating ledger", bke); } catch (KeeperException ke) { throw new IOException("Error in zookeeper while creating ledger", ke); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException("Interrupted creating ledger", ie); }
Ledger的創建還對應一個新的EditLogLedgerMetadata,該類記錄這個日誌段的元信息,包括zkPath,ledgerId,開始和結束txId等,在讀取ledger裏的日誌內容的時候需要這些元數據信息。
BKJM的finalizeLogSegment()將文件由正在寫入日誌的狀態轉化為不接收寫日誌的狀態。BKJM會create ledger,delete ledger,open ledger,這裏的ledger即LedgerHandler類,它對每個ledger entry進行讀寫操作。
寫日誌總體流程
ZK作為BK的元數據服務器,裏麵存儲了哪些bookie服務是可用的,同時也記錄了目前係統有哪些ledger,及其ledger相關信息,如該ledger數據存儲在哪些機器上,及其該ledger起始,結束entryid等。Bookie節點存儲實際的數據,及其數據的讀寫服務。
寫操作由主節點來完成,當主節點調用setReadyToFlush操作,會調用RPC同時向N(N=quorums)個bookie節點寫,flush異步等待響應。
主節點對bk的操作,其實就是對ledger的操作,在開始向bk服務寫數據前,首先需要打開ledger,打開ledger就會與配置的所有bookie節點建立連接;打開連接後,數據以entry為單位以RR算法選擇向N(N=quorums)個bookie節點寫entry數據,並且異步地等待結果返回,有任何一個bookie寫入失敗,則需要重新選擇一個bookie寫入失敗的副本。
當bookie服務端接收到寫入數據後,首先會寫日誌,然後根據同步或者異步算法將數據同步到磁盤上。寫入數據過程中,首先會寫入log文件,寫入的內容包含ledgerid,entryid,EntrySize,LastConfirmed,及其真實數據內容。然後在相應ledger文件中記錄下entryid,及其該entry所在的日誌文件,偏移量等。讀日誌
讀日誌相比寫日誌過程,相對簡單一些。同樣,讀日誌過程也支持高可用。BKJM通過selectInputStreams方法讀出一個範圍內的ELIS集合,每個ELIS是BookKeeperEditLogInputStream類,new BookKeeperEditLogInputStream需要得到一個EditLogLedgerMetadata,並打開對應的ledger。具體BookKeeperEditLogInputStream類裏的內容就不詳細說明了。
@Override public void selectInputStreams(Collection<EditLogInputStream> streams, long fromTxId, boolean inProgressOk, boolean forReading) throws IOException { List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId, inProgressOk); try { BookKeeperEditLogInputStream elis = null; for (EditLogLedgerMetadata l : currentLedgerList) { long lastTxId = l.getLastTxId(); if (l.isInProgress()) { lastTxId = recoverLastTxId(l, false); } // Check once again, required in case of InProgress and is case of any // gap. if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) { LedgerHandle h; if (l.isInProgress()) { // we don't want to fence the current journal h = bkc.openLedgerNoRecovery(l.getLedgerId(), BookKeeper.DigestType.MAC, digestpw.getBytes()); } else { h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC, digestpw.getBytes()); } elis = new BookKeeperEditLogInputStream(h, l); elis.skipTo(fromTxId); } else { // If mismatches then there might be some gap, so we should not check // further. return; } streams.add(elis); if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) { return; } fromTxId = elis.getLastTxId() + 1; } } catch (BKException e) { throw new IOException("Could not open ledger for " + fromTxId, e); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); throw new IOException("Interrupted opening ledger for " + fromTxId, ie); } }
首先選擇日誌文件,建立輸入流。從節點觸發消化日誌後,首先會查詢ZK,獲取到主節點寫入ZK的edits元數據信息(不包含inprocess狀態的edits元數據),這個元數據包含日誌段的startTxid,lastTxid,ledgerID,同時也會打開相應的ledger,並獲取其元數據,如ledger的quorumSize,ensembleSize,lastEntryId等,同時按照txid先後順序對ledger進行排序,放入輸入流集合。需要強調的是,當打開ledger時,會檢查其entry副本之間的一致性,如果不一致需恢複。
準備好輸入流以後,開始消化日誌,依次操作輸入流集合的ledgers,讀取每個ledger內的entry:
- 通過查詢ledger元數據,同時通過RR算法確定該entry存儲在哪幾個bookies;
- 嚐試從bookies集合的第一個bookie服務讀取entry,如果成功,該entry就讀取成功,如果失敗,轉入第3步;
- 嚐試從bookies集合的第二個bookie服務讀取entry,如果成功,該entry就讀取成功,如果失敗,依次類推,如果嚐試讀取完所有的bookies均失敗,則該entry讀取失敗;
恢複
BKJM還有恢複機製,相關接口有recoverUnfinalizedSegments(),recoverLastTxId()。Bookie數據恢複檢查通過定時或者人工發起,集群數據修複流程:
- 通過zk查詢到ledger元數據;
- 通過元數據,查詢相關bookie中存儲的ledger的entry是否完整;
- 如果查詢到存儲在某bookie上的entry不完整,則需要進入數據恢複流程;
- 首先從bk服務端讀取到ledger相關的entry,然後將其寫到需要恢複entry的某bookie服務端;
- Ledger數據恢複完成後,需要更新ledger的segment相關元數據。
總結
本文首先介紹了BookKeeper的背景和使用場景,然後簡單介紹了BK的主要部件及使用方法,最後粗略地分析了hadoop2.0 namenode BKJM的HA實現,介紹了EditLog寫入和讀出BK的過程。通過閱讀hadoopBKJM部分的代碼,幫助學習怎樣在自己的係統裏加入BookKeeper,讓BK來保證日誌的可靠和容災恢複等功能。
最後更新:2017-04-03 12:54:36