学习下 DataStream API 中几种[定义数据源](
Overview | Apache Flink
)的方式。
官网中给出了 4 种分类:
基于集合(Collection-based)
fromCollection(Collection):从 Java Java.util.Collection 创建数据流。 集合中的所有元素必须属于同一类型。
fromCollection(Iterator, Class):从迭代器创建数据流。 该类指定迭代器返回的元素的数据类型。
fromElements(T ...):从给定的对象序列创建数据流。 所有对象必须属于同一类型。
fromParallelCollection(SplittableIterator, Class):从迭代器并行创建数据流。 该类指定迭代器返回的元素的数据类型。
generateSequence(from, to):并行生成给定区间内的数字序列。已经过时,现在使用
fromSequence
。
package com.learn.flink.source;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
* 流数据 - 数据源 - 基于集合
* DataStream - source - collection
public class SourceDemo_Collection {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream<String> ds1 = env.fromCollection(Arrays.asList("hello world", "hello dog"));
DataStream<String> ds2 = env.fromElements("hello world", "hello dog");
DataStream<Long> ds3 = env.fromSequence(1, 100);
ds1.print();
ds2.print();
ds3.print();
env.execute();
readTextFile(path):逐行读取文本文件,即遵守 TextInputFormat 规范的文件,并将它们作为字符串返回。
readFile(fileInputFormat, path):按照指定的文件输入格式读取(一次)文件。
readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):这是前两个内部调用的方法。 它根据给定的 fileInputFormat 读取路径中的文件。 根据提供的 watchType,此源可能会定期监视(每间隔 ms)新数据的路径 (FileProcessingMode.PROCESS_CONTINUOUSLY),或处理当前路径中的数据并退出 (FileProcessingMode.PROCESS_ONCE)。 使用 pathFilter,用户可以进一步排除正在处理的文件。
在幕后,Flink 将文件读取过程拆分为两个子任务,即目录监控和数据读取。 这些子任务中的每一个都由单独的实体实现。 监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。 后者的并行度等于作业并行度。 单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于watchType),找到要处理的文件,将它们划分为splits,并将这些 splits 分配给下游读者。 读者是将阅读实际数据的人。 每个分片只能由一个读者读取,而一个读者可以一个一个地读取多个分片。
package com.learn.flink.source;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
* 流数据 - 数据源 - 本地或者 HDFS 的文件/文件夹/压缩文件
* DataStream - source - file
public class SourceDemo_File {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream<String> ds1 = env.readTextFile("data/words1");
DataStream<String> ds2 = env.readTextFile("data/dir");
DataStream<String> ds3 = env.readTextFile("data/words1.rar");
ds1.print();
ds2.print();
ds3.print();
env.execute();
基于 socket
socketTextStream: 从 socket 读取。 元素可以由分隔符分隔。
package com.learn.flink.source;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
* 流数据 - 数据源 - socket
* DataStream - source - socket
public class SourceDemo_Socket {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream<String> ds = env.socketTextStream("node01", 999);
DataStream<String> words = ds.flatMap((String value, Collector<String> out) -> {
Arrays.stream(value.split(" ")).forEach(out::collect);
}).returns(Types.STRING);
DataStream<Tuple2<String, Integer>> wordAndOne = words.map(value -> Tuple2.of(value, 1))
.returns(Types.TUPLE(Types.STRING, Types.INT));
final KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(value -> value.f0);
final SingleOutputStreamOperator<Tuple2<String, Integer>> sum = grouped.sum(1);
sum.print();
env.execute();
自定义数据源
addSource:附加一个新的源函数。 例如,要从 Apache Kafka 读取数据,您可以使用 addSource(new FlinkKafkaConsumer<>(...))。 有关更多详细信息,请参阅连接器。
在使用其他数据源时,我们也可以自定义数据源。Flink 提供了数据源接口,我们实现该接口久可以实现自定义数据源,分类如下:
SourceFunction: 非并行数据源,并行度只能是 1
RichSourceFunction:多功能并行数据源,并行度只能是 1
ParallelSourceFunction: 并行数据源,并行度 >= 1
RichParallelSourceFunction: 多功能并行数据源,并行度 >= 1,后续学习的 kafka 数据源使用的就是该接口。
从上面的分类,我们一般都是使用RichParallelSourceFunction
,这个包含了以上 3 种的功能。
package com.learn.flink.source;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.Random;
import java.util.UUID;
* 流数据 - 数据源 - socket
* DataStream - source - socket
public class SourceDemo_Custom {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
final DataStreamSource<Order> orderDS = env.addSource(new MyOrderSource()).setParallelism(1);
orderDS.print();
env.execute();
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Order {
private String id;
private Integer userId;
private Integer money;
private Long createTime;
* 自定义数据源继承 RichParallelSourceFunction
* 多功能并行数据源(并行度可以》=1)
public static class MyOrderSource extends RichParallelSourceFunction<Order> {
private boolean run = true;
@Override
public void run(SourceContext<Order> sourceContext) throws Exception {
final Random random = new Random();
while (run) {
final String oid = UUID.randomUUID().toString();
final int uid = random.nextInt(3);
final int money = random.nextInt(101);
final long createTime = System.currentTimeMillis();
sourceContext.collect(new Order(oid, uid, money, createTime));
Thread.sleep(1000);
@Override
public void cancel() {
run = false;
自定义从 mysql 种读取数据的数据源:
package com.learn.flink.source;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
* 流数据 - 数据源 - socket
* DataStream - source - 自定义数据源
public class SourceDemo_Custom_Mysql {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
final DataStreamSource<Student> orderDS = env.addSource(new MySqlSource()).setParallelism(1);
orderDS.print();
env.execute();
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Student {
private String id;
private String name;
private Integer age;
* 自定义数据源继承 RichParallelSourceFunction
* 多功能并行数据源(并行度可以》=1)
public static class MySqlSource extends RichParallelSourceFunction<Student> {
private boolean run = true;
private Connection conn = null;
private PreparedStatement ps = null;
private ResultSet rs = null;
* open 只执行一次,适合开启资源
@Override
public void open(Configuration parameters) throws Exception {
conn = DriverManager.getConnection("jdbc:mysql://node01:3306/bigdata", "root", "root");
String sql = "select id, name, age from t_student";
ps = conn.prepareStatement(sql);
@Override
public void run(SourceContext<Student> sourceContext) throws Exception {
while (run) {
ResultSet rs = ps.executeQuery();
while (rs.next()) {
final String id = rs.getString("id");
final String name = rs.getString("name");
final int age = rs.getInt("age");
sourceContext.collect(new Student(id, name, age));
Thread.sleep(5000);
@Override
public void cancel() {
run = false;
* 适合关闭资源
* @throws Exception
@Override
public void close() throws Exception {
if (conn != null) conn.close();
if (ps != null) ps.close();
if (rs != null) rs.close();
以上内容使用的 pom 文件为:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.learn</groupId>
<artifactId>flink-demo1</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<java.version>11</java.version>
<flink.version>1.13.2</flink.version>
<mysql.version>8.0.18</mysql.version>
<lombok.version>1.18.20</lombok.version>
<slf4j.version>1.7.25</slf4j.version>
</properties>
<repositories>
<repository>
<id>aliyun</id>
<name>maven-aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>maven-public</id>
<name>maven-public</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</pluginRepository>
</pluginRepositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-release-plugin</artifactId>
<version>2.5.3</version>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
复制代码