Spark3.2 VS Flink1.15关于window和watermark功能

Spark3.2 VS Flink1.15关于window和watermark功能

(PS:本次验证用到的组件分别为:kafka2.0.0、Spark3.2.0、Flink1.52.3

涉及到的代码已经全部提交到实战项目中:

github.com/Anryg/intern


上篇文章写了关于 Flink 和 Spark 的一个非常简单的流式处理案例,虽然需求很简单,但也算是完成了一个完整流式数据处理的流程。


整个流程分为:数据源读取、数据ETL处理、数据结果的sink。


经过这么一个完整的处理流程,你就能感受到一个大数据工具如何将其应用到生产,去解决一个实际问题时会存在哪些实实在在的坑。


继上次趟过一些坑之后,这次我领着大家继续,这次咱们分别来看看Spark和Flink流式处理中的 window watermark 功能,同样还是用之前给大家提供的数据集,具体需求场景如下:

从kafka读取数据源,实时统计时间窗口为2分钟的,排名前十的上网用户的数量(客户端IP数量),滑动窗口为30秒(即分析结果每30s更新一次),且数据容忍的迟到时间为10s,然后把结果输出到控制台。

看着是不是也挺简单的,其实就是一个基于时间滑动窗口 worldcount ,但是也别小瞧这个案例,生产上很多时候就是面对这样的一些需求,但是想做好,也没有想象中那么简单。


简单分析一下这个需求:

既然是统计client_ip基于时间的数量,那么就只需要取client_ip以及上网时间(time)这两个字段就可以了,然后基于时间窗口进行聚合就能得到结果。


下面分别从Spark的structured streaming以及Flink的API来实现:



1.Spark的实现


先看Structuredstreaming的实现,参考代码如下:

packagecom.anryg.bigdata.streaming.demo.window_watermark importjava.sql.Timestamp importjava.text.SimpleDateFormat importorg.apache.log4j.{Level,Logger} importorg.apache.spark.SparkConf importorg.apache.spark.sql.SparkSession importorg.apache.spark.sql.streaming.OutputMode *@DESC:用时间窗口和watermark来进行client_ip的worldcount统计 *@Auther:Anryg *@Date:2022/11/3010:04 objectWorldCountWithWatermark{ defmain(args:Array[String]):Unit={ valconf=newSparkConf().setAppName("WorldCountWithWatermark").setMaster("local") valspark=SparkSession.builder() .config(conf) .getOrCreate() Logger.getLogger("org.apache").setLevel(Level.WARN)//减少INFO日志的输出 valrawDF=spark.readStream .format("kafka") .option("kafka.bootstrap.servers","${kafka_broker}:6667") .option("subscribe","${topic}") //.option("group.id","test9999")/**不再用该方式来绑定offset,而是每个程序有个唯一的id,该id跟checkpointLocation绑定,虽然group.id属性在运行中依然保留,但是不再跟offset绑定*/ .option("failOnDataLoss",false) .option("fetchOffset.numRetries",3) //.option("maxOffsetsPerTrigger",Integer.MAX_VALUE)/**用于限流,限定每个批次取的数据条数,确定写入HDFS单个文件的条数*/ .option("startingOffsets","latest") .load() importspark.implicits._ valdf1=rawDF.selectExpr("CAST(valueASstring)") .map(row=>{ valline=row.getAs[String]("value") valfieldArray:Array[String]=line.split("\\|") fieldArray .filter(_.length==9)//确定字段数必须为9个 .filter(_(1).endsWith("com"))//防止数量太大,对访问的网站做的一点限制 .map(array=>{ valsdf=newSimpleDateFormat("yyyyMMddhhmmss").parse(array(2)) valtime=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(sdf) (array(0),Timestamp.valueOf(time))//因为time这个字段要作为watermark字段,它必须是yyyy-MM-ddHH:mm:ss样式的Timestamp类型 .toDF("client_ip","time")//添加schema importorg.apache.spark.sql.functions._/**引入spark内置函数*/ valdf2=df1.withWatermark("time","10seconds")//一般需要跟window一起配合使用 .groupBy(window($"time","2minutes","30seconds"),$"client_ip")//确定具体字段,以及对应的聚合时间窗口,和滑动窗口 .count() .orderBy($"count".desc) .limit(10) valquery=df2.writeStream .format("console")//打印到控制台 .option("truncate",false)//将结果的内容完整输出,默认会砍掉内容过长的部分 .option("numRows",30)//一次最多打印多少行,默认20行 .option("checkpointLocation","hdfs://${namenode}/tmp/offset/WorldCountWithWatermark")//确定checkpoint目录 //.outputMode(OutputMode.Update())//不支持排序的结果 .outputMode(OutputMode.Complete())//确定输出模式,默认为Append .start() query.awaitTermination()

来看下运行效果,如下:

其实代码还是挺简单的(代码中有比较详细的注释),整个内容完全参考官网,也挺好理解的,个人认为这方面的文档比FLink的官方文档要更加清晰,易懂,不信,你可以去做个对比。



2.Flink DataStream的实现


因为对于Flink来说,实时的API有两套,个DataStream API另一个是Table API。


因为就在前两天,我在用Table API的时候遇到一个大坑,当时因为时间有限,没有趟过去(不过现在已经趟过了),于是这次就想着还是先用DataStreamAPI吧。


来,对应的代码实现如下:

packagecom.anryg.window_and_watermark importjava.text.SimpleDateFormat importjava.time.Duration importjava.util.Locale importorg.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,WatermarkStrategy} importorg.apache.flink.api.common.serialization.{SimpleStringEncoder,SimpleStringSchema} importorg.apache.flink.configuration.MemorySize importorg.apache.flink.connector.kafka.source.KafkaSource importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer importorg.apache.flink.core.fs.Path importorg.apache.flink.streaming.api.CheckpointingMode importorg.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment importorg.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows,SlidingProcessingTimeWindows,TumblingEventTimeWindows,TumblingProcessingTimeWindows} importorg.apache.flink.streaming.api.windowing.time importorg.apache.flink.streaming.api.windowing.time.Time *@DESC:读取kafka数据,从DataStream到HDFS *@Auther:Anryg *@Date:2022/8/1419:08 objectFlinkDSFromKafkaWithWatermark{ privatefinalvalhdfsPrefix="hdfs://${namenode}:8020" defmain(args:Array[String]):Unit={ //获取流任务的环境变量 valenv=StreamExecutionEnvironment.getExecutionEnvironment .enableCheckpointing(10000,CheckpointingMode.EXACTLY_ONCE)//打开checkpoint功能 env.getCheckpointConfig.setCheckpointStorage(hdfsPrefix+"/tmp/flink_checkpoint/FlinkDSFromKafkaWithWatermark")//设置checkpoint的hdfs目录 env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//设置checkpoint记录的保留策略 valkafkaSource=KafkaSource.builder()//获取kafka数据源 .setBootstrapServers("${kafka_broker}:6667") .setTopics("${topic}") .setGroupId("FlinkDSFromKafkaWithWatermark") .setStartingOffsets(OffsetsInitializer.latest()) .setValueOnlyDeserializer(newSimpleStringSchema()) .build() importorg.apache.flink.streaming.api.scala._//引入隐私转换函数 valkafkaDS=env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks() ,"kafka-data")//读取数据源生成DataStream对象 valtargetDS=kafkaDS.map(line=>{//对数据源做简单的ETL处理 line.split("\\|") }).filter(_.length==9).filter(_(1).endsWith("com")) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(10))//指定watermark .withTimestampAssigner(newSerializableTimestampAssigner[Array[String]]{ overridedefextractTimestamp(element:Array[String],recordTimestamp:Long):Long={ valsdf=newSimpleDateFormat("yyyyMMddhhmmss") sdf.parse(element(2)).getTime//指定的watermark字段必须是Long类型的时间戳 .map(array=>(array(0),1)) .keyBy(kv=>kv._1)//根据client_ip聚合 .window(SlidingProcessingTimeWindows.of(Time.minutes(2),Time.seconds(30)))//指定window,这里的windowassigner必须是基于ProcessTime而不是EventTime,因为数据时间跟当前真实时间相差有点多 .sum(1) targetDS.print()//打印结果 env.execute("FlinkDSFromKafkaWithWatermark")//启动任务

但是,这个实现好像并不十分优雅,一来这个结果不好排序,二来显示的内容没有上面的spark实现的好看。


输出的截图如下,凑合看一下:


总感觉这个输出结果怪怪的,实际生产我们肯定不能这么用,看来这还得用Table API。



3.Flink Table API的实现


来,下面这部分代码就是我用Table API的实现,完全根据官方文档一步步来的:

packagecom.anryg.window_and_watermark importjava.sql.Timestamp importjava.text.SimpleDateFormat importjava.time.Duration importorg.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,WatermarkStrategy} importorg.apache.flink.api.common.serialization.SimpleStringSchema importorg.apache.flink.connector.kafka.source.KafkaSource importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer importorg.apache.flink.streaming.api.CheckpointingMode importorg.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup importorg.apache.flink.streaming.api.scala._ importorg.apache.flink.table.api.bridge.scala.StreamTableEnvironment *@DESC:读取kafka数据,从DataStreamAPI转为TableAPI,并利用watermark *@Auther:Anryg *@Date:2022/8/1419:08 objectFlinkTBFromKafkaWithWatermark{ privatefinalvalhdfsPrefix="hdfs://192.168.211.106:8020"//HDFS地址前缀 defmain(args:Array[String]):Unit={ valenv=StreamExecutionEnvironment.getExecutionEnvironment//获取流环境变量 .enableCheckpointing(10000,CheckpointingMode.EXACTLY_ONCE)//打开checkpoint功能 valtableEnv=StreamTableEnvironment.create(env)//创建Table环境变量 env.getCheckpointConfig.setCheckpointStorage(hdfsPrefix+"/tmp/flink_checkpoint/FlinkDSFromKafkaWithWatermark")//设置checkpoint的hdfs目录 env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//设置checkpoint记录的保留策略 valkafkaSource=KafkaSource.builder() .setBootstrapServers("192.168.211.107:6667") .setTopics("qianxin") .setGroupId("FlinkTBFromKafkaWithWatermark") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(newSimpleStringSchema()) .build() valkafkaDS=env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-data") valtargetDS=kafkaDS.map(_.split("\\|")) .filter(_.length==9) .filter(_(1).endsWith("com")) .assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))//给业务字段分配watermark .withTimestampAssigner(newSerializableTimestampAssigner[Array[String]]{ overridedefextractTimestamp(element:Array[String],recordTimestamp:Long):Long={//实现watermark字段的分配 valsdf=newSimpleDateFormat("yyyyMMddhhmmss") sdf.parse(element(2)).getTime .map(array=>(array(0),array(2))) .map(kv=>{ valdate=kv._2 valsdf=newSimpleDateFormat("yyyyMMddhhmmss").parse(date) valtime=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(sdf) (kv._1,Timestamp.valueOf(time))//将时间转为要求的TimeAttributes也就是Timestamp类型 importorg.apache.flink.table.api._//加入隐式转换,否则下面的$无法识别 valtargetTable=tableEnv.fromDataStream(targetDS) .as("client_ip","time")//添加schema .window( Slideover1.minuteevery30.seconds()on$"time"as$"w"//加入window .groupBy($"client_ip",$"w") .select( $"client_ip", $"w".start(),//时间窗口的开始时间 $"w".end(),//时间窗口的解释时间 $"client_ip".count()as"count"