Hadoop的輸入輸出格式(重要)
首先說一下Hadoop中預定義的Mapper 與Reducer


InputFormat接口決定了輸入文件如何被hadoop分塊(split up)與接受。
TextInputFormat是InputFormat的默認實現,對於輸入數據中沒有明確的key值時非常有效,TextInputFormat返回的key值為字符在輸入塊中的行數,value為這行的內容。
其他InputFormat的子類還有
KeyValueTextInputFormat(鍵:Text,值:Text)他的分割符默認為tab("\t"),我們可以通過key.value.separator.input.line.property設置
SequenceFileInputFormat<K,V>(鍵和值都是由用戶定義)
NLineInputFormat(鍵:LongWritable,值:Text)等
你的MapReduce程序如果要是用KeyValueTextInputFormat作為輸入格式,我們可以這樣做:
conf.setInputFormat(KeyValueTextInputFormat.class);
package org.apache.hadoop.mapred;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
public interface InputFormat<K, V> {
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader<K, V> getRecordReader(InputSplit split,
JobConf job,
Reporter reporter) throws IOException;
}
以上是InputFormat的定義,裏邊有兩個方法。
第一個方法的功能是確認所有的文件為輸入數據並且把他們分成塊(splits)。每一個任務都被分配一個塊(split)。
第二個方式的功能是提供一個對象用來遍曆給定塊(split)中的記錄,並且把記錄解析成先前定義的key和value類型。
一般來說getSplits()方法不用我們去管。其實以上所舉出的InputFormat的子類都是繼承子FileInputFormat的,FileInputFormat實現了getSplits方法,把getRecordReader()方法留給子類去實現。
我們在使用FileInputFormat時,我們主要的精力在定製合適的RecordReader類上,因為他負責如何將splits解析為records,將records解析成為key/value對。
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.io.DataInput;
public interface RecordReader<K, V> {
boolean next(K key, V value) throws IOException;
K createKey();
V createValue();
long getPos() throws IOException;
public void close() throws IOException;
float getProgress() throws IOException;
}
以上為RecordReader的簽名,我們一般從現有的RecordReader的子類中來定製自己的RecordReader。主要是實現next()方法。
比如LineRecordReader繼承了RecordReader<LongWritable, Text>,TextInputFormat使用了這個Reader。KeyValueLineRecordReader用在了KeyValueTextInputFormat類中。
下麵我們自己實現一個RecordReader類,她將要解析的key為Text,value為URLWritable(自己定義)
首先我們需要定義URLWritable這個類,有了這個以後我們就可以定義我們的Reader了
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueLineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class TimeUrlLineRecordReader implements RecordReader<Text, URLWritable> {
private KeyValueLineRecordReader lineReader;
private Text lineKey, lineValue;
public TimeUrlLineRecordReader(JobConf job, FileSplit split) throws IOException {
lineReader = new KeyValueLineRecordReader(job, split);
lineKey = lineReader.createKey();
lineValue = lineReader.createValue();
}
@Override
public boolean next(Text key, URLWritable value) throws IOException {
// TODO Auto-generated method stub
if (!lineReader.next(lineKey, lineValue)) {
return false;
}
key.set(lineKey);
value.set(lineValue.toString());
return true;
}
@Override
public Text createKey() {
// TODO Auto-generated method stub
return new Text("");
}
@Override
public URLWritable createValue() {
// TODO Auto-generated method stub
return new URLWritable();
}
@Override
public long getPos() throws IOException {
// TODO Auto-generated method stub
return lineReader.getPos();
}
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
lineReader.close();
}
@Override
public float getProgress() throws IOException {
// TODO Auto-generated method stub
return lineReader.getProgress();
}
}
class URLWritable implements Writable {
protected URL url;
public URLWritable() {}
public URLWritable(URL url) {
this.url = url;
}
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeUTF(url.toString());
}
@Override
public void readFields(DataInput in) throws IOException {
// TODO Auto-generated method stub
url = new URL(in.readUTF());
}
public void set(String s) throws MalformedURLException {
url = new URL(s);
}
}可以看到,我們在TimeUrlLineRecordReader類中創建了一個KeyValueLineRecordReader對象,然後在實現getPos()、getProgress()、close()方法是直接是哦那個他對應的方法即可。
在next()方法中,我Text類型的lineValue轉型為URLWritable類型了。
這時我們就可呀使用這個RecordReader了
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable> {
@Override
public RecordReader<Text, URLWritable> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
// TODO Auto-generated method stub
return new TimeUrlLineRecordReader(job, (FileSplit)split);
}
}
OutputFormat和InputFormat類似,但是有些地方不一樣。
和InputFormat一樣,所有的OutputFormat大多數繼承自FileOutputFormat,但是NullOutputForma和DBOutputFormat。他們是為專門領域的應用程序預留的。
TextOutputFormat是默認的輸入格式,key value 用一個tab分開,分割符也可以通過mapred.textoutputformat.separator屬性設置
TextOutputFormat的輸出格式可以被KeyValueTextInputFormat接受
如果輸出的key類型為NullWritable的輸出格式可以被TextInputFormat接受。在這中情況下key沒有被輸出來,也沒有分割符。
如果不想讓reduce程序有任何輸出我們可以把輸出格式設置為NullOutputFormat。阻止hadoop的輸出在reducer用自己的方式輸出文件而不許要hadoop框架些任何額外文件是十分有用。
SequenceFileOutuputFormat它把輸出寫入到一個串文件中(sequence files),這樣我麼就可以用SequenceFileInputFormat讀會數據。這在處理多個聯係mapreduce任務時十分有效。
PS:hadoop中預定義的InputFormat與OutputFormat


最後更新:2017-04-04 07:03:49