Spark3.2 VS Flink1.15关于window和watermark功能
(PS:本次验证用到的组件分别为:kafka2.0.0、Spark3.2.0、Flink1.52.3
涉及到的代码已经全部提交到实战项目中:
https:// github.com/Anryg/intern et_behavior_project )
上篇文章写了关于 Flink 和 Spark 的一个非常简单的流式处理案例,虽然需求很简单,但也算是完成了一个完整流式数据处理的流程。
整个流程分为:数据源读取、数据ETL处理、数据结果的sink。
经过这么一个完整的处理流程,你就能感受到一个大数据工具如何将其应用到生产,去解决一个实际问题时会存在哪些实实在在的坑。
继上次趟过一些坑之后,这次我领着大家继续,这次咱们分别来看看Spark和Flink流式处理中的 window 和 watermark 功能,同样还是用之前给大家提供的数据集,具体需求场景如下:
从kafka读取数据源,实时统计时间窗口为2分钟的,排名前十的上网用户的数量(客户端IP数量),滑动窗口为30秒(即分析结果每30s更新一次),且数据容忍的迟到时间为10s,然后把结果输出到控制台。
看着是不是也挺简单的,其实就是一个基于时间滑动窗口
worldcount
,但是也别小瞧这个案例,生产上很多时候就是面对这样的一些需求,但是想做好,也没有想象中那么简单。
简单分析一下这个需求:
既然是统计client_ip基于时间的数量,那么就只需要取client_ip以及上网时间(time)这两个字段就可以了,然后基于时间窗口进行聚合就能得到结果。
下面分别从Spark的structured streaming以及Flink的API来实现:
1.Spark的实现
先看Structuredstreaming的实现,参考代码如下:
packagecom.anryg.bigdata.streaming.demo.window_watermark
importjava.sql.Timestamp
importjava.text.SimpleDateFormat
importorg.apache.log4j.{Level,Logger}
importorg.apache.spark.SparkConf
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.streaming.OutputMode
*@DESC:用时间窗口和watermark来进行client_ip的worldcount统计
*@Auther:Anryg
*@Date:2022/11/3010:04
objectWorldCountWithWatermark{
defmain(args:Array[String]):Unit={
valconf=newSparkConf().setAppName("WorldCountWithWatermark").setMaster("local")
valspark=SparkSession.builder()
.config(conf)
.getOrCreate()
Logger.getLogger("org.apache").setLevel(Level.WARN)//减少INFO日志的输出
valrawDF=spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers","${kafka_broker}:6667")
.option("subscribe","${topic}")
//.option("group.id","test9999")/**不再用该方式来绑定offset,而是每个程序有个唯一的id,该id跟checkpointLocation绑定,虽然group.id属性在运行中依然保留,但是不再跟offset绑定*/
.option("failOnDataLoss",false)
.option("fetchOffset.numRetries",3)
//.option("maxOffsetsPerTrigger",Integer.MAX_VALUE)/**用于限流,限定每个批次取的数据条数,确定写入HDFS单个文件的条数*/
.option("startingOffsets","latest")
.load()
importspark.implicits._
valdf1=rawDF.selectExpr("CAST(valueASstring)")
.map(row=>{
valline=row.getAs[String]("value")
valfieldArray:Array[String]=line.split("\\|")
fieldArray
.filter(_.length==9)//确定字段数必须为9个
.filter(_(1).endsWith("com"))//防止数量太大,对访问的网站做的一点限制
.map(array=>{
valsdf=newSimpleDateFormat("yyyyMMddhhmmss").parse(array(2))
valtime=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(sdf)
(array(0),Timestamp.valueOf(time))//因为time这个字段要作为watermark字段,它必须是yyyy-MM-ddHH:mm:ss样式的Timestamp类型
.toDF("client_ip","time")//添加schema
importorg.apache.spark.sql.functions._/**引入spark内置函数*/
valdf2=df1.withWatermark("time","10seconds")//一般需要跟window一起配合使用
.groupBy(window($"time","2minutes","30seconds"),$"client_ip")//确定具体字段,以及对应的聚合时间窗口,和滑动窗口
.count()
.orderBy($"count".desc)
.limit(10)
valquery=df2.writeStream
.format("console")//打印到控制台
.option("truncate",false)//将结果的内容完整输出,默认会砍掉内容过长的部分
.option("numRows",30)//一次最多打印多少行,默认20行
.option("checkpointLocation","hdfs://${namenode}/tmp/offset/WorldCountWithWatermark")//确定checkpoint目录
//.outputMode(OutputMode.Update())//不支持排序的结果
.outputMode(OutputMode.Complete())//确定输出模式,默认为Append
.start()
query.awaitTermination()
来看下运行效果,如下:
其实代码还是挺简单的(代码中有比较详细的注释),整个内容完全参考官网,也挺好理解的,个人认为这方面的文档比FLink的官方文档要更加清晰,易懂,不信,你可以去做个对比。
2.Flink DataStream的实现
因为对于Flink来说,实时的API有两套,个DataStream API另一个是Table API。
因为就在前两天,我在用Table API的时候遇到一个大坑,当时因为时间有限,没有趟过去(不过现在已经趟过了),于是这次就想着还是先用DataStreamAPI吧。
来,对应的代码实现如下:
packagecom.anryg.window_and_watermark
importjava.text.SimpleDateFormat
importjava.time.Duration
importjava.util.Locale
importorg.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,WatermarkStrategy}
importorg.apache.flink.api.common.serialization.{SimpleStringEncoder,SimpleStringSchema}
importorg.apache.flink.configuration.MemorySize
importorg.apache.flink.connector.kafka.source.KafkaSource
importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
importorg.apache.flink.core.fs.Path
importorg.apache.flink.streaming.api.CheckpointingMode
importorg.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows,SlidingProcessingTimeWindows,TumblingEventTimeWindows,TumblingProcessingTimeWindows}
importorg.apache.flink.streaming.api.windowing.time
importorg.apache.flink.streaming.api.windowing.time.Time
*@DESC:读取kafka数据,从DataStream到HDFS
*@Auther:Anryg
*@Date:2022/8/1419:08
objectFlinkDSFromKafkaWithWatermark{
privatefinalvalhdfsPrefix="hdfs://${namenode}:8020"
defmain(args:Array[String]):Unit={
//获取流任务的环境变量
valenv=StreamExecutionEnvironment.getExecutionEnvironment
.enableCheckpointing(10000,CheckpointingMode.EXACTLY_ONCE)//打开checkpoint功能
env.getCheckpointConfig.setCheckpointStorage(hdfsPrefix+"/tmp/flink_checkpoint/FlinkDSFromKafkaWithWatermark")//设置checkpoint的hdfs目录
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//设置checkpoint记录的保留策略
valkafkaSource=KafkaSource.builder()//获取kafka数据源
.setBootstrapServers("${kafka_broker}:6667")
.setTopics("${topic}")
.setGroupId("FlinkDSFromKafkaWithWatermark")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(newSimpleStringSchema())
.build()
importorg.apache.flink.streaming.api.scala._//引入隐私转换函数
valkafkaDS=env.fromSource(kafkaSource,
WatermarkStrategy.noWatermarks()
,"kafka-data")//读取数据源生成DataStream对象
valtargetDS=kafkaDS.map(line=>{//对数据源做简单的ETL处理
line.split("\\|")
}).filter(_.length==9).filter(_(1).endsWith("com"))
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofHours(10))//指定watermark
.withTimestampAssigner(newSerializableTimestampAssigner[Array[String]]{
overridedefextractTimestamp(element:Array[String],recordTimestamp:Long):Long={
valsdf=newSimpleDateFormat("yyyyMMddhhmmss")
sdf.parse(element(2)).getTime//指定的watermark字段必须是Long类型的时间戳
.map(array=>(array(0),1))
.keyBy(kv=>kv._1)//根据client_ip聚合
.window(SlidingProcessingTimeWindows.of(Time.minutes(2),Time.seconds(30)))//指定window,这里的windowassigner必须是基于ProcessTime而不是EventTime,因为数据时间跟当前真实时间相差有点多
.sum(1)
targetDS.print()//打印结果
env.execute("FlinkDSFromKafkaWithWatermark")//启动任务
但是,这个实现好像并不十分优雅,一来这个结果不好排序,二来显示的内容没有上面的spark实现的好看。
输出的截图如下,凑合看一下:
总感觉这个输出结果怪怪的,实际生产我们肯定不能这么用,看来这还得用Table API。
3.Flink Table API的实现
来,下面这部分代码就是我用Table API的实现,完全根据官方文档一步步来的:
packagecom.anryg.window_and_watermark
importjava.sql.Timestamp
importjava.text.SimpleDateFormat
importjava.time.Duration
importorg.apache.flink.api.common.eventtime.{SerializableTimestampAssigner,WatermarkStrategy}
importorg.apache.flink.api.common.serialization.SimpleStringSchema
importorg.apache.flink.connector.kafka.source.KafkaSource
importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
importorg.apache.flink.streaming.api.CheckpointingMode
importorg.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
importorg.apache.flink.streaming.api.scala._
importorg.apache.flink.table.api.bridge.scala.StreamTableEnvironment
*@DESC:读取kafka数据,从DataStreamAPI转为TableAPI,并利用watermark
*@Auther:Anryg
*@Date:2022/8/1419:08
objectFlinkTBFromKafkaWithWatermark{
privatefinalvalhdfsPrefix="hdfs://192.168.211.106:8020"//HDFS地址前缀
defmain(args:Array[String]):Unit={
valenv=StreamExecutionEnvironment.getExecutionEnvironment//获取流环境变量
.enableCheckpointing(10000,CheckpointingMode.EXACTLY_ONCE)//打开checkpoint功能
valtableEnv=StreamTableEnvironment.create(env)//创建Table环境变量
env.getCheckpointConfig.setCheckpointStorage(hdfsPrefix+"/tmp/flink_checkpoint/FlinkDSFromKafkaWithWatermark")//设置checkpoint的hdfs目录
env.getCheckpointConfig.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)//设置checkpoint记录的保留策略
valkafkaSource=KafkaSource.builder()
.setBootstrapServers("192.168.211.107:6667")
.setTopics("qianxin")
.setGroupId("FlinkTBFromKafkaWithWatermark")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(newSimpleStringSchema())
.build()
valkafkaDS=env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"kafka-data")
valtargetDS=kafkaDS.map(_.split("\\|"))
.filter(_.length==9)
.filter(_(1).endsWith("com"))
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))//给业务字段分配watermark
.withTimestampAssigner(newSerializableTimestampAssigner[Array[String]]{
overridedefextractTimestamp(element:Array[String],recordTimestamp:Long):Long={//实现watermark字段的分配
valsdf=newSimpleDateFormat("yyyyMMddhhmmss")
sdf.parse(element(2)).getTime
.map(array=>(array(0),array(2)))
.map(kv=>{
valdate=kv._2
valsdf=newSimpleDateFormat("yyyyMMddhhmmss").parse(date)
valtime=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss").format(sdf)
(kv._1,Timestamp.valueOf(time))//将时间转为要求的TimeAttributes也就是Timestamp类型
importorg.apache.flink.table.api._//加入隐式转换,否则下面的$无法识别
valtargetTable=tableEnv.fromDataStream(targetDS)
.as("client_ip","time")//添加schema
.window(
Slideover1.minuteevery30.seconds()on$"time"as$"w"//加入window
.groupBy($"client_ip",$"w")
.select(
$"client_ip",
$"w".start(),//时间窗口的开始时间
$"w".end(),//时间窗口的解释时间
$"client_ip".count()as"count"