《Flink官方文档》Batch Examples(一)
批处理示例
下面的程序展示了从简单的单词词频统计到图算法等不同的Flink应用。代码展示了Flink数据集API的使用方法。
下面案例和更多案例的完整源码可以参见Flink源码中的flink-examples-batch和 flink-examples-streaming模块。
运行实例
为了运行Flink的例子,我们假设你拥有已经启动的Flink实例。在导航栏中的“Quickstart” 和 “Setup”介绍了启动Flink的几种不同方法。
最简单的方法是运行脚本./bin/start-local.sh,执行后一个启动本地JobManager。
每个编译好的Flink源码包含了一个实例目录,其中包括了此页面每个例子的jar包。
执行如下命令,来运行WordCount例子
./bin/flink run ./examples/batch/WordCount.jar
其他的例子都可以用类似的方式执行
如果运行例子的时候没有带参数,默认使用缺省参数。如果希望使用真实数据来运行WordCount,需要将数据的路径传递进去
./bin/flink run ./examples/batch/WordCount.jar –input /path/to/some/text/data –output /path/to/result
注意非本地文件系统需要标明数据库前缀,比如HDFS://
词频统计
单词词频统计是大数据处理系统“hello world”程序。它计算了文本中的词频。算法分成两步,第一部分,将文本分隔成不同单词。第二步,讲这些单词分组并计数。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("/path/to/file");
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
counts.writeAsCsv(outputPath, "\n", " ");
// User-defined functions
public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
词频统计例子实现了上述描述的算法,需要两个输入参数。–input –output 。测试数据可以替换为任何文本。
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = env.readTextFile("/path/to/file")
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.writeAsCsv(outputPath, "\n", " ")
Page Rank
PageRank算法计算了图中页面的重要性,一个页面到另一页面的点形成了链接,这些链接定义成图。它是迭代式的算法,意味着相同的计算会被重复执行。在每次迭代中,每个页面对它的邻居贡献出相同的评分,并接受来自它的邻居的加权评分作为新的评分。PageRank算法因google搜索引擎众所周知,它被用来计算网页搜索查询结果的评分。
这个例子中,PageRank通过一批迭代和固定次数的迭代来完成。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read the pages and initial ranks by parsing a CSV file
DataSet<Tuple2<Long, Double>> pagesWithRanks = env.readCsvFile(pagesInputPath)
.types(Long.class, Double.class)
// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
DataSet<Tuple2<Long, Long[]>> pageLinkLists = getLinksDataSet(env);
// set iterative data set
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);
DataSet<Tuple2<Long, Double>> newRanks = iteration
// join pages with outgoing edges and distribute rank
.join(pageLinkLists).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
// collect and sum ranks
.groupBy(0).sum(1)
// apply dampening factor
.map(new Dampener(DAMPENING_FACTOR, numPages));
DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
newRanks,
newRanks.join(iteration).where(0).equalTo(0)
// termination condition
.filter(new EpsilonFilter()));
finalPageRanks.writeAsCsv(outputPath, "\n", " ");
// User-defined functions
public static final class JoinVertexWithEdgesMatch
implements FlatJoinFunction<Tuple2<Long, Double>, Tuple2<Long, Long[]>,
Tuple2<Long, Double>> {
@Override
public void join(<Tuple2<Long, Double> page, Tuple2<Long, Long[]> adj,
Collector<Tuple2<Long, Double>> out) {
Long[] neighbors = adj.f1;
double rank = page.f1;
double rankToDistribute = rank / ((double) neigbors.length);
for (int i = 0; i < neighbors.length; i++) {
out.collect(new Tuple2<Long, Double>(neighbors[i], rankToDistribute));
}
}
}
public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
private final double dampening, randomJump;
public Dampener(double dampening, double numVertices) {
this.dampening = dampening;
this.randomJump = (1 - dampening) / numVertices;
}
@Override
public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
value.f1 = (value.f1 * dampening) + randomJump;
return value;
}
}
public static final class EpsilonFilter
implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
@Override
public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
}
}
pagerank 程序实现了上面的例子。需要下面的运行参数–pages –links –output –numPages –iterations 。
scala
// User-defined types
case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// read the pages and initial ranks by parsing a CSV file
val pages = env.readCsvFile[Page](pagesInputPath)
// the links are encoded as an adjacency list: (page-id, Array(neighbor-ids))
val links = env.readCsvFile[Link](linksInputPath)
// assign initial ranks to pages
val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
// build adjacency list from link input
val adjacencyLists = links
// initialize lists
.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
// concatenate lists
.groupBy("sourceId").reduce {
(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
}
// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
currentRanks =>
val newRanks = currentRanks
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
for (targetId <- adjacent.targetIds) {
out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
}
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
.map { p =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
}
// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
(newRanks, termination)
}
val result = finalRanks
// emit result
result.writeAsCsv(outputPath, "\n", " ")
输入文件必须是普通文本文件而且文件必须是遵循下列格式:
–Pages 用long型的ID表示,并以换行符分隔,如”1\n2\n12\n42\n63\n”体现了5个页面,id分别是1, 2, 12, 42, and 63。
–Links表示了多对pageId的组合,每对之间通过空格分隔,不同links用换行符分隔。”1 2\n2 12\n1 12\n42 63\n”表示了(1)->(2), (2)->(12), (1)->(12), and (42)->(63)四个有向链接。
为了这个简单实现至少需要每个页面至少有一个入链接和一个出链接。一个页面可以链接到他自己。
最后更新:2017-05-18 20:36:43