閱讀153 返回首頁    go 阿裏雲 go 技術社區[雲棲]


HDFS追本溯源:HDFS操作的邏輯流程與源碼解析

本文主要介紹5個典型的HDFS流程,這些流程充分體現了HDFS實體間IPC接口和stream接口之間的配合。

1. Client和NN

      Client到NN有大量的元數據操作,比如修改文件名,在給定目錄下創建一個子目錄,這些操作一般隻涉及Client和NN的交互,通過IPC調用ClientProtocol進行。創建子目錄的邏輯流程如下圖:

      從圖中可見,創建子目錄這種操作並沒有涉及DN。因為元數據會被NN持久化到edits中,因此在持久化結束之後,這個調用就會被成功返回。複習一下:NN維護了HDFS的文件係統目錄樹和文件與數據塊的對應關係,和數據塊與DN的對應關係。因此,創建目錄僅僅是在NN上也就很容易理解了。

     一些更為複雜的操作,如使用

DistributedFileSystem.setReplication()
來增加文件的副本數,再如通過

DistributedFileSystem.delete()
來刪除HDFS上的文件,都需要DN配合執行一些動作。其中DistributedFileSystem源碼在hadoop-hdfs-project\hadoop-hdfs\src\main\java\org\apache\hadoop\hdfs\DistributedFileSystem.java

/****************************************************************
 * Implementation of the abstract FileSystem for the DFS system.
 * This object is the way end-user code interacts with a Hadoop
 * DistributedFileSystem.
 *
 *****************************************************************/
@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" })
@InterfaceStability.Unstable
public class DistributedFileSystem extends FileSystem {
  private Path workingDir;
  private URI uri;

  DFSClient dfs;
  private boolean verifyChecksum = true;
  
  static{
    HdfsConfiguration.init();
  }

    以客戶端刪除HDFS文件為例,操作在NN上執行完成後,DN存放的文件內容的數據塊也必須刪除。但是,NN在執行delete()方法時,它隻標記需要刪除的數據塊(當然,delete的操作日誌也會被持久化),而不會主動聯係DN去立即刪除這些數據。當保存著這些數據塊的DN在向NN發送心跳時,NN會通過心跳應答攜帶DatanodeCommand命令來通知DN刪除數據。也就是說,被刪除的數據塊在Client接到成功的響應後,會在一段時間後才能真正刪除,NN和DN永遠隻維護簡單的主從關係。NN永遠不會主動發起向DN的調用。NN隻能通過DN心跳應答中攜帶DatanodeCommand的指令對DN進行管理。


2. Client讀文件

     使用Java API讀取文件的源碼如下:

FileSystem hdfs = FileSystem.get(new Configuration());
Path path = new Path("/testfile");// reading
FSDataInputStream dis = hdfs.open(path);
byte[] writeBuf = new byte[1024];
int len = dis.read(writeBuf);
System.out.println(new String(writeBuf, 0, len, "UTF-8"));
dis.close();

hdfs.close();
    下圖顯示了HDFS在讀取文件時,Client,NN和DN發生的事件和這些事件的順序:

步驟1的源碼:

public FSDataInputStream open(Path f, final int bufferSize)
      throws IOException {
    statistics.incrementReadOps(1);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataInputStream>() {
      @Override
      public FSDataInputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        return new HdfsDataInputStream(
            dfs.open(getPathName(p), bufferSize, verifyChecksum));
      }
      @Override
      public FSDataInputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.open(p, bufferSize);
      }
    }.resolve(this, absF);
  }
可見open返回的是HdfsDataInputStream。dfs為hadoop-hdfs-project\hadoop-hdfs\src\main\java\org\apache\hadoop\hdfs\DFSClient.java。HdfsDataInputStream繼承自FSDataInputStream。構造是並沒有額外的處理。

public class HdfsDataInputStream extends FSDataInputStream {
  public HdfsDataInputStream(DFSInputStream in) throws IOException {
    super(in);
  }
FSDataInputStream繼承自DFSInputStream。
DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum
                 ) throws IOException, UnresolvedLinkException {
    this.dfsClient = dfsClient;
    this.verifyChecksum = verifyChecksum;
    this.buffersize = buffersize;
    this.src = src;
    this.cachingStrategy =
        dfsClient.getDefaultReadCachingStrategy();
    openInfo();
  }

  /**
   * Grab the open-file info from namenode
   */
  synchronized void openInfo() throws IOException, UnresolvedLinkException {
    lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
    int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
    while (retriesForLastBlockLength > 0) {
      // Getting last block length as -1 is a special case. When cluster
      // restarts, DNs may not report immediately. At this time partial block
      // locations will not be available with NN for getting the length. Lets
      // retry for 3 times to get the length.
      if (lastBlockBeingWrittenLength == -1) {
        DFSClient.LOG.warn("Last block locations not available. "
            + "Datanodes might not have reported blocks completely."
            + " Will retry for " + retriesForLastBlockLength + " times");
        waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
        lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
      } else {
        break;
      }
      retriesForLastBlockLength--;
    }
    if (retriesForLastBlockLength == 0) {
      throw new IOException("Could not obtain the last block locations.");
    }
  }

