閱讀805 返回首頁    go 阿裏雲


Aggregator機製介紹__圖模型_大數據計算服務-阿裏雲

Aggregator是ODPS-GRAPH作業中常用的feature之一,特別是解決機器學習問題時。ODPS-GRAPH中Aggregator用於匯總並處理全局信息。本文將詳細介紹的Aggregator的執行機製、相關API,並以Kmeans Clustering為例子說明Aggregator的具體用法。

Aggregator機製

Aggregator的邏輯分兩部分,一部分在所有Worker上執行,即分布式執行,另一部分隻在AggregatorOwner所在Worker上執行,即單點。其中在所有Worker上執行的操作包括創建初始值及局部聚合,然後將局部聚合結果發送給AggregatorOwner所在Worker上。AggregatorOwner所在Worker上聚合普通Worker發送過來的局部聚合對象,得到全局聚合結果,然後判斷迭代是否結束。全局聚合的結果會在下一輪超步分發給所有Worker,供下一輪迭代使用。 如下圖所示 :

Aggregator的API

Aggregator共提供了五個API供用戶實現。下麵逐個介紹5個API的調用時機及常規用途。

  1. createStartupValue(context)
    該API在所有Worker上執行一次,調用時機是所有超步開始之前,通常用以初始化AggregatorValue。在第0輪超步中,調用WorkerContext.getLastAggregatedValue() 或ComputeContext.getLastAggregatedValue()可以獲取該API初始化的AggregatorValue對象。

  2. createInitialValue(context)
    該API在所有Worker上每輪超步開始時調用一次,用以初始化本輪迭代所用的AggregatorValue。通常操作是通過WorkerContext.getLastAggregatedValue() 得到上一輪迭代的結果,然後執行部分初始化操作。

  3. aggregate(value, item)
    該API同樣在所有Worker上執行,與上述API不同的是,該API由用戶顯示調用ComputeContext#aggregate(item)來觸發,而上述兩個API,則由框架自動調用。該API用以執行局部聚合操作,其中第一個參數value是本Worker在該輪超步已經聚合的結果(初始值是createInitialValue返回的對象),第二個參數是用戶代碼調用ComputeContext#aggregate(item)傳入的參數。該API中通常用item來更新value實現聚合。所有aggregate執行完後,得到的value就是該Worker的局部聚合結果,然後由框架發送給AggregatorOwner所在的Worker。

  4. merge(value, partial)
    該API執行於AggregatorOwner所在Worker,用以合並各Worker局部聚合的結果,達到全局聚合對象。與aggregate類似,value是已經聚合的結果,而partial待聚合的對象,同樣用partial更新value。
    假定有3個worker,分別是w0、w1、w2,其局部聚合結果是p0、p1、p2。假定發送到AggregatorOwner所在Worker的順序為p1、p0、p2。那麼merge執行次序為,首先執行merge(p1, p0),這樣p1和p0就聚合為p1',然後執行merge(p1', p2),p1'和p2聚合為p1'',而p1''即為本輪超步全局聚合的結果。
    從上述示例可以看出,當隻有一個worker時,不需要執行merge方法,也就是說merge()不會被調用。

  5. terminate(context, value)
    當AggregatorOwner所在Worker執行完merge()後,框架會調用terminate(context, value)執行最後的處理。其中第二個參數value,即為merge()最後得到全局聚合,在該方法中可以對全局聚合繼續修改。執行完terminate()後,框架會將全局聚合對象分發給所有Worker,供下一輪超步使用。
    terminate()方法的一個特殊之處在於,如果返回true,則整個作業就結束迭代,否則繼續執行。在機器學習場景中,通常判斷收斂後返回true以結束作業。

Kmeans Clustering示例

