閱讀682 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Hadoop文件係統訪問的兩種方式

在這裏記錄下學習hadoop 的過程,並對重要內容記錄下來,以備以後查漏補缺。

要從Hadoop文件係統中讀取文件,一般有兩種方式:

1.使用java.net.URL對象

package com.ytu.chapter3;

import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

public class URLCat {
	static {
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
	}
	public static void main(String[] args) {
		InputStream input = null;
		
		try {
			input = new URL("hdfs://localhost:9000/user/liujiacai/build.xml").openStream();
			IOUtils.copyBytes(input, System.out, 4096, false);
		} catch (MalformedURLException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			IOUtils.closeStream(input);
		}
		
	}
}

這種方式需要讓Java識別Hadoop文件係統的URL方案,就是通過一個FsUrlStreamHandlerFactory實例來調用URL中的setURLStreamHandlerFactory方法。這種方法在一個java虛擬機中隻被調用一次,所以一般放在static塊中。這個限製意味著如果程序的其他部件設置了一個URLStreamHandlerFactory,我們便無法再從Hadoop中讀取數據了。

這需要是我第二種方法。

2.使用FileSystemAPI讀取數據

在命令行中,我們可以和使用linux係統命令一樣來操作hdfs係統。

hadoop fs -ls /

這個命令可以查看根目錄下的文件,如果想要遞歸查看,參數改為  -lsr 即可

如果想知道更多的幫助可以用以下命令:

hadoop fs -help ls

可以得到ls的用法提示。

這裏重點講解用hadoop api操作hdfs文件係統。


通過調用FileSystem.get(Configuration conf)工廠方法可以得到FileSystem的實例。

Configuration class is a special class for holding key/value configuration parameters.

Configuration對象封裝了一個客戶端或者服務器的配置,這是用從類路徑對去而來的配置文件(如conf/core-site.xml)來設置。

public static FileSystem get(Configuraion conf) throws IOException

這個靜態方法返回的是默認文件係統(在conf/core-site.xml中設置,如果沒有設置過,則是默認的本地文件係統)。

我們可以這樣得到HDFS文件係統:

Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);

我們可以這樣得到本地文件係統

FileSystem local = FileSystem.getLocal(conf);

在hadoop文件api中,我們用Path對象來編碼文件名和文件夾名,用FileStatus對象來存儲文件與文件夾的元信息(metadata)

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class PutMerge {

	public static void main(String[] args) throws IOException {
		Configuration conf = new Configuration();
		FileSystem hdfs = FileSystem.get(conf);
		FileSystem local = FileSystem.getLocal(conf);

		Path inputDir = new Path(args[0]);
		Path outputDir = new Path(args[1]);

		FileStatus[] inputFiles = local.listStatus(inputDir);

		//FSDataOutputStream是Java標準庫java.io.DataOutputSteam的子類,同時增加了隨機訪問的功能
		FSDataOutputStream out = hdfs.create(outputDir);

		for (int i = 0; i < inputFiles.length; i++) {
			System.out.println(inputFiles[i].getPath().getName());
			FSDataInputStream in = local.open(inputFiles[i].getPath());//默認4K為緩衝區大小
			byte[] buffer = new byte[256];
			int bytesRead = 0;
			while((bytesRead = in.read(buffer))>0) {
				out.write(buffer,0,bytesRead);
			}
			in.close();
		}
		out.close();
	}

}

上麵這個完整的程序完成的功能是:把本地的一個文件夾中的文件再上傳到hdfs文件係統時將它們合並。

除此之外FileSystem類也有諸如delete(),exists(),mkdirs(),rename()等方法。


FSDataInputStream不是標準的java.io類,這個類是java.io.DataInputStream的一個子類,支持隨機訪問,這樣就可以從流的任意位置讀取數據了。

public class FSDataInputStream extends DataInputStream
    implements Seekable, PositionedReadable, Closeable 

上麵是它的簽名,Seekable接口允許在文件中定位,並提供一個查詢方式,用於查詢當前位置相對於文件開始位置的偏移量。

public interface Seekable {
  /**
   * Seek to the given offset from the start of the file.
   * The next read() will be from that location.  Can't
   * seek past the end of the file.
   */
  void seek(long pos) throws IOException;
  
  /**
   * Return the current offset from the start of the file
   */
  long getPos() throws IOException;

  /**
   * Seeks a different copy of the data.  Returns true if 
   * found a new source, false otherwise.
   */
  boolean seekToNewSource(long targetPos) throws IOException;
}

調用seek方法來定位大於文件長度的位置會導致IOException異常。與java.io.InputStream中的skip()方法不同,seek()並沒有指出數據流當前位置之後的一點,他可以轉移到文件中任何一個位置。

應用程序員並不常用seekToNewSource方法。此方法一般傾向於切換到數據的另一個副本並在新的副本中尋找targetPos指定的位置。HDFS內部就采用這種方式在數據節點故障時為客戶端提供可靠的數據流。





FsDataInputStream也實現了PositionedReadable接口

public interface PositionedReadable {
  /**
   * Read upto the specified number of bytes, from a given
   * position within a file, and return the number of bytes read. This does not
   * change the current offset of a file, and is thread-safe.
   */
  public int read(long position, byte[] buffer, int offset, int length)
    throws IOException;
  
  /**
   * Read the specified number of bytes, from a given
   * position within a file. This does not
   * change the current offset of a file, and is thread-safe.
   */
  public void readFully(long position, byte[] buffer, int offset, int length)
    throws IOException;
  
  /**
   * Read number of bytes equalt to the length of the buffer, from a given
   * position within a file. This does not
   * change the current offset of a file, and is thread-safe.
   */
  public void readFully(long position, byte[] buffer) throws IOException;
}
這個接口內的方法都會保留文件當前位置並且是線程安全的,因此他們提供了在讀取文件的主要部分時訪問其他部分的便利方法。


最後務必記住:seek方法是一個相對高開銷的操作,需要慎重使用。







最後更新:2017-04-04 07:03:48

  上一篇:go 歸心似箭,IT達人分享搶票攻略
  下一篇:go 仿android4.0 Spinner下拉效果