Hadoop文件係統訪問的兩種方式
在這裏記錄下學習hadoop 的過程,並對重要內容記錄下來,以備以後查漏補缺。
要從Hadoop文件係統中讀取文件,一般有兩種方式:
1.使用java.net.URL對象
package com.ytu.chapter3; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; public class URLCat { static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } public static void main(String[] args) { InputStream input = null; try { input = new URL("hdfs://localhost:9000/user/liujiacai/build.xml").openStream(); IOUtils.copyBytes(input, System.out, 4096, false); } catch (MalformedURLException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { IOUtils.closeStream(input); } } }
這種方式需要讓Java識別Hadoop文件係統的URL方案,就是通過一個FsUrlStreamHandlerFactory實例來調用URL中的setURLStreamHandlerFactory方法。這種方法在一個java虛擬機中隻被調用一次,所以一般放在static塊中。這個限製意味著如果程序的其他部件設置了一個URLStreamHandlerFactory,我們便無法再從Hadoop中讀取數據了。
這需要是我第二種方法。
2.使用FileSystemAPI讀取數據
在命令行中,我們可以和使用linux係統命令一樣來操作hdfs係統。
hadoop fs -ls /
這個命令可以查看根目錄下的文件,如果想要遞歸查看,參數改為 -lsr 即可
如果想知道更多的幫助可以用以下命令:
hadoop fs -help ls
可以得到ls的用法提示。
這裏重點講解用hadoop api操作hdfs文件係統。
通過調用FileSystem.get(Configuration conf)工廠方法可以得到FileSystem的實例。
Configuration class is a special class for holding key/value configuration parameters.
Configuration對象封裝了一個客戶端或者服務器的配置,這是用從類路徑對去而來的配置文件(如conf/core-site.xml)來設置。
public static FileSystem get(Configuraion conf) throws IOException
這個靜態方法返回的是默認文件係統(在conf/core-site.xml中設置,如果沒有設置過,則是默認的本地文件係統)。
我們可以這樣得到HDFS文件係統:
Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf);
我們可以這樣得到本地文件係統
FileSystem local = FileSystem.getLocal(conf);
在hadoop文件api中,我們用Path對象來編碼文件名和文件夾名,用FileStatus對象來存儲文件與文件夾的元信息(metadata)
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class PutMerge { public static void main(String[] args) throws IOException { Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); FileSystem local = FileSystem.getLocal(conf); Path inputDir = new Path(args[0]); Path outputDir = new Path(args[1]); FileStatus[] inputFiles = local.listStatus(inputDir); //FSDataOutputStream是Java標準庫java.io.DataOutputSteam的子類,同時增加了隨機訪問的功能 FSDataOutputStream out = hdfs.create(outputDir); for (int i = 0; i < inputFiles.length; i++) { System.out.println(inputFiles[i].getPath().getName()); FSDataInputStream in = local.open(inputFiles[i].getPath());//默認4K為緩衝區大小 byte[] buffer = new byte[256]; int bytesRead = 0; while((bytesRead = in.read(buffer))>0) { out.write(buffer,0,bytesRead); } in.close(); } out.close(); } }
上麵這個完整的程序完成的功能是:把本地的一個文件夾中的文件再上傳到hdfs文件係統時將它們合並。
除此之外FileSystem類也有諸如delete(),exists(),mkdirs(),rename()等方法。
FSDataInputStream不是標準的java.io類,這個類是java.io.DataInputStream的一個子類,支持隨機訪問,這樣就可以從流的任意位置讀取數據了。
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, Closeable
上麵是它的簽名,Seekable接口允許在文件中定位,並提供一個查詢方式,用於查詢當前位置相對於文件開始位置的偏移量。
public interface Seekable { /** * Seek to the given offset from the start of the file. * The next read() will be from that location. Can't * seek past the end of the file. */ void seek(long pos) throws IOException; /** * Return the current offset from the start of the file */ long getPos() throws IOException; /** * Seeks a different copy of the data. Returns true if * found a new source, false otherwise. */ boolean seekToNewSource(long targetPos) throws IOException; }
調用seek方法來定位大於文件長度的位置會導致IOException異常。與java.io.InputStream中的skip()方法不同,seek()並沒有指出數據流當前位置之後的一點,他可以轉移到文件中任何一個位置。
應用程序員並不常用seekToNewSource方法。此方法一般傾向於切換到數據的另一個副本並在新的副本中尋找targetPos指定的位置。HDFS內部就采用這種方式在數據節點故障時為客戶端提供可靠的數據流。
FsDataInputStream也實現了PositionedReadable接口
public interface PositionedReadable { /** * Read upto the specified number of bytes, from a given * position within a file, and return the number of bytes read. This does not * change the current offset of a file, and is thread-safe. */ public int read(long position, byte[] buffer, int offset, int length) throws IOException; /** * Read the specified number of bytes, from a given * position within a file. This does not * change the current offset of a file, and is thread-safe. */ public void readFully(long position, byte[] buffer, int offset, int length) throws IOException; /** * Read number of bytes equalt to the length of the buffer, from a given * position within a file. This does not * change the current offset of a file, and is thread-safe. */ public void readFully(long position, byte[] buffer) throws IOException; }這個接口內的方法都會保留文件當前位置並且是線程安全的,因此他們提供了在讀取文件的主要部分時訪問其他部分的便利方法。
最後務必記住:seek方法是一個相對高開銷的操作,需要慎重使用。
最後更新:2017-04-04 07:03:48