学习下 DataStream API 中几种[定义数据源]( Overview | Apache Flink )的方式。 官网中给出了 4 种分类:

基于集合(Collection-based)

  • fromCollection(Collection):从 Java Java.util.Collection 创建数据流。 集合中的所有元素必须属于同一类型。
  • fromCollection(Iterator, Class):从迭代器创建数据流。 该类指定迭代器返回的元素的数据类型。
  • fromElements(T ...):从给定的对象序列创建数据流。 所有对象必须属于同一类型。
  • fromParallelCollection(SplittableIterator, Class):从迭代器并行创建数据流。 该类指定迭代器返回的元素的数据类型。
  • generateSequence(from, to):并行生成给定区间内的数字序列。已经过时,现在使用 fromSequence
  • package com.learn.flink.source;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import java.util.Arrays;
     * 流数据 - 数据源 - 基于集合
     * DataStream - source - collection
    public class SourceDemo_Collection {
        public static void main(String[] args) throws Exception {
            // 0: env
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            // 1: source
            //fromCollection(Collection)
            DataStream<String> ds1 = env.fromCollection(Arrays.asList("hello world", "hello dog"));
            //fromElements(T ...)
            DataStream<String> ds2 = env.fromElements("hello world", "hello dog");
            //fromSequence(from, to)
            DataStream<Long> ds3 = env.fromSequence(1, 100);
            // 2. transformation ...
            // 3: sink
            ds1.print();
            ds2.print();
            ds3.print();
            // 4: execute
            env.execute();
    
  • readTextFile(path):逐行读取文本文件,即遵守 TextInputFormat 规范的文件,并将它们作为字符串返回。
  • readFile(fileInputFormat, path):按照指定的文件输入格式读取(一次)文件。
  • readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo):这是前两个内部调用的方法。 它根据给定的 fileInputFormat 读取路径中的文件。 根据提供的 watchType,此源可能会定期监视(每间隔 ms)新数据的路径 (FileProcessingMode.PROCESS_CONTINUOUSLY),或处理当前路径中的数据并退出 (FileProcessingMode.PROCESS_ONCE)。 使用 pathFilter,用户可以进一步排除正在处理的文件。
  • 在幕后,Flink 将文件读取过程拆分为两个子任务,即目录监控和数据读取。 这些子任务中的每一个都由单独的实体实现。 监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。 后者的并行度等于作业并行度。 单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于watchType),找到要处理的文件,将它们划分为splits,并将这些 splits 分配给下游读者。 读者是将阅读实际数据的人。 每个分片只能由一个读者读取,而一个读者可以一个一个地读取多个分片。

    package com.learn.flink.source;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
     * 流数据 - 数据源 - 本地或者 HDFS 的文件/文件夹/压缩文件
     * DataStream - source - file
    public class SourceDemo_File {
        public static void main(String[] args) throws Exception {
            // 0: env
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            // 1: source
            DataStream<String> ds1 = env.readTextFile("data/words1");
            DataStream<String> ds2 = env.readTextFile("data/dir");
            DataStream<String> ds3 = env.readTextFile("data/words1.rar");
            // 2. transformation
            // 3: sink
            ds1.print();
            ds2.print();
            ds3.print();
    //        ds4.print();
            // 4: execute
            env.execute();
    

    基于 socket

  • socketTextStream: 从 socket 读取。 元素可以由分隔符分隔。
  • package com.learn.flink.source;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    import java.util.Arrays;
     * 流数据 - 数据源 - socket
     * DataStream - source - socket
    public class SourceDemo_Socket {
        public static void main(String[] args) throws Exception {
            // 0: env
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            // 1: source
            DataStream<String> ds = env.socketTextStream("node01", 999);
            // 2. transformation
            // 切割
            DataStream<String> words = ds.flatMap((String value, Collector<String> out) -> {
                Arrays.stream(value.split(" ")).forEach(out::collect);
            }).returns(Types.STRING);
            // 每个词计数为1
            DataStream<Tuple2<String, Integer>> wordAndOne = words.map(value -> Tuple2.of(value, 1))
                    .returns(Types.TUPLE(Types.STRING, Types.INT));
            // 分组
            final KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(value -> value.f0);
            // 聚合
            final SingleOutputStreamOperator<Tuple2<String, Integer>> sum = grouped.sum(1);
            // 3: sink
            sum.print();
            // 4: execute
            env.execute();
    

    自定义数据源

  • addSource:附加一个新的源函数。 例如,要从 Apache Kafka 读取数据,您可以使用 addSource(new FlinkKafkaConsumer<>(...))。 有关更多详细信息,请参阅连接器。 在使用其他数据源时,我们也可以自定义数据源。Flink 提供了数据源接口,我们实现该接口久可以实现自定义数据源,分类如下:
  • SourceFunction: 非并行数据源,并行度只能是 1
  • RichSourceFunction:多功能并行数据源,并行度只能是 1
  • ParallelSourceFunction: 并行数据源,并行度 >= 1
  • RichParallelSourceFunction: 多功能并行数据源,并行度 >= 1,后续学习的 kafka 数据源使用的就是该接口。
  • 从上面的分类,我们一般都是使用RichParallelSourceFunction,这个包含了以上 3 种的功能。

    package com.learn.flink.source;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    import java.util.Random;
    import java.util.UUID;
     * 流数据 - 数据源 - socket
     * DataStream - source - socket
    public class SourceDemo_Custom {
        public static void main(String[] args) throws Exception {
            // 0: env
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            // 1: source
            final DataStreamSource<Order> orderDS = env.addSource(new MyOrderSource()).setParallelism(1);
            // 2. transformation
            // 3: sink
            orderDS.print();
            // 4: execute
            env.execute();
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Order {
            private String id;
            private Integer userId;
            private Integer money;
            private Long createTime;
         * 自定义数据源继承 RichParallelSourceFunction
         * 多功能并行数据源(并行度可以》=1)
        public static class MyOrderSource extends RichParallelSourceFunction<Order> {
            private boolean run = true;
            @Override
            public void run(SourceContext<Order> sourceContext) throws Exception {
                final Random random = new Random();
                while (run) {
                    final String oid = UUID.randomUUID().toString();
                    final int uid = random.nextInt(3);
                    final int money = random.nextInt(101);
                    final long createTime = System.currentTimeMillis();
                    sourceContext.collect(new Order(oid, uid, money, createTime));
                    // 每个 1 秒执行一次
                    Thread.sleep(1000);
            @Override
            public void cancel() {
                // 在 cancel 执行时,不再产生数据
                run = false;
    

    自定义从 mysql 种读取数据的数据源:

    package com.learn.flink.source;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
     * 流数据 - 数据源 - socket
     * DataStream - source - 自定义数据源
    public class SourceDemo_Custom_Mysql {
        public static void main(String[] args) throws Exception {
            // 0: env
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            // 1: source
            final DataStreamSource<Student> orderDS = env.addSource(new MySqlSource()).setParallelism(1);
            // 2. transformation
            // 3: sink
            orderDS.print();
            // 4: execute
            env.execute();
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class Student {
            private String id;
            private String name;
            private Integer age;
         * 自定义数据源继承 RichParallelSourceFunction
         * 多功能并行数据源(并行度可以》=1)
        public static class MySqlSource extends RichParallelSourceFunction<Student> {
            private boolean run = true;
            private Connection conn = null;
            private PreparedStatement ps = null;
            private ResultSet rs = null;
             * open 只执行一次,适合开启资源
            @Override
            public void open(Configuration parameters) throws Exception {
                conn = DriverManager.getConnection("jdbc:mysql://node01:3306/bigdata", "root", "root");
                String sql = "select id, name, age from t_student";
                ps = conn.prepareStatement(sql);
            @Override
            public void run(SourceContext<Student> sourceContext) throws Exception {
                while (run) {
                    ResultSet rs = ps.executeQuery();
                    while (rs.next()) {
                        final String id = rs.getString("id");
                        final String name = rs.getString("name");
                        final int age = rs.getInt("age");
                        sourceContext.collect(new Student(id, name, age));
                    Thread.sleep(5000);
            @Override
            public void cancel() {
                run = false;
             * 适合关闭资源
             * @throws Exception
            @Override
            public void close() throws Exception {
                if (conn != null) conn.close();
                if (ps != null) ps.close();
                if (rs != null) rs.close();
    

    以上内容使用的 pom 文件为:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.learn</groupId>
        <artifactId>flink-demo1</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <maven.compiler.source>11</maven.compiler.source>
            <maven.compiler.target>11</maven.compiler.target>
            <java.version>11</java.version>
            <flink.version>1.13.2</flink.version>
            <mysql.version>8.0.18</mysql.version>
            <lombok.version>1.18.20</lombok.version>
            <slf4j.version>1.7.25</slf4j.version>
        </properties>
        <!-- 指定仓库地址 -->
        <repositories>
            <repository>
                <id>aliyun</id>
                <name>maven-aliyun</name>
                <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            </repository>
        </repositories>
        <pluginRepositories>
            <pluginRepository>
                <id>maven-public</id>
                <name>maven-public</name>
                <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            </pluginRepository>
        </pluginRepositories>
        <!-- 配置依赖 -->
        <dependencies>
            <!-- flink -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <!-- mysql -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>${mysql.version}</version>
            </dependency>
            <!-- slf4j -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>${slf4j.version}</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.1</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                    </configuration>
                </plugin>
                <plugin>
                    <artifactId>maven-release-plugin</artifactId>
                    <version>2.5.3</version>
                </plugin>
                <plugin>
                    <artifactId>maven-source-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>jar-no-fork</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    复制代码
    分类:
    后端
    标签: