閱讀910 返回首頁    go 阿裏雲 go 技術社區[雲棲]


BookKeeper設計介紹及其在Hadoop2.0 Namenode HA方案中的使用分析

BookKeeper背景

BK是一個可靠的日誌流記錄係統,用於將係統產生的日誌(也可以是其他數據)記錄在BK集群上,由BK這個第三方Storage保證數據存儲的可靠和一致性。典型場景是係統寫write-ahead log,即先把log寫到BK上,再對log做處理,比如將log寫到內存的數據結構中。BookKeeper同時適用於任何單點寫入並要求保證高性能和數據不丟失(Strong Durabilty Guarantees)的場景。

BK誕生於Hadoop2.0的namenode HA。在Hadoop中,出於故障恢複的考慮,Namenode在對它的記錄做修改前都會先將本條修改的日誌寫到磁盤上。但是這裏有一個潛在問題,當Namenode發生故障時,很可能連本地磁盤也不能訪問,這時之前的記錄的日誌也就沒用了。基於上述考慮,可以將Namenode的日誌信息保存在一個可靠的外部Storage中。最初業界通過NFS這樣的Share Storage來實現日誌同步。之所以選擇NFS,一方麵因為可以很方便地實現數據共享,另外一方麵是因為NFS相對穩定成熟。雖然如此,NFS也有缺點不能滿足HDFS的在線存儲業務:網絡單點及其存儲節點單點。為了滿足共享日誌的高可用性,社區引入了BK。除此之外還有默認的HA方案:QJM。Hadoop2.0 Namenode HA的介紹可以參考我之前的博文:Hadoop2.0 Namenode HA實現方案介紹及匯總


BookKeeper介紹

BK帶有多個讀寫日誌的server,稱為 bookies。每一個bookie是一個bk的存儲服務,存儲了寫到bk上的write-ahead日誌,及其數據內容。寫入的log流(稱它為流是因為BK記錄的是byte[])稱為 ledgers,一個ledger是一個日誌文件,每個日誌單元叫 ledger  entry,也就是bookies是存ledgers的。ledger隻支持append操作,而且同時隻能有一個單線程來寫。ZK充當BK的元數據存儲服務,在zk中會存儲ledger相關的元數據,包括當前可用的bookies,ledger分布的位置等。

BK通過讀寫多個存儲節點達到高可用性,同時為了恢複由於異常造成的多節點數據不一致性,引入了數據一致性算法。BK的可用性還體現在隻要有足夠多的bookies可用,整個服務就可用。實際上,一份entry的寫入需要確保N份日誌冗餘在N個bookie上寫成功,而我們需要>N個bookie提供服務。在啟動BK的時候,需要指定一個ensemble值,即bookie可用的最小節點數量,還需要指定一個quorums值,即日誌寫入bk服務端的冗餘份數。BK的可靠性體現在服務有多個備份,entry的記錄也是冗餘的。BK的可擴展性體現在可以增加bookie服務的定額數目,同時增加server數據可以一定程度提高吞吐量。

Ledger在BK中扮演了很重要的角色,其相關操作及其作用如下:

  • CreateLedger:創建一個空的ledger,此時會在zk中存儲相關元數據;
  • AddEntry:添加一個記錄到ledger中,如果客戶端失敗或者ledger已經關閉,則不能再追加entry;
  • openLedger:開始讀取數據前,必須先打開ledger,如果某ledger處於未關閉,不能讀取相關數據,如果有異常,需先恢複;
  • readEntries:讀取ledger中的entry

從編碼角度講,操縱entry讀寫的類為LedgerHandle,LedgerHandle對應一個可以被client讀寫entry的ledger。下麵是創建ledgerHandle並讀寫entry的例子。

ClientConfiguration conf = new ClientConfiguration();
conf.setZkServers("zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181"); 

BookKeeper client = new BookKeeper(conf);

LedgerHandle lh = client.createLedger(3, 2, DigestType.CRC32, "foobar");

lh.addEntry("Hello World!".getBytes());
lh.close();

LedgerHandle lh2 = client.openLedger(1, DigestType.CRC32, "foobar");
long lastEntry = lh2.getLastAddConfirmed();
Enumeration<LedgerEntry> entries = lh2.readEntries(0, 9);
while (entries.hasMoreElements()) {
	byte[] bytes = entries.nextElement().getEntry();
	System.out.println(new String(bytes));
}
更多BK文檔可以參考官網文檔

BookKeeper in HDFS

Hdfs有兩個抽象類提供對EditLog的讀出和寫回:EditLogOutputStream(以下簡稱ELOS)和EditLogInputStream(以下簡稱ELIS)。同時還有一個JournalManager接口,負責管理EditLog的可靠存取。它的實現包括QJM(QuorumJournalManager)和BKJM(BookKeeperJournalManager)。


寫日誌

對於hdfs而言,主節點寫的每一個日誌對象為BK的entry,entry的集合組成一個ledger,每一個日誌段對應一個ledger,相同日誌段追加edits即為向ledger追加entry。Ledger有一個遞增的ledgerId,entry也有遞增的entryId,每個entryId對應一個txId。