fetchLocatedBlocksAndGetLastBlockLength通過調用getLocatedBlocks實現了示意圖中的步驟二:

  private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
    final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
    if (DFSClient.LOG.isDebugEnabled()) {
      DFSClient.LOG.debug("newInfo = " + newInfo);
    }
    if (newInfo == null) {
      throw new IOException("Cannot open filename " + src);
    }

    if (locatedBlocks != null) {
      Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
      Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
      while (oldIter.hasNext() && newIter.hasNext()) {
        if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
          throw new IOException("Blocklist for " + src + " has changed!");
        }
      }
    }
    locatedBlocks = newInfo;
    long lastBlockBeingWrittenLength = 0;
    if (!locatedBlocks.isLastBlockComplete()) {
      final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
      if (last != null) {
        if (last.getLocations().length == 0) {
          if (last.getBlockSize() == 0) {
            // if the length is zero, then no data has been written to
            // datanode. So no need to wait for the locations.
            return 0;
          }
          return -1;
        }
        final long len = readBlockLength(last);
        last.getBlock().setNumBytes(len);
        lastBlockBeingWrittenLength = len; 
      }
    }

    currentNode = null;
    return lastBlockBeingWrittenLength;
  }
你可能會說步驟二調用的是getBlockLocations。看以下的代碼:

@VisibleForTesting
  public LocatedBlocks getLocatedBlocks(String src, long start, long length)
      throws IOException {
    return callGetBlockLocations(namenode, src, start, length);
  }

  /**
   * @see ClientProtocol#getBlockLocations(String, long, long)
   */
  static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
      String src, long start, long length) 
      throws IOException {
    try {
      return namenode.getBlockLocations(src, start, length);
    } catch(RemoteException re) {
      throw re.unwrapRemoteException(AccessControlException.class,
                                     FileNotFoundException.class,
                                     UnresolvedPathException.class);
    }
  }
  

      然後就可以開始讀文件的數據了。通過NameNode.getBlockLocations的遠程調用接口獲得了文件開始部分的數據塊的保存位置。對於文件中的每個塊,NN返回保存著該副本的DN的地址。注意,這些DN根據它們與Client的距離進行了簡單的排序(利用了網絡的拓撲信息)。

      Client調用HdfsDataInputStream的read方法讀取文件數據時,DFSInputStream對象會通過和DN間的讀數據stream接口,和最近的DN建立連接。Client反複調用read方法,數據會通過DN和Client的連接上的數據包返回Client。當到達塊的末端時,DFSInputStream會關閉和DN的連接。並通過getBlockLocations()遠程方法獲得保存著下一個數據塊的DN信息,嚴格來說,在對象沒有緩存該數據塊的位置時,才會使用這個遠程方法。這就是上圖中的步驟五。然後重複上述過程。

      另外,由於NameNode.getBlockLocations()不會一次返回文件的所有的數據塊信息,DFSInputStream可能需要多次調用該遠程方法,檢索下一組數據塊的位置信息。對於使用者來說,它讀取的是一個連續的數據流,上麵所講的聯係不同的DN,多次定位數據塊的過程,都是透明的。當使用者完成數據讀取任務後,通過FSDataInputStream.close()關係數據流。即圖中的步驟六。

     如果DN發生了錯誤,如節點停機或者網絡出現故障,那麼Client會嚐試連接下一個Block的位置。同時它會記住出現故障的那個DN,不會再進行徒勞的嚐試。在數據的應答中,不單包含了數據,還包含了數據的校驗和,Client會檢查數據的一致性,如果發現了校驗錯誤,它會將這個信息報告給NN;同時,嚐試從別的DN讀取另外一個副本的內容。由Client在讀取數據時進行數據完整性檢查,可以降低DN的負載,均衡各個節點的計算能力。

     這樣的設計其實可以給我們一個很好的設計大型分布式係統的例子。通過一些有效的設計,將計算和網絡等分散到各個節點上,這樣可以最大程度的保證scalability。

