开启掘金成长之旅!这是我参与「掘金日新计划 · 2 月更文挑战」的第 7 天, 点击查看活动详情
1、生产者基本介绍
Producer是Kafka中负责向Broker写入数据的Client端程序
Producer端连接broker集群,序列化key-value并压缩,随后通过分区策略决策出目标的partition,最后对目标broker进行数据发送。一个Producer从创建、发送消息到销毁,主要分为以下几个阶段:
2、生产者的TCP连接管理
Kafka 基于TCP实现的自己的通讯协议,单个Client会创建多个Socket链接与Server中的多个Broker进行交互。而TCP连接的创建存在于以下几个时机:
2.1、 KafkaProducer实例创建时进行TCP连接
当KafkaProducer实例被创建时,将启动Sender线程,该线程将自动创建与Broker的连接,但是由于Producer启动时不具有Broker集群的信息,因此会通过连接 bootstrap.servers 参数下配置的的Broker,以获取集群METADATA信息。
在KafkaProducer的构造方法中,通过获取配置文件中的bootstrap.servers 和 client.dns.lookup信息,解析需要建立连接的Socket目标信息。 通过这些地址信息初始化metadata中的metadataCache属性。随后通过metadata属性创建Sender对象,并创建线程启动了Sender。
KafkaProducer源码
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
// 通过配置文件配置的 bootstrap.servers 和 client.dns.lookup 获取到需要连接的Socket信息对象
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
// 通过addresses信息初始化metadata对象中的matedataCache属性
this.metadata.bootstrap(addresses);
// 创建Sender对象
this.sender = newSender(logContext, kafkaClient, this.metadata);
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
// 启动Sender线程
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
在Sender启动后,将主动向Broker集群发送请求,用于向集群发送 InitProducerIdRequestData。在发送数据前会初始化连接,初始化连接将在在Node中获取到一个未完成请求最少的节点(producer刚启动时,实际为随机获取)用于发送InitProducerIdRequestData。 但此时Client还未获取到Broker集群的Metadata元数据,因此会再次初始化连接(实际为再次随机获取)用于获取集群信息,随后才将发送此次的Producer注册信息。
因此就我通过大致了解源码与实际实验得到,在KafkaProducer实例创建时,会初始化1个或2个与集群的socket连接,两个连接的作用分别为Client的初始化操作和metadata的初始化操作。特殊的,当两次发送的Node获取为同一个,则只会产生一个Socket连接。以下为相同集群配置下的单个Socket连接和两个Socket连接的日志对比图:
由于系统启动时期KafkaProducer不具备Broker集群的信息,因此其Node初始化编号为从-1开始依次递减。
2.2、发送数据时进行TCP连接
当用户主动发起请求(或自动的心跳请求时?),Sender将再次获取一个未完成请求最少的节点用于本次请求的发送,此时Client端已经获取Broker集群的metadata信息,同时Broker集群也收到了Client的注册信息。
在本次请求的Node获取中,如果Kafka获取到未创建Socket的Node结点,则会通过metadata中该结点的信息,初始化Socket连接,并发送请求。
如果获取的Node为KafkaProducer初始化时即创建过Socket链接的Node结点,也并不会使用原连接:准确来说,metadata更新后可使用的Node集合会被metadata中的获取到的信息所更新,原有的Node将不再被使用。因此即使是初始化中使用过的Node结点,对于获取metadata后的Producer而言,都是一个全新的Node结点,因此对Node结点的操作同第一点。
通过debug leasetLoadedNode方法,也能发现:在metadata更新后,metadata中的nodes集合内容已被更新。
3、数据压缩
在ProducerConfig中存在compression.type
配置,意味配置消息的数据压缩算法:
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
// 获取compressionType
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
// 初始化配置到accumulator中
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
Kafka内部支持的压缩算法有以下几种:
SNAPPY
当然也可以选择不启用压缩算法(NONE), 具体的压缩实现和压缩算法原理此处就不展开讲了,感兴趣的可以自行Google。
消息被压缩后发送,随着消息发送的还有消息的压缩算法。因此Consumer可以拿到压缩算法进行解压缩,完成压缩-发送-解压缩的闭环。
4、数据序列化
KafkaProducer对象提供了范行的key value值供使用,但这些数据都需要序列化为字节数组后才能发送给Broker,因此在启动KafkaProducer时需要显式的指定序列化器。
在KafkaProducer源码里,在构造方法中通过配置的 “key.serializer” 和 “value.serializer” 值指定了当前Producer的序列化器:
KafkaProducer Constructor
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
在核心的发送方法(doSend)中,调用配置的序列化器对提交的键值进行序列化操作:
KafkaProducer.doSend
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
4.1、 Kafka提供的序列化器
Kafka为我们预提供了部分基础数据结构的序列化器,通过将类的全限定类名配置在 “key.serializer” 或 “value.serializer”上以指定使用该序列化器
4.2、自定义序列化器
通过上方的KafkaProducer的Constructor源码可得知,配置的序列化器将被作为 Serializer.class 的对象进行属性装载。因此欲实现自定义序列化器,只需要implements Serializer,实现其中的 serialize 方法,最后将自定义类配置在 “key.serializer” 或 “value.serializer”上即可。
5、 分区策略
如Kafka - Server篇提到的Partition 分区,Kafka Producer在消息发送前,需要明消息发送的目标分区及分区所在Broker,随后才能进行发送。 其中包括三个步骤:
Topic分区信息的获取
消息分区策略
消息向目标Broker的发送
5.1、 获取Topic分区信息
在消息发送前,Kafka将获取Topic的信息(如果没有),并将其更新到metadata中,随后才能对该Topic下进行数据的发送。其实现代码核心逻辑在于Sender.doSend方法中调用的waitOnMetadata方法:
Sender.waitOnMetadata
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
// 将不存在于metadata的主题添加到metadata的topic list,并重置到期时间
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
metadata.add(topic, nowMs);
Integer partitionsCount = cluster.partitionCountForTopic(topic);
// 如果存在topic的metadata,并且本次发送的目标分区未指定或在已知分区之中,则无需重新获取topic的metadata,直接返回
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long remainingWaitMs = maxWaitMs;
long elapsed = 0;
// 持续发送获取topic metadata的请求,直到获取到信息或是超出设置的MaxWaitTimes
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
metadata.add(topic, nowMs + elapsed);
int version = metadata.requestUpdateForTopic(topic);
sender.wakeup();
try {
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
cluster = metadata.fetch();
elapsed = time.milliseconds() - nowMs;
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
5.2、消息分区策略
在KafkaProducer的源码中,在key value序列化之后,通过partition方法决策消息的目标分区
KafkaProducer
// KafkaProducer Constructor 中读取配置文件 partitioner.class 属性获取分区策略实现类
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
// 通过partitioner决策数据目标分区
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
在Server章已经聊过分区策略的常用策略和自实现策略的使用方法,不再赘述。这里着重说一下 DefaultPartitioner 实现类:
5.2.1、 DefaultPartitioner
在默认的分区策略中,Kafka选择在消息具备key时通过key进行分区决策,在不存在key时通过 StickyPartitionCache.partition 进行分区选择,而这个StickyPartitionCache 被官方称为 “粘性分区策略”的缓存实现类
DefaultPartitioner
public class DefaultPartitioner implements Partitioner {
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
5.2.2、粘性分区策略 及 StickyPartitionCache
所谓粘性分区策略即对同一Topic的所有消息都尽可能的往一个分区中存储,使得消息缓存能够静快的达到Sender的发送阈值,提高消息发送的实时性。
从代码层面看,StickyPartitionCache提供的是一个粘性分区的缓存池,即存储了各个Topic的唯一指定分区号(被粘住的分区),并期望所有数据都向这被粘住的分区中进行发送;当缓存已经将这个被粘住的分区数据进行发送,StickyPartitionCache则将更新所粘分区,更新策略为随机更新:
当不具备可用分区时,随机选取分区作为新的粘性分区
当可用分区仅1个时,选取这个可用分区作为新的粘性分区
当可用分区多个时,在可用分区中随机选取一个和上一次所粘分区不同的分区作为粘性分区
StickyPartitionCache
public class StickyPartitionCache {
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
public int partition(String topic, Cluster cluster) {
// 如果当前Topic存在粘性分区,则直接返回
Integer part = indexCache.get(topic);
if (part == null) {
// 如果不存在,选取新的分区作为粘性分区
return nextPartition(topic, cluster, -1);
return part;
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
if (oldPart == null || oldPart == prevPartition) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
// 随机选取
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
// 选取唯一可用
newPart = availablePartitions.get(0).partition();
} else {
// 在可用分区中随机选取非上一次的分区
while (newPart == null || newPart.equals(oldPart)) {
int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
// Only change the sticky partition if it is null or prevPartition matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
return indexCache.get(topic);
return indexCache.get(topic);
5.3、 消息发送
通过分区策略选择的分区进行缓存,随后通过Sender统一发送,详见: 1.7、消息发送
6、消息缓存
Kafka在用户线程与Sender线程之间存在一个缓存区间:RecordAccumulator,缓存的存在使得Sender可以一次进行批量的数据发送,减少了网络传输的资源消耗和I/O资源消耗。但相应的也会造成消息准时性的问题,因此存在消息的缓存超时时间和粘性分区策略等。
对于KafkaProducer而言,缓存的主要实现及主要涉及类如下:
RecordAccumulator: 累加器,其中管理着各个Topic下的ProducerBatch集合。控制ByteBuffer的分配、MemoryRecordsBuilder的创建、ProducerBatch的创建和消息的缓存追加。
ProducerBatch:批记录,帮助管理和追加记录、发送Future和Callback关系、ProduceRequestResult等,其中通过MemoryRecordBuilder实现记录的存储。
MemoryRecordBuilder: 记录存储构建器,存储处理记录,其中使用包装为OutputStream的ByteBuffer实现记录的流式存储。
7、消息发送
消息发送的核心在Sender类中,Sender实现了Runable接口,在run()方法中不断的循环的进行核心方法的调用:
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
在runOnce方法中,主要核心如下(去除对于事物的特殊支持):
void runOnce() {
long currentTimeMs = time.milliseconds();
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
其中主要干了两件事:
sendProducerData: 根据当前时间决策需要发送的数据
client.poll: 调用client进行数据的实际IO操作
接下来主要跟进这两个方法看看其内部到底干了什么
7.1、sendProducerData
sendProducerData方法的代码过长,我们一步一步拆解来看:
首先拿到了基本信息: Cluster信息和需要发送的消息列表:
Cluster cluster = metadata.fetch();
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
如果这些分区还不知道所在集群的领导者,则发起一次请求更新他们的metadata:
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
this.metadata.requestUpdate();
将待发送列表中还没有准备好的结点进行移除(不发送未准备好的结点),并且获取到最近的还没准备好的node的时间差,在发送时以此为时间界限进行poll操作,避免发送了还没准备好的数据
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
从nodes中抽取待发送的批消息,并按照nodeId进行分组
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
将抽取的批消息记录在inFlightBatches中,其中按照TopicPartition进行分组
addToInflightBatches(batches);
从inFlightBatches和accumulator中获取所有已经过期的批消息,并将其存储到expiredBatches变量中:
accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);
如果存在过期的数据,则记录日志并将批数据置为失败(timeout失败)
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
failBatch(expiredBatch, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
// This ensures that no new batches are drained until the current in flight batches are fully resolved.
transactionManager.markSequenceUnresolved(expiredBatch);
计算最大的允许poll的超时时间: 该时间从最近的一个未准备好的node的时差、最近一次准备好的检查的延时时差和最近一次的超时时差中取最小值。 即这个如果短时间内没有出现以下情况,则可以持续的进行poll阻塞操作:
有新的node准备好
将进行一次新的检查
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
特殊的,如果有消息需要进行发送,则pollTimeout设为0,意味立即进行消息的发送
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
将所有批消息进行请求的构建
sendProduceRequests(batches, now);
至此,sendProdcerData方法结束,可以看到,方法内部主要存在的逻辑如下:
构建需要发送的批消息
处理已经过期的批消息
计算允许最大的超时时间(返回值)
进行批消息发送请求的构建
7.2、client.poll
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
handleCompletedSends(responses, updatedNow);
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutConnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
方法结构较为简单,主要分为:
特殊情况的处理;
metadata的更新
实际的poll方法调用,进行网络IO或阻塞
结果的处理(数据收集),连接管理,handler触发等
此处就不过多的展开进行说明,唯一要提到的是:当无发送任务时,this.selector.poll将被阻塞。 因为sendProducerData中的timeout值计算在没有任务的情况下不为0,因此this.selector.poll将被阻塞timeout。
当有任务填入并期望发送或存在新的batch时时,在KafkaProducer.doSend方法中将主动进行selector的唤醒。
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
摸鱼端开发