閱讀576 返回首頁    go 外匯


輸入點表示例__示例程序_圖模型_大數據計算服務-阿裏雲

import java.io.IOException;

import com.aliyun.odps.conf.Configuration;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.VertexResolver;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.VertexChanges;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.WritableComparable;
import com.aliyun.odps.io.WritableRecord;

/**
 * 本示例是用於展示,對於不同類型的數據類型,如何編寫圖作業程序載入數據。主要展示GraphLoader和
 * VertexResolver的配合完成圖的構建。
 *
 * ODPS Graph的作業輸入都為ODPS的Table,假設作業輸入有兩張表,一張存儲點的信息,一張存儲邊的信息。
 * 存儲點信息的表的格式,如:
 * +------------------------+
 * | VertexID | VertexValue |
 * +------------------------+
 * |       id0|            9|
 * +------------------------+
 * |       id1|            7|
 * +------------------------+
 * |       id2|            8|
 * +------------------------+
 *
 * 存儲邊信息的表的格式,如
 * +-----------------------------------+
 * | VertexID | DestVertexID| EdgeValue|
 * +-----------------------------------+
 * |       id0|          id1|         1|
 * +-----------------------------------+
 * |       id0|          id2|         2|
 * +-----------------------------------+
 * |       id2|          id1|         3|
 * +-----------------------------------+
 *
 * 結合兩張表的數據,表示id0有兩條出邊,分別指向id1和id2;id2有一條出邊,指向id1;id1沒有出邊。
 *
 * 對於此種類型的數據,在GraphLoader::load(LongWritable, Record, MutationContext)
 * ,可以使用 MutationContext#addVertexRequest(Vertex)向圖中請求添加點,使用
 * link MutationContext#addEdgeRequest(WritableComparable, Edge)向圖中請求添加邊,然後,在
 * link VertexResolver#resolve(WritableComparable, Vertex, VertexChanges, boolean)
 * 中,將load 方法中添加的點和邊,合並到一個Vertex對象中,作為返回值,添加到最後參與計算的圖中。
 *
 **/
public class VertexInputFormat {

  private final static String EDGE_TABLE = "edge.table";

