1. Kafka Consumber 消费者
kafka Consumer 消费者从属于 消费者群组(Consumer Group) 。
一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。
官网解释:
1 | Consumers label themselves with a consumer group name, and each record published to a topic is delivered to one consumer instance within each subscribing consumer group. |
增加群组中的消费者是横向伸缩消费能力的主要方式。
kafka消费者经常会有一些高延迟的业务操作,例如:写数据库、HDFS、业务处理 等 耗时较多,消费者无法跟上数据生成速度,所以可以增加更多消费者,分担负载。
注意:消费者数量不应该大于topic分区数,多余的消费者只会被闲置。
1.1. 为什么需要 consumer
同分区有点像,也是为了高伸缩性,还为了高容错。
一旦某个 consumer 挂了,consumer group 会立即将 奔溃consumer 负责的分区转交给其他 consumer 处理。
从而保证整个 consumer group 持续工作,不会丢失数据,这个过程称为 “重平衡”。
1.1.1. 示例代码
1 | public class ConsumerMainApp { |
消息轮询是 Consumer API 的核心,如上面示例代码中’while(true)'无限循环。它通过持续轮询向Kafka请求数据。
通过 poll(long timeOut)方法获取数据。
在第一次调用poll(long timeOut)方法时,它会负责查找 GroupCoordinator,然后加入群组,接手分配的分区。
如果发生了再均衡,整个过程也是在轮询期间进行。心跳也在轮询中发送。
1.2. 参数配置
参数 | 含义 |
---|---|
bootstrap.servers | broker地址配置 格式:127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092 |
key.deserializer | key-反序列化类型 |
value.deserializer | value-反序列化类型 |
group.id | 指定consumer属于哪个群组 |
enable.auto.commit | consumer是否自动提交offset位移。 设置为true,则consumer在后台自动提交位移。 否则,需要手动提交位移 |
auto.commit.interval.ms | Consumer的offset自动commit时的周期 |
auto.offset.reset | 当Kafka中没有初始offset偏移量或服务器上不再存在当前偏移量时,采取的策略。 earliest:从最早开始拉取数据。 latest:从当前最新的开始拉取 即当前offset 之后的第一个开始拉取。 |
partition.assignment.strategy | |
rebalance.backoff.ms | |
rebalance.max.retries |
更多配置,请查阅Kafka官方文档。
1.3. offset 位移
每个 consumer(消费者) 都会为它消费的 Partition(分区) 维护属于自己的位置信息来记录当前消费了多少条数据。kafka 中术语叫 offset(位移)。
consumer消费者会定期向Kafka集群提交自己消费数据的进度,这个操作称为:offset commit(位移提交)。
位移提交不仅表示consumer消费者的消费速度,同时也决定了consumer消费语义保证。
注意:
1 | 旧版本consumer会定期将位移信息提交到zookeeper下的固定觉点。 |
如上图,某个topic的分区,producer 向其中写了12条数据,producer-offset最新、最大值也就是12。
1 | Consumer-Group-A 和 Consumer-Group-B 两个 消费者实例 去消费这个分区。 |
consumer 读取记录后,消费者会以线性的方式增加偏移量。
由于 consumer offset 保存在消费者实例中,因此 consumer 可以选择从offset的内任何位置消费数据。
例如:consumer 实例重启后,重置到一个旧的偏移量,重新处理之前的数据。
1.3.1. offset commit(位移提交)
consumer 消费者需要定期向Kafka集群汇报自己消费数据的进度。这个过程被称为:位移提交。
offset 对 consumer 非常重要,因为它是实现消息交付语义的保证。
1.3.2. 旧版本
旧版本中,consumer 把位移提交至zookeeper上。
路径:/consumer/<group.id>/offset/<topic>/<partitionId>
1.3.3. 自动提交
配置 | 含义 |
---|---|
auto.commit.enable | 如果为true,自动定期向zookeeper提交offset 默认:true |
auto.commit.interval.ms | 自动提交offset给zookeeper的间隔时间 默认:60秒(60*1000) |
1.3.3.1. 手动提交
配置 auto.commit.enable 为false。
需要自己承担提交 offset 的职责。
1.3.4. 新版本
因为 zookeeper 不适合高并发读写,新版本中,consumer 把位移提交至Kafka一个内部topic中 _consumer_offset。
由此,新版本 consumer 不再依赖 zookeeper。(旧版本 consumer 配置需要用户指定zookeeper配置)
kafka会自动创建_consumer_offset topic,默认是50个分区,3个副本。
因为 Consumer 能够同时消费多个分区的数据,所以 位移提交 是在分区粒度上进行的。Consumer需要为分配给它的每个分区提交各自的位移数据。
1.3.4.1. 自动提交
默认(不设置情况),consumer 是自动提交位移的,提交间隔时间是5秒。
consumer 程序配置参数
配置 | 含义 |
---|---|
enable.auto.commit | consumer是否后台自动提交 默认值:true |
auto.commit.interval.ms | 自动提交时间间隔 默认值:5000 时间单位:毫秒 |
1.3.4.2. 手动提交
配置 enable.auto.commit 设置为false。
需要自己承担提交 offset 的职责。
1.4. 重平衡、再均衡
当一个消费者 被关闭 或 宕机 离开群组,原本由它读取的分区将由群组里的其他消费者读取;当主题发生变化,比如 添加了新的分区,会发生分区重新分配。
分区的所有权从一个消费者转移到另一个消费者,这种行为被称为 再均衡。
再均衡 为消费者群组提供了高可用和伸缩性。
不过正常情况下,并不希望发生这种情况;再均衡期间,消费者无法消费数据,造成整个群组一段时间的不可用。
而且,当分区重新分配给另一个消费者时,消费者当前的读取状态会丢失,需要去刷新缓存,在重新恢复状态之前会拖慢系统。
某topic下有100个分区,consumer group下有20个consumer实例。
正常情况,kafka会为每个consumer 分配5个分区。这个分配过程就是 rebalance。
rebalance 期间,所有 consumer 实例不能消费任何消息。
1.4.1. 旧版本
依赖zookeeper进行reblance。
1.4.2. 新版本
新版本中,使用 组协调协议(group coordination protocol)。
1.4.3. reblance 触发条件
consumer group 成员变更。新成员加入、成员离开(主动离开、奔溃)
consumer group 订阅topic的分区数量发生变更。
consumer group 订阅的topic数量发生变更。
1.4.4. rebalance 策略
分配策略决定了 订阅topic的每个分区会被分配给哪个 consumer。
1.4.4.1. range 策略
range策略主要是基于范围的思想。它将单个topic中所有的分区按照顺序排列。
然后把这些分区划分成固定大小的组合,并分配给每个 Consumer。
① 获取Topic下所有有效分区平铺 如:P0、P1、P2、P3、P4、P5、P6、P7、P8
② 消费者按照字典排序 如:c-1、c-2、c-3
③ 分区数 除以 消费者数量,得到 n
④ 分区数 对 消费者数量 取余,得到 m
⑤ 前m个消费者 分配 n+1个 分区,剩余的消费者分配 n个 分区。
1.4.4.2. round-robin 策略
round-robin策略,会 轮询 所有topic的所有分区,挨个分配给各个 consumer。
1.4.4.3. sticky 策略
待定
1.4.5. coordination 协调者
coordination:协调者,专门为 Consumer Group 服务,负责为 Consumer Group 执行 Reblance,以及提供位移管理和组成员管理(新成员到达、成员离开时,重新分配分区)等。
每一个 Consumer Group 都有各自对应的 coordination(协调者)。
上面说 consumer-offset-commit(位移提交) 时提到过,新版本kafka中,创建了一个_consumer_offset topic来保存 consumer 提交的offset数据。
coordination(协调者) 就是 Consumer Group 对应_consumer_offset topic中某分区的主分区(hash),所在的Broker。
各个 Consumer Group 如何定位到各自的 coordinator?
① 确定_consumer_offset(位移主题)的哪个分区 来保存Consumer Group数据
1 | partitionId = Math.abs(groupId.hashCode() % offsetsTopicPartitionCount |
② 找出主分区所在的Broker,该Broker即为对应的 coordination。
1.5. 注意
kafka 允许一个 consumer group 消费多个topic主题,但不建议这么做。
一个 consumer group 消费多个topic主题,其中任意一个 topic 的 consumer 重新上下线,会造成 所有 consumer 产生 reblance。
建议,一个 consumer group 对应一个topic。