閱讀611 返回首頁    go 魔獸


《Hadoop與大數據挖掘》一2.5.4 Hadoop K-Means編程實現

本節書摘來華章計算機《Hadoop與大數據挖掘》一書中的第2章 ,第2.5.4節,張良均 樊 哲 位文超 劉名軍 許國傑 周 龍 焦正升 著 更多章節內容可以訪問雲棲社區“華章計算機”公眾號查看。

2.5.4 Hadoop K-Means編程實現

在下麵的實現過程中,會進行簡單實現思路介紹,針對一些實現會有動手實踐給讀者練習。一般情況下我們建議讀者自己全部實現,對於實現起來有難度的讀者,我們提供了參考程序,但是需要注意,參考程序不是完整的,裏麵設置了TODO提示,這些地方是需要讀者去完善的。

image


思路1
不管是思路1還是思路2,Hadoop實現K-Means算法都包含4個步驟:①初始化聚類中心向量;②進行聚類並更新聚類中心向量;③判斷是否達到循環條件,如果是則循環;④判斷是否需要對原始數據進行分類,如果是則進行分類操作。下麵就針對這4個步驟分別進行分析。
(1)初始化聚類中心向量:蓄水池抽樣
初始化聚類中心其實和單機算法類似,可以有多種方法,比如隨機取出k個聚類中心向量、直接取出前k個聚類中心向量等。在Hadoop的編程框架MapReduce限製下,如果是隨機取k個聚類中心向量,那麼實現起來就是這樣的:遍曆一次所有數據,統計數據個數n,再次遍曆,按照k/n概率抽取k個數據。這樣不是不可以,但是效率太低,並且如果真要實現起來,還是要考慮多個問題的,比如如果有多個Mapper怎麼處理?
這裏提出一種效率高,並且還能達到隨機取數的算法—蓄水池抽樣。
什麼是蓄水池抽樣呢?簡單描述:先選中第1~k個元素,作為被選中的元素。然後依次對第k+1至第n個元素做如下操作:每個元素都有k/x的概率被選中,然後等概率地(1/k)替換掉被選中的元素(其中x是元素的序號)。其算法偽代碼描述如代碼清單2-33所示。

代碼清單2-33 蓄水池抽樣偽代碼
Init : a reservoir with the size: k
                     for i= k+1 to N
                         M=random(1, i);
                         if( M < k)
                           SWAP the Mth value and ith value
    end for

蓄水池抽樣同樣可以使用Driver、Mapper、Reducer來進行分析。Driver部分可以參考MapReduce程序的固定模式,但是需要注意,需要傳入聚類中心向量的個數,即k值。其代碼參考代碼清單2-34。

代碼清單2-34 蓄水池抽樣Driver示例代碼
public int run(String[] args) throws Exception {
Configuration conf = getConf();
if (args.length != 3){
    System.err.println("Usage: dome.job.SampleJob <in> <out> <selectRecords>");
    System.exit(2);
}
//設置傳入Mapper以及Reducer的參數
conf.setInt(SELECTRECORDS, integer.parseInt(args[2]));
Job job = Job.getInstance(conf, "sample job " + args[0]);
job.setJarByclass(SampleJob.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}

Mapper就是蓄水池抽樣算法的具體實現了,這裏需要注意,map函數針對每條記錄進行篩選,並不輸出,所以這裏在cleanup進行輸出。這樣就需要在setup裏麵初始化一個變量來存儲當前已經被選為聚類中心向量的值。其各個函數描述如下。

  • setup():讀取傳入的參數值selectedRecordsNum,初始化當前處理的行數遍曆row、存儲已經選擇的selectedRecordsNum個數據變量selectedRecords。
  • map():每次map函數讀取一行記錄,判斷當前行數row是否小於selectedRe-cordsNum,如果小於則直接把當前記錄加入selectedRecords;否則,以概率selectedRecordsNum/row使用當前記錄來對selectedRecords中的任一記錄進行替換。其部分代碼如代碼清單2-35所示。
  • cleanup():直接輸出selectedRecords的內容即可。
代碼清單2-35 蓄水池抽樣Mapper map函數示例的代碼
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context)
        throws IOException, InterruptedException {
    row++; // 行數加1;
        if(row<=selectRecordsNum){
            selectRecords[(int) (row-1)]= new Text(value.toString());
            // 前麵k條記錄直接插入
        }else{// 以概率 k/i 決定是否用第i條記錄替換前麵的任意一條記錄
            int p = SampleJob.getRandom((int)row);
            if(p<selectRecordsNum){// 替換
                selectRecords[p]=new Text(value.toString());
            }
        }
    }