3. Client寫文件

  即使不考慮出現錯誤的情況,寫文件也是HDFS最複雜的流程。本節通過創建一個新文件並向文件寫入數據,結束後關閉這個文件為例,分析文件寫入時各個節點之間的配合。


      Client調用DistributedFileSystem.create()創建文件(上圖中的步驟一),這時DistributedFileSystem創建了DFSOutputStream,並由RPC,讓NN執行同名的方法,在文件係統的命名空間創建一個新文件。NN創建新文件時,需要執行檢查,包括NN是否處理正常工作狀態,被創建的文件不存在,Client是否有在父目錄中創建文件的權限等。通過檢查後,NN會構建一個新文件,記錄創建操作到編輯日誌edits中。RPC結束後,DistributedFileSystem將該DFSOutputStream對象包裝到FSDataOutputStream實例中,返回Client。


 @Override
  public FSDataOutputStream create(final Path f, final FsPermission permission,
    final EnumSet<CreateFlag> cflags, final int bufferSize,
    final short replication, final long blockSize, final Progressable progress,
    final ChecksumOpt checksumOpt) throws IOException {
    statistics.incrementWriteOps(1);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolver<FSDataOutputStream>() {
      @Override
      public FSDataOutputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        return new HdfsDataOutputStream(dfs.create(getPathName(p), permission,
            cflags, replication, blockSize, progress, bufferSize, checksumOpt),
            statistics);
      }
      @Override
      public FSDataOutputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.create(p, permission, cflags, bufferSize,
            replication, blockSize, progress, checksumOpt);
      }
    }.resolve(this, absF);
  }

關鍵的調用點有DFSClient.create:

 /**
   * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long,
   * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is
   * a hint to where the namenode should place the file blocks.
   * The favored nodes hint is not persisted in HDFS. Hence it may be honored
   * at the creation time only. HDFS could move the blocks during balancing or
   * replication, to move the blocks from favored nodes. A value of null means
   * no favored nodes for this create
   */
  public DFSOutputStream create(String src, 
                             FsPermission permission,
                             EnumSet<CreateFlag> flag, 
                             boolean createParent,
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize,
                             ChecksumOpt checksumOpt,
                             InetSocketAddress[] favoredNodes) throws IOException {
    checkOpen();
    if (permission == null) {
      permission = FsPermission.getFileDefault();
    }
    FsPermission masked = permission.applyUMask(dfsClientConf.uMask);
    if(LOG.isDebugEnabled()) {
      LOG.debug(src + ": masked=" + masked);
    }
    String[] favoredNodeStrs = null;
    if (favoredNodes != null) {
      favoredNodeStrs = new String[favoredNodes.length];
      for (int i = 0; i < favoredNodes.length; i++) {
        favoredNodeStrs[i] = 
            favoredNodes[i].getHostName() + ":" 
                         + favoredNodes[i].getPort();
      }
    }
    final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs);
    beginFileLease(src, result);
    return result;
  }

在步驟三Client寫入數據時,由於create()調用創建了一個空文件,所以,DFSOutputStream實例首先需要想NN申請數據塊,addBlock()方法成功執行後,返回一個LocatedBlock對象。該對象包含了新數據塊的數據塊標識和版本好,同時,它的成員變量locs提供了數據流管道的信息,通過上述信息,DFSOutputStream就可以和DN連接,通過些數據接口建立數據流管道。Client寫入FSDataOutputStream流中的數據,被分成一個一個的文件包,放入DFSOutputStream對象的內部隊列。該隊列中的文件包最後打包成數據包,發往數據流管道,流經管道上的各個DN,並持久化,確認包逆流而上,從數據流管道依次發往Client,當Client收到應答時,它將對應的包從內部隊列刪除。

public class LocatedBlock {

  private ExtendedBlock b;
  private long offset;  // offset of the first byte of the block in the file
  private DatanodeInfo[] locs;
  /** Storage ID for each replica */
  private String[] storageIDs;
  // Storage type for each replica, if reported.
  private StorageType[] storageTypes;
  // corrupt flag is true if all of the replicas of a block are corrupt.
  // else false. If block has few corrupt replicas, they are filtered and 
  // their locations are not part of this object
  private boolean corrupt;
  private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>();
  /**
   * List of cached datanode locations
   */
  private DatanodeInfo[] cachedLocs;

  // Used when there are no locations
  private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];

    DFSOutputStream在寫完一個數據塊後,數據流管道上的節點,會通過和NN的DatanodeProtocol遠程接口的blockReceived()方法,向NN提交數據塊。如果數據隊列還有等到輸出的數據,DFSOutputStream會再次調用addBlock(),為文件添加新的數據塊。

     Client完成數據的寫入後,會調用close()方法關閉流,關閉流意味著Client不會再向流中寫入數據。所以,當DFSOutputStream數據隊列的文件包都收到應答後,就可以使用ClientProtocol.complete()方法通知NN關閉文件,完成一次正常的文件寫入。

     如果在文件寫入期間DN發生故障,則會執行下麵的操作(注意,這些操作對於寫入數據的Client是透明的):

  1. 數據流管道會被關閉,已經發送到管道但是還沒有收到確認的文件包,會被重新添加到DFSOutputStream的輸出隊列,這樣就保證了無路數據流管道的哪個DN發生故障,都不會丟失數據。當前正常工作的DN的數據塊會被賦予一個新的版本號,並通知NN。這樣,失敗的DN在從故障恢複過來以後,上麵隻有部分數據的Block會因為版本號和NN保存的版本號不匹配而被刪除。
  2. 在數據流管道中刪除錯誤的DN並建立新的管道,繼續寫數據到正常工作的DN。
  3. 文件關閉後,NN會發現該Block的副本數沒有達到要求,會選擇一個新的DN並複製Block,創建新的副本。DN的故障隻會影響一個Block的寫操作,後續Block的寫入不會受到影響。


