Orderer共识组件提供HandleChain()方法创建通道绑定的共识组件链对象(
consensus.Chain
接口),包括
Solo
(
solo.chain
类型)、
Kafka
(
kafka.chainImpl
类型)等类型,属于通道共识组件的重要实现模块,并设置到链支持对象的
cs.Chain
字段。共识组件链对象提供Orderer共识排序服务,负责关联通道上交易排序、打包出块、提交账本、通道管理等工作,目前采用
Golang
通道或
Kafka
集群作为共识排序后端,
接收来自
Broadcast
服务过滤转发的交易消息
并进行排序。
kafka共识排序服务
orderer服务集群
Orderer
节点采用
Sarama
开源的
Kafka
第三方库构建
Kafka
共识组件,可以同时接受处理多个客户端发送的交易消息请求,能够有效提高
Orderer
节点处理交易消息的并发能力。同时,可利用
Kafka
集群在
单一分区内
按序收集相同主题消息(
消息序号唯一
)的功能,来保证交易消息具有确定性的顺序(以消息序号排序),从而实现对交易排序达成全局共识的目的。
Kafka
生产者按照主题(
Topic
)生产消息并进行发布,
Kafka
服务器集群自动对消息主题进行分类。同一个主题的消息都会被收集到一个或多个分区文件中,按照
FIFO
的顺序追加到文件尾部,并且每个消息在分区中都会有一个
OFFSET
位置偏移量作为该消息的唯一标识ID。目前,
Hyperledger Fabric
基于
Kafka
集群为
每个通道
创建绑定了一个主题(即链ID,
chainID
),并且只设置一个分区(分区号为0)。Kafka消费者管理多个分区消费者并订阅指定分区的主题消息,包括主题(即
chainID
)、分区号(目前只有1个分区号为0的分区)、起始偏移量(开始订阅的消息位置
offset
)等。
Hyperledger Fabric采用
Kafka
集群对单个或多个
Orderer
排序节点提交的交易消息进行排序。此时,
Orderer
排序节点同时充当
Kafka
集群的消息生产者(分区)和消费者,发布消息与订阅消息到Kafka集群上的同一个主题分区,即先将
Peer
节点提交的交易消息转发给Kafka服务端,同时,从指定主题的
Kafka
分区上按顺序获取排序后的交易消息并自动过滤重启的交易消息。这期间可能会存在网络时延造成获取消息时间的差异。如果不考虑丢包造成消息丢失的情况,则所有
Orderer
节点获取消息的顺序与数量应该是确定的和一致的。同时,采用相同的Kafka共识组件链对象与出块规则等,以保证所有Orderer节点都可以创建与更新相同配置的通道,并切割生成相同的批量交易集合出块,再“同步”构造出相同的区块数据,从而基于
Kafka
集群达成全局共识,以保证区块数据的全局一致性。
启动共识组件链对象
启动入口:
orderer/consensus/kafka/chain.go/Start()
func (chain *chainImpl) Start() {
go startThread(chain)
func startThread(chain *chainImpl) {
chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
if err = sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel); err != nil {
logger.Panicf("[channel: %s] Cannot post CONNECT message = %s", chain.channel.topic(), err)
chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)
chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)
close(chain.startChan)
chain.errorChan = make(chan struct{})
chain.processMessagesToBlocks()
startThread
函数首先创建kafka
生产者,发布消息到指定主题(即通道ID)和分区号的通道分区(chain.channel)上。
然后发送CONNECT
消息建立连接,该消息指定了主题Topic
字段为链ID、Key
字段为分区号0、Value
字段为CONNECT
类型消息负载等。订阅该主题的Kafka
(分区)消费者会接收到该消息。
接着创建指定Kafka
分区和Broker
服务器配置的Kafka
消费者对象,并设置从指定主题(链ID)和分区号(0)的Kafka
分区上获取消息。
最后,调用processMessagesToBlocks()
方法创建消息处理循环,负责处理从Kafka
集群中接收到的订阅消息。
processMessagesToBlocks
接收到正常的Kafka
分区消费者消息会根据kafka
的消息类型进行处理,包括以下几种类型:
Kafka- Message_Regular
KafkaMessage_TimeToCut
KafkaMessage_Connect
func (chain *chainImpl) processMessagesToBlocks() ([]uint64, error) {
for {
select {
case in, ok := <-chain.channelConsumer.Messages():
select {
case <-chain.errorChan:
switch msg.Type.(type) {
case *ab.KafkaMessage_Connect:
_ = chain.processConnect(chain.ChainID())
counts[indexProcessConnectPass]++
case *ab.KafkaMessage_TimeToCut:
if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {
logger.Warningf("[channel: %s] %s", chain.ChainID(), err)
logger.Criticalf("[channel: %s] Consenter for channel exiting", chain.ChainID())
counts[indexProcessTimeToCutError]++
return counts, err
counts[indexProcessTimeToCutPass]++
case *ab.KafkaMessage_Regular:
if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil {
counts[indexProcessRegularError]++
case <-chain.timer:
if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &chain.timer); err != nil {
counts[indexSendTimeToCutError]++
} ...
①:KafkaMessage_Connect类型消息
Kafka
连接消息用于测试连通Kafka
分区消费者的工作状态,用于验证Kafka
共识组件的正常工作状态与排除故障,并调用chain.processConnect(chain.ChainID())
方法处理该消息。
②:KafkaMessage_TimeToCut类型消息
processMessagesToBlocks
()方法可调用chain.processTimeToCut()
方法处理TIMETOCUT
类型消息。如果消息中的区块号ttcNumber
不是当前Orderer
节点当前通道账本中下一个打包出块的区块号(最新区块号lastCutBlockNumber
+1),则直接丢弃不处理。否则,调用BlockCutter().Cut()
方法,切割当前该通道上待处理的缓存交易消息列表为批量交易集合batch([]*cb.Envelope)
,再调用CreateNextBlock(batch)
方法构造新区块并提交账本。最后,调用WriteBlock(block,metadata)
方法,更新区块元数据并提交账本,同时更新Kafka共识组件链对象的最新区块号lastCutBlockNumber
增1。
事实上,Orderer
服务集群节点独立打包出块的时间点通常不是完全同步的,同时还可能会重复接收其他Orderer节点提交的TIMETOCUT类型消息(重复区块号)。此时,Orderer
节点以接收到的第一个TIMETOCUT
类型消息为准,打包出块并提交到账本,再更新当前通道的最新区块号lastCutBlockNumber
。这样,processTimeToCut
()方法就能利用最新的lastCutBlockNumber
过滤掉其他重复的TIMETOCUT
类型消息,以保证所有Orderer
节点上账本区块文件的数据同步,实际上是将原先的时间同步机制转换为消息同步机制。
③:KafkaMessage_Regular类型消息
包括通道配置交易消息(KafkaMessageRegular_CONFIG类型)和普通交易消息(KafkaMessageRegular_NORMAL类型)。 详细的分析将会在processRegular
方法中体现。
处理配置交易消息
我们先大概的看一下ProcessRegular中关于处理配置交易消息的代码部分,因为这部分相当的长,必须先看个概览:
func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error {
commitConfigMsg := func(message *cb.Envelope, newOffset int64){...}
seq := chain.Sequence()
switch regularMessage.Class {
case ab.KafkaMessageRegular_UNKNOWN:
case ab.KafkaMessageRegular_NORMAL:
case ab.KafkaMessageRegular_CONFIG:
我们直接跳转到case ab.KafkaMessageRegular_CONFIG
进行分析:
①:如果regularMessage.OriginalOffset 不为 0
说明这是重新过滤验证和排序的通道配置交易消息。
1.1 过滤重复提交的消息
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {}
1.2 确认是否是最近重新验证且重新排序的配置交易消息,并且通道配置序号是最新的
if regularMessage.OriginalOffset == chain.lastResubmittedConfigOffset &®ularMessage.ConfigSeq == seq {
close(chain.doneReprocessingMsgInFlight)
1.3 主动更新本通道的最近重新提交排序的配置交易消息初始偏移量lastResubmitted
存在其他Orderer
节点重新提交了配置消息,但是本地Orderer
节点没有重新提交该消息。因此这里需要更新本通道的最近重新提交排序的配置交易消息初始偏移量lastResubmitted。
if chain.lastResubmittedConfigOffset < regularMessage.OriginalOffset {
chain.lastResubmittedConfigOffset = regularMessage.OriginalOffset
②:regularMessage.OriginalOffset为 0
说明是第一次提交通道配置交易消息,而不是重新验证和重新排序的。
2.1 如果消息中的配置序号regularMessage.ConfigSeq小于当前通道的最新配置序号seq
则说明已经更新了通道配置(配置序号较高),然后再处理当前配置交易消息(配置序号较低)。将会调用ProcessConfigMsg
重新过滤和处理该消息。
接着通过configure
重新提交该配置消息进行排序,重置消息初始偏移量。然后再更新最近重新提交消息的偏移量。
if regularMessage.ConfigSeq < seq {
configEnv, configSeq, err := chain.ProcessConfigMsg(env)
if err := chain.configure(configEnv, configSeq, receivedOffset); err != nil {...}
chain.lastResubmittedConfigOffset = receivedOffset
chain.doneReprocessingMsgInFlight = make(chan struct{})
③:提交配置交易消息执行通道管理操作
经过上面的①和②过滤掉不符合条件的情况,接下来就提交配置交易消息执行通道管理操作,核心函数:commitConfigMsg(env, offset)
3.1 将当前缓存交易消息切割成批量交易集合
batch := chain.BlockCutter().Cut()
3.2 创建新区块block
block := chain.CreateNextBlock(batch)
3.3 构造Kafka元数据
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: receivedOffset - 1,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
3.4 写入区块
通过区块写组件提交新区块到账本,更新当前通道的最新区块号chain.lastCutBlockNumber增1
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
接着更新本链的lastOriginal- OffsetProcessed为newOffset参数,然后做和上面差不多的事情:
chain.lastOriginalOffsetProcessed = newOffset
block := chain.CreateNextBlock([]*cb.Envelope{message})
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: receivedOffset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
chain.WriteConfigBlock(block, metadata)
chain.lastCutBlockNumber++
不管是上面的WriteBlock
还是WriteConfigBlock
底层都是调用的commitBlock
,如下:
func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
...
bw.addBlockSignature(bw.lastBlock)
bw.addLastConfigSignature(bw.lastBlock)
err := bw.support.Append(bw.lastBlock)
接下来再讨论kafka共识组件如何处理普通交易消息的。
处理普通交易消息
还是先回到 processRegular
方法,关于处理普通消息的方法大概如下:
func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error {
case ab.KafkaMessageRegular_NORMAL:
if regularMessage.OriginalOffset != 0 {
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
if regularMessage.ConfigSeq < seq {
configSeq, err := chain.ProcessNormalMsg(env)
if err := chain.order(env, configSeq, receivedOffset); err != nil {}
offset := regularMessage.OriginalOffset
if offset == 0 {
offset = chain.lastOriginalOffsetProcessed
commitNormalMsg(env, offset)
处理普通交易消息的流程与处理配置交易消息的流程基本类似,主要看最后的commitNormalMsg(env, offset)
,我们来继续分析:
commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
batches, pending := chain.BlockCutter().Ordered(message)
if len(batches) == 0 {
chain.lastOriginalOffsetProcessed = newOffset
if chain.timer == nil {
chain.timer = time.After(chain.SharedConfig().BatchTimeout())
return
chain.timer = nil
offset := receivedOffset
if pending || len(batches) == 2 {
offset--
} else {
chain.lastOriginalOffsetProcessed = newOffset
block := chain.CreateNextBlock(batches[0])
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: offset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
if len(batches) == 2 {
chain.lastOriginalOffsetProcessed = newOffset
offset++
block := chain.CreateNextBlock(batches[1])
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: offset,
LastOriginalOffsetProcessed: newOffset,
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
首先将新的普通交易消息添加到当前的缓存交易列表,并切割成批量交易集合列表batches ,但最多只能包含2个批量交易集合,并且第2个批量交易集合最多包含1个交易。最终也是调用的WriteBlock
写入到账本。
到此为止整个processRegular
()方法处理消息结束。
总结及参考
kafka共识排序的逻辑其实是比较简单的,大概的流程如下 :
github.com/blockchainG… (文章图片代码资料在里面)
微信公众号:区块链技术栈