本文主要参考资料为
1.Kafka官网 http://kafka.apache.org/
2.《深入理解Kafka:核心设计与实践原理》–朱忠华编著
简介
Kafka起初是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于ZooKeeper协调的分布式消息系统,现已被捐献给Apache基金会。目前Kafka已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
Kafka之所以受到越来越多的青睐,与它所“扮演”的三大角色是分不开的。
- 消息系统:Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。
- 存储系统:Kafka 把消息持久化到磁盘,相比于其他基于内存存储的系统而言,有效地降低了数据丢失的风险。也正是得益于Kafka 的消息持久化功能和多副本机制,我们可以把Kafka作为长期的数据存储系统来使用,只需要把对应的数据保留策略设置为“永久”或启用主题的日志压缩功能即可。
- 流式处理平台:Kafka 不仅为每个流行的流式处理框架提供了可靠的数据来源,还提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作。
基本概念
生产者、消费者、节点
一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干Consumer,以及一个ZooKeeper集群。其中ZooKeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作的。Producer将消息发送到Broker,Broker负责将收到的消息存储到磁盘中,而Consumer负责从Broker订阅并消费消息。
整个Kafka体系结构中引入了以下3个术语。
- Producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其投递到Kafka中。
- Consumer:消费者,也就是接收消息的一方。消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理。
- Broker:服务代理节点。对于Kafka而言,Broker可以简单地看作一个独立的Kafka服务节点或Kafka服务实例。大多数情况下也可以将Broker看作一台Kafka服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个Broker组成了一个Kafka集群。一般而言,我们更习惯使用首字母小写的broker来表示服务代理节点。
主题、分区
在Kafka中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。
主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。
如图所示,主题中有 4 个分区,消息被顺序追加到每个分区日志文件的尾部。Kafka中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个broker,以此来提供比单个broker更强大的性能。
每一条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器 I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。
Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。Kafka通过多副本机制实现了故障的自动转移,当Kafka集群中某个broker失效时仍然能保证服务可用。
Kafka 消费端也具备一定的容灾能力。Consumer 使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。
分区中的所有副本统称为AR(Assigned Replicas)。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas),ISR集合是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。前面所说的“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即AR=ISR,OSR集合为空。
leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。
ISR与HW和LEO也有紧密的关系。HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。如图所示,它代表一个日志文件,这个日志文件中有 9 条消息,第一条消息的 offset(LogStartOffset)为0,最后一条消息的offset为8,offset为9的消息用虚线框表示,代表下一条待写入的消息。日志文件的HW为6,表示消费者只能拉取到offset在0至5之间的消息,而offset为6的消息对消费者而言是不可见的。
LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset,上图中offset为9的位置即为当前日志文件的LEO,LEO的大小相当于当前日志分区中最后一条消息的offset值加1。分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。
我们来举一个栗子。
假设某个分区的ISR集合中有3个副本,即一个leader副本和2个follower副本,此时分区的LEO和HW都为3。消息3和消息4从生产者发出之后会被先存入leader副本,如图所示。
在消息写入leader副本之后,follower副本会发送拉取请求来拉取消息3和消息4以进行消息同步。在同步过程中,不同的 follower 副本的同步效率也不尽相同。如图所示,在某一时刻follower1完全跟上了leader副本而follower2只同步了消息3,如此leader副本的LEO为5,follower1的LEO为5,follower2的LEO为4,那么当前分区的HW取最小值4,此时消费者可以消费到offset为0至3之间的消息。
如图所示,所有的副本都成功写入了消息3和消息4,整个分区的HW和LEO都变为5,因此消费者可以消费到offset为4的消息了。
由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的 follower 副本都复制完,这条消息才会被确认为已成功提交,这种复制方式极大地影响了性能。而在异步复制方式下,follower副本异步地从leader副本中复制数据,数据只要被leader副本写入就被认为已经成功提交。在这种情况下,如果follower副本都还没有复制完而落后于leader副本,突然leader副本宕机,则会造成数据丢失。Kafka使用的这种ISR的方式则有效地权衡了数据可靠性和性能之间的关系。
生产者
客户端开发
一个正常的生产逻辑需要具备以下几个步骤:
- 配置生产者客户端参数及创建相应的生产者实例。
- 构建待发送的消息。
- 发送消息。
- 关闭生产者实例。
这里有单独说明下构建的消息对象 ProducerRecord,它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务相关的消息体只是其中的一个value 属性,比如“Hello,Kafka!”只是ProducerRecord对象中的一个属性。ProducerRecord类的定义如下(只截取成员变量):
其中topic和partition字段分别代表消息要发往的主题和分区号。headers字段是消息的头部,Kafka 0.11.x版本才引入这个属性,它大多用来设定一些与应用相关的信息,如无需要也可以不用设置。key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个key可以让消息再进行二次归类,同一个key的消息会被划分到同一个分区中。有key的消息还可以支持日志压缩的功能。value是指消息体,一般不为空,如果为空则表示特定的消息—墓碑消息。timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的事件。
必要的参数配置
在Kafka生产者客户端中有三个参数是必填的:
- bootsrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到Kafka集群上。
- key.serializer & value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在,发往broker之前需要将消息中对应的key和value做相应的序列化操作来转换成字节数组。key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,如
org.apache.kafka.common.serialization.StringSerializer
,单单指定StringSerializer
是错误的。
还有一个参数client.id,这个参数用来设定KafkaProducer对应的客户端id,默认值为“”。如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,内容形式如“producer-1”“producer-2”,即字符串“producer-”与数字的拼接。
在实际使用过程中,诸如“key.serializer”“max.request.size”“interceptor.classes”之类的字符串经常由于人为因素而书写错误。为此,我们可以直接使用客户端中的org.apache.kafka.clients.producer.ProducerConfig
类来做一定程度上的预防措施,每个参数在 ProducerConfig
类中都有对应的名称。
KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程调用。
消息的发送
在创建完生产者实例之后,接下来的工作就是构建消息,即创建ProducerRecord对象。我们已经了解了ProducerRecord的属性结构,其中topic属性和value属性是必填项,其余属性是选填项。
创建生产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。
- 发后即忘:只管往Kafka中发送消息而并不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。
- 同步:同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。
- 异步:一般是在
send()
方法里指定一个Callback的回调函数,Kafka在返回响应时调用该函数来实现异步的发送确认。有读者或许会有疑问,send()
方法的返回值类型就是Future,而Future本身就可以用作异步的逻辑处理。这样做不是不行,只不过Future里的get()
方法在何时调用,以及怎么调用都是需要面对的问题,消息不停地发送,那么诸多消息对应的Future对象的处理难免会引起代码处理逻辑的混乱。使用Callback的方式非常简洁明了,Kafka有响应时就会回调,要么发送成功,要么抛出异常。对于同一个分区而言,如果消息record1于record2之前先发送(参考上面的示例代码),那么KafkaProducer就可以保证对应的callback1在callback2之前调用,也就是说,回调函数的调用也可以保证分区有序。
通常,一个KafkaProducer不会只负责发送单条消息,更多的是发送多条消息,在发送完这些消息之后,需要调用KafkaProducer的close()
方法来回收资源。如果调用了带超时时间timeout的close()
方法,那么只会在等待timeout时间内来完成所有尚未完成的请求处理,然后强行退出。
序列化
生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。
除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有3个方法:
- configure():配置当前类
- serialize():执行序列化操作
- close():是一个空方法,若实现此方法,则必须保证该方法的幂等性,因为该方法会被KafkaProducer调用多次
生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了某种序列化器,比如StringSerializer,而消费者使用了另一种序列化器,比如IntegerSerializer,那么是无法解析出想要的数据的。
分区器
消息在通过send()
方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往 broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord
中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。
如果消息ProducerRecord
中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。
Kafka中提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner
,它实现了org.apache.kafka.clients.producer.Partitioner
接口,这个接口中定义了2个方法。
1 |
|
其中partition()
方法用于计算分区号,返回值为int
类型,其入参分别代表主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。close()
方法在关闭分区器的时候用来回收一些资源。
Partitioner
接口还有一个父接口org.apache.kafka.common.Configurable
,这个接口中只有一个方法:
1 |
|
Configurable
接口中的configure()
方法主要用来获取配置信息和初始化数据。
在默认分区器 DefaultPartitioner 的实现中,close()
是空方法,而在partition()
方法中定义了主要的分区分配逻辑。如果 key 不为 null,那么默认的分区器会对 key 进行哈希(采用MurmurHash2算法,具备高运算性能及低碰撞率),最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。
除了使用 Kafka 提供的默认分区器进行分区分配,还可以使用自定义的分区器,只需同DefaultPartitioner
一样实现Partitioner
接口即可。默认的分区器在key为null时不会选择非可用的分区,我们可以通过自定义的分区器DemoPartitioner
来打破这一限制。
实现自定义的DemoPartitioner
类之后,需要通过配置参数partitioner.class
来显式指定这个分区器。
1 |
|
生产者拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。
生产者拦截器的使用也很方便,主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor
接口。ProducerInterceptor
接口中包含3个方法。
1 |
|
KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend()
方法来对消息进行相应的定制化操作。一般来说最好不要修改消息ProducerRecord 的 topic、key 和partition 等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩(Log Compaction)的功能。
KafkaProducer 会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的 onAcknowledgement()
方法,优先于用户设定的 Callback之前执行。这个方法运行在Producer 的 I/O 线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
close()
方法主要用于在关闭拦截器时执行一些资源的清理工作。
在这 3 个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。
实现自定义的 ProducerInterceptorPrefix
之后,需要在 KafkaProducer 的配置参数interceptor.classes
中指定这个拦截器,此参数的默认值为“”。
1 |
|
KafkaProducer中不仅可以指定一个拦截器,还可以指定多个拦截器以形成拦截链。拦截链会按照 interceptor.classes
参数配置的拦截器的顺序来一一执行(配置的时候,各个拦截器之间使用逗号隔开)。
原理分析
整体架构
我们来看一下生产者客户端的整体架构。
整个生产者客户端由两个线程协调运行,这两个线程分别为主线程和Sender线程(发送线程)。在主线程中由KafkaProducer创建消息,然后通过可能的拦截器、序列化器和分区器的作用之后缓存到消息累加器(RecordAccumulator,也称为消息收集器)中。Sender 线程负责从RecordAccumulator中获取消息并将其发送到Kafka中。
RecordAccumulator 主要用来缓存消息以便 Sender 线程可以批量发送,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存的大小可以通过生产者客户端参数buffer.memory
配置,默认值为 33554432B,即 32MB。如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,这个时候KafkaProducer的send()
方法调用要么被阻塞,要么抛出异常,这个取决于参数max.block.ms
的配置,此参数的默认值为60000,即60秒。
主线程中发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,在 RecordAccumulator 的内部为每个分区都维护了一个双端队列,队列中的内容就是ProducerBatch,即 Deque<ProducerBatch>
。消息写入缓存时,追加到双端队列的尾部;Sender读取消息时,从双端队列的头部读取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一至多个ProducerRecord。通俗地说,ProducerRecord 是生产者中创建的消息,而ProducerBatch是指一个消息批次,ProducerRecord会被包含在ProducerBatch中,这样可以使字节的使用更加紧凑。与此同时,将较小的ProducerRecord拼凑成一个较大的ProducerBatch,也可以减少网络请求的次数以提升整体的吞吐量。ProducerBatch和消息的具体格式有关。如果生产者客户端需要向很多分区发送消息,则可以将buffer.memory
参数适当调大以增加整体的吞吐量。
消息在网络上都是以字节(Byte)的形式传输的,在发送之前需要创建一块内存区域来保存对应的消息。在Kafka生产者客户端中,通过java.io.ByteBuffer
实现消息内存的创建和释放。不过频繁的创建和释放是比较耗费资源的,在RecordAccumulator的内部还有一个BufferPool,它主要用来实现ByteBuffer的复用,以实现缓存的高效利用。不过BufferPool只针对特定大小的ByteBuffer进行管理,而其他大小的ByteBuffer不会缓存进BufferPool中,这个特定的大小由batch.size
参数来指定,默认值为16384B,即16KB。我们可以适当地调大batch.size
参数以便多缓存一些消息。
ProducerBatch的大小和batch.size
参数也有着密切的关系。当一条消息(ProducerRecord)流入RecordAccumulator时,会先寻找与消息分区所对应的双端队列(如果没有则新建),再从这个双端队列的尾部获取一个 ProducerBatch(如果没有则新建),查看 ProducerBatch 中是否还可以写入这个ProducerRecord,如果可以则写入,如果不可以则需要创建一个新的ProducerBatch。在新建ProducerBatch时评估这条消息的大小是否超过batch.size
参数的大小,如果不超过,那么就以batch.size
参数的大小来创建ProducerBatch,这样在使用完这段内存区域之后,可以通过BufferPool 的管理来进行复用;如果超过,那么就以评估的大小来创建ProducerBatch,这段内存区域不会被复用。
Sender 从 RecordAccumulator 中获取缓存的消息之后,会进一步将原本<分区,Deque<ProducerBatch>>
的保存形式转变成<Node,List<ProducerBatch>
的形式,其中Node表示Kafka集群的broker节点。对于网络连接来说,生产者客户端是与具体的broker节点建立的连接,也就是向具体的 broker节点发送消息,而并不关心消息属于哪一个分区;而对于 KafkaProducer的应用逻辑而言,我们只关注向哪个分区中发送哪些消息,所以在这里需要做一个应用逻辑层面到网络I/O层面的转换。
在转换成<Node,List<ProducerBatch>>
的形式之后,Sender 还会进一步封装成<Node,Request>
的形式,这样就可以将Request请求发往各个Node了,这里的Request是指Kafka的各种协议请求,对于消息发送而言就是指具体的ProduceRequest。
请求在从Sender线程发往Kafka之前还会保存到InFlightRequests中,InFlightRequests保存对象的具体形式为 Map<NodeId,Deque<Request>>
,它的主要作用是缓存了已经发出去但还没有收到响应的请求(NodeId 是一个String 类型,表示节点的 id 编号)。与此同时,InFlightRequests还提供了许多管理类的方法,并且通过配置参数还可以限制每个连接(也就是客户端与Node之间的连接)最多缓存的请求数。这个配置参数为max.in.fight.requests.per.connection
,默认值为 5,即每个连接最多只能缓存 5个未响应的请求,超过该数值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应(Response)。通过比较Deque<Request>
的size与这个参数的大小来判断对应的Node中是否已经堆积了很多未响应的消息,如果真是如此,那么说明这个 Node 节点负载较大或网络连接有问题,再继续向其发送请求会增大请求超时的可能。
元数据的更新
InFlightRequests还可以获得leastLoadedNode,即所有Node中负载最小的那一个。这里的负载最小是通过每个Node在InFlightRequests中还未确认的请求决定的,未确认的请求越多则认为负载越大。选择leastLoadedNode发送请求可以使它能够尽快发出,避免因网络拥塞等异常而影响整体的进度。leastLoadedNode的概念可以用于多个应用场合,比如元数据请求、消费者组播协议的交互。
我们只知道主题的名称,对于其他一些必要的信息却一无所知。KafkaProducer要将此消息追加到指定主题的某个分区所对应的leader副本之前,首先需要知道主题的分区数量,然后经过计算得出(或者直接指定)目标分区,之后KafkaProducer需要知道目标分区的leader副本所在的broker 节点的地址、端口等信息才能建立连接,最终才能将消息发送到 Kafka,在这一过程中所需要的信息都属于元数据信息。
我们了解了bootstrap.servers
参数只需要配置部分broker节点的地址即可,不需要配置所有broker节点的地址,因为客户端可以自己发现其他broker节点的地址,这一过程也属于元数据相关的更新操作。与此同时,分区数量及leader副本的分布都会动态地变化,客户端也需要动态地捕捉这些变化。元数据是指Kafka集群的元数据,这些元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的leader副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。
当客户端中没有需要使用的元数据信息时,比如没有指定的主题信息,或者超过metadata.max.age.ms
时间没有更新元数据都会引起元数据的更新操作。客户端参数metadata.max.age.ms
的默认值为300000,即5分钟。元数据的更新操作是在客户端内部进行的,对客户端的外部使用者不可见。当需要更新元数据时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。这个更新操作是由Sender线程发起的,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤就和发送消息时的类似。元数据虽然由Sender线程负责更新,但是主线程也需要读取这些信息,这里的数据同步通过synchronized和final关键字来保障。
重要的生产者参数
在KafkaProducer中,除了2.1.1节提及的3个默认的客户端参数,大部分的参数都有合理的默认值,一般不需要修改它们。不过了解这些参数可以让我们更合理地使用生产者客户端,其中还有一些重要的参数涉及程序的可用性和性能,如果能够熟练掌握它们,也可以让我们在编写相关的程序时能够更好地进行性能调优与故障排查。下面挑选一些重要的参数进行讲解。
acks
这个参数用于指定分区中必须要有多少个副本收到这条消息,才认为这条消息成功写入。acks是生产者客户端中非常重要的一个参数,它涉及消息的可靠性和吞吐量之间的权衡,acks参数可以取三种值,均为字符串类型。
- acks=1:默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader副本,比如在leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的折中方案。
- acks=0:生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常,导致Kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks 设置为 0可以达到最大的吞吐量。
- acks=-1或all:生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1(all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks=1的情况。要获得更高的消息可靠性需要配合
min.insync.replicas
等参数的联动。
max.request.size
这个参数用来限制生产者客户端能发送的消息的最大值,默认值为1048576B
,即1MB
。一般情况下,这个默认值就可以满足大多数的应用场景了。笔者不建议读者盲目地增大这个参数的配置值,尤其是在对Kafka整体脉络没有足够把控的时候。因为这个参数还涉及一些其他参数的联动,比如broker端的message.max.bytes
参数,如果配置错误可能会引起一些不必要的异常。
retries和retry.backoff.ms
retries参数用来配置生产者重试的次数,默认值为0
,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。
重试还和另一个参数retry.backoff.ms
有关,这个参数的默认值为100
,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries
和retry.backoff.ms
之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。
Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。
compression.type
这个参数用来指定消息的压缩方式,默认值为none
,即默认情况下,消息不会被压缩。该参数还可以配置为gzip
snappy
和lz4
。对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。
connection.max.idle.ms
这个参数用来指定在多久之后关闭闲置的连接,默认值是540000
(ms),即9分钟。
linger.ms
这个参数用来指定生产者发送 ProducerBatch 之前等待多久以让更多消息(ProducerRecord)加入ProducerBatch,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过linger.ms
值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。这个linger.ms
参数与TCP协议中的Nagle算法有异曲同工之妙。
receive.buffer.bytes
这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为32768
(B),即32KB。如果设置为-1,则使用操作系统的默认值。如果Producer与Kafka处于不同的机房,则可以适地调大这个参数值。
send.buffer.bytes
这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072
(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。
request.timeout.ms
这个参数用来配置Producer等待请求响应的最长时间,默认值为30000
(ms)。请求超时之后可以选择进行重试。注意这个参数需要比broker端参数replica.lag.time.max.ms
的值要大,这样可以减少因客户端重试而引起的消息重复的概率。
消费者
消费者和消费组
消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。
消费者和消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加(减少)消费者个数来提高(降低)整体消费能力。
基于默认的分区分配策略而言,对于分区数固定的情况,一味地增加消费者并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况,就会有消费者分配不到任何分区。另外,还可以通过消费者客户端参数partition.assignment.strategy
来设置消费者与订阅主题之间的分区分配策略。
对于消息中间件而言,一般有两种消息投递模式:点对点(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。Kafka 同时支持两种消息投递模式,而这正是得益于消费者与消费组模型的契合:
- 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。
消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数group.id来配置,默认值为空字符串。而消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同的机器上。
客户端开发
一个正常的消费逻辑需要具备以下几个步骤:
- 配置消费者客户端参数及创建相应的消费者实例
- 订阅主题
- 拉取消息并消费
- 提交消费位移
- 关闭消费者实例
必要的参数配置
Kafka消费者客户端KafkaConsumer中有4个参数必填。
bootstrap.servers
该参数的释义和生产者客户端 KafkaProducer 中的相同,用来 指 定 连 接 Kafka 集 群 所 需 的 broker 地 址 清 单,具 体 内 容 形 式 为host1:port1,host2:post,可以设置一个或多个地址,中间用逗号隔开,此参数的默认值为“”
。注意这里并非需要设置集群中全部的broker地址,消费者会从现有的配置中查找到全部的Kafka集群成员。这里设置两个以上的broker地址信息,当其中任意一个宕机时,消费者仍然可以连接到Kafka集群上。
group.id
消费者隶属的消费组名称,默认值为“”
,一般而言这个值要设置为含有一定意义的业务名称。
key.deserializer和value.deserializer
与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。这两个参数分别用来指定消息中key和value所需反序列化操作的反序列化器,这两个参数无默认值。注意这里必须填写反序列化器类的全限定名,比如示例中的org.apache.kafka.common.serialization.StringDeserializer
。
client.id
这个参数用来设定KafkaConsumer对应的客户端id,默认值为“”
。如果客户端不设置,则KafkaConsumer会自动生成一个非空字符串,内容形式如consumer-1
consumer-2
,即字符串consumer-
与数字的拼接。
此外,我们可以使用客户端中的org.apache.kafka.clients.consumer.ConsumerConfig
,每个参数在ConsumerConfig
类中都有对应的名称。
订阅主题与分区
一个消费者可以订阅一个或多个主题,对于subscribe()
方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。
对于消费者使用集合的方式(subscribe(Collection))来订阅主题而言,比较容易理解,订阅了什么主题就消费什么主题中的消息。如果前后两次订阅了不同的主题,那么消费者以最后一次的为准。
如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。
消费者不仅可以通过KafkaConsumer.subscribe()
方法订阅主题,还可以直接订阅某些主题的特定分区,在KafkaConsumer中还提供了一个assign()
方法来实现这些功能,这个方法只接受一个参数partitions
用来指定需要订阅的分区集合。这里补充说明一下TopicPartition类,在Kafka的客户端中,它用来表示分区。TopicPartition类只有2个属性:topic和partition,分别代表分区所属的主题和自身的分区编号。
如果我们不知道主题有多少个分区怎么办?答案是KafkaConsumer中的partitionsFor()
方法可以查询指定主题的元数据信息,该方法的返回值是List<PartitionInfo>
。其中PartitionInfo
即为该主题分区的元信息,PartitionInfo
类中的属性topic表示主题名称,partition代表分区编号,leader代表分区的leader副本所在的位置,replicas代表分区的AR集合,inSyncReplicas代表分区的ISR集合,offineReplicas代表分区的OSR集合。
既然有订阅,那么就有取消订阅,可以使用KafkaConsumer中的unsubscribe()
方法来取消主题的订阅。
集合订阅的方式subscribe(Collection)、正则表达式订阅的方式subscribe(Pattern)和指定分区的订阅方式 assign(Collection)分表代表了三种不同的订阅状态:AUTO_TOPICS
、AUTO_PATTERN
和USER_ASSIGNED
(如果没有订阅,那么订阅状态为NONE
)。然而这三种状态是互斥的,在一个消费者中只能使用其中的一种,否则会报出IllegalStateException
异常。
通过subscribe()
方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过assign()
方法订阅分区时,是不具备消费者自动均衡的功能的
反序列化
Kafka所提供的反序列化器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer,它们分别用于ByteBuffer、ByteArray、Bytes、Double、Float、Integer、Long、Short 及String类型的反序列化,这些序列化器也都实现了 Deserializer 接口,与KafkaProducer中提及的Serializer接口一样,Deserializer接口也有三个方法。
1 |
|
在实际应用中,在Kafka提供的序列化器和反序列化器满足不了应用需求的前提下,推荐使用Avro、JSON、Thrift、ProtoBuf或Protostuff等通用的序列化工具来包装,以求尽可能实现得更加通用且前后兼容。使用通用的序列化工具也需要实现Serializer 和 Deserializer 接口,因为Kafka客户端的序列化和反序列化入口必须是这两个类型。
消费消息
Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。
位移提交
对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个offset的概念,消费者使用offset来表示消费到分区中某个消息所在的位置。
每次调用poll()
方法时,它返回的是还没有消费过的消息集。要做到这一点,就需要记录上一次消费时的消费位移,而且这个消费位移必须持久化保存,否则消费者重启后就无法知晓之前的消费位移。此外,当新消费者加入时的再均衡动作同样需要消费位移,才能顺利把分区分配给新的消费者。
我们把消费位移存储起来的动作称为提交,消费者在消费完消息后需要执行消费位移的提交。
如图所示,x表示某一次某一次拉取操作中此分区消息的最大偏移量,假设当前消费者已经消费到位置x,我们就说该消费者的消费位移是x,图中也用了lastConsumedOffset来标识。
此外,当消费者需要提交的消费位移并不是x,而是x+1,也就是图中的position,它表示下一条需要拉取的消息位置。
KafkaConsumer 类提供了 position(TopicPartition)和 committed(TopicPartition)两个方法来分别获取上面所说的position和committed offset的值。
在 Kafka 中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit
配置,默认值为 true。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数auto.commit.interval.ms
配置,默认值为5秒,此参数生效的前提是enable.auto.commit
参数为true。
控制或关闭消费
KafkaConsumer 提供了对消费速度进行控制的方法,在有些应用场景下我们可能需要暂停某些分区的消费而先消费其他分区,当达到一定条件时再恢复这些分区的消费。KafkaConsumer中使用pause()
来实现暂停某些分区在拉取操作时返回数据,使用resume()
来恢复某些分区向客户端返回数据。
KafkaConsumer还提供了一个无参的paused()
来返回被暂停的分区集合。
指定位移消费
略
再均衡
再均衡是指分区的所属权从一个消费者转移到另一消费者的行为,它为消费组具备高可用性和伸缩性提供保障,使我们可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。不过在再均衡发生期间,消费组内的消费者是无法读取消息的。也就是说,在再均衡发生期间的这一小段时间内,消费组会变得不可用。另外,当一个分区被重新分配给另一个消费者时,消费者当前的状态也会丢失。
消费者拦截器
消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。
重要的消费者参数
fetch.min.bytes
该参数用来配置Consumer在一次拉取请求(调用poll()方法)中能从Kafka中拉取的最小数据量,默认值为1(B)。
fetch.max.bytes
该参数与fetch.max.bytes参数对应,它用来配置Consumer在一次拉取请求中从Kafka中拉取的最大数据量,默认值为 52428800(B),也就是 50MB。