4. DataNode的啟動與心跳機製

      本節討論DN的啟動及其與NN之間的交互。包括DN從啟動到進入正常工作狀態的注冊,Block上報,以及正常工作過程中的心跳等與NN相關的遠程調用。這部分雖然隻涉及DatanodeProtocol的接口,但是有助於進一步理解DN與NN的關係。

      正常啟動的DN或者為升級而啟動的DN,都會向NN發送遠程調用versionRequest(),進行必要的版本檢查。這裏的版本檢查,隻涉及構建版本號,保證它們間的HDFS版本是一致的。


     在版本檢查結束後,DN會接著通過遠程調用register(),向NN注冊。DatanodeProtocol.register()的主要工作也是檢查,確認該DN是NN所管理集群的成員。也就是說,用戶不能把某一個集群中的某個node直接注冊到另外一個集群中去,保證了整個係統的數據一致性。

     注冊成功後,DN會將它所管理的所有Block的信息,通過blockRequest()方法上報到NN(步驟三),以幫助NN建立HDFS文件數據塊到DN的映射關係。在此後,DN才正式的提供服務。

 由於NN和DN是簡單的主從關係,DN需要每隔一段時間發送心跳到NN(步驟四和步驟五)。如果NN長時間收不到DN的心跳,它會認為DN已經失效。如果NN需要一些DN需要配合的動作,則會通過sendHeartbeat()的方法返回。該返回值是一個DatanodeCommand數組,它是NN的指令。

     應該說,DN和NN的交互邏輯非常簡單。大部分是通過DN到NN的心跳來完成的。但是考慮到一定規模的HDFS集群,一個NN會管理上千個DN,這樣的設計也就非常自然了。


5. SNN節點的元數據合並

      當Client對HDFS的文件目錄進行修改時,NN都會在edits中留下記錄,以保證在係統出現問題時,通過日誌可以進行恢複。

    fsimage是某一個時刻的檢查點(checkpoint)。由於fsimage很大,因此不會在每次的元數據修改都寫入到它裏邊,而隻是存在到edits中。在係統啟動時,會首先狀態最近時刻的fsimage,然後在通過edits,恢複係統的最新狀態。

    當時如果edits太大,那麼節點啟動時將用很長的時間來執行日誌的每一個操作,使得係統恢複最近的狀態。在啟動恢複的這段時間,服務是不可用的。為了避免edits多大,增加集群的可用時間,HDFS引入了第二名字節點,即SNN(Secondary NameNode)。SNN不是NN的standby,它隻是輔助NN完成合並(merge)fsimage和edits。過程涉及NamenodeProtocol和NN與SNN之間的流式接口。

    該過程由SNN發起,首先通過遠程方法NamenodeProtocol.getEditLogSize()獲得NN上edits的大小。如果日誌很小,SNN就會在指定的時間後重新檢查。否則,繼續通過遠程接口rollEditLog(),啟動一次檢查點的過程。這時,NN需要創建一個新的編輯日誌edits.new,後續對元數據的改動,都會記錄到這個新日誌中。而原有的fsimage和edits,會由SNN通過HTTP下載到本地(步驟三和步驟四),在內存中進行merge。合並的結果就是fsimage.ckpt。然後SNN通過HTTP接口通知NN fsimage已經準備好。NN會通過HTTP get獲取merge好的fsimage。在NN下載完成後,SNN會通過NamenodeProtocol.rollFsImage(),完成這次檢查點。NN在處理這個遠程方法時,會用fsimage.ckpt 覆蓋原來的fsimage,並且將新的edits.new改名為edit。


尊重原創,未經允許不得轉載:https://www.anzhan.me/index.php/archives/233

參考資料:

1. Hadoop技術內幕-深入解析HADOOP COMMON和HDFS架構設計與實現原理

最後更新:2017-04-03 12:56:11

  上一篇:go SQL語句Left join 中On和Where的用法區別
  下一篇:go 深入淺出DDoS攻擊防禦——攻擊篇