《Hadoop與大數據挖掘》一2.6.3 Hadoop TF-IDF編程實現
本節書摘來華章計算機《Hadoop與大數據挖掘》一書中的第2章 ,第2.6.3節,張良均 樊 哲 位文超 劉名軍 許國傑 周 龍 焦正升 著 更多章節內容可以訪問雲棲社區“華章計算機”公眾號查看。
2.6.3 Hadoop TF-IDF編程實現
這裏給出的TF-IDF算法的測試數據使用的是Avro格式的。這裏隻對Avro進行簡單介紹,如讀者需要深入了解,可以上網查找相關資料。
1. Avro簡介
Avro是一個數據序列化的係統,它可以將數據結構或對象轉化成便於存儲或傳輸的格式。Avro設計之初就用來支持數據密集型應用,適合於遠程或本地大規模數據的存儲和交換。
Avro依賴於模式(Schema)。通過模式定義各種數據結構,隻有確定了模式才能對數據進行解釋,所以在數據的序列化和反序列化之前,必須先確定模式的結構。
Schema通過JSON對象表示。Schema定義了簡單數據類型和複雜數據類型,其中複雜數據類型包含不同屬性。通過各種數據類型用戶可以自定義豐富的數據結構。
Avro定義了幾種簡單數據類型,表2-10是對其簡單說明。
Avro定義了6種複雜數據類型,分別是record、enum、array、map、union和fixed,每一種複雜數據類型都具有獨特的屬性。表2-11就record這一種複雜數據類型進行了簡要說明(後麵也隻會用到這種數據類型)。
(1)動手實踐:Java基於Avro的序列化和反序列化
簡單來說,Avro就是提供一個數據文件的說明文檔,然後可以直接根據該說明文檔進行序列化和反序列化的一個框架而已。
舉個例子,比如現在有一個數據描述文件,如代碼清單2-46所示。
代碼清單2-46 Avro描述文件
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]}
]
}
有定義一個Java類和該描述文件匹配,如代碼清單2-47所示。
代碼清單2-47 Avro描述文件對應Java實體類
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
// favorite color不設置
// 直接使用構造函數
User user2 = new User("Ben", 7, "red");
// 使用builder進行構造
User user3 = User.newBuilder()
.setName("Charlie")
.setFavoriteColor("blue")
.setFavoriteNumber(null)
.build();
代碼清單2-46中的name:User或者name:name、name:favorite_number等,不需要與代碼清單2-47中的名字User類或者方法setName、setFavoriteColor名字一模一樣,隻需一一對應即可。
那麼怎麼進行序列化呢?參考代碼清單2-48,即可把用戶user1、user2、user3序列化到本地磁盤的users.avro文件。
代碼清單2-48 序列化User
// 序列化user1、user2 and user3 到硬盤
DatumWriter<User> userDatumWriter = new SpecificDatumWriter<User>(User.class);
DataFileWriter<User> dataFileWriter = new DataFileWriter<User>(userDatumWriter);
dataFileWriter.create(user1.getSchema(), new File("users.avro"));
dataFileWriter.append(user1);
dataFileWriter.append(user2);
dataFileWriter.append(user3);
dataFileWriter.close();
如何進行反序列化呢?參考代碼清單2-49,即可把序列化後的users.avro文件內容讀取出來了,並且代碼清單2-49中的代碼還把文件內容也打印出來了。
代碼清單2-49 反序列化User
//從磁盤進行反序列化
DatumReader<User> userDatumReader = new SpecificDatumReader<User>(User.class);
DataFileReader<User> dataFileReader = new DataFileReader<User>(file, user-DatumReader);
User user = null;
while (dataFileReader.hasNext()) {
user = dataFileReader.next(user);
System.out.println(user);
}
參考上麵的示例,進行下麵的實驗。
實驗步驟如下:
1)新建Java工程,引入avro-1.7.4.jar、avro-tools-1.7.4.jar(非必需)、jackson-core-asl-1.9.13.jar、jackson-mapper-asl-1.9.13.jar、junit-4.11.jar、hamcrest-core-1.3.jar。
2)參考代碼清單2-46、代碼清單2-47、代碼清單2-48、代碼清單2-49,縮寫對應程序實現,運行程序查看結果。
(2)動手實踐:Hadoop基於Avro的反序列化
這裏增加一點Hadoop Job Counter的知識,Hadoop Job Counter可以在Hadoop Map-Reduce程序運行的過程中定義全局計數器,對一些必要的參數進行統計,通過doc api查看該用法,如圖2-54所示。
在Java代碼中遍曆所有Hadoop MapReduce Counter,可參考代碼清單2-50。
代碼清單2-50 Java代碼獲取Hadoop MapReduce Counter
Counters counter = job.getCounters();
Iterator<CounterGroup> icg= counter.iterator();
while(icg.hasNext()){
System.out.println(icg.next());
CounterGroup counterGroup = icg.next();
System.out.println(counterGroup.getName());
Iterator<org.apache.hadoop.mapreduce.Counter>counters = counterGroup.iterator();
while(counters.hasNext()){
Counter c = counters.next();
System.out.println(c.getName()+","+c.getValue());
}
}
實驗步驟如下:
1)拷貝avro-mapred-1.7.4-hadoop2.jar到Hadoop集群lib目錄,上傳hadoop/data/mann.avro數據到HDFS。
2)設置讀取Avro文件的FileInputFormat為AvroKeyInputFormat。
3)參考示例程序2.5_004_avro_mr,讀懂程序代碼,運行程序,查看結果。
2. Job1:統計單個文件某個單詞個數
針對2.6.2節分析的Hadoop MapReduce實現TF-IDF的流程中的Job1,分析如下。
驅動程序Driver:隻需要設置Mapper以及Reducer,需要注意這裏的輸入需要使用AvroKeyInputFormat,這裏考慮到編程方便以及效率,輸出使用SequenceFileOutput-Format,如代碼清單2-51所示。
代碼清單2-51 TF-IDF Job1 Driver類示例
// Job1 計算每個文件中單詞個數
Job job1 = Job.getInstance(getConf(), "Word Count per document");
job1.setJarByClass(getClass());
Configuration conf1 = job1.getConfiguration();
FileInputFormat.setInputPaths(job1, in);
out.getFileSystem(conf1).delete(out, true);
FileOutputFormat.setOutputPath(job1, out);
job1.setMapperClass(WordCountPerDocumentMapper.class);
job1.setReducerClass(IntSumReducer.class);
job1.setInputFormatClass(AvroKeyInputFormat.class);
job1.setOutputFormatClass(SequenceFileOutputFormat.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
int ret = job1.waitForCompletion(true) ? 0 : -1;
Mapper要做的工作隻是讀取Avro數據,然後針對數據分隔各個單詞(注意這裏有些單詞是不需要進行統計的,可以直接忽略)。Mapper的功能描述如下:
1)讀取Avro格式數據,獲取文件名和文件內容(類似Java單機程序),如代碼清單2-52所示。
代碼清單2-52 讀取Avro數據示例
@Override
protected void map(AvroKey<GenericRecord> key, NullWritable value,
Mapper<AvroKey<GenericRecord>, NullWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
String name = key.datum().get(Utils.FIELD_FILENAME).toString();
ByteBuffer contentsByte = (ByteBuffer) key.datum().get(Utils.FIELD_CONTENTS);
String contents = new String(contentsByte.array());
…
}
2)分隔文件的內容,這裏需要注意不用統計的單詞,具體單詞如代碼清單2-53所示。
代碼清單2-53 需要忽略的單詞
private static Set<String> STOPWORDS;
static {
STOPWORDS = new HashSet<String>() {
{
add("I");
add("a");
add("about");
add("an");
add("are");
add("as");
add("at");
add("be");
add("by");
add("com");
add("de");
add("en");
add("for");
add("from");
add("how");
add("in");
add("is");
add("it");
add("la");
add("of");
add("on");
add("or");
add("that");
add("the");
add("this");
add("to");
add("was");
add("what");
add("when");
add("where");
add("who");
add("will");
add("with");
add("and");
add("the");
add("www");
}
};
分隔采用Match類正則進行分隔,如代碼清單2-54所示。
代碼清單2-54 Match類分隔文本內容到單詞
//定義Pattern
private static final Pattern WORD_PATTERN = Pattern.compile("\\w+");
// map函數
while (matcher.find()) {
StringBuilder valueBuilder = new StringBuilder();
String matchedKey = matcher.group().toLowerCase();
if (!Character.isLetter(matchedKey.charAt(0)) ||
Character.isDigit(matchedKey.charAt(0))
|| STOPWORDS.contains(matchedKey) ||
matchedKey.contains(UNDERSCORE)) {
continue;
}
…
}
3)隻須輸出單詞、文件名和計數1即可,如代碼清單2-55所示。
代碼清單2-55 TF-IDF Job1 Mapper類輸出示例
valueBuilder.append(matchedKey);
valueBuilder.append(SEPARATOR);
valueBuilder.append(name);
fileWord.set(valueBuilder.toString());
// <key,value> -> <word|file , 1>
context.write(fileWord, one);
Reducer類直接采用Hadoop內部類IntSumReducer即可,即把相同的key的所有value值全部加起來,其輸入輸出描述如表2-12所示。
表2-12 TF-IDF Job1 Reducer輸入輸出描述
// Reducer
// in: <key,value> -> <word|file, [1,1,1,1,…]>
// out: <key,value> -> <word|file, 1+1+…+1>
-
Job2:統計某個文件所有單詞個數
Job2的Driver驅動程序是統計某個文件的所有單詞個數,輸入是Job1的輸出,所以輸入格式為SequenceFileInputFormat,輸出格式也設成SequenceFileOutputFormat,方便Job3的讀取,其設置參考代碼清單2-56。代碼清單2-56 Job2 Driver驅動類示例代碼 Job job2 = Job.getInstance(getConf(), "DocumentWordCount"); job2.setJarByClass(getClass()); Configuration conf2 = job2.getConfiguration(); FileInputFormat.setInputPaths(job2, in); out.getFileSystem(conf2).delete(out, true); FileOutputFormat.setOutputPath(job2, out); job2.setMapperClass(DocumentWordCountMapper.class); job2.setReducerClass(DocumentWordCountReducer.class); job2.setInputFormatClass(SequenceFileInputFormat.class); job2.setOutputFormatClass(SequenceFileOutputFormat.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); ret = job2.waitForCompletion(true) ? 0 : -1;
Mapper類隻需把Job1的輸出的鍵值對進行重構即可,這裏即可以利用MapReduce按照key進行分組的特性,輸出<文件名,文件中的單詞|文件中單詞的個數>這樣的鍵值對,如代碼清單2-57所示。
```javascript
代碼清單2-57 Job2 Mapper map函數示例代碼
public void map(Text key, IntWritable value, Context context) throws IOException, InterruptedException {
int wordAndDocCounter = value.get();
// wordAndDoc = word|filename
String[] wordAndDoc = StringUtils.split(key.toString(), SEPARATOR);
outKey.set(wordAndDoc[1]);
outValue.set(wordAndDoc[0] + SEPARATOR + wordAndDocCounter);
// <key,value> -> <filename, word| wordCount>
context.write(outKey, outValue);
}
在Reucer中利用分組的特性(每個鍵值對按照鍵進行分組,所以會得到每個文件的所有單詞作為一個列表),統計每個文件的所有單詞個數,如代碼清單2-58所示。
代碼清單2-58 Job2 Reducer reduce函數示例代碼
// <filename, [word| wordCount, word|wordCount, ...]>
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int sumOfWordsInDocument = 0;
Map<String, Integer> tempCounter = new HashMap<String, Integer>();
for (Text val : values) {
// wordCounter = word| wordCount
String[] wordCounter = StringUtils.split(val.toString(), SEPARATOR);
tempCounter.put(wordCounter[0], Integer.valueOf(wordCounter[1]));
sumOfWordsInDocument += Integer.parseInt(wordCounter[1]);
}
for (String wordKey : tempCounter.keySet()) {
outKey.set(wordKey + SEPARATOR + key.toString());
outValue.set(tempCounter.get(wordKey) + SEPARATOR + sumOfWordsInDocument);
// <key,value> -> <word|filename , wordCount|sumOfWordsInDoc>
context.write(outKey, outValue);
}
}
-
Job3:計算單個文件某個單詞的TF-IDF
Job3綜合前麵兩個的輸出結構,得到最終每個文件每個單詞的TF-IDF值。Driver需要配置輸入輸出以及格式,這裏注意需要把Job1統計的總文件個數傳入Job3中,這裏為了便於觀察,輸出格式使用默認值TextFileOutputFormat,其示例代碼如代碼清單2-59所示。代碼清單2-59 Job3 Driver驅動類示例代碼 Job job3 = Job.getInstance(getConf(), "DocumentCountAndTfIdf"); job3.setJarByClass(getClass()); Configuration conf3 = job3.getConfiguration(); FileInputFormat.setInputPaths(job3, in); out.getFileSystem(conf3).delete(out, true); FileOutputFormat.setOutputPath(job3, out); conf3.setInt("totalDocs", (int) totalDocs); job3.setMapperClass(TermDocumentCountMapper.class); job3.setReducerClass(TfIdfReducer.class); job3.setInputFormatClass(SequenceFileInputFormat.class); job3.setOutputFormatClass(SequenceFileOutputFormat.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(Text.class); ret = job3.waitForCompletion(true) ? 0 : -1;
Mapper類根據Job2的輸入進行重構,再次使用word作為key,使用filename、word-Count、sumOfWordsInDoc作為value,如代碼清單2-60所示。
```javascript
代碼清單2-60 Job3 Mapper類map函數示例代碼
// <key,value> -> <word|filename , wordCount|sumOfWordsInDoc>
public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
// worddAndDoc = word|filename
String[] wordAndDoc = StringUtils.split(key.toString(), SEPARATOR);
outKey.set(wordAndDoc[0]);
outValue.set(wordAndDoc[1] + DOC_SEPARATOR + value.toString());
// <key,value> -> <word,filename=wordCount|sumOfWordsInDoc>
context.write(outKey, outValue);
}
Reducer根據Mapper的輸出,同時利用相同的key聚合的特性,即可統計出每個單詞在多少個文件中存在;在所有需要的參數計算完成後,即可利用TF-IDF的公式進行最後的計算,如代碼清單2-61所示。
代碼清單2-61 Job3 Reducer類reduce函數示例代碼
// <key,value> -> <word, [filename=wordCount|sumOfWordsInDoc,
// filename=wordCount|sumOfWordsInDoc,...]>
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int totalDocs = context.getConfiguration().getInt("totalDocs", 0);
int totalDocsForWord = 0;
Map<String, String> tempFrequencies = new HashMap<String, String>();
for (Text value : values) {
// documentAndFrequencies = filename, wordCount|sumOfWordsInDoc
String[] documentAndFrequencies = StringUtils.split(value.toString(), DOC_SEPARATOR);
totalDocsForWord++;// the number of files which contains word
// tempFrequencies = (filename,wordCount|sumOfWordsInDoc)
tempFrequencies.put(documentAndFrequencies[0], documentAndFrequencies[1]);
}
for (String document : tempFrequencies.keySet()) {
// wordFrequencyAndTotalWords = wordCount,sumOfWordsInDoc
String[] wordFrequencyAndTotalWords = StringUtils.split(tempFrequencies.get(document), SEPARATOR);
// TF = wordCount / sumOfWordsInDoc
double tf = Double.valueOf(wordFrequencyAndTotalWords[0]) / Double.valueOf(wordFrequencyAndTotalWords[1]);
// IDF
double idf = (double) totalDocs / totalDocsForWord;
double tfIdf = tf * Math.log10(idf);
outKey.set(key + SEPARATOR + document);
outValue.set(DF.format(tfIdf));
// <key,value> -> <word|filename , tfIdf>
context.write(outKey, outValue);
}
}
(1)動手實踐:Hadoop實現TF-IDF算法
理解上麵Hadoop MapReduce框架實現TF-IDF算法的原理,結合部分示例代碼,完成該動手實踐。
實驗步驟如下:
1)參考“動手實踐:Hadoop基於Avro的反序列化”內容,建立程序開發環境(主要是Avro相關開發包);
2)參考工程2.5_005_tf-idf示例代碼,結合前麵的分析,理解代碼功能;
3)修複工程功能(TODO提示),運行程序;
4)查看輸出,對結果進行解釋。
(2)思考
請讀者思考,針對Hadoop MapReduce實現TF-IDF算法是否還有優化的空間?如果有優化的空間,怎麼做呢?可以考慮下麵幾點:
1)是否可以縮減Job的個數?(提示:輸出多目錄、自定義鍵值對)
2)如果使用自定義鍵值對技術,應該如何修改程序?
最後更新:2017-06-26 11:01:59