閱讀578 返回首頁    go 阿裏雲 go 技術社區[雲棲]


《Flink官方文檔》Batch Examples(一)

批處理示例

下麵的程序展示了從簡單的單詞詞頻統計到圖算法等不同的Flink應用。代碼展示了Flink數據集API的使用方法。

下麵案例和更多案例的完整源碼可以參見Flink源碼中的flink-examples-batch和 flink-examples-streaming模塊。

運行實例

為了運行Flink的例子,我們假設你擁有已經啟動的Flink實例。在導航欄中的“Quickstart” 和 “Setup”介紹了啟動Flink的幾種不同方法。

最簡單的方法是運行腳本./bin/start-local.sh,執行後一個啟動本地JobManager。

每個編譯好的Flink源碼包含了一個實例目錄,其中包括了此頁麵每個例子的jar包。

執行如下命令,來運行WordCount例子

./bin/flink run ./examples/batch/WordCount.jar
其他的例子都可以用類似的方式執行

如果運行例子的時候沒有帶參數,默認使用缺省參數。如果希望使用真實數據來運行WordCount,需要將數據的路徑傳遞進去

./bin/flink run ./examples/batch/WordCount.jar –input /path/to/some/text/data –output /path/to/result
注意非本地文件係統需要標明數據庫前綴,比如HDFS://

詞頻統計

單詞詞頻統計是大數據處理係統“hello world”程序。它計算了文本中的詞頻。算法分成兩步,第一部分,將文本分隔成不同單詞。第二步,講這些單詞分組並計數。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<String> text = env.readTextFile("/path/to/file");

DataSet<Tuple2<String, Integer>> counts =
        // split up the lines in pairs (2-tuples) containing: (word,1)
        text.flatMap(new Tokenizer())
        // group by the tuple field "0" and sum up tuple field "1"
        .groupBy(0)
        .sum(1);

counts.writeAsCsv(outputPath, "\n", " ");

// User-defined functions
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
            if (token.length() > 0) {
                out.collect(new Tuple2<String, Integer>(token, 1));
            }   
        }
    }
}

詞頻統計例子實現了上述描述的算法,需要兩個輸入參數。–input –output 。測試數據可以替換為任何文本。

val env = ExecutionEnvironment.getExecutionEnvironment

// get input data
val text = env.readTextFile("/path/to/file")

val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
  .map { (_, 1) }
  .groupBy(0)
  .sum(1)

counts.writeAsCsv(outputPath, "\n", " ")

Page Rank

PageRank算法計算了圖中頁麵的重要性,一個頁麵到另一頁麵的點形成了鏈接,這些鏈接定義成圖。它是迭代式的算法,意味著相同的計算會被重複執行。在每次迭代中,每個頁麵對它的鄰居貢獻出相同的評分,並接受來自它的鄰居的加權評分作為新的評分。PageRank算法因google搜索引擎眾所周知,它被用來計算網頁搜索查詢結果的評分。

這個例子中,PageRank通過一批迭代和固定次數的迭代來完成。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// read the pages and initial ranks by parsing a CSV file
DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
                           .types(Long.class, Double.class)

// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);

// set iterative data set
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);

DataSet<Tuple2<Long, Double>> newRanks = iteration
        // join pages with outgoing edges and distribute rank
        .join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
        // collect and sum ranks
        .groupBy(0).sum(1)
        // apply dampening factor
        .map(new Dampener(DAMPENING_FACTOR, numPages));

DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
        newRanks,
        newRanks.join(iteration).where(0).equalTo(0)
        // termination condition
        .filter(new EpsilonFilter()));

finalPageRanks.writeAsCsv(outputPath, "\n", " ");

// User-defined functions

public static final class JoinVertexWithEdgesMatch
                    implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
                                            Tuple2<Long, Double>> {

    @Override
    public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
                        Collector<Tuple2<Long, Double>> out) {
        Long[] neighbors = adj.f1;
        double rank = page.f1;
        double rankToDistribute = rank / ((double) neigbors.length);

        for (int i = 0; i < neighbors.length; i++) {
            out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
        }
    }
}

public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
    private final double dampening, randomJump;

    public Dampener(double dampening, double numVertices) {
        this.dampening = dampening;
        this.randomJump = (1 - dampening) / numVertices;
    }

    @Override
    public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
        value.f1 = (value.f1 * dampening) + randomJump;
        return value;
    }
}

public static final class EpsilonFilter
                implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {

    @Override
    public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
        return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
    }
}

pagerank 程序實現了上麵的例子。需要下麵的運行參數–pages –links –output –numPages –iterations 。

scala

// User-defined types
case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read the pages and initial ranks by parsing a CSV file
val pages = env.readCsvFile[Page](pagesInputPath)

// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
val links = env.readCsvFile[Link](linksInputPath)

// assign initial ranks to pages
val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))

// build adjacency list from link input
val adjacencyLists = links
  // initialize lists
  .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
  // concatenate lists
  .groupBy("sourceId").reduce {
  (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
  }

// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
  currentRanks =>
    val newRanks = currentRanks
      // distribute ranks to target pages
      .join(adjacencyLists).where("pageId").equalTo("sourceId") {
        (page, adjacent, out: Collector[Page]) =>
        for (targetId <- adjacent.targetIds) {
          out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
        }
      }
      // collect ranks and sum them up
      .groupBy("pageId").aggregate(SUM, "rank")
      // apply dampening factor
      .map { p =>
        Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
      }

    // terminate if no rank update was significant
    val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
      (current, next, out: Collector[Int]) =>
        // check for significant update
        if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
    }

    (newRanks, termination)
}

val result = finalRanks

// emit result
result.writeAsCsv(outputPath, "\n", " ")

輸入文件必須是普通文本文件而且文件必須是遵循下列格式:

–Pages 用long型的ID表示,並以換行符分隔,如”1\n2\n12\n42\n63\n”體現了5個頁麵,id分別是1, 2, 12, 42, and 63。

–Links表示了多對pageId的組合,每對之間通過空格分隔,不同links用換行符分隔。”1 2\n2 12\n1 12\n42 63\n”表示了(1)->(2), (2)->(12), (1)->(12), and (42)->(63)四個有向鏈接。

為了這個簡單實現至少需要每個頁麵至少有一個入鏈接和一個出鏈接。一個頁麵可以鏈接到他自己。

最後更新:2017-05-18 20:36:43

  上一篇:go  注意maven打包的filter
  下一篇:go  《Netty實戰》Netty In Action中文版 第1章——Netty——異步和事件驅動(二)