HDFS追本溯源:HDFS操作的邏輯流程與源碼解析
本文主要介紹5個典型的HDFS流程,這些流程充分體現了HDFS實體間IPC接口和stream接口之間的配合。
1. Client和NN
Client到NN有大量的元數據操作,比如修改文件名,在給定目錄下創建一個子目錄,這些操作一般隻涉及Client和NN的交互,通過IPC調用ClientProtocol進行。創建子目錄的邏輯流程如下圖:
從圖中可見,創建子目錄這種操作並沒有涉及DN。因為元數據會被NN持久化到edits中,因此在持久化結束之後,這個調用就會被成功返回。複習一下:NN維護了HDFS的文件係統目錄樹和文件與數據塊的對應關係,和數據塊與DN的對應關係。因此,創建目錄僅僅是在NN上也就很容易理解了。
一些更為複雜的操作,如使用
DistributedFileSystem.setReplication()來增加文件的副本數,再如通過
DistributedFileSystem.delete()來刪除HDFS上的文件,都需要DN配合執行一些動作。其中DistributedFileSystem源碼在hadoop-hdfs-project\hadoop-hdfs\src\main\java\org\apache\hadoop\hdfs\DistributedFileSystem.java
/**************************************************************** * Implementation of the abstract FileSystem for the DFS system. * This object is the way end-user code interacts with a Hadoop * DistributedFileSystem. * *****************************************************************/ @InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase" }) @InterfaceStability.Unstable public class DistributedFileSystem extends FileSystem { private Path workingDir; private URI uri; DFSClient dfs; private boolean verifyChecksum = true; static{ HdfsConfiguration.init(); }
以客戶端刪除HDFS文件為例,操作在NN上執行完成後,DN存放的文件內容的數據塊也必須刪除。但是,NN在執行delete()方法時,它隻標記需要刪除的數據塊(當然,delete的操作日誌也會被持久化),而不會主動聯係DN去立即刪除這些數據。當保存著這些數據塊的DN在向NN發送心跳時,NN會通過心跳應答攜帶DatanodeCommand命令來通知DN刪除數據。也就是說,被刪除的數據塊在Client接到成功的響應後,會在一段時間後才能真正刪除,NN和DN永遠隻維護簡單的主從關係。NN永遠不會主動發起向DN的調用。NN隻能通過DN心跳應答中攜帶DatanodeCommand的指令對DN進行管理。
2. Client讀文件
使用Java API讀取文件的源碼如下:
FileSystem hdfs = FileSystem.get(new Configuration()); Path path = new Path("/testfile");// reading FSDataInputStream dis = hdfs.open(path); byte[] writeBuf = new byte[1024]; int len = dis.read(writeBuf); System.out.println(new String(writeBuf, 0, len, "UTF-8")); dis.close(); hdfs.close();下圖顯示了HDFS在讀取文件時,Client,NN和DN發生的事件和這些事件的順序:
步驟1的源碼:
public FSDataInputStream open(Path f, final int bufferSize) throws IOException { statistics.incrementReadOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataInputStream>() { @Override public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException { return new HdfsDataInputStream( dfs.open(getPathName(p), bufferSize, verifyChecksum)); } @Override public FSDataInputStream next(final FileSystem fs, final Path p) throws IOException { return fs.open(p, bufferSize); } }.resolve(this, absF); }可見open返回的是HdfsDataInputStream。dfs為hadoop-hdfs-project\hadoop-hdfs\src\main\java\org\apache\hadoop\hdfs\DFSClient.java。HdfsDataInputStream繼承自FSDataInputStream。構造是並沒有額外的處理。
public class HdfsDataInputStream extends FSDataInputStream { public HdfsDataInputStream(DFSInputStream in) throws IOException { super(in); }FSDataInputStream繼承自DFSInputStream。
DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum ) throws IOException, UnresolvedLinkException { this.dfsClient = dfsClient; this.verifyChecksum = verifyChecksum; this.buffersize = buffersize; this.src = src; this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy(); openInfo(); } /** * Grab the open-file info from namenode */ synchronized void openInfo() throws IOException, UnresolvedLinkException { lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength; while (retriesForLastBlockLength > 0) { // Getting last block length as -1 is a special case. When cluster // restarts, DNs may not report immediately. At this time partial block // locations will not be available with NN for getting the length. Lets // retry for 3 times to get the length. if (lastBlockBeingWrittenLength == -1) { DFSClient.LOG.warn("Last block locations not available. " + "Datanodes might not have reported blocks completely." + " Will retry for " + retriesForLastBlockLength + " times"); waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength); lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); } else { break; } retriesForLastBlockLength--; } if (retriesForLastBlockLength == 0) { throw new IOException("Could not obtain the last block locations."); } }
fetchLocatedBlocksAndGetLastBlockLength通過調用getLocatedBlocks實現了示意圖中的步驟二:
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException { final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("newInfo = " + newInfo); } if (newInfo == null) { throw new IOException("Cannot open filename " + src); } if (locatedBlocks != null) { Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { throw new IOException("Blocklist for " + src + " has changed!"); } } } locatedBlocks = newInfo; long lastBlockBeingWrittenLength = 0; if (!locatedBlocks.isLastBlockComplete()) { final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); if (last != null) { if (last.getLocations().length == 0) { if (last.getBlockSize() == 0) { // if the length is zero, then no data has been written to // datanode. So no need to wait for the locations. return 0; } return -1; } final long len = readBlockLength(last); last.getBlock().setNumBytes(len); lastBlockBeingWrittenLength = len; } } currentNode = null; return lastBlockBeingWrittenLength; }你可能會說步驟二調用的是getBlockLocations。看以下的代碼:
@VisibleForTesting public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException { return callGetBlockLocations(namenode, src, start, length); } /** * @see ClientProtocol#getBlockLocations(String, long, long) */ static LocatedBlocks callGetBlockLocations(ClientProtocol namenode, String src, long start, long length) throws IOException { try { return namenode.getBlockLocations(src, start, length); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); } }
然後就可以開始讀文件的數據了。通過NameNode.getBlockLocations的遠程調用接口獲得了文件開始部分的數據塊的保存位置。對於文件中的每個塊,NN返回保存著該副本的DN的地址。注意,這些DN根據它們與Client的距離進行了簡單的排序(利用了網絡的拓撲信息)。
Client調用HdfsDataInputStream的read方法讀取文件數據時,DFSInputStream對象會通過和DN間的讀數據stream接口,和最近的DN建立連接。Client反複調用read方法,數據會通過DN和Client的連接上的數據包返回Client。當到達塊的末端時,DFSInputStream會關閉和DN的連接。並通過getBlockLocations()遠程方法獲得保存著下一個數據塊的DN信息,嚴格來說,在對象沒有緩存該數據塊的位置時,才會使用這個遠程方法。這就是上圖中的步驟五。然後重複上述過程。
另外,由於NameNode.getBlockLocations()不會一次返回文件的所有的數據塊信息,DFSInputStream可能需要多次調用該遠程方法,檢索下一組數據塊的位置信息。對於使用者來說,它讀取的是一個連續的數據流,上麵所講的聯係不同的DN,多次定位數據塊的過程,都是透明的。當使用者完成數據讀取任務後,通過FSDataInputStream.close()關係數據流。即圖中的步驟六。
如果DN發生了錯誤,如節點停機或者網絡出現故障,那麼Client會嚐試連接下一個Block的位置。同時它會記住出現故障的那個DN,不會再進行徒勞的嚐試。在數據的應答中,不單包含了數據,還包含了數據的校驗和,Client會檢查數據的一致性,如果發現了校驗錯誤,它會將這個信息報告給NN;同時,嚐試從別的DN讀取另外一個副本的內容。由Client在讀取數據時進行數據完整性檢查,可以降低DN的負載,均衡各個節點的計算能力。
這樣的設計其實可以給我們一個很好的設計大型分布式係統的例子。通過一些有效的設計,將計算和網絡等分散到各個節點上,這樣可以最大程度的保證scalability。
3. Client寫文件
即使不考慮出現錯誤的情況,寫文件也是HDFS最複雜的流程。本節通過創建一個新文件並向文件寫入數據,結束後關閉這個文件為例,分析文件寫入時各個節點之間的配合。
Client調用DistributedFileSystem.create()創建文件(上圖中的步驟一),這時DistributedFileSystem創建了DFSOutputStream,並由RPC,讓NN執行同名的方法,在文件係統的命名空間創建一個新文件。NN創建新文件時,需要執行檢查,包括NN是否處理正常工作狀態,被創建的文件不存在,Client是否有在父目錄中創建文件的權限等。通過檢查後,NN會構建一個新文件,記錄創建操作到編輯日誌edits中。RPC結束後,DistributedFileSystem將該DFSOutputStream對象包裝到FSDataOutputStream實例中,返回Client。
@Override public FSDataOutputStream create(final Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataOutputStream>() { @Override public FSDataOutputStream doCall(final Path p) throws IOException, UnresolvedLinkException { return new HdfsDataOutputStream(dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt), statistics); } @Override public FSDataOutputStream next(final FileSystem fs, final Path p) throws IOException { return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt); } }.resolve(this, absF); }
關鍵的調用點有DFSClient.create:
/** * Same as {@link #create(String, FsPermission, EnumSet, boolean, short, long, * Progressable, int, ChecksumOpt)} with the addition of favoredNodes that is * a hint to where the namenode should place the file blocks. * The favored nodes hint is not persisted in HDFS. Hence it may be honored * at the creation time only. HDFS could move the blocks during balancing or * replication, to move the blocks from favored nodes. A value of null means * no favored nodes for this create */ public DFSOutputStream create(String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes) throws IOException { checkOpen(); if (permission == null) { permission = FsPermission.getFileDefault(); } FsPermission masked = permission.applyUMask(dfsClientConf.uMask); if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + masked); } String[] favoredNodeStrs = null; if (favoredNodes != null) { favoredNodeStrs = new String[favoredNodes.length]; for (int i = 0; i < favoredNodes.length; i++) { favoredNodeStrs[i] = favoredNodes[i].getHostName() + ":" + favoredNodes[i].getPort(); } } final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, dfsClientConf.createChecksum(checksumOpt), favoredNodeStrs); beginFileLease(src, result); return result; }
在步驟三Client寫入數據時,由於create()調用創建了一個空文件,所以,DFSOutputStream實例首先需要想NN申請數據塊,addBlock()方法成功執行後,返回一個LocatedBlock對象。該對象包含了新數據塊的數據塊標識和版本好,同時,它的成員變量locs提供了數據流管道的信息,通過上述信息,DFSOutputStream就可以和DN連接,通過些數據接口建立數據流管道。Client寫入FSDataOutputStream流中的數據,被分成一個一個的文件包,放入DFSOutputStream對象的內部隊列。該隊列中的文件包最後打包成數據包,發往數據流管道,流經管道上的各個DN,並持久化,確認包逆流而上,從數據流管道依次發往Client,當Client收到應答時,它將對應的包從內部隊列刪除。
public class LocatedBlock { private ExtendedBlock b; private long offset; // offset of the first byte of the block in the file private DatanodeInfo[] locs; /** Storage ID for each replica */ private String[] storageIDs; // Storage type for each replica, if reported. private StorageType[] storageTypes; // corrupt flag is true if all of the replicas of a block are corrupt. // else false. If block has few corrupt replicas, they are filtered and // their locations are not part of this object private boolean corrupt; private Token<BlockTokenIdentifier> blockToken = new Token<BlockTokenIdentifier>(); /** * List of cached datanode locations */ private DatanodeInfo[] cachedLocs; // Used when there are no locations private static final DatanodeInfo[] EMPTY_LOCS = new DatanodeInfo[0];
DFSOutputStream在寫完一個數據塊後,數據流管道上的節點,會通過和NN的DatanodeProtocol遠程接口的blockReceived()方法,向NN提交數據塊。如果數據隊列還有等到輸出的數據,DFSOutputStream會再次調用addBlock(),為文件添加新的數據塊。
Client完成數據的寫入後,會調用close()方法關閉流,關閉流意味著Client不會再向流中寫入數據。所以,當DFSOutputStream數據隊列的文件包都收到應答後,就可以使用ClientProtocol.complete()方法通知NN關閉文件,完成一次正常的文件寫入。
如果在文件寫入期間DN發生故障,則會執行下麵的操作(注意,這些操作對於寫入數據的Client是透明的):
- 數據流管道會被關閉,已經發送到管道但是還沒有收到確認的文件包,會被重新添加到DFSOutputStream的輸出隊列,這樣就保證了無路數據流管道的哪個DN發生故障,都不會丟失數據。當前正常工作的DN的數據塊會被賦予一個新的版本號,並通知NN。這樣,失敗的DN在從故障恢複過來以後,上麵隻有部分數據的Block會因為版本號和NN保存的版本號不匹配而被刪除。
- 在數據流管道中刪除錯誤的DN並建立新的管道,繼續寫數據到正常工作的DN。
- 文件關閉後,NN會發現該Block的副本數沒有達到要求,會選擇一個新的DN並複製Block,創建新的副本。DN的故障隻會影響一個Block的寫操作,後續Block的寫入不會受到影響。
4. DataNode的啟動與心跳機製
本節討論DN的啟動及其與NN之間的交互。包括DN從啟動到進入正常工作狀態的注冊,Block上報,以及正常工作過程中的心跳等與NN相關的遠程調用。這部分雖然隻涉及DatanodeProtocol的接口,但是有助於進一步理解DN與NN的關係。
正常啟動的DN或者為升級而啟動的DN,都會向NN發送遠程調用versionRequest(),進行必要的版本檢查。這裏的版本檢查,隻涉及構建版本號,保證它們間的HDFS版本是一致的。
在版本檢查結束後,DN會接著通過遠程調用register(),向NN注冊。DatanodeProtocol.register()的主要工作也是檢查,確認該DN是NN所管理集群的成員。也就是說,用戶不能把某一個集群中的某個node直接注冊到另外一個集群中去,保證了整個係統的數據一致性。
注冊成功後,DN會將它所管理的所有Block的信息,通過blockRequest()方法上報到NN(步驟三),以幫助NN建立HDFS文件數據塊到DN的映射關係。在此後,DN才正式的提供服務。
由於NN和DN是簡單的主從關係,DN需要每隔一段時間發送心跳到NN(步驟四和步驟五)。如果NN長時間收不到DN的心跳,它會認為DN已經失效。如果NN需要一些DN需要配合的動作,則會通過sendHeartbeat()的方法返回。該返回值是一個DatanodeCommand數組,它是NN的指令。
應該說,DN和NN的交互邏輯非常簡單。大部分是通過DN到NN的心跳來完成的。但是考慮到一定規模的HDFS集群,一個NN會管理上千個DN,這樣的設計也就非常自然了。
5. SNN節點的元數據合並
當Client對HDFS的文件目錄進行修改時,NN都會在edits中留下記錄,以保證在係統出現問題時,通過日誌可以進行恢複。
fsimage是某一個時刻的檢查點(checkpoint)。由於fsimage很大,因此不會在每次的元數據修改都寫入到它裏邊,而隻是存在到edits中。在係統啟動時,會首先狀態最近時刻的fsimage,然後在通過edits,恢複係統的最新狀態。
當時如果edits太大,那麼節點啟動時將用很長的時間來執行日誌的每一個操作,使得係統恢複最近的狀態。在啟動恢複的這段時間,服務是不可用的。為了避免edits多大,增加集群的可用時間,HDFS引入了第二名字節點,即SNN(Secondary NameNode)。SNN不是NN的standby,它隻是輔助NN完成合並(merge)fsimage和edits。過程涉及NamenodeProtocol和NN與SNN之間的流式接口。
該過程由SNN發起,首先通過遠程方法NamenodeProtocol.getEditLogSize()獲得NN上edits的大小。如果日誌很小,SNN就會在指定的時間後重新檢查。否則,繼續通過遠程接口rollEditLog(),啟動一次檢查點的過程。這時,NN需要創建一個新的編輯日誌edits.new,後續對元數據的改動,都會記錄到這個新日誌中。而原有的fsimage和edits,會由SNN通過HTTP下載到本地(步驟三和步驟四),在內存中進行merge。合並的結果就是fsimage.ckpt。然後SNN通過HTTP接口通知NN fsimage已經準備好。NN會通過HTTP get獲取merge好的fsimage。在NN下載完成後,SNN會通過NamenodeProtocol.rollFsImage(),完成這次檢查點。NN在處理這個遠程方法時,會用fsimage.ckpt 覆蓋原來的fsimage,並且將新的edits.new改名為edit。
參考資料:
1. Hadoop技術內幕-深入解析HADOOP COMMON和HDFS架構設計與實現原理最後更新:2017-04-03 12:56:11