  /**
   * 將Record解釋為Vertex和Edge,每個Record根據其來源,表示一個Vertex或者一條Edge。
   *
   * 類似於com.aliyun.odps.mapreduce.Mapper#map
   * ,輸入Record,生成鍵值對,此處的鍵是Vertex的ID,
   * 值是Vertex或Edge,通過上下文Context寫出,這些鍵值對會在LoadingVertexResolver出根據Vertex的ID匯總。
   *
   * 注意:此處添加的點或邊隻是根據Record內容發出的請求,並不是最後參與計算的點或邊,隻有在隨後的VertexResolver
   * 中添加的點或邊才參與計算。
   **/
  public static class VertexInputLoader extends
      GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {

    private boolean isEdgeData;

    /**
     * 配置VertexInputLoader。
     *
     * @param conf
     *          作業的配置參數,在main中使用GraphJob配置的,或者在console中set的
     * @param workerId
     *          當前工作的worker的序號,從0開始,可以用於構造唯一的vertex id
     * @param inputTableInfo
     *          當前worker載入的輸入表信息,可以用於確定當前輸入是哪種類型的數據,即Record的格式
     **/
    @Override
    public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) {
      isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName());
    }

    /**
     * 根據Record中的內容,解析為對應的邊,並請求添加到圖中。
     *
     * @param recordNum
     *          記錄序列號,從1開始,每個worker上單獨計數
     * @param record
     *          輸入表中的記錄,三列,分別表示初點、終點、邊的權重
     * @param context
     *          上下文,請求將解釋後的邊添加到圖中
     **/
    @Override
    public void load(
        LongWritable recordNum,
        WritableRecord record,
        MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
        throws IOException {
      if (isEdgeData) {
        /**
         * 數據來源於存儲邊信息的表。
         *
         * 1、第一列表示初始點的ID
         **/
        LongWritable sourceVertexID = (LongWritable) record.get(0);

        /**
         * 2、第二列表示終點的ID
         **/
        LongWritable destinationVertexID = (LongWritable) record.get(1);

        /**
         * 3、地三列表示邊的權重
         **/
        LongWritable edgeValue = (LongWritable) record.get(2);

        /**
         * 4、創建邊,由終點ID和邊的權重組成
         **/
        Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>(
            destinationVertexID, edgeValue);

        /**
         * 5、請求給初始點添加邊
         **/
        context.addEdgeRequest(sourceVertexID, edge);

        /**
         * 6、如果每條Record表示雙向邊,重複4與5 Edge<LongWritable, LongWritable> edge2 = new
         * Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue);
         * context.addEdgeRequest(destinationVertexID, edge2);
         **/
      } else {
        /**
         * 數據來源於存儲點信息的表。
         *
         * 1、第一列表示點的ID
         **/
        LongWritable vertexID = (LongWritable) record.get(0);

        /**
         * 2、第二列表示點的值
         **/
        LongWritable vertexValue = (LongWritable) record.get(1);

        /**
         * 3、創建點,由點的ID和點的值組成
         **/
        MyVertex vertex = new MyVertex();

        /**
         * 4、初始化點
         **/
        vertex.setId(vertexID);
        vertex.setValue(vertexValue);

        /**
         * 5、請求添加點
         **/
        context.addVertexRequest(vertex);
      }

    }

  }

  /**
   * 匯總GraphLoader::load(LongWritable, Record, MutationContext)生成的鍵值對,類似於
   * com.aliyun.odps.mapreduce.Reducer中的reduce。對於唯一的Vertex ID,所有關於這個ID上
   * 添加刪除、點邊的行為都會放在VertexChanges中。
   *
   * 注意:此處並不隻針對load方法中添加的有衝突的點或邊才調用(衝突是指添加多個相同Vertex對象,添加重複邊等),
   * 所有在load方法中請求生成的ID都會在此處被調用。
   **/
  public static class LoadingResolver extends
      VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> {

    /**
     * 處理關於一個ID的添加或刪除、點或邊的請求。
     *
     * VertexChanges有四個接口,分別與MutationContext的四個接口對應:
     * VertexChanges::getAddedVertexList()與
     * MutationContext::addVertexRequest(Vertex)對應,
     * 在load方法中,請求添加的ID相同的Vertex對象,會被匯總在返回的List中
     * VertexChanges::getAddedEdgeList()與
     * MutationContext::addEdgeRequest(WritableComparable, Edge)
     * 對應,請求添加的初始點ID相同的Edge對象,會被匯總在返回的List中
     * VertexChanges::getRemovedVertexCount()與
     * MutationContext::removeVertexRequest(WritableComparable)
     * 對應,請求刪除的ID相同的Vertex,匯總的請求刪除的次數作為返回值
     * VertexChanges#getRemovedEdgeList()與
     * MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)
     * 對應,請求刪除的初始點ID相同的Edge對象,會被匯總在返回的List中
     *
     * 用戶通過處理關於這個ID的變化,通過返回值聲明此ID是否參與計算,如果返回的Vertex不為null,
     * 則此ID會參與隨後的計算,如果返回null,則不會參與計算。
     *
     * @param vertexId
     *          請求添加的點的ID,或請求添加的邊的初點ID
     * @param vertex
     *          已存在的Vertex對象,數據載入階段,始終為null
     * @param vertexChanges
     *          此ID上的請求添加刪除、點邊的集合
     * @param hasMessages
     *          此ID是否有輸入消息,數據載入階段,始終為false
     **/
    @Override
    public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve(
        LongWritable vertexId,
        Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex,
        VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges,
        boolean hasMessages) throws IOException {
      /**
       * 1、獲取Vertex對象,作為參與計算的點。
       **/
      MyVertex computeVertex = null;
      if (vertexChanges.getAddedVertexList() == null
          || vertexChanges.getAddedVertexList().isEmpty()) {
        computeVertex = new MyVertex();
        computeVertex.setId(vertexId);
      } else {
        /**
         * 此處假設存儲點信息的表中,每個Record表示唯一的點。
         **/
        computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0);
      }

      /**
       * 2、將請求給此點添加的邊,添加到Vertex對象中,如果數據有重複的可能,根據算法需要決定是否去重。
       **/
      if (vertexChanges.getAddedEdgeList() != null) {
        for (Edge<LongWritable, LongWritable> edge : vertexChanges
            .getAddedEdgeList()) {
          computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
        }
      }

      /**
       * 3、將Vertex對象返回,添加到最終的圖中參與計算。
       **/
      return computeVertex;
    }

  }

  /**
   * 確定參與計算的Vertex的行為。
   *
   **/
  public static class MyVertex extends
      Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {

    /**
     * 將vertex的邊,按照輸入表的格式再寫到結果表。輸入表與輸出表的格式和數據都相同。
     *
     * @param context
     *          運行時上下文
     * @param messages
     *          輸入消息
     **/
    @Override
    public void compute(
        ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
        Iterable<LongWritable> messages) throws IOException {
      /**
       * 將點的ID和值,寫到存儲點的結果表
       **/
      context.write("vertex", getId(), getValue());

      /**
       * 將點的邊,寫到存儲邊的結果表
       **/
      if (hasEdges()) {
        for (Edge<LongWritable, LongWritable> edge : getEdges()) {
          context.write("edge", getId(), edge.getDestVertexId(),
              edge.getValue());
        }
      }

      /**
       * 隻迭代一輪
       **/
      voteToHalt();
    }

  }

  /**
   * @param args
   * @throws IOException
   */
  public static void main(String[] args) throws IOException {

    if (args.length < 4) {
      throw new IOException(
          "Usage: VertexInputFormat <vertex input> <edge input> <vertex output> <edge output>");
    }

    /**
     * GraphJob用於對Graph作業進行配置
     */
    GraphJob job = new GraphJob();

    /**
     * 1、指定輸入的圖數據,並指定存儲邊數據所在的表。
     */
    job.addInput(TableInfo.builder().tableName(args[0]).build());
    job.addInput(TableInfo.builder().tableName(args[1]).build());
    job.set(EDGE_TABLE, args[1]);

    /**
     * 2、指定載入數據的方式,將Record解釋為Edge,此處類似於map,生成的 key為vertex的ID,value為edge。
     */
    job.setGraphLoaderClass(VertexInputLoader.class);

    /**
     * 3、指定載入數據階段,生成參與計算的vertex。此處類似於reduce,將map 生成的edge合並成一個vertex。
     */
    job.setLoadingVertexResolverClass(LoadingResolver.class);

    /**
     * 4、指定參與計算的vertex的行為。每輪迭代執行vertex.compute方法。
     */
    job.setVertexClass(MyVertex.class);

    /**
     * 5、指定圖作業的輸出表,將計算生成的結果寫到結果表中。
     */
    job.addOutput(TableInfo.builder().tableName(args[2]).label("vertex").build());
    job.addOutput(TableInfo.builder().tableName(args[3]).label("edge").build());

    /**
     * 6、提交作業執行。
     */
    job.run();
  }

}

最後更新:2016-11-23 17:16:04

  上一篇:go 三角形計數__示例程序_圖模型_大數據計算服務-阿裏雲
  下一篇:go 輸入邊表示例__示例程序_圖模型_大數據計算服務-阿裏雲