Kafka 总结
前言: 最近发布的文章都是类似xxx总结,因为工作原因,没有时间一个章节一个章节的剖析,但是又不想放弃总结习惯,因此就选择了这种结合几本书写一个总结形式,怎么说呢,等后面不那么忙的时候,再来简单补一补吧。
下面的内容,结合了《图解 Kafka 之实战指南》和《Kafka 权威指南》两本书,《图解 Kafka 之实战指南》讲的比较细,但是有些地方有点难以理解,《Kafka 权威指南》讲的比较通俗,但是又不够深入。结合两本一起看比较好。
Kafka Broker
一般来说,在开发的时候,一般都是部署kafka
,然后开发生产者和消费者,因此对于开发者来说,一般不用怎么管Broker
,但是了解Broker
的原理,才能在出问题的时候,或者有更加精确的要求的时候,能够更好的使用。
在设计一个高可用的Broker
的时候,需要考虑以下几点:
- 如果保证高可用? 这是一个消息中间件最重要的属性,当其中一个实例崩溃的时候,不能影响整个系统的使用。
- 如果保证消息的有序性? 在某些场景中需要保证消息的顺序,那如何保证消息的有序性呢?
- 对于消费者,消息是主动推还是让消费者自己拉呢?两种方式都有利有弊,不过基于一些异常设计的话,一般消费者拉模式比较简单
- 对于多个消费者想要实现负载均衡,应该如何实现,有什么注意事项
- 消息的拉取是否可以指定消息位移,如果可以指定消息位移,那应该如何快速索引到相关消息的位置
- 消息持久化的策略?什么时候能够清除哪些消息。
Kafka Broker
为了保证高效,都使用了哪些优化手段?- ….
带着这些问题,我们来了解Kafka
。
Kafka
是 由 LinkedIn
公司采用 Scala
语言开发的一个多分区、多副本且基于 ZooKeeper
协调的分布式消息系统。它的特点是高吞吐、可持久化、可水平扩展、支持流数据处理等。
Kafka
主要分为3个模块,Broker
,Consumer
,Producer
,整体架构如下
对于Broker
,Kafka
将其设计为多个分区,对于每个消息,在发送的时候都需要指定主题(Topic
)和分区(Partition
),主题作为逻辑概念,消费者只用在订阅的时候指定Topic
就能收到相关的消息,用户可以通过Topic
来区分消息类型,消费者只用订阅感兴趣的Topic
,而分区是物理概念,在Broker
中一般是用来做负载均衡。
问题一: Kafka如何保证高可用
前面说过,Kafka
将Topic
从物理上分为多个分区,其中每个分区还包含了多个副本,副本包含Leader
副本和Follower
副本,Leader
副本负责写入消息和读取消息,Follower
副本则负责同步Leader
副本写入的消息,如果Leader
副本在某个时间崩溃了,此时会选举一个Follower
副本作为Leader
副本继续工作。这便是Kafka
高可用的主要办法。
通过Leader-Follower
的方式,虽然保证了高可用,但是也会带来一定的问题,那就是消息同步。
当一个消息到达Leader
的时候,如何才能保证不管什么时候,这条消息都不会丢失呢?
- 首先,当生产者发送消息时,确认消息送达时机,
ack=1
,表示消息到达Leader
后,就返回成功确认,ack=-1
表示消息到达Leader
后,并且同步到所有ISR
集合中后,才返回确认成功,因此如果需要保证消息完全不丢失,需要配置ack=-1
-
其次,当副本过多的时候,如果设置
ack=-1
就表示需要将消息同步给所有副本,可能会因为某些副本处理能力,网络等原因一直无法跟上Leader
的进度,因此Kafka
设定了ISR
集合和OSR
集合,ISR
表示在一定程度内能和Leader
同步的副本,OSR
表示这个时间段未能和Leader
同步的副本。因此当生产者发送ack=-1
的消息的时候,只用同步给所有ISR
集合即可返回,因为只有ISR
集合内的副本才能被选举为Leader
-
unclean.leader.election
: 当Kafka
中所有ISR
集合中的副本都崩溃,那么是否能选举OSR
集合中的副本为Leader
,此参数默认为false
,如果设置为true
,则表示未同步的副本也能成为Leader
,这样会导致消息的问题。 -
min.insync.replicas
: 最少同步副本,使用的情况和上面一样,表示ISR
中至少需要存活多少个副本,才能写入消息,当少于这个数量的时候,写入消息会报错,但是可以读取消息。min.insync.replicas
和unclean.leader.election
不同的作用在于,如果ISR
中只有一个副本,那么当这个副本崩溃,并且一直无法启动,那么消息便永久丢失了,而使用min.insync.replicas
能够保证至少存在几个副本的时候,才能写消息,除非几个副本同时无法启动,否则不会丢消息。 flush.messages
: 表示多少条消息后强制将消息落盘,因为Kafka
依靠的系统缓存实现快速IO
,如果使用中物理机突然断电,则可能消息还没有刷新到磁盘中,此选项表示多少条消息后,强制刷入磁盘一次,但是比较影响性能,建议这个问题的解决方案通过Leader-Follower
来解决
总之,kafka
考虑了各种情况如何真正的保存消息,了解这些机制,才能在实际使用的时候,结合业务作出选择。
问题二: Kafka 如何保证消息的有序性
其实这个就比较简单了,Kafka
将消息分类为Topic
,同时又将Topic
分为不同的分区。当消费者从Kafka
拉取消息的时候,实际上是从指定分区拉取的消息。为了保证不必要的复杂度,Kafka
只保证同一个分区,收到的消息和拉取的消息是同步的,因此如果需要保证消息的有序性,需要保证消息都在同一个分区中。
同时,由于Kafka
生产者有重试机制,比如当生产者发送了一条消息,但是由于网络原因,发送失败,而消息的发送和确认是异步的,在失败之前可能还发送的其他消息,导致消息顺序被打乱,因此需要将生产者max.in.flight.requests.per.connection
设置为1
,表示每次只能存在一个”活跃消息”
问题三: Kafka 消费者的负载均衡
一般来说,高可用系统最终都会通过集群来实现负载均衡,那对于消费者来说,如果一个集群不简单处理,就很容易对同一条消息进行多次消费。
在Kafka
中,对消费者提出了消费者组(Consumer Group
)的概念,对于一条消息来说,一条消息只会同一个消费者组的其中一个消费者获取到。这样变实现了负载均衡的要求,在Kafka Broker
内部中,对于Consumer Group
则是通过每个Consumer
对应一个Partition
来实现的,内部结构如下:
如果消费者数量等于分区数量,则消费者和分区则是一对一的关系。
如果消费者数量少于分区数量,则一个消费者对应多个分区。
如果消费者数量多于分区数量,则多于的消费者永远都收不到消息。
简单说就是一个分区只会被一个消费者拉取消息,这样做就保证了分区有序。
明白了消费者组的概念,剩下的就需要知道消费者组实例的动态修改带来的影响。
如上图所示,假如某个主题包含4个分区,刚开始此消费者组只有2个实例,此时两个消费者应该各消费两个分区。此时又新加入了两个实例,那么按道理来说消费者和分区应该变成一一对应的关系。在这个过程中,应该有两个分区的内容会被新加入的两个实例订阅,如果不将这两个分区的消费进度上传到Kafka Broker
中,则很可能造成重复消费的问题。
在Kafka Consumer API
中,提供了再均衡监听器,用来在当消费者组数量变化,需要从新分配订阅分区的时候,做一些处理工作,最重要的就是提交当前消费进度。
public interface ConsumerRebalanceListener {
//在再均衡开始前,消费者停止读取消息之后调用
void onPartitionsRevoked(Collection<TopicPartition> var1);
//在再均衡完成之后,消费者开始读取消费消息之前被调用
void onPartitionsAssigned(Collection<TopicPartition> var1);
}
实现此接口,并在订阅的时候传入实例化的对象,在再均衡开始前,会调用onPartitionsRevoked()
方法,在再均衡结束,分配了新分区之后,会调用onPartitionsAssigned()
方法,因此我们可以通过此接口用来提交消费位移和从新获取分区相关信息。
注意:
再均衡接口会在每次poll()
的时候检查是否需要再均衡,若需要再均衡,则会直接触发再均衡操作。
问题四: 指定位移消息
有些时候,可能需要对一些消息进行回溯消费,Kafka Consumer API
提供了seek()
方法用来对消息进行回溯消费。
使用方式如下:
consumer.poll(1000);
Set<TopicPartition> topicPartitions=consumer.assignment();
for (TopicPartition topic : topicPartitions) {
consumer.seek(topic ,250);
}
对于以上API
,需要注意如下情况:
首先必须调用consumer.poll()
方法,因为必须先通过该方法来分配此消费者负责的分区。
同时,对于consumer.poll()
,如果没有消息过来,则会一直阻塞到超时,因此超时时间需要控制好,太短可能还没有获取信息,太长会阻塞太久
另外一个需要注意的就是seek()
的第二个参数,一般来说,这个参数可能需要依据某些维度来获取,比如可以通过Consumer#offsetsForTimes()
获取某个时间点的offset
.
对于这个offset
,需要注意的是Kafka
对于消息是有清除策略的,如果Kafka
内部只剩下100-200
的消息体,而seek(topic,0)
则会触发越界异常,此异常导致的行为依赖于auto.offset.reset
参数的设置,earliest
(从头开始消费消息),latest
(从最新的消息开始获取),none
(抛出异常)。默认为latest
对于Kafka Broker
来说,指定位移消费就意味着需要索引消息,Kafka Broker
将消息存为.log
文件,与此同时,还会为每个.log
文件生成.index
和.timeIndex
用来保存此文件的索引和时间戳索引,并且文件名是以当前第一条消息的索引,用来作为基准偏移量,在索引文件中,保存的是对应消息的稀疏索引,当需要查找消息的时候,直接通过二分查找即可查找到对应的消息,文件名如图所示:
问题五:Kafka的消息保留策略
前面说过,Kafka
的一个特点是可持久化。如果要将Kafka
用来持久化使用,则需要了解Kafka
持久化的一些配置。
在Kafka Broker
中,可以铜鼓欧参数:log.cleanup.policy
设置以下持久化策略,
delete
: 删除策略,会定期清理消息compact
: 日志压缩,会压缩然后保存消息
对于delete
,有两种维度来清理消息,
log.retention.hours
为时间维度配置,默认168小时,也就是消息保留7天,log.retention.bytes
为所有文件大小维度,默认-1
,表示无穷大。
对于compact
,Kafka
则是通过key
来标记一条消息的,如果对于相同的key
的消息,Kafka
会以最新的一条消息为准,删除以前的消息。
这里的Key
可以类比数据库的主键,同一个主键的消息,会被最新的覆盖。由于Kafka compact
会删除部分消息,因此此时Kafka
内部的消息offset
将不再是连续的。
问题六: Kafka为什么能做到高吞吐
Kafka
作为一个消息中间件,主要功能便是分发消息,暂时持久化消息,因此最主要涉及到的功能便是IO
和写磁盘。
对于IO
来说,由于Kafka
不需要额外编辑消息,因此可以很好的利用零拷贝的特性。使用零拷贝可以直接将磁盘中的内容不仅用户冭直接拷贝到网卡中,极大的提高了效率。
对于写磁盘来说,一般对于这种容器类的中间件,用的最多的便是内存,但是对于Kafka
这种不用随机写的消息队列来说,直接顺序IO
磁盘的效率依然很快,同时对于操作系统来说,磁盘的读写都会先写经过页缓存,因此Kafka
没有直接使用用户内存,而是直接IO
磁盘,借助操作系统页缓存完成高效率的IO
。
同时,由于要大量的操作对象,对于JVM
来说,一个对象包含对象头,字节对其等消息,缓存大量对象会造成空间浪费比较严重,同时如果对象较多,还会给GC带来一定的压力,因此对于存储有一定规律的,可以使用堆外缓存来存放相关对象,在Kafka
中大量使用了堆外缓存来存储消息,用来节约空间和减轻GC
带来的Stop-The-World
所带来的影响。
Kafka 生产者
在使用kafka
的时候,不仅仅是调用send()
将消息发送出去就可以不用关心了,作为一个高吞吐,高可用的消息中间件,Kafka
生产者必须考虑以下几点:
- 消息是否成功发送,对于没有成功发送的消息,是什么原因没有发送成功,是否需要重试
- 生产者是否是线程安全的?如果是,如何保证效率,以及生产者内部的线程模型是怎样的,需要注意哪些地方才不会影响发送效率。
- 对于大量的消息发送,是每条消息单独发送,还是缓存一定的消息数量然后批量发送
- 消息需要发送到哪个分区?
Kafka Broker
包含多个分区,如何指定消息需要发送到哪个分区? - 对于严格要求有序的操作(比如同步
MySQL
的Bin Log
),需要注意什么地方 - …
一般来说,我们使用Kafka
生产者都是使用Kafka
官方提供的Kafka Customer API
,其大概的过程是
- 用户调用
send()
方法发送消息 API
将消息序列化成字节- 与
Broker
建立连接,将消息发送给Broker
Broker
收到消息后,返回ack
给生产者- 生产者收到
ack
后,调用用户客户端的回调接口
问题一: 如何确认消息成功发送到Broker
大体流程如上所述,虽然看着比较简单,但是对于每个细节,都需要详细设计。
发送消息
如何确认消息是否已经发送,需要从头开始确认,首先,需要确认消息是否已经发送,对于Customer API
,发送消息分为以下3个级别:
kafkaProducer.send(producerRecord);
-
Future<RecordMetadata> future = kafkaProducer.send(producerRecord); future.get();
-
kafkaProducer.send(producerRecord,(r,e)->{ if (e!=null){ //发送报错 }else if (r!=null){ //发送成功 } });
对于第一种,就是简单的发送消息,如果配置错误或者其他不可重试的错误,只会被简单的记录到kafka
日志中。
对于第二种,是一种阻塞的消息发送,在调用get()
的时候,如果发生了不可重试的错误,则会抛出异常。
对于第三种,是一种异步的消息发送,当发生不可重试的错误的时候,参数e
不为空,当发送成功时,r
不为空
确认消息
如果需要确认消息是否成功发送,则需要第二种或第三种使用方式,将因为不可重试的错误导致没有成功发送的消息记录下来。
在确认消息已经发送后,则需要确认消息是否已经到达Broker
,这需要Broker
返回ack
,对于ack
返回的时机,也是比较重要的。从解析Broker
中我们知道Kafka
的高可用是通过Leader-Follower
来实现的,但是如果消息到达Leader
后直接返回ACK
,那么可能出现正好这个时候Leader
崩溃,从而导致消息丢失,因此可能需要设置ACK
为-1
表示消息完全同步到Follower
后,再返回成功。
ack
一种包含三种配置,0 ,表示发送了消息就返回,不管Leader
是否有收到,1
表示Leader
收到后就返回,-1
表示Leader
收到后并同步给Follower
再返回。
问题二: Kafka 生成者线程模型
了解Kafka Producer API
的线程模型,才能更好的使用生产者发送消息,对于Kafka Producer
,流程图如下:
可以看到,当用户调用send()
方法的时候,消息会先通过拦截器,简单修改消息,然后通过序列化器将消息序列化,序列化完成后,经过分区器设置消息的分区。接着分区完毕之后,生产者会将消息放进消息累加器(RecordAccumulator
)中。
消息累加器是一个消息缓冲区,这个缓冲区充当了一个容器的角色,主线程不断的往里面放消息,Sender
线程则不断地取出消息并进行发送。两个线程形成了一个生产者消费者模型。
消息在RecordAccumulator
中被包装成ProducerBatch
,ProducerBatch
的大小通过参数batch.size
控制,当一个ProducerBatch
被填满后,Sender
线程就会将其取出来发送给Broker
Sender
线程会将ProducerBatch
包装成Request
,同时会创建一个等待返回的Request
,当收到Broker
返回的ACK
之后,对应的Request
会调用相关的回调方法。
明白了上面的总体流程后,下面简单总结在生产环境中使用生产者的注意事项。
- 拦截器,序列化器,分区器都是工作在主线程,如果这里面包含一些比较耗费时间的操作,那么将会影响整个生产者的吞吐量
- 消息累加器并不能无限添加消息,其大小受
buffer.memory
默认32M
控制,当消息累加器被填满后,send()
方法将会被阻塞,因此如果数据量比较大,注意增大此选项 - 等待返回的
Request
数量受max.in.flight.requests.per.connection
参数影响,默认为5,也就是每个链接只能保持5个正在等待的消息。此参数决定了发送消息的并发度,超过此参数,send()
方法也会被阻塞。
问题三: 对于大量的消息发送,Kafka是如何处理的
首先说结论:对于一个优秀的开源中间件来说,一般都会设计成可配置的。因为批量发送要基于业务场景考虑,如果对实时性要求比较高的话,可以一条一条的发送,但是一条一条的发送会性能会稍微低一些。如果对实时性要求不是那么高,那么kafka
会将消息先缓存到缓冲中,然后等收集一定数量后再批量发送,批量发送只用进行一次网络IO,性能比较高。
在Kafka
中,缓冲的行为由两个参数控制:
linger.ms
: 等待时间,生产者会在ProducerBatch
被填满或等待时间超过linger.ms
值的时候发送出去,增大这个参数的值会增加消息的延迟。默认为0,表示接受到一条消息就发送一次。batch.size
: 单个ProducerBatch
缓冲区大小,生产者在linger.ms
时间足够充足的情况下,会一直等待直到填满一个ProducerBatch
大小才发送下次,默认为16k
问题四: 消息如何确定需要发送的分区
其实从问题二中,就能知道这个问题的答案,那就是分区器。这里简单介绍下细节:
分区器接口Partitioner
用户可以自定义分区器,来自定义如何分区,如果没有配置自定义的分区器,则会使用默认的分区器:DefaultPartitioner
默认分区器逻辑如下:
- 如果
key
为null
,则将其轮流发送到当前活跃的分区中 - 如果
key
不为null
,则通过hash(key)%分区数量
来计算应该投送的分区
注意第一种情况发送的是一定可用的分区,而第二种情况可能会发送到一个已经崩溃的分区,这样做应该是为了保证同一个key
发送到的分区是一样的。不过,一旦主题中增加了分区,那么key与分区映射关系依然会变化。
问题四:如何保证发送的消息是有序的
先说结论: Kafka
能保证消息在同一个分区下是有序的。
对于kafka
来说,分区的作用是用来做负载均衡的,如果为了保证消息的有序性,强制消费按顺序遍历所有分区消费,那么就失去了分区的作用了。因此Kafka
只保证了同一个分区的消息是有序的。
其中还有一个值得注意的地方,那就是失败重试。
kafka
生产者默认带有失败重试功能,同时生产者是可以并行发送的,加入在刚开始的时候,发送了一个消息A,然后发送了消息B,但是A发送失败然后又重试成功了,此时消息顺序就会变成BA
解决这个问题的方法很简单,那就是关闭并行发送,设置max.in.flight.requests.per.connection
为1
,
Kafka 消费者
poll()
为我们做了那些事?- 消费者是否是线程安全的?如果不是,那么最佳实践是什么样的?
- 基于拉模式的消费者,应该注意什么问题能保证不丢消息?
- 在使用消费者客户端的时候,还有什么注意事项
问题一:poll()为我们做了那些事
对于Kafka
消费者,是通过循环poll()
消息来拉取消息,
public ConsumerRecords<K, V> poll(final Duration timeout);
poll()
的作用不仅仅是拉取消息,一般来说,Broker
与消费者的主要通信手段都是通过poll()
方法,也就是说Broker
再均衡,服务器相关信息交换等,都需要通过poll()
来完成。
总结来说,poll()
方法完成了下面几件事:
- 建立
TCP
连接,发送心跳 -
根据订阅的
Topic
发送join-group
请求,请求加入到消费者组中,让broker
为其分配对应的分区 - 发送上一次拉取到的消息
offset
,提交消费位移 - 检查是否需要再均衡,如果需要,则调用相关回调接口
可以看到,poll()
不仅仅是简单的拉取消息,还涉及到其他一些列的信息维护工作。
首先,poll()
需要发送心跳,因此在poll()
循环中,尽量不要做延时太久的操作,否则如果超过心跳时间,会导致消费者频繁的加入,退出导致频繁的再均衡。
其次,如果配置了自动提交,那么在每次poll()
的时候,会自动提交上一次poll()
到的消费位移。
再次,poll()
每次都会检查是否需要进行再均衡操作,如果需要,则会直接触发再均衡操作。
为什么要知道poll()
做了什么事,是因为后面有很多代码都要围绕poll()
来完成,明白其原理,才能避免踩一些坑。
问题二: 消费者客户端是否是线程安全的
其实这一个问题,简单看下源码就能发现,消费者客户端每次调用方法都会先检查当前是否有多个线程同时执行,如果是,则抛出异常。也就是说消费者并不是线程安全的。
对于非线程安全的消费者,如果有些时候需要提高吞吐,可能会使用多个线程,此时有两种比较好的模型。
第一种:每个线程一个消费者客户端。
原理类似于ThreadLocal
,每个线程持有一个消费者客户端,但是由前面的Kafka Broker
分区模型我们知道,消费者客户端的数量受限于Broker
分区数量。因此如果Broker
分区数量比较少,那么多线程数量将会受到限制,同时如果线程数量较多,网络连接也会比较多。
上面说的只是注意事项,但是这是一种最简单的使用方式,对于业务量不大的系统,可以使用这种模型。
第二种: 生产者消费者模型
Consumer
单独在一个线程中,每次拉取到消息后,不进行业务处理,而是将其放在缓存队列中,后面的多线程handler
依次从缓存队列中消费消息,也就是再化成一个生产者消费者模型。
这样的好处就是消费者可以随意扩展,线程数量也不受分区限制。
但是这样的模型,会带来一个问题。如果设置自动提交消费位移,那么很容易出现丢消息的情况,比如Consumer
客户端在源源不断的拉取消息,上面我们说过poll()
会提交上一次拉取的消息的位移,如果这个时候客户端突然崩溃,此时在缓存中还未进行业务处理的消息就会丢失。
想要解决这个问题,就需要手动提交位移,等真正进行了业务处理之后,再进行位移提交。
手动提交位移也存在一个问题,比如线程1负责消费0-99的消息,线程2负责消费100-199的消息,如果线程2先于线程1提交,同时线程1却发生了异常,这个是时候如果重启,0-99的消息依然会丢失。
想要解决这个问题,最好的办法就是通过滑动窗口,通过滑动窗口等待窗口内最小的消息消费完后,再滑动到下一格。
问题三: 使用消费者应该注意如何保证不丢消息
在Kafka
主从模式下,只要不是所有机器都同时崩溃,按道理来说就不会丢失消息,但是如果在生产者,消费者中,没有注意某些细节的地方,也会造成人为的丢消息。
对于消费者来说,丢消息的情况在问题二已经说了,一般来说,自动提交消息没有什么问题,因为poll()
是提交的上一次拉取的消息的位移,因此即使崩溃,也能重新拉取。但是对于需要缓存消息的时候,就一定要注意需要手动提交消费位移。
问题四:使用消费者客户端的其他注意事项
除了注意消息丢失外,另外一个比较重要的就是消息重复。对于Kafka
来说,很难保证一个消息不重复,目前kafka
虽然已经支持“有且仅有一次”语义,但是日常工作用的比较少,对于其他语义来说,消息比较容易出现重复的情况,比如broker
在回复ack
的时候,因为网络原因超时或者丢失了, 此时生产者以为消息没有发送成功,于是又发送了一次。又比如,在Broker
再均衡的时候或者消费者崩溃后,由于是手动提交消息,一般都是批量进行提交,如果这个时候客户端重启,那么之前消费了一半的消息,又会被消费一次。因此一定要保证消费是幂等的。保证的手段有很多,比如redis
判断,数据库主键插入判断等。
总之,Kafka
的使用远远不止这些,这里只是简单总结使用的时候容易忽略的问题,想要更加深入的了解Kafka
,还需要多多阅读,继续研究。