0.引言
Apache Flink 是一个分布式的流式计算引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。
我司相关云服务产品DLI(数据湖探索,Data Lake Insight)完全兼容Apache Flink,且提供了一站式的流处理、批处理、交互式分析的Serverless融合处理分析服务。本博客,总结了本人在初入DLI团队,试着在Windows端编译Flink官方Demo,并成功提交运行FlinkDemo的例子。希望对有需要了解Apache Flink的新同学能有所帮助,也可以对DLI的Flink引擎底层原理有一定认识。
1.Flink安装
1.1 预置条件
运行Flink集群,需要有Java运行环境。
可通过在 CMD窗口 中运行 java -version 验证,若未安装,需自行提前预置。
1.2 下载并启动Flink集群
- 下载Flink v1.9(flink-1.9.0-bin-scala_2.12.tgz),直接解压即可;
2. 依次进入 flink-1.9.0\bin 文件夹,双击 start-cluster.bat 文件(此时会弹出两个java空白窗口)
3. 浏览器输入 http://localhost:8081/#/overview ,正常情况下,会显示如下:
2. Demo运行
下载的Flink v1.9压缩包解压后自带了几个官方Flink示例。在 flink-1.9.0\examples 中可以找到对应的jar包,本博客将演示如何自定义WordCountDemo jar,并运行 WordCountDemo。
2.1 自定义WordCountDemo jar
主类WordCount如下(pom.xml和WordCountData.java见附件):
package org.apache.flink.examples.java.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args)
throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(params);
DataSet<String> text;
if (params.has("input")) {
text = env.readTextFile(params.get("input"));
} else {
System.out.println("Executing WordCount example with default input data set.");
System.out.println("Use --input to specify file input.");
text = WordCountData.getDefaultTextLineDataSet(env);
}
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(new int[]{0}).sum(1);
if (params.has("output")) {
counts.writeAsCsv(params.get("output"), "\n", " ");
env.execute("WordCount Example");
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
}
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2(token, Integer.valueOf(1)));
}
}
}
}
}
2.2 Flink任务提交
Flink Dashboard提供了Flink任务的提交功能,按下图所示将对应的jar包上传。
2.3 Word Count
- 算法含义:顾名思义,Word Count会将输入文档中的所有单词计数,这也是大数据处理的hello world示例
- 输入:任意一个文本文件
- 输出:单词和对应的数量
- 主类:
org.apache.flink.examples.java.wordcount.WordCount
- 参数配置:
--input D:\learning\flinkDemo\wordCountInput.txt --output D:\learning\flinkDemo\wordCountRet.txt
- 执行结果: