閱讀963 返回首頁    go 阿裏雲 go 技術社區[雲棲]


Hadoop的輸入輸出格式(重要)

首先說一下Hadoop中預定義的Mapper 與Reducer




InputFormat接口決定了輸入文件如何被hadoop分塊(split up)接受。

TextInputFormat是InputFormat的默認實現,對於輸入數據中沒有明確的key值時非常有效,TextInputFormat返回的key值為字符在輸入塊中的行數,value為這行的內容。

其他InputFormat的子類還有

KeyValueTextInputFormat(鍵:Text,值:Text)他的分割符默認為tab("\t"),我們可以通過key.value.separator.input.line.property設置

SequenceFileInputFormat<K,V>(鍵和值都是由用戶定義)

NLineInputFormat(鍵:LongWritable,值:Text)等

你的MapReduce程序如果要是用KeyValueTextInputFormat作為輸入格式,我們可以這樣做:

conf.setInputFormat(KeyValueTextInputFormat.class);

package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
public interface InputFormat<K, V> {

  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;

  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job, 
                                     Reporter reporter) throws IOException;
}

以上是InputFormat的定義,裏邊有兩個方法。

第一個方法的功能是確認所有的文件為輸入數據並且把他們分成塊(splits)。每一個任務都被分配一個塊(split)。

第二個方式的功能是提供一個對象用來遍曆給定塊(split)中的記錄,並且把記錄解析成先前定義的key和value類型。


一般來說getSplits()方法不用我們去管。其實以上所舉出的InputFormat的子類都是繼承子FileInputFormat的,FileInputFormat實現了getSplits方法,把getRecordReader()方法留給子類去實現。

我們在使用FileInputFormat時,我們主要的精力在定製合適的RecordReader類上,因為他負責如何將splits解析為records,將records解析成為key/value對。

package org.apache.hadoop.mapred;

import java.io.IOException;
import java.io.DataInput;
public interface RecordReader<K, V> {
  boolean next(K key, V value) throws IOException;
  K createKey();
  V createValue();
  long getPos() throws IOException;
  public void close() throws IOException;
  float getProgress() throws IOException;
}


以上為RecordReader的簽名,我們一般從現有的RecordReader的子類中來定製自己的RecordReader。主要是實現next()方法。

比如LineRecordReader繼承了RecordReader<LongWritable, Text>,TextInputFormat使用了這個Reader。KeyValueLineRecordReader用在了KeyValueTextInputFormat類中。

下麵我們自己實現一個RecordReader類,她將要解析的key為Text,value為URLWritable(自己定義)

首先我們需要定義URLWritable這個類,有了這個以後我們就可以定義我們的Reader了

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueLineRecordReader;
import org.apache.hadoop.mapred.RecordReader;


public class TimeUrlLineRecordReader implements RecordReader<Text, URLWritable> {

	private KeyValueLineRecordReader lineReader;
	private Text lineKey, lineValue;
	public TimeUrlLineRecordReader(JobConf job, FileSplit split) throws IOException {
		lineReader = new KeyValueLineRecordReader(job, split);
		
		lineKey = lineReader.createKey();
		lineValue = lineReader.createValue();
	}
	@Override
	public boolean next(Text key, URLWritable value) throws IOException {
		// TODO Auto-generated method stub
		if (!lineReader.next(lineKey, lineValue)) {
			return false;	
		}
		key.set(lineKey);
		value.set(lineValue.toString());
		return true;
		
	}

	@Override
	public Text createKey() {
		// TODO Auto-generated method stub
		return new Text("");
	}

	@Override
	public URLWritable createValue() {
		// TODO Auto-generated method stub
		return new URLWritable();
	}

	@Override
	public long getPos() throws IOException {
		// TODO Auto-generated method stub
		return lineReader.getPos();
	}

	@Override
	public void close() throws IOException {
		// TODO Auto-generated method stub
		lineReader.close();
	}

	@Override
	public float getProgress() throws IOException {
		// TODO Auto-generated method stub
		return lineReader.getProgress();
	}

}
class URLWritable implements Writable {

	protected URL url;
	public URLWritable() {}
	public URLWritable(URL url) {
		this.url = url;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeUTF(url.toString());
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		url = new URL(in.readUTF());
	}
	public void set(String s) throws MalformedURLException {
		url = new URL(s);
	}
	
}
可以看到,我們在TimeUrlLineRecordReader類中創建了一個KeyValueLineRecordReader對象,然後在實現getPos()、getProgress()、close()方法是直接是哦那個他對應的方法即可。

在next()方法中,我Text類型的lineValue轉型為URLWritable類型了。

這時我們就可呀使用這個RecordReader了

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;


public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable> {

	@Override
	public RecordReader<Text, URLWritable> getRecordReader(InputSplit split,
			JobConf job, Reporter reporter) throws IOException {
		// TODO Auto-generated method stub
		return new TimeUrlLineRecordReader(job, (FileSplit)split);
	}

}


OutputFormat和InputFormat類似,但是有些地方不一樣。

和InputFormat一樣,所有的OutputFormat大多數繼承自FileOutputFormat,但是NullOutputForma和DBOutputFormat。他們是為專門領域的應用程序預留的。


TextOutputFormat是默認的輸入格式,key value 用一個tab分開,分割符也可以通過mapred.textoutputformat.separator屬性設置

TextOutputFormat的輸出格式可以被KeyValueTextInputFormat接受

如果輸出的key類型為NullWritable的輸出格式可以被TextInputFormat接受。在這中情況下key沒有被輸出來,也沒有分割符。

如果不想讓reduce程序有任何輸出我們可以把輸出格式設置為NullOutputFormat。阻止hadoop的輸出在reducer用自己的方式輸出文件而不許要hadoop框架些任何額外文件是十分有用。

SequenceFileOutuputFormat它把輸出寫入到一個串文件中(sequence files),這樣我麼就可以用SequenceFileInputFormat讀會數據。這在處理多個聯係mapreduce任務時十分有效。

PS:hadoop中預定義的InputFormat與OutputFormat




最後更新:2017-04-04 07:03:49

  上一篇:go iOS開發那些事-iOS常用設計模式–委托模式
  下一篇:go ThreadLocal實現方式&amp;使用介紹---無鎖化線程封閉