一、Flink介绍

Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,提供支持流处理和批处理两种类型应用的功能。

二、引入依赖

<!-- 引入Flink依赖 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.13.0</version>
</dependency>

三、编写例子

需求

统计数据文件单词出现的次数

准备数据

words.txt

hello word
hello qiang
xiao qiang

编写例子

WordCountConsoleDemo.java

package com.qiang.flink.demo;

import cn.hutool.core.io.FileUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


/**
 * @author: 小强崽
 * @create: 2022/8/30 22:26
 * @description: 统计数据文件单词出现的次数
 **/
@Slf4j
public class WordCountConsoleDemo {

    public static void main(String[] args) throws Exception {
        // 1.创建执行环境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

        // 2.从文本读取数据
        String absolutePath = FileUtil.getAbsolutePath("data/words.txt");
        log.info("从文本读取数据:{}",absolutePath);
        DataSource<String> stringDataSource = executionEnvironment.readTextFile(absolutePath);

        // 3.将每行数据进行分词,转换成二元组
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = stringDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4.按照word进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);

        // 5.分组内进行聚合统计
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);

        // 6.输出结果
        sum.print();
    }
}

输出结果

image-20220901013422906

四、环境搭建

持续更新中…

五、系统架构

六、数据流API

七、任务调度

八、读取Socket文本

九、读取Kafka

十、读取MySQL

十一、输出到MySQL

十二、简单聚合

十三、规约聚合

十四、超时事件