ELOS使用write()將FSEditLogOp往外寫,對應的BookKeeperEditLogOutputStream的實現為:

@Override
  public void write(FSEditLogOp op) throws IOException {
    writer.writeOp(op);

    if (bufCurrent.getLength() > transmissionThreshold) {
      transmit();
    }
  }

BookKeeperEditLogOutputStream內部有一個buffer,每次調用write()寫FSEditLogOp的時候,會由一個Writer將此次FSEditLogOp寫入buffer,當buffer長度達到門檻值時,進行transmit操作:把buffer裏的editLog發送到BK上,代碼如下:

/**
   * Transmit the current buffer to bookkeeper.
   * Synchronised at the FSEditLog level. #write() and #setReadyToFlush()
   * are never called at the same time.
   */
  private void transmit() throws IOException {
    if (!transmitResult.compareAndSet(BKException.Code.OK,
                                     BKException.Code.OK)) {
      throw new IOException("Trying to write to an errored stream;"
          + " Error code : (" + transmitResult.get()
          + ") " + BKException.getMessage(transmitResult.get()));
    }
    if (bufCurrent.getLength() > 0) {
      byte[] entry = Arrays.copyOf(bufCurrent.getData(),
                                   bufCurrent.getLength());
      lh.asyncAddEntry(entry, this, null);
      bufCurrent.reset();
      outstandingRequests.incrementAndGet();
    }
  }

lh為BK的LedgerHandle,asyncAddEntry方法異步將entry寫往一個open狀態的ledger。這就是一個簡單的把Editlog寫往BK的過程。

BKJM簡單寫的代碼如下:

public void testSimpleWrite() throws Exception {
    NamespaceInfo nsi = newNSInfo();
    BookKeeperJournalManager bkjm = new BookKeeperJournalManager(conf,
        BKJMUtil.createJournalURI("/hdfsjournal-simplewrite"), nsi);
    bkjm.format(nsi);

    EditLogOutputStream out = bkjm.startLogSegment(1);
    for (long i = 1 ; i <= 100; i++) {
      FSEditLogOp op = FSEditLogTestUtil.getNoOpInstance();
      op.setTransactionId(i);
      out.write(op);
    }
    out.close();
    bkjm.finalizeLogSegment(1, 100);
 
    String zkpath = bkjm.finalizedLedgerZNode(1, 100);
  }

BKJM的startLogSegment(txId)將產生一個新的ledger,對應一個新的日誌段,該日誌段狀態為接收寫入日誌的狀態。創建ledger之前有一些校驗工作

if (txId <= maxTxId.get()) {
      throw new IOException("We've already seen " + txId
          + ". A new stream cannot be created with it");
    }

    try {
      String existingInprogressNode = ci.read();
      if (null != existingInprogressNode
          && zkc.exists(existingInprogressNode, false) != null) {
        throw new IOException("Inprogress node already exists");
      }
      if (currentLedger != null) {
        // bookkeeper errored on last stream, clean up ledger
        currentLedger.close();
      }
      currentLedger = bkc.createLedger(ensembleSize, quorumSize,
                                       BookKeeper.DigestType.MAC,
                                       digestpw.getBytes());
    } catch (BKException bke) {
      throw new IOException("Error creating ledger", bke);
    } catch (KeeperException ke) {
      throw new IOException("Error in zookeeper while creating ledger", ke);
    } catch (InterruptedException ie) {
      Thread.currentThread().interrupt();
      throw new IOException("Interrupted creating ledger", ie);
    }

Ledger的創建還對應一個新的EditLogLedgerMetadata,該類記錄這個日誌段的元信息,包括zkPath,ledgerId,開始和結束txId等,在讀取ledger裏的日誌內容的時候需要這些元數據信息。

BKJM的finalizeLogSegment()將文件由正在寫入日誌的狀態轉化為不接收寫日誌的狀態。BKJM會create ledger,delete ledger,open ledger,這裏的ledger即LedgerHandler類,它對每個ledger entry進行讀寫操作。

寫日誌總體流程

ZK作為BK的元數據服務器,裏麵存儲了哪些bookie服務是可用的,同時也記錄了目前係統有哪些ledger,及其ledger相關信息,如該ledger數據存儲在哪些機器上,及其該ledger起始,結束entryid等。Bookie節點存儲實際的數據,及其數據的讀寫服務。

寫操作由主節點來完成,當主節點調用setReadyToFlush操作,會調用RPC同時向N(N=quorums)個bookie節點寫,flush異步等待響應。

主節點對bk的操作,其實就是對ledger的操作,在開始向bk服務寫數據前,首先需要打開ledger,打開ledger就會與配置的所有bookie節點建立連接;打開連接後,數據以entry為單位以RR算法選擇向N(N=quorums)個bookie節點寫entry數據,並且異步地等待結果返回,有任何一個bookie寫入失敗,則需要重新選擇一個bookie寫入失敗的副本。

