Kafka 开发使用指南

Kafka 总结

前言: 最近发布的文章都是类似xxx总结,因为工作原因,没有时间一个章节一个章节的剖析,但是又不想放弃总结习惯,因此就选择了这种结合几本书写一个总结形式,怎么说呢,等后面不那么忙的时候,再来简单补一补吧。

下面的内容,结合了《图解 Kafka 之实战指南》和《Kafka 权威指南》两本书,《图解 Kafka 之实战指南》讲的比较细,但是有些地方有点难以理解,《Kafka 权威指南》讲的比较通俗,但是又不够深入。结合两本一起看比较好。

Kafka Broker

一般来说,在开发的时候,一般都是部署kafka,然后开发生产者和消费者,因此对于开发者来说,一般不用怎么管Broker,但是了解Broker的原理,才能在出问题的时候,或者有更加精确的要求的时候,能够更好的使用。

在设计一个高可用的Broker的时候,需要考虑以下几点:

  • 如果保证高可用? 这是一个消息中间件最重要的属性,当其中一个实例崩溃的时候,不能影响整个系统的使用。
  • 如果保证消息的有序性? 在某些场景中需要保证消息的顺序,那如何保证消息的有序性呢?
  • 对于消费者,消息是主动推还是让消费者自己拉呢?两种方式都有利有弊,不过基于一些异常设计的话,一般消费者拉模式比较简单
  • 对于多个消费者想要实现负载均衡,应该如何实现,有什么注意事项
  • 消息的拉取是否可以指定消息位移,如果可以指定消息位移,那应该如何快速索引到相关消息的位置
  • 消息持久化的策略?什么时候能够清除哪些消息。
  • Kafka Broker为了保证高效,都使用了哪些优化手段?
  • ….

带着这些问题,我们来了解Kafka

Kafka 是 由 LinkedIn 公司采用 Scala 语言开发的一个多分区、多副本且基于 ZooKeeper 协调的分布式消息系统。它的特点是高吞吐、可持久化、可水平扩展、支持流数据处理等。
Kafka主要分为3个模块,BrokerConsumer,Producer,整体架构如下

image-20201015112559041

对于BrokerKafka将其设计为多个分区,对于每个消息,在发送的时候都需要指定主题(Topic)和分区(Partition),主题作为逻辑概念,消费者只用在订阅的时候指定Topic就能收到相关的消息,用户可以通过Topic来区分消息类型,消费者只用订阅感兴趣的Topic,而分区是物理概念,在Broker中一般是用来做负载均衡。


问题一: Kafka如何保证高可用

前面说过,KafkaTopic从物理上分为多个分区,其中每个分区还包含了多个副本,副本包含Leader副本和Follower副本,Leader副本负责写入消息和读取消息,Follower副本则负责同步Leader副本写入的消息,如果Leader副本在某个时间崩溃了,此时会选举一个Follower副本作为Leader副本继续工作。这便是Kafka高可用的主要办法。

通过Leader-Follower的方式,虽然保证了高可用,但是也会带来一定的问题,那就是消息同步。

