单源最短距离__示例程序_图模型_大数据计算服务-阿里云
Dijkstra算法是求解有向图中单源最短距离(Single Source Shortest Path,简写:SSSP)的经典算法。 最短距离:对一个有权重的有向图 G=(V,E),从一个源点s到汇点v有很多路径,其中边权和最小的路径,称从s到v的最短距离。算法基本原理:
- 初始化:源点 s 到 s 自身的距离(d[s]=0),其他点 u 到 s 的距离为无穷(d[u]=∞)
- 迭代:若存在一条从 u 到 v 的边,那么从 s 到 v 的最短距离更新为: d[v]=min(d[v], d[u]+weight(u, v)),直到所有的点到 s 的距离不再发生变化时迭代结束。
从算法基本原理可以看出,这个算法非常适合使用 MaxCompute Graph 程序进行求解: 每个点维护到源点的当前最短距离值,当这个值变化时,将新值加上边的权值发送消息通知其邻接点,下一轮迭代时,邻接点根据收到的消息更新其当前最短距离,当所有点当前最短距离不再变化时,迭代终止。
源代码
import java.io.IOException;import com.aliyun.odps.io.WritableRecord;import com.aliyun.odps.graph.Combiner;import com.aliyun.odps.graph.ComputeContext;import com.aliyun.odps.graph.Edge;import com.aliyun.odps.graph.GraphJob;import com.aliyun.odps.graph.GraphLoader;import com.aliyun.odps.graph.MutationContext;import com.aliyun.odps.graph.Vertex;import com.aliyun.odps.graph.WorkerContext;import com.aliyun.odps.io.LongWritable;import com.aliyun.odps.data.TableInfo;public class SSSP {public static final String START_VERTEX = "sssp.start.vertex.id";public static class SSSPVertex extendsVertex<LongWritable, LongWritable, LongWritable, LongWritable> {private static long startVertexId = -1;public SSSPVertex() {this.setValue(new LongWritable(Long.MAX_VALUE));}public boolean isStartVertex(ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context) {if (startVertexId == -1) {String s = context.getConfiguration().get(START_VERTEX);startVertexId = Long.parseLong(s);}return getId().get() == startVertexId;}@Overridepublic void compute(ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,Iterable<LongWritable> messages) throws IOException {long minDist = isStartVertex(context) ? 0 : Integer.MAX_VALUE;for (LongWritable msg : messages) {if (msg.get() < minDist) {minDist = msg.get();}}if (minDist < this.getValue().get()) {this.setValue(new LongWritable(minDist));if (hasEdges()) {for (Edge<LongWritable, LongWritable> e : this.getEdges()) {context.sendMessage(e.getDestVertexId(), new LongWritable(minDist+ e.getValue().get()));}}} else {voteToHalt();}}@Overridepublic void cleanup(WorkerContext<LongWritable, LongWritable, LongWritable, LongWritable> context)throws IOException {context.write(getId(), getValue());}}public static class MinLongCombiner extendsCombiner<LongWritable, LongWritable> {@Overridepublic void combine(LongWritable vertexId, LongWritable combinedMessage,LongWritable messageToCombine) throws IOException {if (combinedMessage.get() > messageToCombine.get()) {combinedMessage.set(messageToCombine.get());}}}public static class SSSPVertexReader extendsGraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {@Overridepublic void load(LongWritable recordNum,WritableRecord record,MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)throws IOException {SSSPVertex vertex = new SSSPVertex();vertex.setId((LongWritable) record.get(0));String[] edges = record.get(1).toString().split(",");for (int i = 0; i < edges.length; i++) {String[] ss = edges[i].split(":");vertex.addEdge(new LongWritable(Long.parseLong(ss[0])),new LongWritable(Long.parseLong(ss[1])));}context.addVertexRequest(vertex);}}public static void main(String[] args) throws IOException {if (args.length < 2) {System.out.println("Usage: <startnode> <input> <output>");System.exit(-1);}GraphJob job = new GraphJob();job.setGraphLoaderClass(SSSPVertexReader.class);job.setVertexClass(SSSPVertex.class);job.setCombinerClass(MinLongCombiner.class);job.set(START_VERTEX, args[0]);job.addInput(TableInfo.builder().tableName(args[1]).build());job.addOutput(TableInfo.builder().tableName(args[2]).build());long startTime = System.currentTimeMillis();job.run();System.out.println("Job Finished in "+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");}}
代码说明
SSSP 源代码包括以下几部分:
- 85行:定义 SSSPVertexReader 类,加载图,将表中每一条记录解析为一个点,记录的第一列是点标识,第二列存储该点起始的所有的边集,内容如:2:2,3:1,4:4;
- 21行:定义 SSSPVertex ,其中:
- 点值表示该点到源点 startVertexId 的当前最短距离;
- compute()方法使用迭代公式:d[v]=min(d[v], d[u]+weight(u, v)) 更新点值;
- cleanup() 方法把点及其到源点的最短距离写到结果表中;
- 60行:当点值没发生变化时,调用 voteToHalt() 告诉框架该点进入 halt 状态,当所有点都进入 halt 状态时计算结束;
- 72行:定义 MinLongCombiner,对发送给同一个点的消息进行合并,优化性能,减少内存占用;
- 108行:主程序(main函数),定义 GraphJob,指定 Vertex/GraphLoader/Combiner 等的实现,指定输入输出表。
最后更新:2016-06-22 12:49:51
上一篇:
图模型开发和调试__图模型_大数据计算服务-阿里云
下一篇:
PageRank__示例程序_图模型_大数据计算服务-阿里云
集群管理__控制台使用指南_批量计算-阿里云
实验目标__快速入门_推荐引擎-阿里云
GetMachineGroup__Logtail机器组相关接口_API-Reference_日志服务-阿里云
OGG MaxCompute插件__数据入云_数据集成-阿里云
RemoveConfigFromMachineGroup__Logtail机器组相关接口_API-Reference_日志服务-阿里云
公众趋势分析__数加产品概览_数加平台介绍-阿里云
LogSearch-报警__Getting-Started_日志服务-阿里云
移动端播放器SDK__视频播放_使用手册_媒体转码-阿里云
Hive 作业配置__作业_用户指南_E-MapReduce-阿里云
赔偿说明__产品简介_消息服务-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云