下麵以典型的KmeansClustering作為示例,來看下Aggregator具體用法。附件有完整代碼,這裏我們逐個部分解析代碼。

  1. GraphLoader部分
    GraphLoader部分用以加載輸入表,並轉換為圖的點或邊。這裏我們輸入表的每行數據為一個樣本,一個樣本構造一個點,並用Vertex的value來存放樣本。
    我們首先定義一個Writable類KmeansValue作為Vertex的value類型:

    public static class KmeansValue implements Writable {
    
     DenseVector sample;
    
     public KmeansValue() { 
     }
    
     public KmeansValue(DenseVector v) {
       this.sample = v;
     }
    
     @Override
     public void write(DataOutput out) throws IOException {
       wirteForDenseVector(out, sample);
    
     }
    
     @Override
     public void readFields(DataInput in) throws IOException {
       sample = readFieldsForDenseVector(in);
     }
    }
    

    KmeansValue中封裝一個DenseVector對象來存放一個樣本,這裏DenseVector類型來自matrix-toolkits-java,而wirteForDenseVector()及readFieldsForDenseVector()用以實現序列化及反序列化,可參見附件中的完整代碼。
    我們自定義的KmeansReader代碼如下:

    public static class KmeansReader extends 
     GraphLoader<LongWritable, KmeansValue, NullWritable, NullWritable> {
    
     @Override
     public void load(
         LongWritable recordNum,
         WritableRecord record,
         MutationContext<LongWritable, KmeansValue, NullWritable, NullWritable> context)
         throws IOException {
       KmeansVertex v = new KmeansVertex();
       v.setId(recordNum);
    
       int n = record.size();
       DenseVector dv = new DenseVector(n);
       for (int i = 0; i < n; i++) {
         dv.set(i, ((DoubleWritable)record.get(i)).get());
       }
       v.setValue(new KmeansValue(dv));
    
       context.addVertexRequest(v);
     }
    }
    

    KmeansReader中,每讀入一行數據(一個Record)創建一個點,這裏用recordNum作為點的ID,將record內容轉換成DenseVector對象並封裝進VertexValue中。

  2. Vertex部分
    自定義的KmeansVertex代碼如下。邏輯非常簡單,每輪迭代要做的事情就是將自己維護的樣本執行局部聚合。具體邏輯參見下麵Aggregator的實現。

    public static class KmeansVertex extends
     Vertex<LongWritable, KmeansValue, NullWritable, NullWritable> {
    
     @Override
     public void compute(
         ComputeContext<LongWritable, KmeansValue, NullWritable, NullWritable> context,
         Iterable<NullWritable> messages) throws IOException {
       context.aggregate(getValue());
     }
    }
    
  3. Aggregator部分
    整個Kmeans的主要邏輯集中在Aggregator中。首先是自定義的KmeansAggrValue,用以維護要聚合及分發的內容。

    public static class KmeansAggrValue implements Writable {
    
     DenseMatrix centroids;
     DenseMatrix sums; // used to recalculate new centroids
     DenseVector counts; // used to recalculate new centroids
    
     @Override
     public void write(DataOutput out) throws IOException {
       wirteForDenseDenseMatrix(out, centroids);
       wirteForDenseDenseMatrix(out, sums);
       wirteForDenseVector(out, counts);
     }
    
     @Override
     public void readFields(DataInput in) throws IOException {
       centroids = readFieldsForDenseMatrix(in);
       sums = readFieldsForDenseMatrix(in);
       counts = readFieldsForDenseVector(in);
     }
    }
    

    KmeansAggrValue中維護了三個對象,其中centroids是當前的K個中心點,假定樣本是m維的話,centroids就是一個K*m的矩陣。sums是和centroids大小一樣的矩陣,每個元素記錄了到特定中心點最近的樣本特定維之和,例如sums(i,j)是到第i個中心點最近的樣本的第j維度之和。
    counts是個K維的向量,記錄到每個中心點距離最短的樣本個數。sums和counts一起用以計算新的中心點,也是要聚合的主要內容。 接下來是自定義的Aggregator實現類KmeansAggregator,我們按照上述API的順序逐個看其實現。
    首先是createStartupValue()。

    public static class KmeansAggregator extends Aggregator<KmeansAggrValue> {
    
     public KmeansAggrValue createStartupValue(WorkerContext context) throws IOException {
       KmeansAggrValue av = new KmeansAggrValue();
    
       byte[] centers = context.readCacheFile("centers");
       String lines[] = new String(centers).split("n");
    
       int rows = lines.length;
       int cols = lines[0].split(",").length; // assumption rows >= 1 
    
       av.centroids = new DenseMatrix(rows, cols);
       av.sums = new DenseMatrix(rows, cols);
       av.sums.zero();
       av.counts = new DenseVector(rows);
       av.counts.zero();
    
       for (int i = 0; i < lines.length; i++) {
         String[] ss = lines[i].split(",");
         for (int j = 0; j < ss.length; j++) {
           av.centroids.set(i, j, Double.valueOf(ss[j]));
         }
       }
       return av;
     }
    

    我們在該方法中初始化一個KmeansAggrValue對象,然後從資源文件centers中讀取初始中心點,並賦值給centroids。而sums和counts初始化為0。
    接來下是createInitialValue()的實現:

    @Override
     public void aggregate(KmeansAggrValue value, Object item)
         throws IOException {
       DenseVector sample = ((KmeansValue)item).sample;
    
       // find the nearest centroid
       int min = findNearestCentroid(value.centroids, sample);
    
       // update sum and count
       for (int i = 0; i < sample.size(); i ++) {
         value.sums.add(min, i, sample.get(i));
       }
       value.counts.add(min, 1.0d);
     }
    

    該方法中調用findNearestCentroid()(實現見附件)找到樣本item歐拉距離最近的中心點索引,然後將其各個維度加到sums上,最後counts計數加1。
    以上三個方法執行於所有worker上,實現局部聚合。接下來看下在AggregatorOwner所在Worker執行的全局聚合相關操作。
    首先是merge的實現:

    @Override
     public void merge(KmeansAggrValue value, KmeansAggrValue partial)
         throws IOException {
       value.sums.add(partial.sums);
       value.counts.add(partial.counts);
     }
    

    merge的實現邏輯很簡單,就是把各個worker聚合出的sums和counts相加即可。
    最後是terminate()的實現:

    @Override
     public boolean terminate(WorkerContext context, KmeansAggrValue value)
         throws IOException {
       // Calculate the new means to be the centroids (original sums)
       DenseMatrix newCentriods = calculateNewCentroids(value.sums, value.counts, value.centroids);
    
       // print old centroids and new centroids for debugging
       System.out.println("nsuperstep: " + context.getSuperstep() + 
           "nold centriod:n" + value.centroids + " new centriod:n" + newCentriods);
    
       boolean converged = isConverged(newCentriods, value.centroids, 0.05d);
       System.out.println("superstep: " + context.getSuperstep() + "/" 
           + (context.getMaxIteration() - 1) + " converged: " + converged);
       if (converged || context.getSuperstep() == context.getMaxIteration() - 1) {
         // converged or reach max iteration, output centriods
         for (int i = 0; i < newCentriods.numRows(); i++) {
           Writable[] centriod = new Writable[newCentriods.numColumns()];
           for (int j = 0; j < newCentriods.numColumns(); j++) {
             centriod[j] = new DoubleWritable(newCentriods.get(i, j));
           }
           context.write(centriod);
         }
    
         // true means to terminate iteration
         return true;
       }
    
       // update centriods
       value.centroids.set(newCentriods);
       // false means to continue iteration
       return false;
     }
    

    teminate()中首先根據sums和counts調用calculateNewCentroids()求平均計算出新的中心點。然後調用isConverged()根據新老中心點歐拉距離判斷是否已經收斂。如果收斂或迭代次數達到最大數,則將新的中心點輸出並返回true,以結束迭代。否則更新中心點並返回false以繼續迭代。其中calculateNewCentroids()和isConverged()的實現見附件。

  4. main方法
    main方法用以構造GraphJob,然後設置相應配置,並提交作業。代碼如下:

    public static void main(String[] args) throws IOException {
     if (args.length < 2)
       printUsage();
    
     GraphJob job = new GraphJob();
    
     job.setGraphLoaderClass(KmeansReader.class);
     job.setRuntimePartitioning(false);
     job.setVertexClass(KmeansVertex.class);
     job.setAggregatorClass(KmeansAggregator.class);
     job.addInput(TableInfo.builder().tableName(args[0]).build());
     job.addOutput(TableInfo.builder().tableName(args[1]).build());
    
     // default max iteration is 30
     job.setMaxIteration(30);
     if (args.length >= 3)
       job.setMaxIteration(Integer.parseInt(args[2]));
    
     long start = System.currentTimeMillis();
     job.run();
     System.out.println("Job Finished in "
         + (System.currentTimeMillis() - start) / 1000.0 + " seconds");
    }
    

    這裏需要注意的是job.setRuntimePartitioning(false),設置為false後,各個worker加載的數據不再根據Partitioner重新分區,即誰加載的數據誰維護。

總結

本文介紹了ODPS-GRAPH中的Aggregator機製,API含義以及示例Kmeans Clustering。總的來說,Aggregator基本步驟是:

  1. 每個worker啟動時執行createStartupValue用以創建AggregatorValue;
  2. 每輪迭代開始前,每個worker執行createInitialValue來初始化本輪的AggregatorValue;
  3. 一輪迭代中每個點通過context.aggregate()來執行aggregate()實現worker內的局部迭代;
  4. 每個Worker將局部迭代結果發送給AggregatorOwner所在的Worker;
  5. AggregatorOwner所在worker執行多次merge,實現全局聚合;
  6. AggregatorOwner所在Worker執行terminate用以對全局聚合結果做處理並決定是否結束迭代。

附件

Kmeans

最後更新:2016-05-06 10:43:10

  上一篇:go 輸入邊表示例__示例程序_圖模型_大數據計算服務-阿裏雲
  下一篇:go 目標用戶__安全指南_大數據計算服務-阿裏雲