说到分布式事务,大多数想到的都是2PC,3PC,TCC
等,然而真正使用和实现的现有项目又比较少,在Kafka
中,提供了批量消息的事务实现,接下来简单分析下Kafka
事务的原理。
Kafka Exactly Once
Kafka
的事务首先依赖于Kafka
Exactly-Once
语义,保证发送的消息不重复,Kafka Exactly Once
语义的实现原理很简单,类似TCP
协议中的Sequence Number
.
在Kafka Producer
中,如果配置了enable.idempotence
为true
,则会开启消息的幂等性,Kafka
所提供的消息幂等语义如下:
- 该
Producer
本次Session
中发送的消息单条,有且仅有一次持久化到Broker
中
注意,是单个生产者,单次session,单个
Partition
的单条消息的幂等。
当Producer
开启幂等后,Producer
会在Broker
中申请一个Producer ID(PID)
,用来标识每个客户端,同时在发送消息时,会给每条消息带上一个sequence numbers(SN)
,当且仅当Broker
中接受的消息的 PID+SN
等于服务器当前保存的最大PID+SN+1
时,Broker
才会接收这条消息,当消息PID+SN
小于服务器中PID+SN+1
,说明消息重复发送,当消息PID+SN
大于服务器PID+SN+1
,此时Producer
会抛出异常。
明白原理之后说说细节:
我们都知道,在Kafka Producer
中,消息是批量多线程发送的,因此如果出现消息重试怎么办?同时如果选择ack=0
或ack=1
,那么消息丢失了也是违背了Exactly Once
的语义,这些在Kafka
中,是如何处理的呢?
首先,当你配置enable.idempotence
为true
之后,Kafka
或强制检查以下配置,如果不为下面的配置,则会抛出异常:
retries
必须大于0,默认为Integer.MAX_VALUE
,表示无限重试max.in.flight.requests.per.connection
不大于5,默认为5ack=-1
,开启幂等后默认为-1
因此,通过ack
和retries
能保证消息不丢失。而同时发送等待的线程数。
但是为什么max.in.flight.requests.per.connection
需要不大于5.按照个人印象应该为1才对,因为比如设置为5,同时发送了1,2,3,4,5,5条消息,那么如果此时编号为1的消息失败了,那么sequence number
不就不连续了。但是通过网上资料说明,如果将这个值设置1,那么Kafka
吞吐下降的会比较多,因此在Kafka
的Broker
端中缓存了5这个PID
对应分区的最近5条消息,进而形成了一个滑动窗口,而5是在服务器写死的一个值,因此最大为5.
从这里也可以看出,如果
Kafka
开启幂等后,不仅可以保证有且仅有一次送达,还可以保证有序,同时性能也比为了达到有序使得max.in.flight.requests.per.connection=1
要高。简单的说,因为幂等能知道消息发送的进度,因此及时5个线程并发发送,当发生重试的时候,将前面的数据返回异常让其都重试即可。
总的来说,Kafka
的Exactly-Once
原理和TCP
的原理一致,不过Kafka
添加了很多业务限制。
Kafka 事务
kafka
所提供的事务依赖于上面所说的Exactly Once
特性,因为如果消息可以重复,那么也就违背了要么都成功,要么都不成功的语义。
Kafka
所提供的事务如下:
- 保证多次提交到不同主题和不同分区的消息的原子性,即要么全部发送成功,要么全部发送失败
- 保证
conumser-transform-produce
应用模式中,消息能被原子性转换。
需要注意的是Kafka
的Exactly Once
和事务都是对于Producer
而言,因为对于消费者来说:
- 对于 compacted topic,在一个事务中写入的数据可能会被新的值覆盖;
-
一个事务内的数据,可能会跨多个
log segment
,如果旧的segmeng
数据由于过期而被清除,那么这个事务的一部分数据就无法被消费到了; -
Consumer
在消费时可以通过 seek 机制,随机从一个位置开始消费,这也会导致一个事务内的部分数据无法消费; -
Consumer
可能没有订阅这个事务涉及的全部 Partition
还有就是,对于消费者而言不管是先消费,再提交还是先提交,再消费,都不能保证消息正好消费一次,因此Kafka
目前的事务都针对于Producer
发送消息。
对于Kafka
的事务而言,由于可能跨Topic
,跨Partition
,因此相当于多个机器进行事务处理。需要解决的问题如下:
- 在不引入其他中间件的情况下,引入什么协议?
2PC
?3PC
- 不管
2PC
还是3PC
,都需要先将消息暂存,然后由协调者发起commit
,在这期间,对于没有commit
的信息应该保存在哪里?消费者是否可读? - 协调者角色怎么确定?怎么保证高可用?
- 生产者在事务发送到一半宕机后,应该如何处理?宕机恢复后,又怎么处理上一次未完成的情况?
带着上面的问题,依次来看:
在Kafka
中,选用的是类似2PC
的协议,不过这里的协调者更多的是处理事务状态的流转。关于协调者的高可用其实在前面的文章就有出现过,之前Kafka
移除了Consumer Offset
对Zookeeper
的依赖,使用的就是通过_consumer_offset_ topic
来持久化,通过多副本+ISR
来保证高可用和数据不丢失,因此这里依然可以使用这么方式。Kafka
对于事务选用的topic
为:__transaction_state__
,和_consumer_offset_
一样,默认3个副本,50个分区。
协调者名为TransactionCoordinator
。
解决协议和协调者问题,接下来看如何标识一个事务?怎么表示这个事务正在准备,提交/放弃,事务完成呢?
当Producer
需要使用事务时,需要调用API
,模板如下:
//初始化事务
producer.initTransactions();
try {
//事务开始
producer.beginTransaction();
//...发送消息
//提交事务
producer.commitTransaction();
} catch (ProducerFencedException e1) {
e1.printStackTrace();
producer.close();
} catch (KafkaException e2) {
e2.printStackTrace();
//发生异常,回滚事务
producer.abortTransaction();
}
producer.close();
可以看到,通过API
可以标识一个事务的初始化,开始,提交,回滚。
初始化:
Producer
会首先向任意一个broker
发送查找自己对应事务协调器的请求,获取请求后,Producer
会向事务协调器请求PID,同时在这个过程中,如果发现对应TransactionId
有之前未完成的任务,它还会做以下两件事:
- 恢复
Producer
对应TransactionId
之前未完成的事务(Commit
/Abort
) - 对
PID
对应epoch
进行递增,防止脑裂问题。
可以看到,
Kafka
事务为了实现Producer
的主从功能,提出了TransactionId
的概念,同一个TransactionId
只能有一个在运行,后面启动的Producer
会使得前面的Producer
立即抛出异常。
开始事务:
本地记录事务状态为开始,但是协调器只有在接受到事务第一条消息后,才会标记为事务真正的开始。
进行事务:
在kafka
事务中,会原子的支持Consumer-Process-Producer
过程,因此在这个过程中还提供了一个API
producer.sendOffsetsToTransaction();
,这个过程会将消费的offset
暂存在协调器中,等事务提交时统一提交。
提交/回滚:
当提交或回滚的时候,协调器会进行一个两段提交
- 第一阶段,将事务日志,将此事务设置为
PREPARE_COMMIT
或PREPARE_ABORT
- 第二阶段,发送
Transaction Marker
给事务涉及到的Leader
发送标记信息,标记此条信息为已提交或已放弃
当完成第二阶段后,协调器最终会将此事务标记为COMPLETE_COMMIT
或COMPLETE_ABORT
故障恢复
明白了事务流程之后,简单说一下Kafka
对事务的保障:
- 首先,是一个
2PC
的提交过程,为什么不用3PC
? 因为Kafka
的业务仅仅是追加消息,不会涉及到修改数据,因此一般出现问题的情况比较小。 - 对于
2PC
协调器,由__transaction_state__
topic
的某个Leader
担任,由Kafka
本身确保高可用 - 协调器负责传递和持久化事务状态,通过持久化状态,可以使得协调器即使崩溃,也能选举新的
Leader
继续补全事务 - 在提交阶段,为了防止其他
Leader
崩溃而没有收到commit
消息,协调器会先保存事务状态,再发送Transaction Marker
消息 Kafka
为了不修改消息状态,会额外持久化Transaction Marker
,当消费事务消息的时候,会组合消息和标记共同判断这个消息是否能够被消费。
接下来分下下故障恢复,通过以上状态,如何保证及时出现故障Kafka
也能使用:
Produce
r 在发送beginTransaction()
时,如果出现timeout
或者错误:Producer
只需要重试即可;Producer
在发送数据时出现错误:Producer
应该abort
这个事务,如果Produce
没有abort
(比如设置了重试无限次,并且batch
超时设置得非常大),TransactionCoordinator
将会在这个事务超时之后 abort 这个事务操作;Producer
发送commitTransaction()
时出现 timeout 或者错误:Producer 应该重试这个请求;Coordinator
Failure
:如果Transaction
Coordinator 发生切换(事务 topic leader 切换),Coordinator
可以从日志中恢复。如果发送事务有处于PREPARE_COMMIT
或 PREPARE_ABORT 状态,那么直接执行commit
或者 abort 操作,如果是一个正在进行的事务,Coordinator
的失败并不需要 abort 事务,producer
只需要向新的Coordinator
发送请求即可。
写在后面
本来只是好奇Kafka
是怎么实现的事务,结果发现有这么多细节问题,当然文章涉及的也只是冰山一角。
这篇文章写得不太好,因为内容几乎来自于网上的资料。