当一个消息到达Leader的时候,如何才能保证不管什么时候,这条消息都不会丢失呢?

  • 首先,当生产者发送消息时,确认消息送达时机,ack=1,表示消息到达Leader后,就返回成功确认,ack=-1表示消息到达Leader后,并且同步到所有ISR集合中后,才返回确认成功,因此如果需要保证消息完全不丢失,需要配置ack=-1

  • 其次,当副本过多的时候,如果设置ack=-1就表示需要将消息同步给所有副本,可能会因为某些副本处理能力,网络等原因一直无法跟上Leader的进度,因此Kafka设定了ISR集合和OSR集合,ISR表示在一定程度内能和Leader同步的副本,OSR表示这个时间段未能和Leader同步的副本。因此当生产者发送ack=-1的消息的时候,只用同步给所有ISR集合即可返回,因为只有ISR集合内的副本才能被选举为Leader

  • unclean.leader.election: 当Kafka中所有ISR集合中的副本都崩溃,那么是否能选举OSR集合中的副本为Leader,此参数默认为false,如果设置为true,则表示未同步的副本也能成为Leader,这样会导致消息的问题。

  • min.insync.replicas: 最少同步副本,使用的情况和上面一样,表示ISR中至少需要存活多少个副本,才能写入消息,当少于这个数量的时候,写入消息会报错,但是可以读取消息。

    min.insync.replicasunclean.leader.election不同的作用在于,如果ISR中只有一个副本,那么当这个副本崩溃,并且一直无法启动,那么消息便永久丢失了,而使用min.insync.replicas能够保证至少存在几个副本的时候,才能写消息,除非几个副本同时无法启动,否则不会丢消息。

  • flush.messages: 表示多少条消息后强制将消息落盘,因为Kafka依靠的系统缓存实现快速IO,如果使用中物理机突然断电,则可能消息还没有刷新到磁盘中,此选项表示多少条消息后,强制刷入磁盘一次,但是比较影响性能,建议这个问题的解决方案通过Leader-Follower来解决

总之,kafka考虑了各种情况如何真正的保存消息,了解这些机制,才能在实际使用的时候,结合业务作出选择。

问题二: Kafka 如何保证消息的有序性

其实这个就比较简单了,Kafka将消息分类为Topic,同时又将Topic分为不同的分区。当消费者从Kafka拉取消息的时候,实际上是从指定分区拉取的消息。为了保证不必要的复杂度,Kafka只保证同一个分区,收到的消息和拉取的消息是同步的,因此如果需要保证消息的有序性,需要保证消息都在同一个分区中。

同时,由于Kafka生产者有重试机制,比如当生产者发送了一条消息,但是由于网络原因,发送失败,而消息的发送和确认是异步的,在失败之前可能还发送的其他消息,导致消息顺序被打乱,因此需要将生产者max.in.flight.requests.per.connection设置为1,表示每次只能存在一个”活跃消息”

问题三: Kafka 消费者的负载均衡

一般来说,高可用系统最终都会通过集群来实现负载均衡,那对于消费者来说,如果一个集群不简单处理,就很容易对同一条消息进行多次消费。

Kafka中,对消费者提出了消费者组(Consumer Group)的概念,对于一条消息来说,一条消息只会同一个消费者组的其中一个消费者获取到。这样变实现了负载均衡的要求,在Kafka Broker内部中,对于Consumer Group则是通过每个Consumer对应一个Partition来实现的,内部结构如下:

image-20201015170300293

如果消费者数量等于分区数量,则消费者和分区则是一对一的关系。

如果消费者数量少于分区数量,则一个消费者对应多个分区。

如果消费者数量多于分区数量,则多于的消费者永远都收不到消息。


简单说就是一个分区只会被一个消费者拉取消息,这样做就保证了分区有序。

明白了消费者组的概念,剩下的就需要知道消费者组实例的动态修改带来的影响。

如上图所示,假如某个主题包含4个分区,刚开始此消费者组只有2个实例,此时两个消费者应该各消费两个分区。此时又新加入了两个实例,那么按道理来说消费者和分区应该变成一一对应的关系。在这个过程中,应该有两个分区的内容会被新加入的两个实例订阅,如果不将这两个分区的消费进度上传到Kafka Broker中,则很可能造成重复消费的问题。

Kafka Consumer API中,提供了再均衡监听器,用来在当消费者组数量变化,需要从新分配订阅分区的时候,做一些处理工作,最重要的就是提交当前消费进度。

public interface ConsumerRebalanceListener {
    //在再均衡开始前,消费者停止读取消息之后调用
    void onPartitionsRevoked(Collection<TopicPartition> var1);

    //在再均衡完成之后,消费者开始读取消费消息之前被调用
    void onPartitionsAssigned(Collection<TopicPartition> var1);
}

实现此接口,并在订阅的时候传入实例化的对象,在再均衡开始前,会调用onPartitionsRevoked()方法,在再均衡结束,分配了新分区之后,会调用onPartitionsAssigned()方法,因此我们可以通过此接口用来提交消费位移和从新获取分区相关信息。

