Windows环境运行FlinkDemo

0.引言

Apache Flink 是一个分布式的流式计算引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。

我司相关云服务产品DLI(数据湖探索,Data Lake Insight)完全兼容Apache Flink,且提供了一站式的流处理、批处理、交互式分析的Serverless融合处理分析服务。本博客,总结了本人在初入DLI团队,试着在Windows端编译Flink官方Demo,并成功提交运行FlinkDemo的例子。希望对有需要了解Apache Flink的新同学能有所帮助,也可以对DLIFlink引擎底层原理有一定认识。

1.Flink安装

1.1 预置条件

运行Flink集群,需要有Java运行环境。

可通过在 CMD窗口 中运行 java -version 验证,若未安装,需自行提前预置。

1.2 下载并启动Flink集群

  1. 下载Flink v1.9flink-1.9.0-bin-scala_2.12.tgz),直接解压即可;

     2. 依次进入 flink-1.9.0\bin 文件夹,双击 start-cluster.bat 文件(此时会弹出两个java空白窗口)

1.png

     3. 浏览器输入 http://localhost:8081/#/overview ,正常情况下,会显示如下:

2.png

2. Demo运行

下载的Flink v1.9压缩包解压后自带了几个官方Flink示例。在 flink-1.9.0\examples 中可以找到对应的jar包,本博客将演示如何自定义WordCountDemo jar,并运行 WordCountDemo

2.1 自定义WordCountDemo jar

主类WordCount如下(pom.xml和WordCountData.java见附件):

package org.apache.flink.examples.java.wordcount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args)
            throws Exception {
        ParameterTool params = ParameterTool.fromArgs(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		
        env.getConfig().setGlobalJobParameters(params);
        DataSet<String> text;
        if (params.has("input")) {
            text = env.readTextFile(params.get("input"));
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);
        }
        DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(new int[]{0}).sum(1);
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output"), "\n", " ");
            env.execute("WordCount Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }
    }

    public static final class Tokenizer
            implements FlatMapFunction<String, Tuple2<String, Integer>> {
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2(token, Integer.valueOf(1)));
                }
            }
        }
    }
}

2.2 Flink任务提交

Flink Dashboard提供了Flink任务的提交功能,按下图所示将对应的jar包上传。

3.png

2.3 Word Count

  • 算法含义:顾名思义,Word Count会将输入文档中的所有单词计数,这也是大数据处理的hello world示例
  • 输入:任意一个文本文件
  • 输出:单词和对应的数量
  • 主类:
org.apache.flink.examples.java.wordcount.WordCount
  • 参数配置:
--input D:\learning\flinkDemo\wordCountInput.txt --output D:\learning\flinkDemo\wordCountRet.txt
  • 执行结果:

4.png


(完)