576
外匯
輸入點表示例__示例程序_圖模型_大數據計算服務-阿裏雲
import java.io.IOException;
import com.aliyun.odps.conf.Configuration;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.graph.ComputeContext;
import com.aliyun.odps.graph.GraphJob;
import com.aliyun.odps.graph.GraphLoader;
import com.aliyun.odps.graph.Vertex;
import com.aliyun.odps.graph.VertexResolver;
import com.aliyun.odps.graph.MutationContext;
import com.aliyun.odps.graph.VertexChanges;
import com.aliyun.odps.graph.Edge;
import com.aliyun.odps.io.LongWritable;
import com.aliyun.odps.io.WritableComparable;
import com.aliyun.odps.io.WritableRecord;
/**
* 本示例是用於展示,對於不同類型的數據類型,如何編寫圖作業程序載入數據。主要展示GraphLoader和
* VertexResolver的配合完成圖的構建。
*
* ODPS Graph的作業輸入都為ODPS的Table,假設作業輸入有兩張表,一張存儲點的信息,一張存儲邊的信息。
* 存儲點信息的表的格式,如:
* +------------------------+
* | VertexID | VertexValue |
* +------------------------+
* | id0| 9|
* +------------------------+
* | id1| 7|
* +------------------------+
* | id2| 8|
* +------------------------+
*
* 存儲邊信息的表的格式,如
* +-----------------------------------+
* | VertexID | DestVertexID| EdgeValue|
* +-----------------------------------+
* | id0| id1| 1|
* +-----------------------------------+
* | id0| id2| 2|
* +-----------------------------------+
* | id2| id1| 3|
* +-----------------------------------+
*
* 結合兩張表的數據,表示id0有兩條出邊,分別指向id1和id2;id2有一條出邊,指向id1;id1沒有出邊。
*
* 對於此種類型的數據,在GraphLoader::load(LongWritable, Record, MutationContext)
* ,可以使用 MutationContext#addVertexRequest(Vertex)向圖中請求添加點,使用
* link MutationContext#addEdgeRequest(WritableComparable, Edge)向圖中請求添加邊,然後,在
* link VertexResolver#resolve(WritableComparable, Vertex, VertexChanges, boolean)
* 中,將load 方法中添加的點和邊,合並到一個Vertex對象中,作為返回值,添加到最後參與計算的圖中。
*
**/
public class VertexInputFormat {
private final static String EDGE_TABLE = "edge.table";
/**
* 將Record解釋為Vertex和Edge,每個Record根據其來源,表示一個Vertex或者一條Edge。
*
* 類似於com.aliyun.odps.mapreduce.Mapper#map
* ,輸入Record,生成鍵值對,此處的鍵是Vertex的ID,
* 值是Vertex或Edge,通過上下文Context寫出,這些鍵值對會在LoadingVertexResolver出根據Vertex的ID匯總。
*
* 注意:此處添加的點或邊隻是根據Record內容發出的請求,並不是最後參與計算的點或邊,隻有在隨後的VertexResolver
* 中添加的點或邊才參與計算。
**/
public static class VertexInputLoader extends
GraphLoader<LongWritable, LongWritable, LongWritable, LongWritable> {
private boolean isEdgeData;
/**
* 配置VertexInputLoader。
*
* @param conf
* 作業的配置參數,在main中使用GraphJob配置的,或者在console中set的
* @param workerId
* 當前工作的worker的序號,從0開始,可以用於構造唯一的vertex id
* @param inputTableInfo
* 當前worker載入的輸入表信息,可以用於確定當前輸入是哪種類型的數據,即Record的格式
**/
@Override
public void setup(Configuration conf, int workerId, TableInfo inputTableInfo) {
isEdgeData = conf.get(EDGE_TABLE).equals(inputTableInfo.getTableName());
}
/**
* 根據Record中的內容,解析為對應的邊,並請求添加到圖中。
*
* @param recordNum
* 記錄序列號,從1開始,每個worker上單獨計數
* @param record
* 輸入表中的記錄,三列,分別表示初點、終點、邊的權重
* @param context
* 上下文,請求將解釋後的邊添加到圖中
**/
@Override
public void load(
LongWritable recordNum,
WritableRecord record,
MutationContext<LongWritable, LongWritable, LongWritable, LongWritable> context)
throws IOException {
if (isEdgeData) {
/**
* 數據來源於存儲邊信息的表。
*
* 1、第一列表示初始點的ID
**/
LongWritable sourceVertexID = (LongWritable) record.get(0);
/**
* 2、第二列表示終點的ID
**/
LongWritable destinationVertexID = (LongWritable) record.get(1);
/**
* 3、地三列表示邊的權重
**/
LongWritable edgeValue = (LongWritable) record.get(2);
/**
* 4、創建邊,由終點ID和邊的權重組成
**/
Edge<LongWritable, LongWritable> edge = new Edge<LongWritable, LongWritable>(
destinationVertexID, edgeValue);
/**
* 5、請求給初始點添加邊
**/
context.addEdgeRequest(sourceVertexID, edge);
/**
* 6、如果每條Record表示雙向邊,重複4與5 Edge<LongWritable, LongWritable> edge2 = new
* Edge<LongWritable, LongWritable>( sourceVertexID, edgeValue);
* context.addEdgeRequest(destinationVertexID, edge2);
**/
} else {
/**
* 數據來源於存儲點信息的表。
*
* 1、第一列表示點的ID
**/
LongWritable vertexID = (LongWritable) record.get(0);
/**
* 2、第二列表示點的值
**/
LongWritable vertexValue = (LongWritable) record.get(1);
/**
* 3、創建點,由點的ID和點的值組成
**/
MyVertex vertex = new MyVertex();
/**
* 4、初始化點
**/
vertex.setId(vertexID);
vertex.setValue(vertexValue);
/**
* 5、請求添加點
**/
context.addVertexRequest(vertex);
}
}
}
/**
* 匯總GraphLoader::load(LongWritable, Record, MutationContext)生成的鍵值對,類似於
* com.aliyun.odps.mapreduce.Reducer中的reduce。對於唯一的Vertex ID,所有關於這個ID上
* 添加刪除、點邊的行為都會放在VertexChanges中。
*
* 注意:此處並不隻針對load方法中添加的有衝突的點或邊才調用(衝突是指添加多個相同Vertex對象,添加重複邊等),
* 所有在load方法中請求生成的ID都會在此處被調用。
**/
public static class LoadingResolver extends
VertexResolver<LongWritable, LongWritable, LongWritable, LongWritable> {
/**
* 處理關於一個ID的添加或刪除、點或邊的請求。
*
* VertexChanges有四個接口,分別與MutationContext的四個接口對應:
* VertexChanges::getAddedVertexList()與
* MutationContext::addVertexRequest(Vertex)對應,
* 在load方法中,請求添加的ID相同的Vertex對象,會被匯總在返回的List中
* VertexChanges::getAddedEdgeList()與
* MutationContext::addEdgeRequest(WritableComparable, Edge)
* 對應,請求添加的初始點ID相同的Edge對象,會被匯總在返回的List中
* VertexChanges::getRemovedVertexCount()與
* MutationContext::removeVertexRequest(WritableComparable)
* 對應,請求刪除的ID相同的Vertex,匯總的請求刪除的次數作為返回值
* VertexChanges#getRemovedEdgeList()與
* MutationContext#removeEdgeRequest(WritableComparable, WritableComparable)
* 對應,請求刪除的初始點ID相同的Edge對象,會被匯總在返回的List中
*
* 用戶通過處理關於這個ID的變化,通過返回值聲明此ID是否參與計算,如果返回的Vertex不為null,
* 則此ID會參與隨後的計算,如果返回null,則不會參與計算。
*
* @param vertexId
* 請求添加的點的ID,或請求添加的邊的初點ID
* @param vertex
* 已存在的Vertex對象,數據載入階段,始終為null
* @param vertexChanges
* 此ID上的請求添加刪除、點邊的集合
* @param hasMessages
* 此ID是否有輸入消息,數據載入階段,始終為false
**/
@Override
public Vertex<LongWritable, LongWritable, LongWritable, LongWritable> resolve(
LongWritable vertexId,
Vertex<LongWritable, LongWritable, LongWritable, LongWritable> vertex,
VertexChanges<LongWritable, LongWritable, LongWritable, LongWritable> vertexChanges,
boolean hasMessages) throws IOException {
/**
* 1、獲取Vertex對象,作為參與計算的點。
**/
MyVertex computeVertex = null;
if (vertexChanges.getAddedVertexList() == null
|| vertexChanges.getAddedVertexList().isEmpty()) {
computeVertex = new MyVertex();
computeVertex.setId(vertexId);
} else {
/**
* 此處假設存儲點信息的表中,每個Record表示唯一的點。
**/
computeVertex = (MyVertex) vertexChanges.getAddedVertexList().get(0);
}
/**
* 2、將請求給此點添加的邊,添加到Vertex對象中,如果數據有重複的可能,根據算法需要決定是否去重。
**/
if (vertexChanges.getAddedEdgeList() != null) {
for (Edge<LongWritable, LongWritable> edge : vertexChanges
.getAddedEdgeList()) {
computeVertex.addEdge(edge.getDestVertexId(), edge.getValue());
}
}
/**
* 3、將Vertex對象返回,添加到最終的圖中參與計算。
**/
return computeVertex;
}
}
/**
* 確定參與計算的Vertex的行為。
*
**/
public static class MyVertex extends
Vertex<LongWritable, LongWritable, LongWritable, LongWritable> {
/**
* 將vertex的邊,按照輸入表的格式再寫到結果表。輸入表與輸出表的格式和數據都相同。
*
* @param context
* 運行時上下文
* @param messages
* 輸入消息
**/
@Override
public void compute(
ComputeContext<LongWritable, LongWritable, LongWritable, LongWritable> context,
Iterable<LongWritable> messages) throws IOException {
/**
* 將點的ID和值,寫到存儲點的結果表
**/
context.write("vertex", getId(), getValue());
/**
* 將點的邊,寫到存儲邊的結果表
**/
if (hasEdges()) {
for (Edge<LongWritable, LongWritable> edge : getEdges()) {
context.write("edge", getId(), edge.getDestVertexId(),
edge.getValue());
}
}
/**
* 隻迭代一輪
**/
voteToHalt();
}
}
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
if (args.length < 4) {
throw new IOException(
"Usage: VertexInputFormat <vertex input> <edge input> <vertex output> <edge output>");
}
/**
* GraphJob用於對Graph作業進行配置
*/
GraphJob job = new GraphJob();
/**
* 1、指定輸入的圖數據,並指定存儲邊數據所在的表。
*/
job.addInput(TableInfo.builder().tableName(args[0]).build());
job.addInput(TableInfo.builder().tableName(args[1]).build());
job.set(EDGE_TABLE, args[1]);
/**
* 2、指定載入數據的方式,將Record解釋為Edge,此處類似於map,生成的 key為vertex的ID,value為edge。
*/
job.setGraphLoaderClass(VertexInputLoader.class);
/**
* 3、指定載入數據階段,生成參與計算的vertex。此處類似於reduce,將map 生成的edge合並成一個vertex。
*/
job.setLoadingVertexResolverClass(LoadingResolver.class);
/**
* 4、指定參與計算的vertex的行為。每輪迭代執行vertex.compute方法。
*/
job.setVertexClass(MyVertex.class);
/**
* 5、指定圖作業的輸出表,將計算生成的結果寫到結果表中。
*/
job.addOutput(TableInfo.builder().tableName(args[2]).label("vertex").build());
job.addOutput(TableInfo.builder().tableName(args[3]).label("edge").build());
/**
* 6、提交作業執行。
*/
job.run();
}
}
最後更新:2016-11-23 17:16:04
上一篇:
三角形計數__示例程序_圖模型_大數據計算服務-阿裏雲
下一篇:
輸入邊表示例__示例程序_圖模型_大數據計算服務-阿裏雲
常見錯誤說明__附錄_大數據計算服務-阿裏雲
發送短信接口__API使用手冊_短信服務-阿裏雲
接口文檔__Android_安全組件教程_移動安全-阿裏雲
運營商錯誤碼(聯通)__常見問題_短信服務-阿裏雲
設置短信模板__使用手冊_短信服務-阿裏雲
OSS 權限問題及排查__常見錯誤及排除_最佳實踐_對象存儲 OSS-阿裏雲
消息通知__操作指南_批量計算-阿裏雲
設備端快速接入(MQTT)__快速開始_阿裏雲物聯網套件-阿裏雲
查詢API調用流量數據__API管理相關接口_API_API 網關-阿裏雲
使用STS訪問__JavaScript-SDK_SDK 參考_對象存儲 OSS-阿裏雲