注意:

再均衡接口会在每次poll()的时候检查是否需要再均衡,若需要再均衡,则会直接触发再均衡操作。

问题四: 指定位移消息

有些时候,可能需要对一些消息进行回溯消费,Kafka Consumer API提供了seek()方法用来对消息进行回溯消费。

使用方式如下:

consumer.poll(1000);
Set<TopicPartition> topicPartitions=consumer.assignment();
for (TopicPartition topic : topicPartitions) {
    consumer.seek(topic ,250);
}

对于以上API,需要注意如下情况:

首先必须调用consumer.poll()方法,因为必须先通过该方法来分配此消费者负责的分区。

同时,对于consumer.poll(),如果没有消息过来,则会一直阻塞到超时,因此超时时间需要控制好,太短可能还没有获取信息,太长会阻塞太久

另外一个需要注意的就是seek()的第二个参数,一般来说,这个参数可能需要依据某些维度来获取,比如可以通过Consumer#offsetsForTimes()获取某个时间点的offset.

对于这个offset,需要注意的是Kafka对于消息是有清除策略的,如果Kafka内部只剩下100-200的消息体,而seek(topic,0)则会触发越界异常,此异常导致的行为依赖于auto.offset.reset参数的设置,earliest (从头开始消费消息),latest(从最新的消息开始获取),none(抛出异常)。默认为latest


对于Kafka Broker来说,指定位移消费就意味着需要索引消息,Kafka Broker将消息存为.log文件,与此同时,还会为每个.log文件生成.index.timeIndex用来保存此文件的索引和时间戳索引,并且文件名是以当前第一条消息的索引,用来作为基准偏移量,在索引文件中,保存的是对应消息的稀疏索引,当需要查找消息的时候,直接通过二分查找即可查找到对应的消息,文件名如图所示:

image-20201016114930734

问题五:Kafka的消息保留策略

前面说过,Kafka的一个特点是可持久化。如果要将Kafka用来持久化使用,则需要了解Kafka持久化的一些配置。

Kafka Broker中,可以铜鼓欧参数:log.cleanup.policy 设置以下持久化策略,

  • delete: 删除策略,会定期清理消息
  • compact: 日志压缩,会压缩然后保存消息

对于delete,有两种维度来清理消息,

  • log.retention.hours为时间维度配置,默认168小时,也就是消息保留7天,
  • log.retention.bytes为所有文件大小维度,默认-1,表示无穷大。

对于compact,Kafka则是通过key来标记一条消息的,如果对于相同的key的消息,Kafka会以最新的一条消息为准,删除以前的消息。

这里的Key可以类比数据库的主键,同一个主键的消息,会被最新的覆盖。由于Kafka compact会删除部分消息,因此此时Kafka内部的消息offset将不再是连续的。

问题六: Kafka为什么能做到高吞吐

Kafka作为一个消息中间件,主要功能便是分发消息,暂时持久化消息,因此最主要涉及到的功能便是IO和写磁盘。

对于IO来说,由于Kafka不需要额外编辑消息,因此可以很好的利用零拷贝的特性。使用零拷贝可以直接将磁盘中的内容不仅用户冭直接拷贝到网卡中,极大的提高了效率。

对于写磁盘来说,一般对于这种容器类的中间件,用的最多的便是内存,但是对于Kafka这种不用随机写的消息队列来说,直接顺序IO磁盘的效率依然很快,同时对于操作系统来说,磁盘的读写都会先写经过页缓存,因此Kafka没有直接使用用户内存,而是直接IO磁盘,借助操作系统页缓存完成高效率的IO

同时,由于要大量的操作对象,对于JVM来说,一个对象包含对象头,字节对其等消息,缓存大量对象会造成空间浪费比较严重,同时如果对象较多,还会给GC带来一定的压力,因此对于存储有一定规律的,可以使用堆外缓存来存放相关对象,在Kafka中大量使用了堆外缓存来存储消息,用来节约空间和减轻GC带来的Stop-The-World所带来的影响。

Kafka 生产者

