一、Flink介绍
Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,提供支持流处理和批处理两种类型应用的功能。
二、引入依赖
<!-- 引入Flink依赖 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
三、编写例子
需求
统计数据文件单词出现的次数
准备数据
words.txt
hello word
hello qiang
xiao qiang
编写例子
WordCountConsoleDemo.java
package com.qiang.flink.demo;
import cn.hutool.core.io.FileUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* @author: 小强崽
* @create: 2022/8/30 22:26
* @description: 统计数据文件单词出现的次数
**/
@Slf4j
public class WordCountConsoleDemo {
public static void main(String[] args) throws Exception {
// 1.创建执行环境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
// 2.从文本读取数据
String absolutePath = FileUtil.getAbsolutePath("data/words.txt");
log.info("从文本读取数据:{}",absolutePath);
DataSource<String> stringDataSource = executionEnvironment.readTextFile(absolutePath);
// 3.将每行数据进行分词,转换成二元组
FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = stringDataSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
String[] words = line.split(" ");
for (String word : words) {
out.collect(Tuple2.of(word, 1L));
}
}).returns(Types.TUPLE(Types.STRING, Types.LONG));
// 4.按照word进行分组
UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);
// 5.分组内进行聚合统计
AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
// 6.输出结果
sum.print();
}
}
输出结果
四、环境搭建
持续更新中…