594
阿裏雲
強連通分量__示例程序_圖模型_大數據計算服務-阿裏雲
在有向圖中,如果從任意一個頂點出發,都能通過圖中的邊到達圖中的每一個頂點,則稱之為 強連通圖。 一張有向圖的頂點數極大的強連通子圖稱為強連通分量。此算法示例基於 parallel Coloring algorithm。
每個頂點包含兩個部分:
- colorID:在向前遍曆過程中存儲頂點v的顏色,在計算結束時,具有相同colorID的頂點屬於一個強連通分量。
- transposeNeighbors:存儲輸入圖的轉置圖中頂點v的鄰居ID
算法包含以下四步:
- 生成轉置圖:包含兩個超步,首先每個頂點發送ID到其出邊對應的鄰居,這些ID在第二個超步中會存為transposeNeighbors值;
- 修剪:一個超步,每個隻有一個入邊或出邊的頂點,將其colorID設為自身ID,狀態設為不活躍,後麵傳給該頂點的信號被忽略;
- 向前遍曆:頂點包括兩個子過程(超步),啟動和休眠。在啟動階段,每個頂點將其colorID設置為自身ID,同時將其ID傳給出邊對應的鄰居。 休眠階段,頂點使用其收到的最大colorID更新自身colorID,並傳播其colorID,直到colorID收斂。當colorID收斂,master進程將全局對象設置為向後遍曆。
- 向後遍曆:同樣包含兩個子過程,啟動和休眠。啟動階段,每一個ID等於colorID的頂點將其ID傳遞給其轉置圖鄰居頂點, 同時將自身狀態設置為不活躍,後麵傳給該頂點的信號可忽略。在每一個休眠步,每個頂點接收到與其colorID匹配的信號, 並將其colorID在轉置圖中傳播,隨後設置自身狀態為不活躍。該步結束後如果仍有活躍頂點,則回到修剪步。
源代碼
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.Aggregator;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.WorkerContext;
import com.aliyun.odps.io.BooleanWritable;
import com.aliyun.odps.io.IntWritable;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.NullWritable;
import com.aliyun.odps.io.Tuple;
import com.aliyun.odps.io.Writable;
import com.aliyun.odps.io.WritableRecord;
/**
* Definition from Wikipedia:
* In the mathematical theory of directed graphs, a graph is said
* to be strongly connected if every vertex is reachable from every
* other vertex. The strongly connected components of an arbitrary
* directed graph form a partition into subgraphs that are themselves
* strongly connected.
*
* Algorithms with four phases as follows.
* 1. Transpose Graph Formation: Requires two supersteps. In the first
* superstep, each vertex sends a message with its ID to all its outgoing
* neighbors, which in the second superstep are stored in transposeNeighbors.
*
* 2. Trimming: Takes one superstep. Every vertex with only in-coming or
* only outgoing edges (or neither) sets its colorID to its own ID and
* becomes inactive. Messages subsequently sent to the vertex are ignored.
*
* 3. Forward-Traversal: There are two sub phases: Start and Rest. In the
* Start phase, each vertex sets its colorID to its own ID and propagates
* its ID to its outgoing neighbors. In the Rest phase, vertices update
* their own colorIDs with the minimum colorID they have seen, and propagate
* their colorIDs, if updated, until the colorIDs converge.
* Set the phase to Backward-Traversal when the colorIDs converge.
*
* 4. Backward-Traversal: We again break the phase into Start and Rest.
* In Start, every vertex whose ID equals its colorID propagates its ID to
* the vertices in transposeNeighbors and sets itself inactive. Messages
* subsequently sent to the vertex are ignored. In each of the Rest phase supersteps,
* each vertex receiving a message that matches its colorID: (1) propagates
* its colorID in the transpose graph; (2) sets itself inactive. Messages
* subsequently sent to the vertex are ignored. Set the phase back to Trimming
* if not all vertex are inactive.
*
* https://ilpubs.stanford.edu:8090/1077/3/p535-salihoglu.pdf
*/
public class StronglyConnectedComponents {
public final static int STAGE_TRANSPOSE_1 = 0;
public final static int STAGE_TRANSPOSE_2 = 1;
public final static int STAGE_TRIMMING = 2;
public final static int STAGE_FW_START = 3;
public final static int STAGE_FW_REST = 4;
public final static int STAGE_BW_START = 5;
public final static int STAGE_BW_REST = 6;
/**
* The value is composed of component id, incoming neighbors,
* active status and updated status.
*/
public static class MyValue implements Writable {
LongWritable sccID;// strongly connected component id
Tuple inNeighbors; // transpose neighbors
BooleanWritable active; // vertex is active or not
BooleanWritable updated; // sccID is updated or not
public MyValue() {
this.sccID = new LongWritable(Long.MAX_VALUE);
this.inNeighbors = new Tuple();
this.active = new BooleanWritable(true);
this.updated = new BooleanWritable(false);
}
public void setSccID(LongWritable sccID) {
this.sccID = sccID;
}
public LongWritable getSccID() {
return this.sccID;
}
public void setInNeighbors(Tuple inNeighbors) {
this.inNeighbors = inNeighbors;
}
public Tuple getInNeighbors() {
return this.inNeighbors;
}
public void addInNeighbor(LongWritable neighbor) {
this.inNeighbors.append(new LongWritable(neighbor.get()));
}
public boolean isActive() {
return this.active.get();
}
public void setActive(boolean status) {
this.active.set(status);
}
public boolean isUpdated() {
return this.updated.get();
}
public void setUpdated(boolean update) {
this.updated.set(update);
}
@Override
public void write(DataOutput out) throws IOException {
this.sccID.write(out);
this.inNeighbors.write(out);
this.active.write(out);
this.updated.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.sccID.readFields(in);
this.inNeighbors.readFields(in);
this.active.readFields(in);
this.updated.readFields(in);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("sccID: " + sccID.get());
sb.append(" inNeighbores: " + inNeighbors.toDelimitedString(','));
sb.append(" active: " + active.get());
sb.append(" updated: " + updated.get());
return sb.toString();
}
}
public static class SCCVertex extends
Vertex<LongWritable, MyValue, NullWritable, LongWritable> {
public SCCVertex() {
this.setValue(new MyValue());
}
@Override
public void compute(
ComputeContext<LongWritable, MyValue, NullWritable, LongWritable> context,
Iterable<LongWritable> msgs) throws IOException {
// Messages sent to inactive vertex are ignored.
if (!this.getValue().isActive()) {
this.voteToHalt();
return;
}
int stage = ((SCCAggrValue)context.getLastAggregatedValue(0)).getStage();
switch (stage) {
case STAGE_TRANSPOSE_1:
context.sendMessageToNeighbors(this, this.getId());
break;
case STAGE_TRANSPOSE_2:
for (LongWritable msg: msgs) {
this.getValue().addInNeighbor(msg);
}
case STAGE_TRIMMING:
this.getValue().setSccID(getId());
if (this.getValue().getInNeighbors().size() == 0 ||
this.getNumEdges() == 0) {
this.getValue().setActive(false);
}
break;
case STAGE_FW_START:
this.getValue().setSccID(getId());
context.sendMessageToNeighbors(this, this.getValue().getSccID());
break;
case STAGE_FW_REST:
long minSccID = Long.MAX_VALUE;
for (LongWritable msg : msgs) {
if (msg.get() < minSccID) {
minSccID = msg.get();
}
}
if (minSccID < this.getValue().getSccID().get()) {
this.getValue().setSccID(new LongWritable(minSccID));
context.sendMessageToNeighbors(this, this.getValue().getSccID());
this.getValue().setUpdated(true);
} else {
this.getValue().setUpdated(false);
}
break;
case STAGE_BW_START:
if (this.getId().equals(this.getValue().getSccID())) {
for (Writable neighbor : this.getValue().getInNeighbors().getAll()) {
context.sendMessage((LongWritable)neighbor, this.getValue().getSccID());
}
this.getValue().setActive(false);
}
break;
case STAGE_BW_REST:
this.getValue().setUpdated(false);
for (LongWritable msg : msgs) {
if (msg.equals(this.getValue().getSccID())) {
for (Writable neighbor : this.getValue().getInNeighbors().getAll()) {
context.sendMessage((LongWritable)neighbor, this.getValue().getSccID());
}
this.getValue().setActive(false);
this.getValue().setUpdated(true);
break;
}
}
break;
}
context.aggregate(0, getValue());
}
@Override
public void cleanup(
WorkerContext<LongWritable, MyValue, NullWritable, LongWritable> context)
throws IOException {
context.write(getId(), getValue().getSccID());
}
}
/**
* The SCCAggrValue maintains global stage and graph updated and active status.
* updated is true only if one vertex is updated.
* active is true only if one vertex is active.
*/
public static class SCCAggrValue implements Writable {
IntWritable stage = new IntWritable(STAGE_TRANSPOSE_1);
BooleanWritable updated = new BooleanWritable(false);
BooleanWritable active = new BooleanWritable(false);
public void setStage(int stage) {
this.stage.set(stage);
}
public int getStage() {
return this.stage.get();
}
public void setUpdated(boolean updated) {
this.updated.set(updated);
}
public boolean getUpdated() {
return this.updated.get();
}
public void setActive(boolean active) {
this.active.set(active);
}
public boolean getActive() {
return this.active.get();
}
@Override
public void write(DataOutput out) throws IOException {
this.stage.write(out);
this.updated.write(out);
this.active.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.stage.readFields(in);
this.updated.readFields(in);
this.active.readFields(in);
}
}
/**
* The job of SCCAggregator is to schedule global stage in every superstep.
*/
public static class SCCAggregator extends Aggregator<SCCAggrValue> {
@SuppressWarnings("rawtypes")
@Override
public SCCAggrValue createStartupValue(WorkerContext context) throws IOException {
return new SCCAggrValue();
}
@SuppressWarnings("rawtypes")
@Override
public SCCAggrValue createInitialValue(WorkerContext context)
throws IOException {
return (SCCAggrValue) context.getLastAggregatedValue(0);
}
@Override
public void aggregate(SCCAggrValue value, Object item) throws IOException {
MyValue v = (MyValue)item;
if ((value.getStage() == STAGE_FW_REST || value.getStage() == STAGE_BW_REST)
&& v.isUpdated()) {
value.setUpdated(true);
}
// only active vertex invoke aggregate()
value.setActive(true);
}
@Override
public void merge(SCCAggrValue value, SCCAggrValue partial)
throws IOException {
boolean updated = value.getUpdated() || partial.getUpdated();
value.setUpdated(updated);
boolean active = value.getActive() || partial.getActive();
value.setActive(active);
}
@SuppressWarnings("rawtypes")
@Override
public boolean terminate(WorkerContext context, SCCAggrValue value)
throws IOException {
// If all vertices is inactive, job is over.
if (!value.getActive()) {
return true;
}
// state machine
switch (value.getStage()) {
case STAGE_TRANSPOSE_1:
value.setStage(STAGE_TRANSPOSE_2);
break;
case STAGE_TRANSPOSE_2:
value.setStage(STAGE_TRIMMING);
break;
case STAGE_TRIMMING:
value.setStage(STAGE_FW_START);
break;
case STAGE_FW_START:
value.setStage(STAGE_FW_REST);
break;
case STAGE_FW_REST:
if (value.getUpdated()) {
value.setStage(STAGE_FW_REST);
} else {
value.setStage(STAGE_BW_START);
}
break;
case STAGE_BW_START:
value.setStage(STAGE_BW_REST);
break;
case STAGE_BW_REST:
if (value.getUpdated()) {
value.setStage(STAGE_BW_REST);
} else {
value.setStage(STAGE_TRIMMING);
}
break;
}
value.setActive(false);
value.setUpdated(false);
return false;
}
}
public static class SCCVertexReader extends
GraphLoader<LongWritable, MyValue, NullWritable, LongWritable> {
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<LongWritable, MyValue, NullWritable, LongWritable> context)
throws IOException {
SCCVertex vertex = new SCCVertex();
vertex.setId((LongWritable) record.get(0));
String[] edges = record.get(1).toString().split(",");
for (int i = 0; i < edges.length; i++) {
try {
long destID = Long.parseLong(edges[i]);
vertex.addEdge(new LongWritable(destID), NullWritable.get());
} catch(NumberFormatException nfe) {
System.err.println("Ignore " + nfe);
}
}
context.addVertexRequest(vertex);
}
}
public static void main(String[] args) throws IOException {
if (args.length < 2) {
System.out.println("Usage: <input> <output>");
System.exit(-1);
}
GraphJob job = new GraphJob();
job.setGraphLoaderClass(SCCVertexReader.class);
job.setVertexClass(SCCVertex.class);
job.setAggregatorClass(SCCAggregator.class);
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addOutput(TableInfo.builder().tableName(args[1]).build());
long startTime = System.currentTimeMillis();
job.run();
System.out.println("Job Finished in "
+ (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
}
}
最後更新:2016-05-06 10:43:09
上一篇:
BiPartiteMatchiing__示例程序_圖模型_大數據計算服務-阿裏雲
下一篇:
連通分量__示例程序_圖模型_大數據計算服務-阿裏雲
優化SQL__性能管理_用戶指南(RDBMS)_數據管理-阿裏雲
示例項目使用說明__開發準備_開發人員指南_E-MapReduce-阿裏雲
視頻點播轉碼完成後如何獲取播放地址的URL?__常見問題_產品常見問題_視頻點播-阿裏雲
Python SDK 曆史迭代版本__曆史版本 SDK 下載_SDK 參考_表格存儲-阿裏雲
鑒權Action__OpenAPI RAM鑒權_OpenAPI 2.0_移動推送-阿裏雲
在哪裏可以查到RDS的權限定義___雲數據庫(RDS)授權問題_授權常見問題_訪問控製-阿裏雲
WordCount示例__示例程序_MapReduce_大數據計算服務-阿裏雲
修改消費位點__數據訂閱_用戶指南_數據傳輸-阿裏雲
數據源概覽__準備數據源_用戶指南_業務實時監控服務 ARMS-阿裏雲
如何配置ECS支持負載均衡__後端 ECS 服務器常見問題_常見問題_負載均衡-阿裏雲
相關內容
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