Kafka Exactly Once和事务

说到分布式事务,大多数想到的都是2PC,3PC,TCC等,然而真正使用和实现的现有项目又比较少,在Kafka中,提供了批量消息的事务实现,接下来简单分析下Kafka事务的原理。

Kafka Exactly Once

Kafka的事务首先依赖于Kafka Exactly-Once语义,保证发送的消息不重复,Kafka Exactly Once语义的实现原理很简单,类似TCP协议中的Sequence Number.

Kafka Producer中,如果配置了enable.idempotencetrue,则会开启消息的幂等性,Kafka所提供的消息幂等语义如下:

  • Producer本次Session中发送的消息单条,有且仅有一次持久化到Broker

注意,是单个生产者,单次session,单个Partition的单条消息的幂等。

Producer开启幂等后,Producer会在Broker中申请一个Producer ID(PID),用来标识每个客户端,同时在发送消息时,会给每条消息带上一个sequence numbers(SN),当且仅当Broker中接受的消息的 PID+SN 等于服务器当前保存的最大PID+SN+1时,Broker才会接收这条消息,当消息PID+SN小于服务器中PID+SN+1,说明消息重复发送,当消息PID+SN大于服务器PID+SN+1,此时Producer会抛出异常。

明白原理之后说说细节:

我们都知道,在Kafka Producer中,消息是批量多线程发送的,因此如果出现消息重试怎么办?同时如果选择ack=0ack=1,那么消息丢失了也是违背了Exactly Once的语义,这些在Kafka 中,是如何处理的呢?

首先,当你配置enable.idempotencetrue之后,Kafka或强制检查以下配置,如果不为下面的配置,则会抛出异常:

  • retries必须大于0,默认为Integer.MAX_VALUE,表示无限重试
  • max.in.flight.requests.per.connection不大于5,默认为5
  • ack=-1,开启幂等后默认为-1

因此,通过ackretries能保证消息不丢失。而同时发送等待的线程数。

但是为什么max.in.flight.requests.per.connection需要不大于5.按照个人印象应该为1才对,因为比如设置为5,同时发送了1,2,3,4,5,5条消息,那么如果此时编号为1的消息失败了,那么sequence number不就不连续了。但是通过网上资料说明,如果将这个值设置1,那么Kafka吞吐下降的会比较多,因此在KafkaBroker端中缓存了5这个PID对应分区的最近5条消息,进而形成了一个滑动窗口,而5是在服务器写死的一个值,因此最大为5.

从这里也可以看出,如果Kafka开启幂等后,不仅可以保证有且仅有一次送达,还可以保证有序,同时性能也比为了达到有序使得max.in.flight.requests.per.connection=1要高。

简单的说,因为幂等能知道消息发送的进度,因此及时5个线程并发发送,当发生重试的时候,将前面的数据返回异常让其都重试即可。


总的来说,KafkaExactly-Once原理和TCP的原理一致,不过Kafka添加了很多业务限制。

Kafka 事务

kafka所提供的事务依赖于上面所说的Exactly Once特性,因为如果消息可以重复,那么也就违背了要么都成功,要么都不成功的语义。

Kafka所提供的事务如下:

  • 保证多次提交到不同主题和不同分区的消息的原子性,即要么全部发送成功,要么全部发送失败
  • 保证conumser-transform-produce 应用模式中,消息能被原子性转换。

需要注意的是KafkaExactly Once和事务都是对于Producer而言,因为对于消费者来说:

  • 对于 compacted topic,在一个事务中写入的数据可能会被新的值覆盖;

  • 一个事务内的数据,可能会跨多个 log segment,如果旧的 segmeng 数据由于过期而被清除,那么这个事务的一部分数据就无法被消费到了;

  • Consumer 在消费时可以通过 seek 机制,随机从一个位置开始消费,这也会导致一个事务内的部分数据无法消费;

  • Consumer 可能没有订阅这个事务涉及的全部 Partition

还有就是,对于消费者而言不管是先消费,再提交还是先提交,再消费,都不能保证消息正好消费一次,因此Kafka目前的事务都针对于Producer发送消息。

对于Kafka的事务而言,由于可能跨Topic,跨Partition,因此相当于多个机器进行事务处理。需要解决的问题如下:

  • 在不引入其他中间件的情况下,引入什么协议?2PC?3PC
  • 不管2PC还是3PC,都需要先将消息暂存,然后由协调者发起commit,在这期间,对于没有commit的信息应该保存在哪里?消费者是否可读?
  • 协调者角色怎么确定?怎么保证高可用?
  • 生产者在事务发送到一半宕机后,应该如何处理?宕机恢复后,又怎么处理上一次未完成的情况?

带着上面的问题,依次来看:

Kafka中,选用的是类似2PC的协议,不过这里的协调者更多的是处理事务状态的流转。关于协调者的高可用其实在前面的文章就有出现过,之前Kafka移除了Consumer OffsetZookeeper的依赖,使用的就是通过_consumer_offset_ topic来持久化,通过多副本+ISR来保证高可用和数据不丢失,因此这里依然可以使用这么方式。Kafka对于事务选用的topic为:__transaction_state__,和_consumer_offset_一样,默认3个副本,50个分区。

协调者名为TransactionCoordinator

解决协议和协调者问题,接下来看如何标识一个事务?怎么表示这个事务正在准备,提交/放弃,事务完成呢?

Producer需要使用事务时,需要调用API,模板如下:

//初始化事务
producer.initTransactions();

try {
    //事务开始
    producer.beginTransaction();

    //...发送消息

    //提交事务
    producer.commitTransaction();
} catch (ProducerFencedException e1) {
    e1.printStackTrace();
    producer.close();
} catch (KafkaException e2) {
    e2.printStackTrace();
    //发生异常,回滚事务
    producer.abortTransaction();
}
producer.close();

可以看到,通过API可以标识一个事务的初始化,开始,提交,回滚。

初始化:

Producer会首先向任意一个broker发送查找自己对应事务协调器的请求,获取请求后,Producer会向事务协调器请求PID,同时在这个过程中,如果发现对应TransactionId有之前未完成的任务,它还会做以下两件事:

  • 恢复Producer对应TransactionId之前未完成的事务(Commit/Abort
  • PID对应epoch进行递增,防止脑裂问题。

可以看到,Kafka事务为了实现Producer的主从功能,提出了TransactionId的概念,同一个TransactionId只能有一个在运行,后面启动的Producer会使得前面的Producer立即抛出异常。

开始事务:

本地记录事务状态为开始,但是协调器只有在接受到事务第一条消息后,才会标记为事务真正的开始。

进行事务:

kafka事务中,会原子的支持Consumer-Process-Producer过程,因此在这个过程中还提供了一个API

producer.sendOffsetsToTransaction();,这个过程会将消费的offset暂存在协调器中,等事务提交时统一提交。

提交/回滚:

当提交或回滚的时候,协调器会进行一个两段提交

  • 第一阶段,将事务日志,将此事务设置为PREPARE_COMMITPREPARE_ABORT
  • 第二阶段,发送Transaction Marker给事务涉及到的Leader发送标记信息,标记此条信息为已提交或已放弃

当完成第二阶段后,协调器最终会将此事务标记为COMPLETE_COMMITCOMPLETE_ABORT

故障恢复

明白了事务流程之后,简单说一下Kafka对事务的保障:

  • 首先,是一个2PC的提交过程,为什么不用3PC? 因为Kafka的业务仅仅是追加消息,不会涉及到修改数据,因此一般出现问题的情况比较小。
  • 对于2PC协调器,由__transaction_state__ topic的某个Leader担任,由Kafka本身确保高可用
  • 协调器负责传递和持久化事务状态,通过持久化状态,可以使得协调器即使崩溃,也能选举新的Leader继续补全事务
  • 在提交阶段,为了防止其他Leader崩溃而没有收到commit消息,协调器会先保存事务状态,再发送Transaction Marker消息
  • Kafka为了不修改消息状态,会额外持久化Transaction Marker,当消费事务消息的时候,会组合消息和标记共同判断这个消息是否能够被消费。

接下来分下下故障恢复,通过以上状态,如何保证及时出现故障Kafka也能使用:

  1. Producer 在发送 beginTransaction() 时,如果出现 timeout 或者错误:Producer 只需要重试即可;
  2. Producer 在发送数据时出现错误:Producer 应该 abort 这个事务,如果 Produce 没有 abort(比如设置了重试无限次,并且 batch 超时设置得非常大),TransactionCoordinator 将会在这个事务超时之后 abort 这个事务操作;
  3. Producer 发送 commitTransaction() 时出现 timeout 或者错误:Producer 应该重试这个请求;
  4. Coordinator Failure:如果 Transaction Coordinator 发生切换(事务 topic leader 切换),Coordinator 可以从日志中恢复。如果发送事务有处于 PREPARE_COMMIT 或 PREPARE_ABORT 状态,那么直接执行 commit 或者 abort 操作,如果是一个正在进行的事务,Coordinator 的失败并不需要 abort 事务,producer 只需要向新的 Coordinator 发送请求即可。

写在后面

本来只是好奇Kafka是怎么实现的事务,结果发现有这么多细节问题,当然文章涉及的也只是冰山一角。

这篇文章写得不太好,因为内容几乎来自于网上的资料。