在使用kafka的时候,不仅仅是调用send()将消息发送出去就可以不用关心了,作为一个高吞吐,高可用的消息中间件,Kafka生产者必须考虑以下几点:

  • 消息是否成功发送,对于没有成功发送的消息,是什么原因没有发送成功,是否需要重试
  • 生产者是否是线程安全的?如果是,如何保证效率,以及生产者内部的线程模型是怎样的,需要注意哪些地方才不会影响发送效率。
  • 对于大量的消息发送,是每条消息单独发送,还是缓存一定的消息数量然后批量发送
  • 消息需要发送到哪个分区?Kafka Broker包含多个分区,如何指定消息需要发送到哪个分区?
  • 对于严格要求有序的操作(比如同步MySQLBin Log),需要注意什么地方

一般来说,我们使用Kafka生产者都是使用Kafka官方提供的Kafka Customer API,其大概的过程是

  • 用户调用send()方法发送消息
  • API将消息序列化成字节
  • Broker建立连接,将消息发送给Broker
  • Broker收到消息后,返回ack给生产者
  • 生产者收到ack后,调用用户客户端的回调接口

问题一: 如何确认消息成功发送到Broker

大体流程如上所述,虽然看着比较简单,但是对于每个细节,都需要详细设计。

发送消息

如何确认消息是否已经发送,需要从头开始确认,首先,需要确认消息是否已经发送,对于Customer API,发送消息分为以下3个级别:

  • kafkaProducer.send(producerRecord);

  • Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
    future.get();
    
  • kafkaProducer.send(producerRecord,(r,e)->{
        if (e!=null){
            //发送报错
        }else if (r!=null){
            //发送成功
        }
    });
    

对于第一种,就是简单的发送消息,如果配置错误或者其他不可重试的错误,只会被简单的记录到kafka日志中。

对于第二种,是一种阻塞的消息发送,在调用get()的时候,如果发生了不可重试的错误,则会抛出异常。

对于第三种,是一种异步的消息发送,当发生不可重试的错误的时候,参数e不为空,当发送成功时,r不为空

确认消息

如果需要确认消息是否成功发送,则需要第二种或第三种使用方式,将因为不可重试的错误导致没有成功发送的消息记录下来。


在确认消息已经发送后,则需要确认消息是否已经到达Broker,这需要Broker返回ack,对于ack返回的时机,也是比较重要的。从解析Broker中我们知道Kafka的高可用是通过Leader-Follower来实现的,但是如果消息到达Leader后直接返回ACK,那么可能出现正好这个时候Leader崩溃,从而导致消息丢失,因此可能需要设置ACK-1表示消息完全同步到Follower后,再返回成功。

ack一种包含三种配置,0 ,表示发送了消息就返回,不管Leader是否有收到,1表示Leader收到后就返回,-1表示Leader收到后并同步给Follower再返回。


问题二: Kafka 生成者线程模型

了解Kafka Producer API的线程模型,才能更好的使用生产者发送消息,对于Kafka Producer,流程图如下:

image

可以看到,当用户调用send()方法的时候,消息会先通过拦截器,简单修改消息,然后通过序列化器将消息序列化,序列化完成后,经过分区器设置消息的分区。接着分区完毕之后,生产者会将消息放进消息累加器(RecordAccumulator)中。

消息累加器是一个消息缓冲区,这个缓冲区充当了一个容器的角色,主线程不断的往里面放消息,Sender线程则不断地取出消息并进行发送。两个线程形成了一个生产者消费者模型。

消息在RecordAccumulator中被包装成ProducerBatchProducerBatch的大小通过参数batch.size控制,当一个ProducerBatch被填满后,Sender线程就会将其取出来发送给Broker

Sender线程会将ProducerBatch包装成Request,同时会创建一个等待返回的Request,当收到Broker返回的ACK之后,对应的Request会调用相关的回调方法。


