Kafka剖析系列之Consumer解析
![作者头像](https://ask.qcloudimg.com/avatar/1263954/r1y1ckb9xa.png)
High Level Consumer
很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播)。因此,Kafka High Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义。
Consumer Group
High Level Consumer将从某个Partition读取的最后一条消息的offset存于ZooKeeper中(Kafka从0.8.2版本开始同时支持将offset存于Zookeeper中与将offset存于专用的Kafka Topic中)。这个offset基于客户程序提供给Kafka的名字来保存,这个名字被称为Consumer Group。Consumer Group是整个Kafka集群全局的,而非某个Topic的。每一个High Level Consumer实例都属于一个Consumer Group,若不指定则属于默认的Group。ZooKeeper中Consumer相关节点如下图所示:
![](https://ask.qcloudimg.com/http-save/yehe-1263954/rto631kru2.jpeg)
很多传统的Message Queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证Queue的长度比较短,提高效率。而如上文所述,Kafka并不删除已消费的消息,为了实现传统Message Queue消息只被消费一次的语义,Kafka保证每条消息在同一个Consumer Group里只会被某一个Consumer消费。与传统Message Queue不同的是,Kafka还允许不同Consumer Group同时消费同一条消息,这一特性可以为消息的多元化处理提供支持。(微信公众号:IT技术精选文摘, 微信号:ITHK01, 欢迎订阅)
![](https://ask.qcloudimg.com/http-save/yehe-1263954/hip54z3qto.jpeg)
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用 Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer在不同的 Consumer Group即可。下图展示了Kafka在LinkedIn的一种简化部署模型。
![](https://ask.qcloudimg.com/http-save/yehe-1263954/cyyl83hpwd.jpeg)
为了更清晰展示Kafka Consumer Group的特性,笔者进行了一项测试。创建一个Topic (名为topic1),再创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,然后通过 Producer向topic1发送Key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时 group2中的3个Consumer分别收到了Key为1,2,3的消息,如下图所示。
![](https://ask.qcloudimg.com/http-save/yehe-1263954/ep48c3whff.jpeg)
注:上图中每个黑色区域代表一个Consumer实例,每个实例只创建一个MessageStream。实际上,本实验将Consumer应用程序打成jar包,并在4个不同的命令行终端中传入不同的参数运行。
High Level Consumer Rebalance
注:本节所讲述Rebalance相关内容均基于Kafka High Level Consumer。
Kafka保证同一Consumer Group中只有一个Consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每一个Consumer实例只会消费某一个或多个特定 Partition的数据,而某个Partition的数据只会被某一个特定的Consumer实例所消费。也就是说Kafka对消息的分配是以 Partition为单位分配的,而非以每一条消息作为分配单元。这样设计的劣势是无法保证同一个Consumer Group里的Consumer均匀消费数据,优势是每个Consumer不用都跟大量的Broker通信,减少通信开销,同时也降低了分配难度,实现也更简单。另外,因为同一个Partition里的数据是有序的,这种设计可以保证每个Partition里的数据可以被有序消费。
如果某Consumer Group中Consumer(每个Consumer只创建1个MessageStream)数量少于Partition数量,则至少有一个 Consumer会消费多个Partition的数据,如果Consumer的数量与Partition数量相同,则正好一个Consumer消费一个 Partition的数据。而如果Consumer的数量多于Partition的数量时,会有部分Consumer无法消费该Topic下任何一条消息。
如下例所示,如果topic1有0,1,2共三个Partition,当group1只有一个Consumer(名为consumer1)时,该 Consumer可消费这3个Partition的所有数据。
![](https://ask.qcloudimg.com/http-save/yehe-1263954/o9ujoo3nkp.jpeg)
增加一个Consumer(consumer2)后,其中一个Consumer(consumer1)可消费2个Partition的数据(Partition 0和Partition 1),另外一个Consumer(consumer2)可消费另外一个Partition(Partition 2)的数据。
![](https://ask.qcloudimg.com/http-save/yehe-1263954/iqkaontvz6.jpeg)
再增加一个Consumer(consumer3)后,每个Consumer可消费一个Partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2。
![](https://ask.qcloudimg.com/http-save/yehe-1263954/t3d4doo8gf.jpeg)
再增加一个Consumer(consumer4)后,其中3个Consumer可分别消费一个Partition的数据,另外一个Consumer(consumer4)不能消费topic1的任何数据。
![](https://ask.qcloudimg.com/http-save/yehe-1263954/mh6nzgrrae.jpeg)
此时关闭consumer1,其余3个Consumer可分别消费一个Partition的数据。
![](https://ask.qcloudimg.com/http-save/yehe-1263954/uu4mpj8nj0.jpeg)
接着关闭consumer2,consumer3可消费2个Partition,consumer4可消费1个Partition。
![](https://ask.qcloudimg.com/http-save/yehe-1263954/26y6n8z27a.jpeg)
再关闭consumer3,仅存的consumer4可同时消费topic1的3个Partition。
![](https://ask.qcloudimg.com/http-save/yehe-1263954/taz36lbbdc.jpeg)
Consumer Rebalance的算法如下:
- 将目标Topic下的所有Partirtion排序,存于PT
- 对某Consumer Group下所有Consumer排序,存于CG,第i个Consumer记为Ci
- N=size(PT)/size(CG),向上取整
- 解除Ci对原来分配的Partition的消费权(i从0开始)
- 将第i∗N到(i+1)∗N−1个Partition分配给Ci
目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每一个Consumer通过在Zookeeper上注册Watch完成的。每个Consumer被创建时会触发 Consumer Group的Rebalance,具体启动流程如下:
-
High Level Consumer启动时将其ID注册到其Consumer Group下,在Zookeeper上的路径为
/consumers/[consumer group]/ids/[consumer id]
-
在
/consumers/[consumer group]/ids
上注册Watch -
在
/brokers/ids
上注册Watch -
如果Consumer通过Topic Filter创建消息流,则它会同时在
/brokers/topics
上也创建Watch - 强制自己在其Consumer Group内启动Rebalance流程
在这种策略下,每一个Consumer或者Broker的增加或者减少都会触发 Consumer Rebalance。因为每个Consumer只负责调整自己所消费的Partition,为了保证整个Consumer Group的一致性,当一个Consumer触发了Rebalance时,该Consumer Group内的其它所有其它Consumer也应该同时触发Rebalance。(微信公众号:IT技术精选文摘, 微信号:ITHK01, 欢迎订阅)
该方式有如下缺陷:
- Herd effect 任何Broker或者Consumer的增减都会触发所有的Consumer的Rebalance
- Split Brain 每个Consumer分别单独通过Zookeeper判断哪些Broker和Consumer 宕机了,那么不同Consumer在同一时刻从Zookeeper“看”到的View就可能不一样,这是由Zookeeper的特性决定的,这就会造成不正确的Reblance尝试。
- 调整结果不可控 所有的Consumer都并不知道其它Consumer的Rebalance是否成功,这可能会导致Kafka工作在一个不正确的状态。
根据Kafka社区wiki,Kafka作者正在考虑在还未发布的0.9.x版本中使用中心协调器(Coordinator)。大体思想是为所有Consumer Group的子集选举出一个Broker作为Coordinator,由它Watch Zookeeper,从而判断是否有Partition或者Consumer的增减,然后生成Rebalance命令,并检查是否这些Rebalance 在所有相关的Consumer中被执行成功,如果不成功则重试,若成功则认为此次Rebalance成功(这个过程跟Replication Controller非常类似)。具体方案将在后文中详细阐述。
Low Level Consumer
使用Low Level Consumer (Simple Consumer)的主要原因是,用户希望比Consumer Group更好的控制数据的消费。比如:
- 同一条消息读多次
- 只读取某个Topic的部分Partition
- 管理事务,从而确保每条消息被处理一次,且仅被处理一次
与Consumer Group相比,Low Level Consumer要求用户做大量的额外工作。
- 必须在应用程序中跟踪offset,从而确定下一条应该消费哪条消息
- 应用程序需要通过程序获知每个Partition的Leader是谁
- 必须处理Leader的变化
使用Low Level Consumer的一般流程如下
- 查找到一个“活着”的Broker,并且找出每个Partition的Leader
- 找出每个Partition的Follower
- 定义好请求,该请求应该能描述应用程序需要哪些数据
- Fetch数据
- 识别Leader的变化,并对之作出必要的响应
Consumer重新设计
根据社区社区wiki,Kafka在0.9.*版本中,重新设计Consumer可能是最重要的Feature之一。本节会根据社区wiki介绍Kafka 0.9.*中对Consumer可能的设计方向及思路。
设计方向
简化消费者客户端
部分用户希望开发和使用non-java的客户端。现阶段使用non-java发SimpleConsumer比较方便,但想开发High Level Consumer并不容易。因为High Level Consumer需要实现一些复杂但必不可少的失败探测和Rebalance。如果能将消费者客户端更精简,使依赖最小化,将会极大的方便non- java用户实现自己的Consumer。
中心Coordinator
如上文所述,当前版本的High Level Consumer存在Herd Effect和Split Brain的问题。如果将失败探测和Rebalance的逻辑放到一个高可用的中心Coordinator,那么这两个问题即可解决。同时还可大大减少 Zookeeper的负载,有利于Kafka Broker的Scale Out。
允许手工管理offset
一些系统希望以特定的时间间隔在自定义的 数据库 中管理Offset。这就要求Consumer能获取到每条消息的metadata,例如 Topic,Partition,Offset,同时还需要在Consumer启动时得到每个Partition的Offset。实现这些,需要提供新的 Consumer API。同时有个问题不得不考虑,即是否允许Consumer手工管理部分Topic的Offset,而让Kafka自动通过Zookeeper管理其它 Topic的Offset。一个可能的选项是让每个Consumer只能选取1种Offset管理机制,这可极大的简化Consumer API的设计和实现。
Rebalance后触发用户指定的回调
一些应用可能会在内存中为每个Partition维护一些状态,Rebalance时,它们可能需要将该状态持久化。因此该需求希望支持用户实现并指定一些可插拔的并在Rebalance时触发的回调。如果用户使用手动的Offset管理,那该需求可方便得由用户实现,而如果用户希望使用Kafka提供的自动Offset管理,则需要Kafka提供该回调机制。
非阻塞式Consumer API
该需求源于那些实现高层流处理操作,如filter by, group by, join等,的系统。现阶段的阻塞式Consumer几乎不可能实现Join操作。
如何通过中心Coordinator实现Rebalance
成功Rebalance的结果是,被订阅的所有Topic的每一个Partition将会被Consumer Group内的一个(有且仅有一个)Consumer拥有。每一个Broker将被选举为某些Consumer Group的Coordinator。某个Cosnumer Group的Coordinator负责在该Consumer Group的成员变化或者所订阅的Topic的Partititon变化时协调Rebalance操作。
Consumer
1) Consumer启动时,先向Broker列表中的任意一个Broker发送ConsumerMetadataRequest,并通过 ConsumerMetadataResponse获取它所在Group的Coordinator信息。ConsumerMetadataRequest 和ConsumerMetadataResponse的结构如下
ConsumerMetadataRequest
GroupId => String
ConsumerMetadataResponse
ErrorCode => int16
Coordinator => Broker
}
2)Consumer连接到Coordinator并发送 HeartbeatRequest,如果返回的HeartbeatResponse没有任何错误码,Consumer继续fetch数据。若其中包含 IllegalGeneration错误码,即说明Coordinator已经发起了Rebalance操作,此时Consumer停止fetch数据,commit offset,并发送JoinGroupRequest给它的Coordinator,并在JoinGroupResponse中获得它应该拥有的所有 Partition列表和它所属的Group的新的Generation ID。此时Rebalance完成,Consumer开始fetch数据。相应Request和Response结构如下
HeartbeatRequest
GroupId => String
GroupGenerationId => int32
ConsumerId => String
HeartbeatResponse
ErrorCode => int16
JoinGroupRequest
GroupId => String
SessionTimeout => int32
Topics => [String]
ConsumerId => String
PartitionAssignmentStrategy => String
JoinGroupResponse
ErrorCode => int16