相关文章推荐
干练的茶壶  ·  Fortran程序: ...·  5 月前    · 
豪气的板栗  ·  Python。在json.load()与js ...·  1 年前    · 
跑龙套的牛肉面  ·  Reporting ...·  1 年前    · 
孤独的脆皮肠  ·  COGNOS ...·  1 年前    · 
近视的橙子  ·  (转载)在Eclipse ...·  1 年前    · 

需要读取HDFS上变化的日志文件,对每一行进行处理,就是类似于Linux中tail -f实现的功能。

看了看好像Spark和Flink都没有类似的支持,于是就用Flink自定义了Source实现了这个功能。

维持一个当前读取位置的偏移量,然后每隔几秒去看下文件的大小是否大于当前偏移量。如果最新文件大小大于当前偏移量就读取数据,并将当前偏移量设置为最新的文件大小;反之,不做任何操作。

以下的代码,还没有把当前读取位置存储到状态中,如果重启会重头开始读。

自定义Source
package com.upupfeng.source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
 * 自定义Source实现对HDFS上的文件进行"tail -f"的类似操作
 * @author mawf
public class TailHdfsFileSource extends RichSourceFunction<String> {
    // 当前读取到的偏移量
    private volatile Long currentPos = 0L;
    // 运行flag
    private volatile Boolean running;
    // Flink HDFS FileSystem的配置
    private Configuration configuration;
    // 要监听的文件的目录
    private String path;
    // 每次轮询的间隔。秒
    private Integer duration;
    public TailHdfsFileSource(Configuration configuration, String path, Integer duration) {
        this.configuration = configuration;
        this.path = path;
        this.duration = duration;
        init();
    // 初始化
    private void init() {
        running = true;
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        FileSystem.initialize(configuration, null);
        FileSystem fileSystem = FileSystem.get(FileSystem.getDefaultFsUri());
        while (running) {
            Long latestLength = getLatestLength(fileSystem);
            if (latestLength > currentPos) {
                collectRecords(ctx, fileSystem, latestLength);
            Thread.sleep(duration * 1000);
    // 收集记录
    public void collectRecords(SourceContext<String> ctx, FileSystem fs, Long latestLength) throws IOException {
        FSDataInputStream dataInputStream = fs.open(new Path(path));
        // 移动InputStream的偏移量
        dataInputStream.seek(currentPos);
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(dataInputStream));
        String line;
        while ((line = bufferedReader.readLine()) != null) {
            ctx.collect(line);
        // 更新偏移量
        currentPos = latestLength;
    // 获取最新的文件大小
    public Long getLatestLength(FileSystem fs) throws IOException {
        FileStatus fileStatus = fs.getFileStatus(new Path(path));
        return fileStatus.getLen();
    @Override
    public void cancel() {
        running = false;
使用自定义的Source
package com.upupfeng.demo;
import com.upupfeng.source.TailHdfsFileSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 * @author mawf
public class TailHdfsFileDemo {
    public static void main(String[] args) throws Exception {
        System.setProperty("user.name", "root");
        // 创建配置对象
        Configuration configuration = new Configuration();
        configuration.setString("fs.default-scheme", "hdfs://hadoop1:8020");
        String path = "/user/mwf/a.log";
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        TailHdfsFileSource tailHdfsFileSource = new TailHdfsFileSource(configuration, path, 5);
        env.addSource(tailHdfsFileSource)
                .setParallelism(1)
                .print();
        env.execute();
                    背景需要读取HDFS上变化的日志文件,对每一行进行处理,就是类似于Linux中tail -f实现的功能。看了看好像Spark和Flink都没有类似的支持,于是就用Flink自定义了Source实现了这个功能。实现思路维持一个当前读取位置的偏移量,然后每隔几秒去看下文件的大小是否大于当前偏移量。如果最新文件大小大于当前偏移量就读取数据,并将当前偏移量设置为最新的文件大小;反之,不做任何操作。以下的代码,还没有把当前读取位置存储到状态中,如果重启会重头开始读。实现代码自定义Sourcepacka
				
本博客基于1.13.6版本Flink实现批处理提交到yarn执行,并实现读取HDFS文件实现批处理,有完整的Flink代码,已完成编译,开箱简单修改即可用,避免了大家编程、编译、提交yarn上的一些错误。 本工程提供 1、项目源码及详细注释,简单修改即可用在实际生产代码 2、成功编译截图 3、linux提交命令 4、提交到yarn上截图 5、自己编译过程中可能出现的问题 6、执行结果
<groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.1.3</version> </dependency> <dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-filesystem_2.11</artifactId> <versio...
目录1.引入依赖2.创建hdfs文件3.从hdfs读取数据 1.引入依赖 <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-hadoop-compatibility --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.12&
文章目录01 引言02 Source2.1 基于集合的Source2.2 基于文件Source2.3 基于Socket的Source2.4 自定义Source2.4.1 案例 - 随机生成数据2.4.2 案例 - MySQL03 Source 01 引言 在前面的博客,我们已经对Flink的原理有了一定的了解了,有兴趣的同学可以参阅下: 《Flink教程(01)- Flink知识图谱》 《Flink教程(02)- Flink入门》 《Flink教程(03)- Flink环境搭建》 《Flink教程(04
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hadoop-compatibility_2.11</artifactId> import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.functions.source.SourceFunction; import jav
Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源(Source)。 从java的集合中读取数据 一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的。 //1.创建流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.创建集合 List<WaterSe.
Flink是一个高性能流式处理引擎,可以读取各种各样的数据源,包括自定义的源。自定义源是使用Flink的一种方式,主要是为了读取一些非标准的数据源或者改善性能表现。 自定义source是一个接口,需要实现org.apache.flink.streaming.api.functions.source.SourceFunction接口。该接口只有两个方法,一个是run(),另一个是cancel()。在run()中实现数据读取的逻辑,cancel()用于取消读取。自定义source主要包括数据什么时候开始读取,如何读取数据及什么时候读取结束等。 实现自定义source需要在程序入口处调用StreamExecutionEnvironment对象中的addSource()方法,将自定义source添加到批处理中。示例如下: ```java DataStreamSource<String> dataSource = env.addSource(new MySource()); 其中,MySource自定义的数据源。 在自定义source中,可以采用文件缓存方式来提升读取性能。通过FileChannel打开文件,使用ByteBuffer读取文件,然后将ByteBuffer通过Flink的DataStream传递给后续算子处理。这种方式可以大大提升文件读取的性能,减少文件IO的次数。示例如下: ```java try { FileInputStream inputStream = new FileInputStream(filePath); FileChannel inChannel = inputStream.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024 * 5); while (inChannel.read(buffer) != -1) { buffer.flip(); sourceContext.collect(buffer); buffer.clear(); } catch (IOException e) { e.printStackTrace(); 自定义source实现需要根据具体的数据源进行,但总体来说,实现自定义源并不复杂,只需要理解Flink数据处理的机制,并编写封装好的代码即可。