283
阿里云
PageRank__示例程序_图模型_大数据计算服务-阿里云
PageRank 算法是计算网页排名的经典算法:输入是一个有向图 G,其中顶点表示网页,如果存在网页 A 到网页 B 的链接,那么存在联接 A 到 B 的边,算法基本原理:
- 初始化:点值表示 PageRank 的 rank 值(double 类型),初始时,所有点取值为 1/TotalNumVertices
- 迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum,其中 sum 为所有指向 i 点的点(设为 j) PageRank(j)/out_degree(j) 的累加值
从算法基本原理可以看出,这个算法非常适合使用 OPDS Graph 程序进行求解: 每个点 j 维护其 PageRank 值,每一轮迭代都将PageRank(j)/out_degree(j) 发给其邻接点(向其投票),下一轮迭代时,每个点根据迭代公式重新计算 PageRank 取值。
源代码
import java.io.IOException;
import org.apache.log4j.Logger;
import com.aliyun.odps.io.WritableRecord;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.DoubleWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.io.Text;
import com.aliyun.odps.io.Writable;
public class PageRank {
private final static Logger LOG = Logger.getLogger(PageRank.class);
public static class PageRankVertex extends
Vertex<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void compute(
ComputeContext<Text, DoubleWritable, NullWritable, DoubleWritable> context,
Iterable<DoubleWritable> messages) throws IOException {
if (context.getSuperstep() == 0) {
setValue(new DoubleWritable(1.0 / context.getTotalNumVertices()));
} else if (context.getSuperstep() >= 1) {
double sum = 0;
for (DoubleWritable msg : messages) {
sum += msg.get();
}
DoubleWritable vertexValue = new DoubleWritable(
(0.15f / context.getTotalNumVertices()) + 0.85f * sum);
setValue(vertexValue);
}
if (hasEdges()) {
context.sendMessageToNeighbors(this, new DoubleWritable(getValue()
.get() / getEdges().size()));
}
}
@Override
public void cleanup(
WorkerContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
context.write(getId(), getValue());
}
}
public static class PageRankVertexReader extends
GraphLoader<Text, DoubleWritable, NullWritable, DoubleWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<Text, DoubleWritable, NullWritable, DoubleWritable> context)
throws IOException {
PageRankVertex vertex = new PageRankVertex();
vertex.setValue(new DoubleWritable(0));
vertex.setId((Text) record.get(0));
System.out.println(record.get(0));
for (int i = 1; i < record.size(); i++) {
Writable edge = record.get(i);
System.out.println(edge.toString());
if (!(edge.equals(NullWritable.get()))) {
vertex.addEdge(new Text(edge.toString()), NullWritable.get());
}
}
LOG.info("vertex edgs size: "
+ (vertex.hasEdges() ? vertex.getEdges().size() : 0));
context.addVertexRequest(vertex);
}
}
private static void printUsage() {
System.out.println("Usage: <in> <out> [Max iterations (default 30)]");
System.exit(-1);
}
public static void main(String[] args) throws IOException {
if (args.length < 2)
printUsage();
GraphJob job = new GraphJob();
job.setGraphLoaderClass(PageRankVertexReader.class);
job.setVertexClass(PageRankVertex.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
// default max iteration is 30
job.setMaxIteration(30);
if (args.length >= 3)
job.setMaxIteration(Integer.parseInt(args[2]));
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
代码说明
PageRank 源代码包括以下几部分:
- 55行:定义 PageRankVertexReader 类,加载图,将表中每一条记录解析为一个点,记录的第一列是起点,其他列为终点;
- 23行:定义 PageRankVertex ,其中:
- 点值表示该点(网页)的当前 PageRank 取值;
- compute()方法使用迭代公式:PageRank(i)=0.15/TotalNumVertices+0.85*sum 更新点值;
- cleanup() 方法把点及其 PageRank 取值写到结果表中;
- 88行:主程序(main函数),定义 GraphJob,指定 Vertex/GraphLoader等的实现,以及最大迭代次数(默认 30),并指定输入输出表。
最后更新:2016-09-21 11:03:08
上一篇:
单源最短距离__示例程序_图模型_大数据计算服务-阿里云
下一篇:
K-均值聚类__示例程序_图模型_大数据计算服务-阿里云
EcsOrder__数据类型_API参考_E-MapReduce-阿里云
DTS支持RAM主子账号__访问控制_用户指南_数据传输-阿里云
GetRole__角色管理接口_RAM API文档_访问控制-阿里云
创建OceanBase实例__快速入门_云数据库 OceanBase-阿里云
备份恢复服务__系统架构_产品简介_云数据库 RDS 版-阿里云
大文件上传如何续传__数据操作常见问题_产品使用问题_归档存储-阿里云
4.4 多计算引擎和Hint__第四章 DML_使用手册_分析型数据库-阿里云
释放弹性公网 IP__网络相关接口_API 参考_云服务器 ECS-阿里云
发展历史/Release Note__产品简介_分析型数据库-阿里云
删除服务__服务管理_用户指南_容器服务-阿里云
相关内容
常见错误说明__附录_大数据计算服务-阿里云
发送短信接口__API使用手册_短信服务-阿里云
接口文档__Android_安全组件教程_移动安全-阿里云
运营商错误码(联通)__常见问题_短信服务-阿里云
设置短信模板__使用手册_短信服务-阿里云
OSS 权限问题及排查__常见错误及排除_最佳实践_对象存储 OSS-阿里云
消息通知__操作指南_批量计算-阿里云
设备端快速接入(MQTT)__快速开始_阿里云物联网套件-阿里云
查询API调用流量数据__API管理相关接口_API_API 网关-阿里云
使用STS访问__JavaScript-SDK_SDK 参考_对象存储 OSS-阿里云