明白了上面的总体流程后,下面简单总结在生产环境中使用生产者的注意事项。

  • 拦截器,序列化器,分区器都是工作在主线程,如果这里面包含一些比较耗费时间的操作,那么将会影响整个生产者的吞吐量
  • 消息累加器并不能无限添加消息,其大小受buffer.memory默认32M 控制,当消息累加器被填满后,send()方法将会被阻塞,因此如果数据量比较大,注意增大此选项
  • 等待返回的Request数量受max.in.flight.requests.per.connection参数影响,默认为5,也就是每个链接只能保持5个正在等待的消息。此参数决定了发送消息的并发度,超过此参数,send()方法也会被阻塞。

问题三: 对于大量的消息发送,Kafka是如何处理的

首先说结论:对于一个优秀的开源中间件来说,一般都会设计成可配置的。因为批量发送要基于业务场景考虑,如果对实时性要求比较高的话,可以一条一条的发送,但是一条一条的发送会性能会稍微低一些。如果对实时性要求不是那么高,那么kafka会将消息先缓存到缓冲中,然后等收集一定数量后再批量发送,批量发送只用进行一次网络IO,性能比较高。

Kafka中,缓冲的行为由两个参数控制:

  • linger.ms: 等待时间,生产者会在ProducerBatch被填满或等待时间超过linger.ms值的时候发送出去,增大这个参数的值会增加消息的延迟。默认为0,表示接受到一条消息就发送一次。
  • batch.size: 单个ProducerBatch缓冲区大小,生产者在linger.ms时间足够充足的情况下,会一直等待直到填满一个ProducerBatch大小才发送下次,默认为16k

问题四: 消息如何确定需要发送的分区

其实从问题二中,就能知道这个问题的答案,那就是分区器。这里简单介绍下细节:

分区器接口Partitioner

用户可以自定义分区器,来自定义如何分区,如果没有配置自定义的分区器,则会使用默认的分区器:DefaultPartitioner

默认分区器逻辑如下:

  • 如果keynull,则将其轮流发送到当前活跃的分区中
  • 如果key不为null,则通过 hash(key)%分区数量 来计算应该投送的分区

注意第一种情况发送的是一定可用的分区,而第二种情况可能会发送到一个已经崩溃的分区,这样做应该是为了保证同一个key发送到的分区是一样的。不过,一旦主题中增加了分区,那么key与分区映射关系依然会变化。

问题四:如何保证发送的消息是有序的

先说结论: Kafka能保证消息在同一个分区下是有序的。

对于kafka来说,分区的作用是用来做负载均衡的,如果为了保证消息的有序性,强制消费按顺序遍历所有分区消费,那么就失去了分区的作用了。因此Kafka只保证了同一个分区的消息是有序的。


其中还有一个值得注意的地方,那就是失败重试。

kafka生产者默认带有失败重试功能,同时生产者是可以并行发送的,加入在刚开始的时候,发送了一个消息A,然后发送了消息B,但是A发送失败然后又重试成功了,此时消息顺序就会变成BA

解决这个问题的方法很简单,那就是关闭并行发送,设置max.in.flight.requests.per.connection1

Kafka 消费者

  • poll()为我们做了那些事?
  • 消费者是否是线程安全的?如果不是,那么最佳实践是什么样的?
  • 基于拉模式的消费者,应该注意什么问题能保证不丢消息?
  • 在使用消费者客户端的时候,还有什么注意事项

问题一:poll()为我们做了那些事

对于Kafka消费者,是通过循环poll()消息来拉取消息,

public ConsumerRecords<K, V> poll(final Duration timeout);

poll()的作用不仅仅是拉取消息,一般来说,Broker与消费者的主要通信手段都是通过poll()方法,也就是说Broker再均衡,服务器相关信息交换等,都需要通过poll()来完成。

总结来说,poll()方法完成了下面几件事:

  • 建立TCP连接,发送心跳

  • 根据订阅的Topic 发送 join-group请求,请求加入到消费者组中,让broker为其分配对应的分区

  • 发送上一次拉取到的消息offset,提交消费位移
  • 检查是否需要再均衡,如果需要,则调用相关回调接口

可以看到,poll()不仅仅是简单的拉取消息,还涉及到其他一些列的信息维护工作。

