《Hadoop與大數據挖掘》一2.4.4 MapReduce組件分析與編程實踐
本節書摘來華章計算機《Hadoop與大數據挖掘》一書中的第2章 ,第2.4.4節,張良均 樊 哲 位文超 劉名軍 許國傑 周 龍 焦正升 著 更多章節內容可以訪問雲棲社區“華章計算機”公眾號查看。
2.4.4 MapReduce組件分析與編程實踐
MapReduce整個流程包括以下步驟:輸入格式(InputFormat)、Mapper、Combiner、Partitioner、Reducer、輸出格式(OutputFormat)。這裏會針對流程中的Combiner、Part-itioner、輸入/輸出格式進行分析,同時,也會介紹相關的編程技巧,如自定義鍵值對。
1. Combiner分析
Combiner是什麼呢?從字麵意思理解,Combine即合並。其實,Combiner就是對Mapper的輸出進行一定的合並,減少網絡輸出的組件。所以,其去掉與否不影響最終結果,影響的隻是性能。
Combiner是Mapper端的匯總,然後才通過網絡發向Reducer。如圖2-40所示,經過Combiner後,鍵值對,被合並為,這樣發往Reducer的記錄就可以減少一條(當然,實際中肯定不是隻減少一條記錄),從而減少了網絡IO。
對於多個輸入數據塊,每個數據塊產生一個InputSplit,每個InputSplit對應一個map任務,每個map任務會對應0個到多個Combiner,最後再匯總到Reducer。在單詞計數的例子中,使用Combiner的情形如圖2-41所示。
需要注意的是,自定義Combiner也是需要集成Reducer的,同樣也需要在reduce函數中寫入處理邏輯。但是要注意,Combiner的輸入鍵值對格式與輸出鍵值對格式必須保持一致,也正是因為這個要求,很多情況下,采用自定義Combiner的方式在業務或算法處理上行不通。還有,在單詞計數程序中,Combiner和Reducer使用的是同一個類代碼,這是可能的,但是大多數情況下不能這樣做,因為Reducer和Combiner的邏輯在很多情況下是不一樣的。
2. Partitioner分析
Partitioner是來做什麼的呢?是用來提高性能的嗎?非也!Partitioner主要的目的是把鍵值對分給不同的Reducer。分給不同的Reducer?難道Reducer可以有多個嗎?這是當然的,隻需要在初始化Job實例的時候進行設置即可,例如設置代碼為job.setNum-ReduceTasks(3),這樣就可以設置3個Reducer了。
經過前麵的分析可以知道,在Reducer的輸入端,其鍵值對組是按照一個鍵對應一個值列表的。如果同一個鍵的不同值被發送到了不同的Reducer中,那麼(注意,每個Reducer在一個子節點運行,不同Reducer之間不會幹擾),經過不同的Reducer處理後,其實我們已經做不到針對一個鍵,輸出一個值了,而是輸出了兩條記錄。我們可以看下Hadoop係統默認的Partitioner實現,默認的Partitioner是HashPartitioner,其源碼如代碼清單2-30所示。
代碼清單2-30 HashPartitioner源碼
public class HashPartitioner<K, V> extends Partitioner<K, V> {
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K key, V value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
在源碼中,可以看到HashPartitiner中隻有一個方法,就是getPartition(K key,V value, int numReducTasks)。3個參數分別為鍵、值、Reducer的個數,輸出其實就是Reducer的ID。從代碼的實現中可以看出,最終輸出的Reducer ID隻與鍵(key)的值有關,這樣也就保證了同樣的鍵會被發送到同一個Reducer中處理。
同一個鍵的記錄會被發送到同一個Reducer中處理,一個Reducer可以處理不同的鍵的記錄。
3.輸入輸出格式/鍵值類型
一般來說,HDFS一個文件對應多個文件塊,每個文件塊對應一個InputSplit,而一個InputSplit就對應一個Mapper任務,每個Mapper任務在一個節點上運行,其僅處理當前文件塊的數據,但是我們編寫Mapper的時候隻是關心輸入鍵值對,而不是關心輸入文件塊。那麼,文件塊怎麼被處理成了鍵值對呢?這就是Hadoop的輸入格式要做的工作了。
在InputFormat中定義了如何分割以及如何進行數據讀取從而得到鍵值對的實現方式,它有一個子類FileInputFormat,如果要自定義輸入格式,一般都會集成它的子類File-InputFormat,它裏麵幫我們實現了很多基本的操作,比如記錄跨文件塊的處理等。
圖2-42所示是InputFormat的類繼承結構。
然而,比較常用的則是如表2-7所示的幾個實現方式。
同理,可以想象,輸出格式(OutputFormat)也與輸入格式相同,不過是輸入格式的逆過程:把鍵值對寫入HDFS中的文件塊中。如圖2-43所示是OutputFormat的類繼承結構。
在Hadoop中,無論是Mapper或Reducer處理的都是鍵值對記錄,那麼Hadoop中有哪些鍵值對類型呢?Hadoop中常用的鍵值對類型如圖2-44所示。
從各個類的命名上其實也可以看出其代表什麼類型,比如LongWritable,代表的就是Long的實現,而Text就是String的實現。在前麵的單詞計數中我們使用過IntWritable以及Text。
這裏有兩點需要注意:
1)值類型都需實現Writale接口;
2)鍵需要實現WritableComparable接口。
其實從圖2-44中也可以看出,Hadoop已有的鍵值類型都是實現WritableComparable接口的,然而WritableComparable接口又是實現Writable接口的。所以,Hadoop已有的鍵值類型既可以作為鍵類型也可以作為值類型。作為鍵類型的肯定可以作為值類型,但作為值類型的卻不能作為鍵類型。為什麼鍵類型是實現WritableComparable接口呢?其實,如果你聯想到了Shuffle/Sort過程的話,應該不難理解,因為MapReduce框架需要在這裏對鍵進行排序。
4.動手實踐:指定輸入輸出格式
這個實驗主要是加深理解Hadoop的輸入/輸出格式,熟悉常用的SequenceFileInput-Format和SequenceFileOutputFormat。
實驗步驟:
1)打開Eclipse,打開已經完成的WordCount程序;
2)設置輸出格式為SequenceFileOutputFormat,重新打包,並提交到Linux上運行;
3)查看輸出的文件;
4)再次修改WordCount程序,設置輸入格式為SequenceFileInputFormat、輸入路徑為3的輸出;設置輸出格式為TextFileInputFormat;
5)查看輸出結果;
6)針對上麵的各個步驟以及輸出進行分析,解釋對應的輸出結構。
思考:
1)第4步中查看的文件是否是亂碼?如果是亂碼,為什麼是亂碼?針對這樣的數據,如何使用HDFS Java API進行讀取?如果不是亂碼,看到的是什麼?
2)使用SequenceFileInputFormat或SequenceFileOutputFormat有什麼優勢與劣勢?
5.自定義鍵值類型
Hadoop已經定義了很多鍵值類型,比如Text、IntWritable、LongWritable等,那為什麼需要用到自定鍵值類型呢?答案其實很簡單,不夠用。在有些情況下,我們需要一些特殊的鍵值類型來滿足我們的業務需求,這種時候就需要自定義鍵值類型了。前麵已經提到,自定義鍵需要實現WritableComparable接口,自定義值需要實現Writable接口,那麼實現了接口後,還需要做哪些操作呢?
自定義值類型可參考代碼清單2-31進行分析。
代碼清單2-31 自定義Hadoop 值類型
public class MyWritable implements Writable {
private int counter;
private long timestamp;
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
@Override
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp=in.readLong();
}
}
在代碼清單2-31中,首先實現了Writable接口,接著定義了兩個變量。這兩個變量其實是與業務相關的(比如,這裏定義了一個counter,一個timestamp)。實現了Writable接口後,需要覆寫兩個方法(write和readFields),這裏需要注意寫入和讀取的順序是很重要的,比如這裏先把counter寫入out輸出流,再把timestamp寫入out輸出流。那麼,在讀取的時候就需要先讀取counter,再讀取timestamp(如果兩個變量都是int型,那麼就更加需要注意區分)。
自定義鍵類型可參考代碼清單2-32進行分析。
代碼清單2-32 自定義Hadoop 鍵類型
public class MyWritableComparable implements WritableComparable {
private int counter;
private long timestamp;
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(counter);
out.writeLong(timestamp);
}
@Override
public void readFields(DataInput in) throws IOException {
counter = in.readInt();
timestamp= in.readLong();
}
@Override
public int compareTo(MyWritableComparable other) {
if(this.counter == other.counter){
return (int)(this.timestamp - other.timestamp);
}
return this.counter-other.counter;
}
}
從代碼清單2-32中可以看出,自定義鍵類型其實就是比自定義值類型多了一個比較方法而已,其他都是一樣的。
6.動手實踐:自定義鍵值類型
針對source/hadoop/keyvalue.data數據求解每行數據的個數以及平均值,該數據格式如表2-9所示。
1)編寫Driver程序,main函數接收兩個參數和,設置輸入格式為KeyValueInputFormat;
2)編寫Mapper程序,map函數針對每個value值,使用‘\t’進行分隔;接著,對分隔後的數據進行求和以及個數統計(注意將字符串轉換為數值),輸出平均值和個數,Mapper輸出鍵值對類型為;
3)編寫自定義value類型MyValue,定義兩個字段,一個是average,一個是num,用於存儲平均值和個數;重寫toString方法;
4)編寫Reducer程序,直接輸出即可;
5)對編寫的程序進行打包averagejob.jar;
6)上傳source/hadoop/keyvalue.data到HDFS,上傳averagejob.jar到Linux;
7)使用命令hadoop jar averagejob.jar進行調用;
8)查看輸出結果。
思考:
1)Reducer類是否必需?如果不需要,則如何修改?如果去掉reducer,輸出結果會有什麼不一樣?
2)如果想讓程序可以直接在Eclipse中運行,應該如何修改程序?
最後更新:2017-06-26 10:32:32