閱讀283 返回首頁    go 阿裏雲


PageRank__示例程序_圖模型_大數據計算服務-阿裏雲

PageRank 算法是計算網頁排名的經典算法:輸入是一個有向圖 G,其中頂點表示網頁,如果存在網頁 A 到網頁 B 的鏈接,那麼存在聯接 A 到 B 的邊,算法基本原理:

  • 初始化:點值表示 PageRank 的 rank 值(double 類型),初始時,所有點取值為 1/TotalNumVertices
  • 迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum,其中 sum 為所有指向 i 點的點(設為 j) PageRank(j)/out_degree(j) 的累加值

從算法基本原理可以看出,這個算法非常適合使用 OPDS Graph 程序進行求解: 每個點 j 維護其 PageRank 值,每一輪迭代都將PageRank(j)/out_degree(j) 發給其鄰接點(向其投票),下一輪迭代時,每個點根據迭代公式重新計算 PageRank 取值。

源代碼

  1. import java.io.IOException;
  2. import org.apache.log4j.Logger;
  3. import com.aliyun.odps.io.WritableRecord;
  4. import com.aliyun.odps.graph.ComputeContext;
  5. import com.aliyun.odps.graph.GraphJob;
  6. import com.aliyun.odps.graph.GraphLoader;
  7. import com.aliyun.odps.graph.MutationContext;
  8. import com.aliyun.odps.graph.Vertex;
  9. import com.aliyun.odps.graph.WorkerContext;
  10. import com.aliyun.odps.io.DoubleWritable;
  11. import com.aliyun.odps.io.LongWritable;
  12. import com.aliyun.odps.io.NullWritable;
  13. import com.aliyun.odps.data.TableInfo;
  14. import com.aliyun.odps.io.Text;
  15. import com.aliyun.odps.io.Writable;
  16. public class PageRank {
  17. private final static Logger LOG = Logger.getLogger(PageRank.class);
  18. public static class PageRankVertex extends
  19. Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
  20. @Override
  21. public void compute(
  22. ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
  23. Iterable<DoubleWritable> messages) throws IOException {
  24. if (context.getSuperstep() == 0) {
  25. setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
  26. } else if (context.getSuperstep() >= 1) {
  27. double sum = 0;
  28. for (DoubleWritable msg : messages) {
  29. sum += msg.get();
  30. }
  31. DoubleWritable vertexValue = new DoubleWritable(
  32. (0.15f / context.getTotalNumVertices()) + 0.85f * sum);
  33. setValue(vertexValue);
  34. }
  35. if (hasEdges()) {
  36. context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
  37. .get() / getEdges().size()));
  38. }
  39. }
  40. @Override
  41. public void cleanup(
  42. WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
  43. throws IOException {
  44. context.write(getId(), getValue());
  45. }
  46. }
  47. public static class PageRankVertexReader extends
  48. GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
  49. @Override
  50. public void load(
  51. LongWritable recordNum,
  52. WritableRecord record,
  53. MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
  54. throws IOException {
  55. PageRankVertex vertex = new PageRankVertex();
  56. vertex.setValue(new DoubleWritable(0));
  57. vertex.setId((Text) record.get(0));
  58. System.out.println(record.get(0));
  59. for (int i = 1; i < record.size(); i++) {
  60. Writable edge = record.get(i);
  61. System.out.println(edge.toString());
  62. if (!(edge.equals(NullWritable.get()))) {
  63. vertex.addEdge(new Text(edge.toString()), NullWritable.get());
  64. }
  65. }
  66. LOG.info("vertex edgs size: "
  67. + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
  68. context.addVertexRequest(vertex);
  69. }
  70. }
  71. private static void printUsage() {
  72. System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
  73. System.exit(-1);
  74. }
  75. public static void main(String[] args) throws IOException {
  76. if (args.length < 2)
  77. printUsage();
  78. GraphJob job = new GraphJob();
  79. job.setGraphLoaderClass(PageRankVertexReader.class);
  80. job.setVertexClass(PageRankVertex.class);
  81. job.addInput(TableInfo.builder().tableName(args[0]).build());
  82. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  83. // default max iteration is 30
  84. job.setMaxIteration(30);
  85. if (args.length >= 3)
  86. job.setMaxIteration(Integer.parseInt(args[2]));
  87. long startTime = System.currentTimeMillis();
  88. job.run();
  89. System.out.println("Job Finished in "
  90. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  91. }
  92. }

代碼說明

PageRank 源代碼包括以下幾部分:

  • 55行:定義 PageRankVertexReader 類,加載圖,將表中每一條記錄解析為一個點,記錄的第一列是起點,其他列為終點;
  • 23行:定義 PageRankVertex ,其中:
    • 點值表示該點(網頁)的當前 PageRank 取值;
    • compute()方法使用迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum 更新點值;
    • cleanup() 方法把點及其 PageRank 取值寫到結果表中;
  • 88行:主程序(main函數),定義 GraphJob,指定 Vertex/GraphLoader等的實現,以及最大迭代次數(默認 30),並指定輸入輸出表。

最後更新:2016-09-21 11:03:08

  上一篇:go 單源最短距離__示例程序_圖模型_大數據計算服務-阿裏雲
  下一篇:go K-均值聚類__示例程序_圖模型_大數據計算服務-阿裏雲