首先,poll()需要发送心跳,因此在poll()循环中,尽量不要做延时太久的操作,否则如果超过心跳时间,会导致消费者频繁的加入,退出导致频繁的再均衡。

其次,如果配置了自动提交,那么在每次poll()的时候,会自动提交上一次poll()到的消费位移。

再次,poll()每次都会检查是否需要进行再均衡操作,如果需要,则会直接触发再均衡操作。

为什么要知道poll()做了什么事,是因为后面有很多代码都要围绕poll()来完成,明白其原理,才能避免踩一些坑。

问题二: 消费者客户端是否是线程安全的

其实这一个问题,简单看下源码就能发现,消费者客户端每次调用方法都会先检查当前是否有多个线程同时执行,如果是,则抛出异常。也就是说消费者并不是线程安全的。

对于非线程安全的消费者,如果有些时候需要提高吞吐,可能会使用多个线程,此时有两种比较好的模型。

第一种:每个线程一个消费者客户端。

image-20201022153250209

原理类似于ThreadLocal,每个线程持有一个消费者客户端,但是由前面的Kafka Broker分区模型我们知道,消费者客户端的数量受限于Broker分区数量。因此如果Broker分区数量比较少,那么多线程数量将会受到限制,同时如果线程数量较多,网络连接也会比较多。

上面说的只是注意事项,但是这是一种最简单的使用方式,对于业务量不大的系统,可以使用这种模型。

第二种: 生产者消费者模型

image-20201022154404503

Consumer单独在一个线程中,每次拉取到消息后,不进行业务处理,而是将其放在缓存队列中,后面的多线程handler依次从缓存队列中消费消息,也就是再化成一个生产者消费者模型。

这样的好处就是消费者可以随意扩展,线程数量也不受分区限制。

但是这样的模型,会带来一个问题。如果设置自动提交消费位移,那么很容易出现丢消息的情况,比如Consumer客户端在源源不断的拉取消息,上面我们说过poll()会提交上一次拉取的消息的位移,如果这个时候客户端突然崩溃,此时在缓存中还未进行业务处理的消息就会丢失。

想要解决这个问题,就需要手动提交位移,等真正进行了业务处理之后,再进行位移提交。

手动提交位移也存在一个问题,比如线程1负责消费0-99的消息,线程2负责消费100-199的消息,如果线程2先于线程1提交,同时线程1却发生了异常,这个是时候如果重启,0-99的消息依然会丢失。

想要解决这个问题,最好的办法就是通过滑动窗口,通过滑动窗口等待窗口内最小的消息消费完后,再滑动到下一格。


问题三: 使用消费者应该注意如何保证不丢消息

Kafka主从模式下,只要不是所有机器都同时崩溃,按道理来说就不会丢失消息,但是如果在生产者,消费者中,没有注意某些细节的地方,也会造成人为的丢消息。

对于消费者来说,丢消息的情况在问题二已经说了,一般来说,自动提交消息没有什么问题,因为poll()是提交的上一次拉取的消息的位移,因此即使崩溃,也能重新拉取。但是对于需要缓存消息的时候,就一定要注意需要手动提交消费位移。

问题四:使用消费者客户端的其他注意事项

除了注意消息丢失外,另外一个比较重要的就是消息重复。对于Kafka来说,很难保证一个消息不重复,目前kafka虽然已经支持“有且仅有一次”语义,但是日常工作用的比较少,对于其他语义来说,消息比较容易出现重复的情况,比如broker在回复ack的时候,因为网络原因超时或者丢失了, 此时生产者以为消息没有发送成功,于是又发送了一次。又比如,在Broker再均衡的时候或者消费者崩溃后,由于是手动提交消息,一般都是批量进行提交,如果这个时候客户端重启,那么之前消费了一半的消息,又会被消费一次。因此一定要保证消费是幂等的。保证的手段有很多,比如redis判断,数据库主键插入判断等。

总之,Kafka的使用远远不止这些,这里只是简单总结使用的时候容易忽略的问题,想要更加深入的了解Kafka,还需要多多阅读,继续研究。