在設計Reducer的時候需要考慮的一個問題是,如果有多個Mapper怎麼辦?多個Mapper就會發送k×N個聚類中心向量到Reducer中(其中N為Mapper的個數),所以在Reducer端需要對k×N個記錄再次篩選,選出其中的k個聚類中心向量。這裏當然也有多種方法,其實這裏的選擇和最開始我們在Mapper中針對所有數據隨機選取k條記錄的選擇一樣,這裏所有數據隻是“變”小了而已。因為是在Reducer中處理(一個Reducer可以理解為單機),所以其實也可以理解為單機的隨機選擇k條記錄的算法。這裏隨機選擇k條記錄的算法也可以,不過我們這裏還是選擇使用蓄水池抽樣。
這裏隻能使用一個Reducer,為什麼?請讀者思考。
動手實踐:蓄水池抽樣Hadoop實現
首先理解上麵蓄水池抽樣算法的Hadoop實現的描述及分析,接著新建工程,並參考上節完善工程代碼功能。
實驗步驟:
1)打開Eclipse,新建工程2.5_002_sample;
2)添加相關環境(如JDK路徑、Hadoop路徑等);
3)參考上節蓄水池抽樣Hadoop實現原理實現編寫源代碼;
4)把工程編譯,並導出jar包,然後上傳jar包到master節點上,使用yarn jar的方式運行,查看輸出及相關日誌。
思考:
1)還有其他方式實現蓄水池抽樣嗎?
2)如何查看蓄水池抽樣抽取出來的結果?
(2)更新聚類中心向量
更新聚類中心向量其實就是整個K-Means算法的核心所在,K-Means算法的每次循環其實就是一個不斷更新聚類中心向量的過程。那麼具體怎麼更新呢?我們在單機算法中已經知道怎麼更新了,怎麼把其轉換為Hadoop的MapReduce代碼呢?其實,可以把每個Mapper理解為一個單機算法,因為其處理的數據其實是所有數據的一部分(一個文件塊)。下麵來看具體涉及的Driver、Mapper和Reducer。
針對Driver類,除了一些固定寫法外,還需傳入聚類初始中心向量路徑、聚類中心個數、列分隔符(考慮是否需要?),其示例代碼如代碼清單2-36所示。

代碼清單2-36 更新聚類中心向量Driver示例代碼
    conf.set(SPLITTER, splitter );
    conf.set(CENTERPATH, args[4]);
    conf.setInt(K, k);
    Job job =Job.getInstance(conf,"kmeans center path:"+args[4]+",output"+output);
    job.setJarByClass(KMeansDriver.class);
    job.setMapperClass(KMeansMapper.class);
    job.setReducerClass(KMeansReducer.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);

    job.setNumReduceTasks(1);// 如果有多個會有什麼問題?

Reducer設置多個會有什麼問題?可以設置多個嗎?設置多個有什麼好處?

Mapper的工作主要包括兩個:其一,讀取首次HDFS上的聚類中心;其二,根據聚類中心對每個鍵值對記錄進行距離計算,輸出距離最小的聚類中心ID以及該條鍵值對記錄。下麵針對具體實現做分析。
1)setup():讀取傳入的初始聚類中心向量路徑,根據路徑讀取對應的數據,利用分隔符來對初始聚類中心向量進行初始化(初始化為數組和列表)。
2)map():在map階段根據初始化的聚類中心向量對當前記錄進行分類,輸出其對應的聚類中心id、當前記錄,如代碼清單2-37所示。

代碼清單2-37 更新聚類中心向量Mapper map函數示例代碼
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
        throws IOException, InterruptedException {
    int vecId = getCenterId(value.toString());
    if(!validate(vecId)){
        logger.info("數據異常:{}",value.toString());
        return ;
    }
    ID.set(vecId);
    context.write(ID, value);
    logger.info("ID:{},value:{}",new Object[]{vecId,value});
}

Reducer要做的工作就是針對每個組的所有數據計算其平均值(該平均值就是新的聚類中心向量)。其函數描述如下。
1)reduce():每個reduce函數針對同一個聚類中心id的數據進行處理;具體處理過程為,把每條記錄對應列的值加起來,同時記錄當前的記錄數;接著,使用每列和除以記錄數,即可得到每列平均值,也就是當前聚類中心id新的聚類中心,如代碼清單2-38所示。

