閱讀350 返回首頁    go 技術社區[雲棲]


《Flink官方文檔》Batch Examples(二)

連通分支

連通分支算法識別會一個更大的圖,這部分圖通過被相同的組件ID鏈接的所有頂點連接。類似PageRank,連通組件是一個迭代算法。在每個步驟中,每個頂點都將其當前組件ID傳給所有鄰居。如果小於自己的組件ID,一個頂點從鄰居接受組件ID。

此實現使用增量迭代:組件ID未變化的頂點不參與下一步驟。因為後來的迭代通常隻處理一些離群頂點,這將產生更好的性能。

// read vertex and edge data
DataSet<Long> vertices = getVertexDataSet(env);
DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge());

// assign the initial component IDs (equal to the vertex ID)
DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());

// open a delta iteration
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
        verticesWithInitialId.iterateDelta(verticesWithInitialId, maxIterations, 0);

// apply the step logic:
DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
        // join with the edges
        .join(edges).where(0).equalTo(0).with(new NeighborWithComponentIDJoin())
        // select the minimum neighbor component ID
        .groupBy(0).aggregate(Aggregations.MIN, 1)
        // update if the component ID of the candidate is smaller
        .join(iteration.getSolutionSet()).where(0).equalTo(0)
        .flatMap(new ComponentIdFilter());

// close the delta iteration (delta and new workset are identical)
DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);

// emit result
result.writeAsCsv(outputPath, "\n", " ");

// User-defined functions

public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> {

    @Override
    public Tuple2<T, T> map(T vertex) {
        return new Tuple2<T, T>(vertex, vertex);
    }
}

public static final class UndirectEdge
                    implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();

    @Override
    public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) {
        invertedEdge.f0 = edge.f1;
        invertedEdge.f1 = edge.f0;
        out.collect(edge);
        out.collect(invertedEdge);
    }
}

public static final class NeighborWithComponentIDJoin
                implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {

    @Override
    public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) {
        return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1);
    }
}

public static final class ComponentIdFilter
                    implements FlatMapFunction<Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>>,
                                            Tuple2<Long, Long>> {

    @Override
    public void flatMap(Tuple2<Tuple2<Long, Long>, Tuple2<Long, Long>> value,
                        Collector<Tuple2<Long, Long>> out) {
        if (value.f0.f1 < value.f1.f1) {
            out.collect(value.f0);
        }
    }
}

scala

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map { id => (id, id) }

// undirected edges by emitting for each input edge the input edges itself and an inverted
// version
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }

// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
  (s, ws) =>

    // apply the step logic: join with the edges
    val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
      (edge._2, vertex._2)
    }

    // select the minimum neighbor
    val minNeighbors = allNeighbors.groupBy(0).min(1)

    // update if the component of the candidate is smaller
    val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
      (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
        if (newVertex._2 < oldVertex._2) out.collect(newVertex)
    }

    // delta and new workset are identical
    (updatedComponents, updatedComponents)
}


verticesWithComponents.writeAsCsv(outputPath, "\n", " ")

該連通分支程序實現了上述例子。它需要運行下列參數:–vertices –edges –output –iterations 。
輸入文件是純文本文件,必須格式化如下:

–Vertices 以IDS表示的頂點,由換行字符分隔。例如“1\n2\n12\n42\n63\n”給出了五個訂單(1)、(2)、(12)、(42)和(63)。

–Edges 邊通過以空格分隔的兩個頂點ID表示。不同邊是由換行符分隔。例如“1 2\n2 12\n1 12\n42 63\n”表示了四個無方向鏈接(1)-(2)、(2)-(12)、(1)-(12)和(42)-(63)。

關係型查詢

關係型查詢示例假定會使用兩張表,一張訂單表,另一張是TPC-H決策支持基準測試表。TPC-H是數據庫行業標準基準測試。如何生成輸入數據請參見下麵的說明。

該示例實現以下sql查詢。
SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
FROM orders, lineitem
WHERE l_orderkey = o_orderkey
AND o_orderstatus = "F"
AND YEAR(o_orderdate) > 1993
AND o_orderpriority LIKE "5%"
GROUP BY l_orderkey, o_shippriority;

Flink程序中按照如下的方式進行sql查詢

// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
DataSet<Tuple5<Integer, String, String, String, Integer>> orders = getOrdersDataSet(env);
// get lineitem data set: (orderkey, extendedprice)
DataSet<Tuple2<Integer, Double>> lineitems = getLineitemDataSet(env);

// orders filtered by year: (orderkey, custkey)
DataSet<Tuple2<Integer, Integer>> ordersFilteredByYear =
        // filter orders
        orders.filter(
            new FilterFunction<Tuple5<Integer, String, String, String, Integer>>() {
                @Override
                public boolean filter(Tuple5<Integer, String, String, String, Integer> t) {
                    // status filter
                    if(!t.f1.equals(STATUS_FILTER)) {
                        return false;
                    // year filter
                    } else if(Integer.parseInt(t.f2.substring(0, 4)) <= YEAR_FILTER) {
                        return false;
                    // order priority filter
                    } else if(!t.f3.startsWith(OPRIO_FILTER)) {
                        return false;
                    }
                    return true;
                }
            })
        // project fields out that are no longer required
        .project(0,4).types(Integer.class, Integer.class);

// join orders with lineitems: (orderkey, shippriority, extendedprice)
DataSet<Tuple3<Integer, Integer, Double>> lineitemsOfOrders =
        ordersFilteredByYear.joinWithHuge(lineitems)
                            .where(0).equalTo(0)
                            .projectFirst(0,1).projectSecond(1)
                            .types(Integer.class, Integer.class, Double.class);

// extendedprice sums: (orderkey, shippriority, sum(extendedprice))
DataSet<Tuple3<Integer, Integer, Double>> priceSums =
        // group by order and sum extendedprice
        lineitemsOfOrders.groupBy(0,1).aggregate(Aggregations.SUM, 2);

// emit result
priceSums.writeAsCsv(outputPath);

缺少scala例子(譯者注)

關係查詢程序實現了上述查詢。它需要以下參數運行–orders –lineitem –output 。
order和lineitem文件可以使用TPC-H基準測試套件的數據生成工具(DBGEN)生成。采取以下步驟生成需提供給flink程序輸入的任意大小的數據文件。

1、下載並解壓DBGEN

2、複製makefile.suite並更名為Makefile,編輯修改如下:

DATABASE = DB2
MACHINE  = LINUX
WORKLOAD = TPCH
CC       = gcc

1、使用make命令構建DBGEN

2、使用DBGEN生成lineitem和orders表。-s命令傳入1,將會一個生成約1 GB的大小的數據集。

./dbgen -T o -s 1


轉載自 並發編程網 - ifeve.com

最後更新:2017-05-18 20:36:45

  上一篇:go  《KAFKA官方文檔》使用場景
  下一篇:go  注意maven打包的filter