Hadoop的輸入輸出格式(重要)
首先說一下Hadoop中預定義的Mapper 與Reducer
InputFormat接口決定了輸入文件如何被hadoop分塊(split up)與接受。
TextInputFormat是InputFormat的默認實現,對於輸入數據中沒有明確的key值時非常有效,TextInputFormat返回的key值為字符在輸入塊中的行數,value為這行的內容。
其他InputFormat的子類還有
KeyValueTextInputFormat(鍵:Text,值:Text)他的分割符默認為tab("\t"),我們可以通過key.value.separator.input.line.property設置
SequenceFileInputFormat<K,V>(鍵和值都是由用戶定義)
NLineInputFormat(鍵:LongWritable,值:Text)等
你的MapReduce程序如果要是用KeyValueTextInputFormat作為輸入格式,我們可以這樣做:
conf.setInputFormat(KeyValueTextInputFormat.class);
package org.apache.hadoop.mapred; import java.io.IOException; import org.apache.hadoop.fs.FileSystem; public interface InputFormat<K, V> { InputSplit[] getSplits(JobConf job, int numSplits) throws IOException; RecordReader<K, V> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException; }
以上是InputFormat的定義,裏邊有兩個方法。
第一個方法的功能是確認所有的文件為輸入數據並且把他們分成塊(splits)。每一個任務都被分配一個塊(split)。
第二個方式的功能是提供一個對象用來遍曆給定塊(split)中的記錄,並且把記錄解析成先前定義的key和value類型。
一般來說getSplits()方法不用我們去管。其實以上所舉出的InputFormat的子類都是繼承子FileInputFormat的,FileInputFormat實現了getSplits方法,把getRecordReader()方法留給子類去實現。
我們在使用FileInputFormat時,我們主要的精力在定製合適的RecordReader類上,因為他負責如何將splits解析為records,將records解析成為key/value對。
package org.apache.hadoop.mapred; import java.io.IOException; import java.io.DataInput; public interface RecordReader<K, V> { boolean next(K key, V value) throws IOException; K createKey(); V createValue(); long getPos() throws IOException; public void close() throws IOException; float getProgress() throws IOException; }
以上為RecordReader的簽名,我們一般從現有的RecordReader的子類中來定製自己的RecordReader。主要是實現next()方法。
比如LineRecordReader繼承了RecordReader<LongWritable, Text>,TextInputFormat使用了這個Reader。KeyValueLineRecordReader用在了KeyValueTextInputFormat類中。
下麵我們自己實現一個RecordReader類,她將要解析的key為Text,value為URLWritable(自己定義)
首先我們需要定義URLWritable這個類,有了這個以後我們就可以定義我們的Reader了
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.KeyValueLineRecordReader; import org.apache.hadoop.mapred.RecordReader; public class TimeUrlLineRecordReader implements RecordReader<Text, URLWritable> { private KeyValueLineRecordReader lineReader; private Text lineKey, lineValue; public TimeUrlLineRecordReader(JobConf job, FileSplit split) throws IOException { lineReader = new KeyValueLineRecordReader(job, split); lineKey = lineReader.createKey(); lineValue = lineReader.createValue(); } @Override public boolean next(Text key, URLWritable value) throws IOException { // TODO Auto-generated method stub if (!lineReader.next(lineKey, lineValue)) { return false; } key.set(lineKey); value.set(lineValue.toString()); return true; } @Override public Text createKey() { // TODO Auto-generated method stub return new Text(""); } @Override public URLWritable createValue() { // TODO Auto-generated method stub return new URLWritable(); } @Override public long getPos() throws IOException { // TODO Auto-generated method stub return lineReader.getPos(); } @Override public void close() throws IOException { // TODO Auto-generated method stub lineReader.close(); } @Override public float getProgress() throws IOException { // TODO Auto-generated method stub return lineReader.getProgress(); } } class URLWritable implements Writable { protected URL url; public URLWritable() {} public URLWritable(URL url) { this.url = url; } @Override public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeUTF(url.toString()); } @Override public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub url = new URL(in.readUTF()); } public void set(String s) throws MalformedURLException { url = new URL(s); } }可以看到,我們在TimeUrlLineRecordReader類中創建了一個KeyValueLineRecordReader對象,然後在實現getPos()、getProgress()、close()方法是直接是哦那個他對應的方法即可。
在next()方法中,我Text類型的lineValue轉型為URLWritable類型了。
這時我們就可呀使用這個RecordReader了
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; public class TimeUrlTextInputFormat extends FileInputFormat<Text, URLWritable> { @Override public RecordReader<Text, URLWritable> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { // TODO Auto-generated method stub return new TimeUrlLineRecordReader(job, (FileSplit)split); } }
OutputFormat和InputFormat類似,但是有些地方不一樣。
和InputFormat一樣,所有的OutputFormat大多數繼承自FileOutputFormat,但是NullOutputForma和DBOutputFormat。他們是為專門領域的應用程序預留的。
TextOutputFormat是默認的輸入格式,key value 用一個tab分開,分割符也可以通過mapred.textoutputformat.separator屬性設置
TextOutputFormat的輸出格式可以被KeyValueTextInputFormat接受
如果輸出的key類型為NullWritable的輸出格式可以被TextInputFormat接受。在這中情況下key沒有被輸出來,也沒有分割符。
如果不想讓reduce程序有任何輸出我們可以把輸出格式設置為NullOutputFormat。阻止hadoop的輸出在reducer用自己的方式輸出文件而不許要hadoop框架些任何額外文件是十分有用。
SequenceFileOutuputFormat它把輸出寫入到一個串文件中(sequence files),這樣我麼就可以用SequenceFileInputFormat讀會數據。這在處理多個聯係mapreduce任務時十分有效。
PS:hadoop中預定義的InputFormat與OutputFormat
最後更新:2017-04-04 07:03:49