代碼清單2-38 更新聚類中心向量Reducer reduce函數示例代碼
@Override
protected void reduce(IntWritable key, Iterable<Text> values,
Reducer<IntWritable, Text, Text, NullWritable>.Context arg2)throws IOException, InterruptedException {
    double[] sum=null;
    long  num =0;
    for(Text value:values){
        String[] valStr = pattern.split(value.toString(), -1);
        if(sum==null){// 初始化
           sum=new double[valStr.length];
           addToSum(sum,valStr);// 第一次需要加上
        }else{
        // 對應字段相加
           addToSum(sum,valStr);
        }
        num++;

    }
    if(num==0){
        centerVec[key.get()]="";
    }
    averageSum(sum,num);
    centerVec[key.get()]= format(sum);
}

3)cleanup():輸出每個類別新的聚類中心。
動手實踐:Hadoop實現更新聚類中心向量
實驗步驟如下:
1)打開Eclipse,打開上一小節完成的工程;
2)根據上節Hadoop實現更新聚類中心實現思路,編寫對應源代碼;
3)把工程編譯並導出Jar包,然後上傳Jar包到master節點上,使用yarn jar的方式運行,查看輸出及相關日誌。
思考:如何測試代碼?
(3)是否循環
是否循環其實就是檢查前後兩次聚類中心向量是否滿足給定閾值。這裏使用的是方差,其描述如圖2-51所示。


image


還需要注意的問題是,如果不滿足delta閾值,那麼再次循環需初始化對應參數,主要包括下一個MapReduce程序的輸入聚類中心向量及輸出路徑等。
動手實踐:Hadoop實現更新聚類中心向量循環
實驗步驟如下:
1)打開Eclipse,打開上一小節完成的工程;
2)參考上述描述完成對應的代碼;
3)編譯工程並導出jar包,然後上傳jar包到master節點上,使用yarn jar的方式運行,查看輸出及相關日誌。
(4)是否分類
分類是針對原始數據進行的,這個工作其實在更新聚類中心向量的Mapper已經做了這個工作,所以分類可以參考前麵的Mapper。這裏不給出其具體代碼,讀者隻需要完成動手實踐即可(分類動手實踐)。
動手實踐:Hadoop實現最終分類
實驗步驟如下:
1)打開Eclipse,並打開已經完成的工程;
2)使用KMeansMapper的實現,編輯Driver主類,分類原始數據;
3)編譯工程,並導出jar包,然後上傳jar包到master節點上,使用hadoop jar的方式運行,查看輸出及相關日誌。
思路2
思路2其實和思路1裏麵的大部分步驟都是一樣的邏輯流程,隻是在更新聚類中心向量環節做了優化。下麵隻針對優化的環節做分析,其他部分請讀者參考思路1。
(1)更新聚類中心向量
更新聚類中心向量的Driver部分直接參考思路1對應內容即可,這裏直接分析其Mapper實現。結合前麵內容,我們知道這裏需要實現自定義值類型。
由於Mapper輸出的類型包含列和、個數,所以這裏可以自定義一個值類型,該值類型需包含一個double的數組,用於存儲某個類別的所有列和;一個long變量,用於存儲當前類別的數據個數,如代碼清單2-39所示。

代碼清單2-39 更新聚類中心向量Mapper輸出值自定義類型示例代碼1
public class SumNumWritable implements Writable {

private long num;
private double[] sum;
…
}

同時,需要覆寫readFields、write函數,在這裏針對數組類型還需要做些額外的處理。其處理過程為存儲數組的長度,在實例化類的時候傳入數組的長度,否則會報NullPointer的異常,如代碼清單2-40所示。

代碼清單2-40 更新聚類中心向量Mapper輸出值自定義類型示例代碼2
@Override
public void readFields(DataInput in) throws IOException {
    this.num = in.readLong();   // 先讀個數
    int size = in.readInt();    // 再讀sum數組長度
    sum = new double[size];
    for (int i = 0; i < size; i++) {
         sum[i] = in.readDouble();
    }
}
@Override
public void write(DataOutput out) throws IOException {
    out.writeLong(this.num);    // 先寫入個數
    out.writeInt(sum.length);   // 接著寫入sum數組的長度;
    for (double d : sum) {
         out.writeDouble(d);    // 依次寫入數組的值
    }
}

