閱讀928 返回首頁    go 阿裏雲 go 技術社區[雲棲]


連通分量__示例程序_圖模型_大數據計算服務-阿裏雲

兩個頂點之間存在路徑,稱兩個頂點為連通的。如果無向圖G中任意兩個頂點都是連通的,則稱G為連通圖,否則稱為非連通圖, 其頂點個數極大的連通子圖稱為連通分量。本算法計算每個點的連通分量成員,最後輸出頂點值中包含最小頂點id的連通分量。 將最小頂點id沿著邊傳播到連通分量的所有頂點。

源代碼

import java.io.IOException;

import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.graph.examples.SSSP.MinLongCombiner;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.WritableRecord;

/**
 * Compute the connected component membership of each vertex and output
 * each vertex which's value containing the smallest id in the connected
 * component containing that vertex.
 *
 * Algorithm: propagate the smallest vertex id along the edges to all
 * vertices of a connected component.
 *
 */
public class ConnectedComponents {

  public static class CCVertex extends
    Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {

    @Override
    public void compute(
        ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
        Iterable<LongWritable> msgs) throws IOException {

      if (context.getSuperstep() == 0L) {
        this.setValue(getId());
        context.sendMessageToNeighbors(this, getValue());
        return;
      }

      long minID = Long.MAX_VALUE;
      for (LongWritable id : msgs) {
        if (id.get() < minID) {
          minID = id.get();
        }
      }

      if (minID < this.getValue().get()) {
        this.setValue(new LongWritable(minID));
        context.sendMessageToNeighbors(this, getValue());
      } else {
        this.voteToHalt();
      }
    }

    /**
     * Output Table Description:
     * +-----------------+----------------------------------------+
     * | Field | Type    | Comment                                |
     * +-----------------+----------------------------------------+
     * | v     | bigint  | vertex id                              |
     * | minID | bigint  | smallest id in the connected component |
     * +-----------------+----------------------------------------+
     */
    @Override
    public void cleanup(
        WorkerContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
        throws IOException {
      context.write(getId(), getValue());
    }
  }


  /**
   * Input Table Description:
   * +-----------------+----------------------------------------------------+
   * | Field | Type    | Comment                                            |
   * +-----------------+----------------------------------------------------+
   * | v     | bigint  | vertex id                                          |
   * | es    | string  | comma separated target vertex id of outgoing edges |
   * +-----------------+----------------------------------------------------+
   *
   * Example:
   * For graph:
   *       1 ----- 2
   *       |       |
   *       3 ----- 4
   * Input table:
   * +-----------+
   * | v  | es   |
   * +-----------+
   * | 1  | 2,3  |
   * | 2  | 1,4  |
   * | 3  | 1,4  |
   * | 4  | 2,3  |
   * +-----------+
   */
  public static class CCVertexReader extends
    GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {

    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
    throws IOException {
      CCVertex vertex = new CCVertex();

      vertex.setId((LongWritable) record.get(0));

      String[] edges = record.get(1).toString().split(",");
      for (int i = 0; i < edges.length; i++) {
        long destID = Long.parseLong(edges[i]);
        vertex.addEdge(new LongWritable(destID), NullWritable.get());
      }

      context.addVertexRequest(vertex);
    }

  }

  public static void main(String[] args) throws IOException {
    if (args.length < 2) {
      System.out.println("Usage: <input> <output>");
      System.exit(-1);
    }

    GraphJob job = new GraphJob();
    job.setGraphLoaderClass(CCVertexReader.class);
    job.setVertexClass(CCVertex.class);
    job.setCombinerClass(MinLongCombiner.class);

    job.addInput(TableInfo.builder().tableName(args[0]).build());
    job.addOutput(TableInfo.builder().tableName(args[1]).build());
    long startTime = System.currentTimeMillis();
    job.run();
    System.out.println("Job Finished in "
        + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  }
}

最後更新:2016-05-06 10:43:09

  上一篇:go 強連通分量__示例程序_圖模型_大數據計算服務-阿裏雲
  下一篇:go 拓撲排序__示例程序_圖模型_大數據計算服務-阿裏雲