當bookie服務端接收到寫入數據後,首先會寫日誌,然後根據同步或者異步算法將數據同步到磁盤上。寫入數據過程中,首先會寫入log文件,寫入的內容包含ledgerid,entryid,EntrySize,LastConfirmed,及其真實數據內容。然後在相應ledger文件中記錄下entryid,及其該entry所在的日誌文件,偏移量等。

讀日誌

讀日誌相比寫日誌過程,相對簡單一些。同樣,讀日誌過程也支持高可用。BKJM通過selectInputStreams方法讀出一個範圍內的ELIS集合,每個ELIS是BookKeeperEditLogInputStream類,new BookKeeperEditLogInputStream需要得到一個EditLogLedgerMetadata,並打開對應的ledger。具體BookKeeperEditLogInputStream類裏的內容就不詳細說明了。

@Override
  public void selectInputStreams(Collection<EditLogInputStream> streams,
      long fromTxId, boolean inProgressOk, boolean forReading)
      throws IOException {
    List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
        inProgressOk);
    try {
      BookKeeperEditLogInputStream elis = null;
      for (EditLogLedgerMetadata l : currentLedgerList) {
        long lastTxId = l.getLastTxId();
        if (l.isInProgress()) {
          lastTxId = recoverLastTxId(l, false);
        }
        // Check once again, required in case of InProgress and is case of any
        // gap.
        if (fromTxId >= l.getFirstTxId() && fromTxId <= lastTxId) {
          LedgerHandle h;
          if (l.isInProgress()) { // we don't want to fence the current journal
            h = bkc.openLedgerNoRecovery(l.getLedgerId(),
                BookKeeper.DigestType.MAC, digestpw.getBytes());
          } else {
            h = bkc.openLedger(l.getLedgerId(), BookKeeper.DigestType.MAC,
                digestpw.getBytes());
          }
          elis = new BookKeeperEditLogInputStream(h, l);
          elis.skipTo(fromTxId);
        } else {
          // If mismatches then there might be some gap, so we should not check
          // further.
          return;
        }
        streams.add(elis);
        if (elis.getLastTxId() == HdfsConstants.INVALID_TXID) {
          return;
        }
        fromTxId = elis.getLastTxId() + 1;
      }
    } catch (BKException e) {
      throw new IOException("Could not open ledger for " + fromTxId, e);
    } catch (InterruptedException ie) {
      Thread.currentThread().interrupt();
      throw new IOException("Interrupted opening ledger for " + fromTxId, ie);
    }
  }

首先選擇日誌文件,建立輸入流。從節點觸發消化日誌後,首先會查詢ZK,獲取到主節點寫入ZK的edits元數據信息(不包含inprocess狀態的edits元數據),這個元數據包含日誌段的startTxid,lastTxid,ledgerID,同時也會打開相應的ledger,並獲取其元數據,如ledger的quorumSize,ensembleSize,lastEntryId等,同時按照txid先後順序對ledger進行排序,放入輸入流集合。需要強調的是,當打開ledger時,會檢查其entry副本之間的一致性,如果不一致需恢複。

準備好輸入流以後,開始消化日誌,依次操作輸入流集合的ledgers,讀取每個ledger內的entry:

  1. 通過查詢ledger元數據,同時通過RR算法確定該entry存儲在哪幾個bookies;
  2. 嚐試從bookies集合的第一個bookie服務讀取entry,如果成功,該entry就讀取成功,如果失敗,轉入第3步;
  3. 嚐試從bookies集合的第二個bookie服務讀取entry,如果成功,該entry就讀取成功,如果失敗,依次類推,如果嚐試讀取完所有的bookies均失敗,則該entry讀取失敗;

恢複

BKJM還有恢複機製,相關接口有recoverUnfinalizedSegments(),recoverLastTxId()。Bookie數據恢複檢查通過定時或者人工發起,集群數據修複流程:

  1. 通過zk查詢到ledger元數據;
  2. 通過元數據,查詢相關bookie中存儲的ledger的entry是否完整;
  3. 如果查詢到存儲在某bookie上的entry不完整,則需要進入數據恢複流程;
  4. 首先從bk服務端讀取到ledger相關的entry,然後將其寫到需要恢複entry的某bookie服務端;
  5. Ledger數據恢複完成後,需要更新ledger的segment相關元數據。


總結

本文首先介紹了BookKeeper的背景和使用場景,然後簡單介紹了BK的主要部件及使用方法,最後粗略地分析了hadoop2.0 namenode BKJM的HA實現,介紹了EditLog寫入和讀出BK的過程。通過閱讀hadoopBKJM部分的代碼,幫助學習怎樣在自己的係統裏加入BookKeeper,讓BK來保證日誌的可靠和容災恢複等功能。


(全文完)

最後更新:2017-04-03 12:54:36

  上一篇:go JSP中報錯only a type can be imported: XXX resolves to package
  下一篇:go Sql Server 存儲過程實例講解