寫入或者讀取時,注意順序,順序重要嗎?如果亂序會有什麼影響?請讀者思考。
下麵針對Mapper進行分析。
setup():在setup函數中,除了需要參考思路1把初始聚類中心讀取出來外,還需要初始化“列和”;由於每個類別都有一個“列和”,所以可以定義一個“列和”數組;然後根據聚類中心數來初始化該“列和”數組;同時,根據初始聚類中心的列個數類初始化每個類別的“列和”的double數組,如代碼清單```javascript
2-41所示。
代碼清單2-41 更新聚類中心向量Mapper的setup函數示例
private SumNumWritable[] sumNums = null;
@Override
protected void setup(Mapper.Context context)
throws IOException, InterruptedException {
centerPathStr = context.getConfiguration().get(MainDriver.CENTERPATH);
splitter = context.getConfiguration().get(MainDriver.SPLITTER);
pattern = Pattern.compile(splitter);
k = context.getConfiguration().getInt(MainDriver.K, 0);
centerVec = new String[k];
sumNums = new SumNumWritable[k];
// 讀取數據
Path path = new Path(centerPathStr);
FileSystem fs = FileSystem.get(context.getConfiguration());
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(path)));
try {
String line;
int index =0;
while ((line =br.readLine())!= null){
logger.info("center "+index+" vector:{}",line);
centerVec[index++]=line;
}
} finally {
br.close();
}
// 初始化 sumNums
colSize = pattern.split(centerVec[0]).length;
for(int i=0;i<k;i++){
sumNums[i] = new SumNumWritable(colSize);
}
logger.info("colSize:{}",colSize);
}

SumNumWritable構造函數如代碼清單2-42所示。
```javascript
代碼清單2-42 更新聚類中心向量Mapper輸出自定義值類型構造函數
public SumNumWritable(int size) {
this.sum = new double [size];
this.num =0;
}

  • map():在map函數中在得到當前記錄的類別後(可以參考思路1的做法),需要根據此類別去更新該類別的“列和”以及個數,如代碼清單2-43所示。

代碼清單2-43 更新聚類中心向量Mapper的map函數示例
/

  • 更新列和以及個數
  • @param sumNumWritable 某個類別的“列和”
  • @param valArr 當前記錄 / private void updateSumNum(SumNumWritable sumNumWritable, double[] valArr) { if(sumNumWritable==null) return ; sumNumWritable.setNum(sumNumWritable.getNum()+1); addSum(sumNumWritable.getSum(),valArr); // 這裏不用setSum() }*
    • cleanup():在cleanup中隻需要輸出“列和”數組即可,如代碼清單2-44所示。
代碼清單2-44 更新聚類中心向量Mapper的cleanup函數示例
/**
 * 輸出
 */
@Override
protected void cleanup(Context context)
          throws IOException, InterruptedException {
    int index =0;
    for(SumNumWritable sn:sumNums){
        ID.set(index++);
        context.write(ID, sn);
    }
}

Reducer隻需要整合各個Mapper的輸出記錄,針對每個記錄分別求“列和”、個數和,然後再求平均即可得到新的聚類中心向量和。各個函數描述如下。
setup():隻需讀取分隔符參數,並進行初始化即可(在reduce函數中需要使用此參數)。
reduce():在reduce中直接使用for循環讀取每個類別的“列和”以及個數,分別相加即可得到每個類別的最終“列和”以及個數,然後求平均即可得到更新後的聚類中心向量,如代碼清單2-45所示。

代碼清單2-45 更新聚類中心向量Reducer reduce示例代碼
@Override
protected void reduce(IntWritable key, Iterable<SumNumWritable> values,
        Context context) throws IOException, InterruptedException {
    double[] sum=null;
    long  num =0;
    for(SumNumWritable value:values){
        if(sum==null){  // 第一次需要初始化
            sum = new double[value.getSum().length];
        }
        addToSum(sum,value.getSum());
        num+=value.getNum();
    }
    if(num==0){
        vec.set("");
        log.info("id:{}類別沒有數據!",key.get());
    }else{
        averageSum(sum,num);
        vec.set(format(sum));
log.info("id:{},聚類中心是:[{}]",new Object[]{key.get(),vec.toString ()});
    }
    context.write(vec, NullWritable.get()); //寫入的順序有影響嗎?如果順序寫入呢?
}

(2)動手實踐:Hadoop實現K-Means算法思路2
請讀者參考思路1的動手實踐,編寫K-Means算法思路2的Hadoop實現。

最後更新:2017-06-26 11:01:50

  上一篇:go  《Hadoop與大數據挖掘》一2.6 TF-IDF算法原理及Hadoop MapReduce實現
  下一篇:go  2017網絡安全(中國)論壇將於8月11日在上海召開