閱讀1015 返回首頁    go 阿裏雲


單源最短距離__示例程序_圖模型_大數據計算服務-阿裏雲

Dijkstra算法是求解有向圖中單源最短距離(Single Source Shortest Path,簡寫:SSSP)的經典算法。 最短距離:對一個有權重的有向圖 G=(V,E),從一個源點s到匯點v有很多路徑,其中邊權和最小的路徑,稱從s到v的最短距離。算法基本原理:

  • 初始化:源點 s 到 s 自身的距離(d[s]=0),其他點 u 到 s 的距離為無窮(d[u]=∞)
  • 迭代:若存在一條從 u 到 v 的邊,那麼從 s 到 v 的最短距離更新為: d[v]=min(d[v], d[u]+weight(u, v)),直到所有的點到 s 的距離不再發生變化時迭代結束。

從算法基本原理可以看出,這個算法非常適合使用 MaxCompute Graph 程序進行求解: 每個點維護到源點的當前最短距離值,當這個值變化時,將新值加上邊的權值發送消息通知其鄰接點,下一輪迭代時,鄰接點根據收到的消息更新其當前最短距離,當所有點當前最短距離不再變化時,迭代終止。

源代碼

  1. import java.io.IOException;
  2. import com.aliyun.odps.io.WritableRecord;
  3. import com.aliyun.odps.graph.Combiner;
  4. import com.aliyun.odps.graph.ComputeContext;
  5. import com.aliyun.odps.graph.Edge;
  6. import com.aliyun.odps.graph.GraphJob;
  7. import com.aliyun.odps.graph.GraphLoader;
  8. import com.aliyun.odps.graph.MutationContext;
  9. import com.aliyun.odps.graph.Vertex;
  10. import com.aliyun.odps.graph.WorkerContext;
  11. import com.aliyun.odps.io.LongWritable;
  12. import com.aliyun.odps.data.TableInfo;
  13. public class SSSP {
  14. public static final String START_VERTEX = "sssp.start.vertex.id";
  15. public static class SSSPVertex extends
  16. Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
  17. private static long startVertexId = -1;
  18. public SSSPVertex() {
  19. this.setValue(new LongWritable(Long.MAX_VALUE));
  20. }
  21. public boolean isStartVertex(
  22. ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
  23. if (startVertexId == -1) {
  24. String s = context.getConfiguration().get(START_VERTEX);
  25. startVertexId = Long.parseLong(s);
  26. }
  27. return getId().get() == startVertexId;
  28. }
  29. @Override
  30. public void compute(
  31. ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
  32. Iterable<LongWritable> messages) throws IOException {
  33. long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
  34. for (LongWritable msg : messages) {
  35. if (msg.get() < minDist) {
  36. minDist = msg.get();
  37. }
  38. }
  39. if (minDist < this.getValue().get()) {
  40. this.setValue(new LongWritable(minDist));
  41. if (hasEdges()) {
  42. for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
  43. context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
  44. + e.getValue().get()));
  45. }
  46. }
  47. } else {
  48. voteToHalt();
  49. }
  50. }
  51. @Override
  52. public void cleanup(
  53. WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
  54. throws IOException {
  55. context.write(getId(), getValue());
  56. }
  57. }
  58. public static class MinLongCombiner extends
  59. Combiner<LongWritable, LongWritable> {
  60. @Override
  61. public void combine(LongWritable vertexId, LongWritable combinedMessage,
  62. LongWritable messageToCombine) throws IOException {
  63. if (combinedMessage.get() > messageToCombine.get()) {
  64. combinedMessage.set(messageToCombine.get());
  65. }
  66. }
  67. }
  68. public static class SSSPVertexReader extends
  69. GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
  70. @Override
  71. public void load(
  72. LongWritable recordNum,
  73. WritableRecord record,
  74. MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
  75. throws IOException {
  76. SSSPVertex vertex = new SSSPVertex();
  77. vertex.setId((LongWritable) record.get(0));
  78. String[] edges = record.get(1).toString().split(",");
  79. for (int i = 0; i < edges.length; i++) {
  80. String[] ss = edges[i].split(":");
  81. vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
  82. new LongWritable(Long.parseLong(ss[1])));
  83. }
  84. context.addVertexRequest(vertex);
  85. }
  86. }
  87. public static void main(String[] args) throws IOException {
  88. if (args.length < 2) {
  89. System.out.println("Usage: <startnode> <input> <output>");
  90. System.exit(-1);
  91. }
  92. GraphJob job = new GraphJob();
  93. job.setGraphLoaderClass(SSSPVertexReader.class);
  94. job.setVertexClass(SSSPVertex.class);
  95. job.setCombinerClass(MinLongCombiner.class);
  96. job.set(START_VERTEX, args[0]);
  97. job.addInput(TableInfo.builder().tableName(args[1]).build());
  98. job.addOutput(TableInfo.builder().tableName(args[2]).build());
  99. long startTime = System.currentTimeMillis();
  100. job.run();
  101. System.out.println("Job Finished in "
  102. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  103. }
  104. }

代碼說明

SSSP 源代碼包括以下幾部分:

  • 85行:定義 SSSPVertexReader 類,加載圖,將表中每一條記錄解析為一個點,記錄的第一列是點標識,第二列存儲該點起始的所有的邊集,內容如:2:2,3:1,4:4;
  • 21行:定義 SSSPVertex ,其中:
    • 點值表示該點到源點 startVertexId 的當前最短距離;
    • compute()方法使用迭代公式:d[v]=min(d[v], d[u]+weight(u, v)) 更新點值;
    • cleanup() 方法把點及其到源點的最短距離寫到結果表中;
  • 60行:當點值沒發生變化時,調用 voteToHalt() 告訴框架該點進入 halt 狀態,當所有點都進入 halt 狀態時計算結束;
  • 72行:定義 MinLongCombiner,對發送給同一個點的消息進行合並,優化性能,減少內存占用;
  • 108行:主程序(main函數),定義 GraphJob,指定 Vertex/GraphLoader/Combiner 等的實現,指定輸入輸出表。

最後更新:2016-06-22 12:49:51

  上一篇:go 圖模型開發和調試__圖模型_大數據計算服務-阿裏雲
  下一篇:go PageRank__示例程序_圖模型_大數據計算服務-阿裏雲