1015
阿裏雲
單源最短距離__示例程序_圖模型_大數據計算服務-阿裏雲
Dijkstra算法是求解有向圖中單源最短距離(Single Source Shortest Path,簡寫:SSSP)的經典算法。 最短距離:對一個有權重的有向圖 G=(V,E),從一個源點s到匯點v有很多路徑,其中邊權和最小的路徑,稱從s到v的最短距離。算法基本原理:
- 初始化:源點 s 到 s 自身的距離(d[s]=0),其他點 u 到 s 的距離為無窮(d[u]=∞)
- 迭代:若存在一條從 u 到 v 的邊,那麼從 s 到 v 的最短距離更新為: d[v]=min(d[v], d[u]+weight(u, v)),直到所有的點到 s 的距離不再發生變化時迭代結束。
從算法基本原理可以看出,這個算法非常適合使用 MaxCompute Graph 程序進行求解: 每個點維護到源點的當前最短距離值,當這個值變化時,將新值加上邊的權值發送消息通知其鄰接點,下一輪迭代時,鄰接點根據收到的消息更新其當前最短距離,當所有點當前最短距離不再變化時,迭代終止。
源代碼
import java.io.IOException;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.Combiner;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.data.TableInfo;
public class SSSP {
public static final String START_VERTEX = "sssp.start.vertex.id";
public static class SSSPVertex extends
Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
private static long startVertexId = -1;
public SSSPVertex() {
this.setValue(new LongWritable(Long.MAX_VALUE));
}
public boolean isStartVertex(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {
if (startVertexId == -1) {
String s = context.getConfiguration().get(START_VERTEX);
startVertexId = Long.parseLong(s);
}
return getId().get() == startVertexId;
}
@Override
public void compute(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
Iterable<LongWritable> messages) throws IOException {
long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;
for (LongWritable msg : messages) {
if (msg.get() < minDist) {
minDist = msg.get();
}
}
if (minDist < this.getValue().get()) {
this.setValue(new LongWritable(minDist));
if (hasEdges()) {
for (Edge<LongWritable, LongWritable> e : this.getEdges()) {
context.sendMessage(e.getDestVertexId(), new LongWritable(minDist
+ e.getValue().get()));
}
}
} else {
voteToHalt();
}
}
@Override
public void cleanup(
WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
context.write(getId(), getValue());
}
}
public static class MinLongCombiner extends
Combiner<LongWritable, LongWritable> {
@Override
public void combine(LongWritable vertexId, LongWritable combinedMessage,
LongWritable messageToCombine) throws IOException {
if (combinedMessage.get() > messageToCombine.get()) {
combinedMessage.set(messageToCombine.get());
}
}
}
public static class SSSPVertexReader extends
GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
SSSPVertex vertex = new SSSPVertex();
vertex.setId((LongWritable) record.get(0));
String[] edges = record.get(1).toString().split(",");
for (int i = 0; i < edges.length; i++) {
String[] ss = edges[i].split(":");
vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),
new LongWritable(Long.parseLong(ss[1])));
}
context.addVertexRequest(vertex);
}
}
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage: <startnode> <input> <output>");
System.exit(-1);
}
GraphJob job = new GraphJob();
job.setGraphLoaderClass(SSSPVertexReader.class);
job.setVertexClass(SSSPVertex.class);
job.setCombinerClass(MinLongCombiner.class);
job.set(START_VERTEX, args[0]);
job.addInput(TableInfo.builder().tableName(args[1]).build());
job.addOutput(TableInfo.builder().tableName(args[2]).build());
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
代碼說明
SSSP 源代碼包括以下幾部分:
- 85行:定義 SSSPVertexReader 類,加載圖,將表中每一條記錄解析為一個點,記錄的第一列是點標識,第二列存儲該點起始的所有的邊集,內容如:2:2,3:1,4:4;
- 21行:定義 SSSPVertex ,其中:
- 點值表示該點到源點 startVertexId 的當前最短距離;
- compute()方法使用迭代公式:d[v]=min(d[v], d[u]+weight(u, v)) 更新點值;
- cleanup() 方法把點及其到源點的最短距離寫到結果表中;
- 60行:當點值沒發生變化時,調用 voteToHalt() 告訴框架該點進入 halt 狀態,當所有點都進入 halt 狀態時計算結束;
- 72行:定義 MinLongCombiner,對發送給同一個點的消息進行合並,優化性能,減少內存占用;
- 108行:主程序(main函數),定義 GraphJob,指定 Vertex/GraphLoader/Combiner 等的實現,指定輸入輸出表。
最後更新:2016-06-22 12:49:51
上一篇:
圖模型開發和調試__圖模型_大數據計算服務-阿裏雲
下一篇:
PageRank__示例程序_圖模型_大數據計算服務-阿裏雲
集群管理__控製台使用指南_批量計算-阿裏雲
實驗目標__快速入門_推薦引擎-阿裏雲
GetMachineGroup__Logtail機器組相關接口_API-Reference_日誌服務-阿裏雲
OGG MaxCompute插件__數據入雲_數據集成-阿裏雲
RemoveConfigFromMachineGroup__Logtail機器組相關接口_API-Reference_日誌服務-阿裏雲
公眾趨勢分析__數加產品概覽_數加平台介紹-阿裏雲
LogSearch-報警__Getting-Started_日誌服務-阿裏雲
移動端播放器SDK__視頻播放_使用手冊_媒體轉碼-阿裏雲
Hive 作業配置__作業_用戶指南_E-MapReduce-阿裏雲
賠償說明__產品簡介_消息服務-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