这篇博客文章介绍了Apache Flink的内置监控和指标系统,使开发人员能够有效地监控他们的Flink程序。通常情况下,对于刚开始使用流处理和Apache Flink的DevOps团队,选择相关指标来监控Flink程序可能是比较困难的。在参与部署过许多大规模Flink环境后,我想在这里与社区分享我的经验和一些最佳实践。

对于在Apache Flink上运行的关键业务程序,其性能监控成为生产环境中越来越重要的部分。它可确保快速发现并处理业务程序的性能下降、异常停止等问题。

监控与可观察性密切相关,可观察性是故障排除和性能调优的先决条件。如今,随着现代企业应用程序的复杂性和交付速度的提高,工程团队必须在任何给定的时间点理解并全面了解其应用程序的状态。

1.Flink的度量系统

监控Flink程序的基础是其 度量系统 ,它由两部分组成:度量指标和度量报告。

1.1 度量指标

Flink提供了一整套内置指标,例如:

  • 使用的JVM堆/非堆/直接内存(每个TaskManager/JobManager)
  • 程序重启次数(每个程序)
  • 每秒记录数(每个算子)
  • 这些指标具有不同的衡量范围,不仅包含Flink本身,还有更普遍的JVM,操作系统指标。

    作为用户,可以为自己的功能添加特定应用程序的指标。通常这些包括无效记录数计数器或在State中临时缓冲的记录数计数器。除了计数器,Flink还提供其他指标类型,如仪表和直方图。有关如何使用Flink的指标系统注册自己的指标的说明,请查看 Flink的文档 。在这篇博客文章中,我们将重点介绍如何充分利用Flink的内置指标。

    1.2 度量报告

    通过Flink的REST API可以查询所有指标,同时用户可以配置度量报告系统,将指标发送到外部系统。Apache Flink为度量报告提供了开箱即用的最常见的监控工具,包括JMX,Prometheus,Datadog,Graphite和InfluxDB。有关如何配置报告的信息,请查看Flink的MetricsReporter文档。
    在本博文的其余部分,我们将介绍一些最重要的指标来监控你的Apache Flink应用程序。

    2.监测指标

    要监控的第一件事是你的程序是否实际处于运行状态。此外,它还可以监控重启次数和自上一次重启的时间。

    一般来说,成功的检查点是检测应用程序整体状况的关键因素。对于每个检查点,检查点挡板需要流经Flink程序的整个拓扑,拓扑中正常的业务事件和挡板按顺序处理不能互相超越。因此,成功的检查点可以表示没有通道被完全阻塞。

    uptime 程序没有中断的运行时长。 fullRestarts 自提交此程序以来完全重新启动的总次数。 numberOfCompletedCheckpoints 成功完成检查点的数量。 numberOfFailedCheckpoints 失败检查点的数量。

    示例仪表板面板

    正常运行时间(35分钟),重启时间(3毫秒)和完全重启次数(7)


    图片2.png

    完成检查点(18336),失败(14)

    可能的警报

  • ΔfullRestarts > threshold
  • ΔnumberOfFailedCheckpoints > threshold
  • 3.监控进度和吞吐量

    现在可以知道你的程序是否正在运行,它的检查点是否正常完成,但还不能知道程序是否在不断消费,能否跟得上上游系统数据的产生速度。

    3.1 吞吐量

    Flink提供多个指标来衡量我们的应用程序的吞吐量。一个程序可以包含多个链式任务 Flink计算进出的记录和字节数。在这些度量中,每个操作员的传出记录的速率通常是最直观和最容易推理的。对于每一个算子或任务(一个任务可以包含多个串起来的子任务),Flink记录了进入和写出的记录数和字节数大小。在这些度量指标中,每个算子的写出速率通常是最直观,最容易理解的。

    numRecordsOutPerSecond 此任务每秒发送的记录数。 numRecordsOutPerSecond 此算子每秒发送的记录数。

    示例仪表板面板

    每个算子每秒平均记录数

    可能的警报

    • recordsOutPerSecond= 0(对于非Sink算子)

    注意: Source算子始终没有传入记录,Sink算子始终没有传出记录,因为度量标准仅计算Flink内部通信。不过 JIRA Ticket 可以改变这个情况。

    3.2 进展
    对于使用事件时间语义的应用程序,随时间推移水位就显得非常重要。时间水位t告诉整个程序,它不应再期望接收时间戳早于t的事件,并且触发程序调度时间戳< t的所有操作  。例如,一旦水位通过30,程序将关闭并计算在t = 30 结束的时间窗口内的事件。

    因此,在应用程序中对事件时间敏感的算子需要监控水位,例如过程函数和窗口。如果当前处理时间和水位之间的差异非常高,那么它通常意味着两个问题。第一,它可能意味着你正在处理旧事件,例如在停机后追数据或程序处理速度没办法赶上上游数据产生速度,上游事件堆积。第二,这可能意味着单个上游子任务长时间没有发送水位(例如,因为它没有接收任何基于水位的事件),这也阻止了下游算子的水位处理。这个 JIRA Ticket 为后者提供了进一步的信息和解决方案。

    currentOutputWatermark 此算子发出的最后一个水印。

    示例仪表板面板

    拓扑中单个算子的每个子任务的事件时间延迟。在这种情况下,对于每个子任务,时间水位落后几秒钟。

    可能的警报

    • currentProcessingTime - currentOutputWatermark > threshold

    3.3 “紧跟”
    当从消息队列中进行消费时,通常可以直接监视应用程序是否跟得上消息产生速度。通过使用特定连接器的度量指标,你可以监视当前消费者组的消息与最新消息的差距。Flink可以基于大多数Source连接器转发其基础指标。

    records-lag-max 适用于FlinkKafkaConsumer。此窗口中任何分区的记录数最大延迟。随着时间的推移,越来越大的延时表明消费者没有跟上生产者的速度。 millisBehindLatest 适用于FlinkKinesisConsumer。消费者距离最新消息的毫秒数。对于任何消费者和Kinesis分片,这表示它与当前时间之间的差距。

    可能的警报

  • records-lag-max > threshold
  • millisBehindLatest > threshold
  • 4.监控时延

    一般而言,时延是指事件创建与基于此事件的结果变得可见之间的时间延迟。创建事件后,它通常存储在持久性消息队列中,然后由Apache Flink处理,将结果写入数据库或调用下游系统。在这样的数据处理管道中,可以在每个阶段引入时延。原因有多种,包括:

  • 在事件持久存储在消息队列中之前,可能需要不同的时间。
  • 在高负载期间或恢复期间,事件可能会在消息队列中花费一些时间,直到Flink处理它们(请参阅上一节)。
  • 出于功能原因,流式拓扑中的一些函数需要缓冲事件一段时间(例如,在时间窗口中)。
  • Flink拓扑(框架或用户代码)中的每个计算以及每个网络shuffle都需要时间并增加时延。
  • 如果应用程序通过事务型Sink节点写出,则Sink节点将仅在Flink的成功检查点上提交和下发事务,每个检查点之间的间隔时间也将增加时延。
  • 实际上,事实证明,在多个阶段(事件创建,存储,进入Flink,写出Flink,若数据量过大可以只采样部分数据)为事件添加时间戳是非常有价值的。这些时间戳之间的差异可以作为Flink拓扑中的用户自定义度量标准展示,以获得每个阶段的延迟分布情况。

    在本节的剩下部分,我们只考虑在Flink拓扑中的时延,但并不包括事务型sink节点或由于函数原因缓存事件的节点。

    为此,Flink提供了一项称为 时延跟踪 的功能。启用后,Flink将在所有来源定期插入延迟标记事件。对于每个子任务,将报告从每个源到此算子的延迟分布。可以通过根据需要设置metrics.latency.granularity来进一步控制这些直方图的粒度。

    由于可能存在大量的直方图(特别是对于 metrics.latency.granularity:子任务 ),启用延迟跟踪会显着影响群集的性能。建议仅在调试期间使其能够找到延迟源。

    latency 从源算子到此算子的时延。 restartingTime 重新启动程序所花费的时间,或当前重新启动的持续时间。

    示例仪表板面板

    Source和单个Sink子任务之间的时延分布。

    4.JVM指标

    到目前为止,我们只关注了Flink特定的指标。只要你的应用程序时延和吞吐量符合你的期望并且检查点持续正常,整个程序应该是没有问题的。但另一方面,如果程序性能开始下降,你首要考虑的指标就是TaskManager&JobManager JVM的内存消耗和CPU负载。

    4.1 内存

    Flink报告了JobManagers和TaskManagers的Heap,NonHeap,Direct和Mapped内存的使用情况。

  • 堆内存 - 与大多数JVM应用程序一样 - 是最易于观察的重要指标。尤其是在使用Flink的文件系统statebackend时,因为它将所有状态对象保留在JVM堆上。如果堆上的对象大小显着增加,这通常可归因于应用程序状态的大小(检查堆栈状态的估计大小的 检查点指标 )。增长状态的可能原因是特定于应用程序的。通常,越来越多的主键,不同输入流之间的事件时间偏差过大或者仅仅缺少状态清理都可能导致整个堆呈增长状态。
  • NonHeap内存由元空间控制,默认情况下其大小不受限制,并保存类元数据和静态内容。有一个 JIRA Ticket 默认将大小限制为250兆字节。
  • 直接内存的最大驱动因素是Flink的网络缓冲区数量,可以 配置
  • 映射内存通常接近零,因为Flink不使用内存映射文件。
  • 在容器化环境中,你还应监视JobManger和TaskManager容器的总体内存消耗,以确保它们不超过其资源限制。当使用RocksDB状态后端时,要注意RocksDB会从堆下分配大量内存。如果想了解更多关于RocksDB使用内存量的多少,可以查看 Stefan Richter 撰写的 这篇博客文章

    Status.JVM.Memory.NonHeap.Committed 程序/任务管理器 保证JVM可用的非堆内存量(以字节为单位)。 Status.JVM.Memory.Heap.Used 程序/任务管理器 当前使用的堆内存量(以字节为单位)。 Status.JVM.Memory.Heap.Committed Job-/TaskManager 保证可供JVM使用的堆内存量(以字节为单位)。 Status.JVM.Memory.Direct.MemoryUsed Job-/TaskManager JVM用于直接缓冲池的内存量(以字节为单位)。 Status.JVM.Memory.Mapped.MemoryUsed Job-/TaskManager 执行G1 Young Generation垃圾收集所花费的总时间。 Status.JVM.GarbageCollector.G1 Young Generation.Time Job-/TaskManager JVM用于直接缓冲池的内存量(以字节为单位)。 Status.JVM.GarbageCollector.G1 Old Generation.Time Job-/TaskManager 执行G1 Old Generation垃圾收集所花费的总时间。

    示例仪表板面板

    TaskManager内存消耗和垃圾收集时间。

    JobManager内存消耗和垃圾收集时间。

    可能的警报

    • container memory limit < container memory + safety margin

    4.2 CPU

    除了内存,你还应该监视TaskManagers的CPU负载。如果你的TaskManagers经常处于非常高的负载下,你可以通过减少每个TaskManager的任务槽数来提高整体性能(如果是Standalone部署),为TaskManager提供更多资源(如果是容器化部署),或提供更多的TaskManagers。通常,在正常业务期间已经在非常高的负载状态下运行的系统,在从停机时间恢复之后将需要更多的时间来追数据。在此期间,你将看到比平时更高的延迟(事件时间偏差)。

    CPU负载的突然增加也可能是因为过多垃圾收集压力,这在JVM内存指标中可见。

    如果一个或几个TaskManagers一直处于非常高的负载下,由于长检查点对齐时间和事件时间偏差增加,这可能会降低整个拓扑结构的速度。常见的原因是数据分区键的数据倾斜,这可以通过在shuffle之前预先聚合或者将分区键更改为均匀分布的主键上来减轻数据倾斜。

    Status.JVM.CPU.Load Job-/TaskManager JVM最近的CPU使用情况。

    示例仪表板面板

    TaskManager和JobManager CPU加载。

    5.系统资源

    除了上面的JVM指标之外,还可以使用Flink的指标系统来收集有关系统资源,即整个计算机的内存,CPU和网络相关指标,而不仅仅是Flink程序。默认情况下禁用系统资源监视,并且需要对类路径有其他依赖性。可以查看 Flink系统资源指标文档 以获取相关指导和详细信息。Flink中的系统资源监视在没有现有主机监视功能的设置中非常有用。

    这篇文章主要阐明Flink的指标和监控系统。当你是第一次考虑如何成功实现监控Flink应用程序时,可以通过这方面的知识学习作为起点。我更推荐在开发阶段的早期就开始监控你的Flink应用程序。这样能够随着时间的推移改进仪表板和警报,更重要的是,你在整个开发阶段观察应用程序更改对性能的影响。通过这样做你可以提出有关应用程序运行时行为的正确问题,在早期就可以了解到有关Flink内部的更多信息。

    最后想说的是,这篇文章仅涉及Apache Flink的整体指标和监控功能。本人建议可以浏览 Flink的指标文档 ,以获取Flink指标系统的完整参考。

    本文由阿里云开发者社区组织翻译。

    文章原标题《Monitoring Apache Flink Applications 101》

    作者:AJ Christensen

    译者:么凹

    校对:校对者:杨阳(时溪)

    文章为简译,更为详细的内容,请查看 原文

    《Apache Flink 案例集(2022版)》——1.数据集成——37手游-基于 Flink CDC + Hudi 湖仓一体方案实践
    《Apache Flink 案例集(2022版)》——1.数据集成——37手游-基于 Flink CDC + Hudi 湖仓一体方案实践
    《Apache Flink 案例集(2022版)》——1.数据集成——XTransfer-基Flink MongoDB CDC 在 XTransfer 的生产实践(上)
    《Apache Flink 案例集(2022版)》——1.数据集成——XTransfer-基Flink MongoDB CDC 在 XTransfer 的生产实践(上)