kafka producer 生产者
2025-01-22 08:19:30    4.3k 字   
This post is also available in English and alternative languages.

可以使用 Spring 封装的 Client,或者使用 Kafka 官方的 Client 构建消息数据生产者。

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

1. 发送消息大致流程

java-producer流程(图片来自网络)

将待发送的数据封装到 ProducerRecord 中,将数据序列化后传递给 paritioner,并通过分区器确定目标分区;最终落入 kafkaProducer 内的一个缓冲池中(池中维护着还没有被推送的数据)。kafkaProducer 专门有个后台I/O线程负责推送数据。

  • producer主线程:
    • 当调用 send 方法发送数据后,producer(生产者)会将消息封装成ProducerRecord对象;
    • 序列化封装好的对象,发送给分区器(partitioner);
    • 分区器(partitioner)根据key确定消息数据到底要写入topic的哪个partition(分区),然后寻找Leader Replica(partition主副本)所在的broker;
    • 确定消息数据的目标分区后,将消息数据保存到位于producer程序中的一块内存缓冲区中。
  • sender线程:
    • 从内存缓冲区中取出准备就绪的消息数据,将发往同一分区的多条消息添加到一个批次(batch)中,统一发送给对应的broker。

以上,有几个注意点:

  • producer 中会有专门的sender线程负责向发送消息数据。
  • producer 发送消息是批量发送(通过参数设置也可以实时发送)。

1.1. 示例代码

推送数据至服务端分两种:同步、异步。KafkaProducer.send 方法是异步的。send 方法返回一个 Future 对象。如果想同步调用,接着调用 get 方法即可。


1.1.1. 同步方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ProducerMainApp {
static final Logger LOGGER = LoggerFactory.getLogger(ProducerMainApp.class);
private static final String APPLICATION_CONTEXT_FILE = "/spring/applicationContext.xml";
private KafkaProducer kafkaProducer;

public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(APPLICATION_CONTEXT_FILE);
ProducerMainApp bean = applicationContext.getBean(ProducerMainApp.class);
bean.processSynchronizationSendMsg();
}

//同步方式
private void processSynchronizationSendMsg() {
kafkaProducer = new KafkaProducer(getProducerConf());
try {
for (int i = 0; i < 6; i++) {
String message = "--- test message - " + i;
Future<RecordMetadata> testTopicFuture = kafkaProducer.send(new ProducerRecord("test", Integer.toString(i), message));
RecordMetadata recordMetadata = testTopicFuture.get();
LOGGER.info("synchronization send message :{},offset:{}", message, recordMetadata.offset());
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
kafkaProducer.close();
}
}

private Properties getProducerConf() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//acks参数指定 必须有多少分区副本收到消息,生产者才会认为消息写入是成功的(0、1、all)
properties.setProperty("acks", "all");
return properties;
}
}

1.1.2. 异步方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ProducerMainApp {
static final Logger LOGGER = LoggerFactory.getLogger(ProducerMainApp.class);
private static final String APPLICATION_CONTEXT_FILE = "/spring/applicationContext.xml";
private KafkaProducer kafkaProducer;

public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(APPLICATION_CONTEXT_FILE);
ProducerMainApp bean = applicationContext.getBean(ProducerMainApp.class);
bean.processAsynchronousSendMsg();
}

//异步方式
private void processAsynchronousSendMsg() {
kafkaProducer = new KafkaProducer(getProducerConf());
try {
for (int i = 0; i < 6; i++) {
String message = "--- test message - " + i;
kafkaProducer.send(new ProducerRecord("test", Integer.toString(i), message), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
LOGGER.info("asynchronous send message :{},offset:{}", message, metadata.offset());
if (null != exception) {
LOGGER.error(exception.getMessage(), exception);
}
}
});
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
kafkaProducer.close();
}
}

private Properties getProducerConf() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//acks参数指定 必须有多少分区副本收到消息,生产者才会认为消息写入是成功的(0、1、all)
properties.setProperty("acks", "all");

return properties;
}
}

2. 主要参数

producer(生产者)向topic(主题)发送消息数据时,需要注意以下几个主要参数。

参数含义
bootstrap.serversbroker地址配置
格式:127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092
key.serializer指定 key 的序列化
用户可以通过实现org.apache.kafka.common.serialization.StringSerializer接口自定义序列化,并且设置时需要指定全路径类名。
注意:即使发送消息数据时,即使不指定key,该参数也必须配置。
被发送到 broker 的任何消息数据都必须是字节数组;因此 producer(生产者)发送前需要将消息数据序列化,然后才能发送到 broker。
value.serializer指定 value(消息体)的序列化。
用户可以通过实现org.apache.kafka.common.serialization.StringSerializer接口自定义序列化,并且设置时需要指定全路径类名。
acks该参数指定了,必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的(用于告知 Kafka,producer(生产者)发送的消息数据怎样才算成功提交)。

0: producer(生产者)不等待返回结果,继续发送后续的消息数据。由于不接收发送结果,这种情况下失败回调就失去了作用,即用户无法通过回调感知到发送失败的消息数据。因此吞吐量高

1: 默认值,只要 Leader Replica(partition主副本)接收消息数据并写入磁盘成功,producer(生产者)就认为消息数据发送成功。

-1(all): Leader Replica(partition主副本)接收消息数据写入磁盘成功,并且ISR集合中所有 Follower Replica(partition从副本)全部同步完成(写入磁盘成功),producer(生产者)就认为消息数据发送成功。缺点是吞吐量低。优点是保证消息数据。
buffer.memoryproducer(生产者)并不是实时向 broker 发送消息数据的,而是批量发送,在发送之前,会将消息数据存储在内存缓冲区中;然后由另一个专属 sender 线程负责从内存缓冲区中取出消息数据,执行真正的发送。
这部分内存缓冲区大小就是由该参数指定的,此参数指定 producer(生产者)用于缓存消息数据的缓冲区大小,单位是字节,默认值是32MB。
如果 producer(生产者)向内存缓冲区写入消息数据的速度超过专属线程发送的速度,必然会造成内存缓冲区空间不断增大,此时 producer(生产者)会暂停向内存缓冲区写入,等待专属 sender 线程;
compression.type此参数设置 producer(生产者)是否压缩消息数据,默认是none,即不压缩消息数据;
优点是显著降低网络IO开销,提升整体吞吐量。缺点是增加 producer(生产者)所在机器的CPU开销。
retries用于设置 produer(生产者)发送消息数据失败后的重试次数,默认值0,即不进行重试。
broker 在处理写入请求时可能因为网络抖动或其他原因导致消息数据发送失败,这种异常一般是可以自行恢复的,因此 producer(生产者)内部提供了自动重试机制。
最常见的瞬时错误是 broker leader 换届选举时。
batch.sizeproducer(生产者)会将发往同一分区的多条消息数据添加到一个批次(batch)中,当 batch 满了一起发送。
若 batch 包含消息数据太少,一次发送请求能够写入的消息数据就很少,producer(生产者)的吞吐量会比较低;若 batch 包含消息数据太多,那就会给内存带来压力。
因此此参数的设置是时间与空间平衡的体现;
默认值16384,即16KB;建议根据实际情况合理设置。
producer(生产者)也不总是等到 batch 满了才发送消息,有可能 batch 还有空闲空间时 producer(生产者)就会发送该 batch。
linger.ms上面提到,producer(生产者)也不总是等到 batch 满了才发送消息,有可能 batch 还有空闲空间时 producer(生产者)就会发送该 batch;这是吞吐量与延时之间的权衡。
此参数用于控制 producer(生产者)发送消息数据的延时行为;默认值是0,表示消息需要被立即发送,无需关心 batch 是否填满。
为了减少网络耗时,需要设置这个值。需要注意,太大可能容易导致缓冲区满,阻塞消费者;太小容易频繁请求服务端。
max.request.size此参数用于控制 producer(生产者)能够发送的最大消息大小,默认 1048576 字节。
request.timeout.ms当 producer(生产者)发送消息数据给 broker 后,broker 需要在指定时间内返回处理结果给producer(生产者),这个时间范围由此参数控制,默认30s。
如果 broker 在30秒内没有将处理结果返回 producer(生产者),producer(生产者)认为该请求超时,会在回调中抛出异常;
client.id附着在每个请求的后面,用于标识请求是从什么地方发送过来的
connections.max连接空闲时间超过过久自动关闭(单位毫秒)

更多配置,请查阅Kafka官方文档。


3. 分区

所谓分区策略是决定 producer(生产者)将消息数据发送到哪个 partition(分区)的算法;kafka内置了默认的分区策略,同时也支持自定义分区策略。

默认分区策略是轮询方式(Round-robin),轮询策略有非常优秀的负载均衡表现,它总是能保证消息最大限度地被平均分配到所有分区上,故默认情况下它是最合理的分区策略,也是最常用的分区策略之一。

分区功能很重要,在我实际业务场景中 就运用了这个功能。
上游系统通过 kafka 推送广告物料数据给我们广告投放平台,上游分8个 partition,下游有八台机器分别消费这8个 partition。
广告物料数据根据 广告计划ID 进行分区,同一个 广告计划ID 的数据都在同一个 partition 里面,也就是说,同一个 广告计划ID 的物料数据,只会被同一台机器消费,不会被多台机器消费(除非不可抗拒因素:宕机)。


3.1. 为什么需要分区?

一方面,为了提供高负载,实现系统高伸缩性。

和 elasticsearch 分片设计类似,每个分区(主分区)布置在不同节点,每个分区(主分区)都可以独立对外提供服务,以提高系统吞吐量。

另一方面,如果不分区,物料数据会被随机发送到多个 partition(不受控);如果下游某个机器消费速度快,会导致广告物料数据错乱。

本来应该先下线、更新、开启,由于 partition 不同,消费不一致,最后变成了 更新、开启、下线。

这也是 Kafka 的一个特色。

ProducerRecord(String topic, K key, V value)

通过设置 key,可以指定消息写到主题的哪个分区。拥有相同 key 的消息,被写到同一个分区。

换句话说,相同 key 的消息被写到同一个分区,如果一个进程只从一个主题的分区读取数据,那么具有相同 key 的数据,都会被该进程读取。


3.2. key-默认分区器

如果 key 设置为 null,则使用 默认分区器默认分区器使用轮询算法将数据均衡的分布到各个分区上。

如果 key 不为空,并且使用 默认分区器,kafka 会对 key 值进行 hash(kakfa自带的hash算法),根据散列值将数据映射到特定分区中。

kafka 在映射分区时,会使用主题所有的分区,而不仅仅是可用分区。

只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。

举个栗子:

1
2
3
4
5
6
7
8
某topic有5个分区(A、B、C、D、E)。

在分区数量保持不变的情况下,可以保证 key=045189 的记录总是被写到分区A中。
consumer 从A分区读取数据时,对该类数据进行了各种针对处理。

一旦主题增加了新的分区,就无陆保证 key=045189 的记录总是被写到分区A;

旧数据仍然留在分区A,但新的数据可能被写到其他分区上 。

了解 elasticsearch 的童鞋可以发现,和 elasticsearch 中路由分片类似。


3.3. key-自定义分区

自定义分区 需要实现 Partitioner 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class MyPartitioner implements Partitioner {
public MyPartitioner() {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int result;
int valueIndex = Integer.parseInt(String.valueOf(key));
if (valueIndex % 2 == 0) {
result = 0;
} else {
result = 1;
}
System.out.println("key:" + key + ",result:" + result);
return result;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
System.out.println("configs:" + configs);
}
}

public class ProducerMainApp {
public static void main(String[] args) {
ProducerMainApp bean = new ProducerMainApp();
bean.testSendKafkaMessage();
}
private void testSendKafkaMessage() {
KafkaProducer kafkaProducer = new KafkaProducer(getProducerConf());
try {
for (int i = 0; i < 6; i++) {
String message = "test-" + i;
Future<RecordMetadata> testTopicFuture = kafkaProducer.send(new ProducerRecord("testTopic", Integer.toString(i), message));
RecordMetadata recordMetadata = testTopicFuture.get();
System.out.println("send message:" + message + ",offset:" + recordMetadata.offset());
}
} catch (Exception e) {
e.printStackTrace();
}finally {
kafkaProducer.close();
}
}

private Properties getProducerConf() {
Properties properties = new Properties();
//指定broker地址
properties.setProperty("bootstrap.servers", "127.0.0.1:9092,127.0.0.2:9092");
//broker消息的key和value序列化
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//acks 参数指定了 必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的 。(0、1、all)
properties.put("acks", "all");
//自定义分区
properties.put("partitioner.class","org.kafka.producer.partitioner.MyPartitioner");
return properties;
}
}

3.4. 位移(producer-offset)

kafka-producer

生产者根据分区器,将数据写入对应的分区。分区中的每条数据都会分配一个 id 来表示顺序,即 offset。

每个分区中都有一个 offset。offset 是分区中每条数据的唯一标识。这个 offset 就是 producer(生产者) offset。


4. 压缩

在 Kafka中,压缩可能发生在两个地方:producer(生产者)端和 broker 端。

  • producer(生产者)端

    配置compression.type参数即表示启用指定类型的压缩算法;

    例如使用 spring-kafka,在application.yml配置中添加compression-type: gzip,表明该 producer(生产者)的压缩算法使用的是GZIP;

    producer(生产者)启动后生产的每个消息集合都是经GZIP压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。

  • broker端

    大部分情况下 broker 从 producer(生产者)端接收到消息数据后仅仅是原封不动地保存而不会对其进行任何修改;

    但总有例外,例如下面几种情况就可能让 broker 重新压缩消息。

    1. broker 端指定了和 producer(生产者)端不同的压缩算法

      例如,producer(生产者)使用GZIP压缩,broker 使用 snappy 压缩;这种情况下 broker 接收到 producer(生产者)发送的GZIP 压缩消息数据后,只能先解压缩然后使用 Snappy 再重新压缩一遍。

      broker 端的配置同名compression.type,它的默认值是 producer,意味着默认采用 producer 的压缩方式;

    2. broker 端发生消息格式转换

      为了兼容老版本的消费者程序,broker 端会对新版本消息数据执行向老版本格式的转换;这个过程中会涉及消息数据的解压缩和重新压缩;

      一般情况下这种消息格式转换对性能是有很大影响的,除了这里的压缩之外,它还让 Kafka 丧失了引以为豪的Zero Copy特性。


5. 解压缩

理想完美的状态下来说,解压缩发生在消费者程序中;也就是说 producer(生产者)发送压缩消息数据到 broker 后,broker 照单全收并原样保存起来。当 consumer(消费者)程序请求这部分消息时,broker 依然原样发送出去,当消息数据到达 consumer(消费者)端后,由 consumer(消费者)自行解压缩还原成之前的消息。

Kafka 会将启用了哪种压缩算法封装进消息集合中,这样当consumer(消费者)读取到消息集合时,它自然就知道了这些消息使用的是哪种压缩算法。

producer(生产者)端压缩、broker 端保持、consumer(消费者)端解压缩;

特别要注意的是,除了在 consumer(消费者)端解压缩,broker 端也会进行解压缩;
区别于上面提到为了兼容老版本而进行消息格式转换的解压缩场景不同,每个压缩过的消息集合在 Broker 端写入时都要发生解压缩操作,目的就是为了对消息执行各种验证


6. Reference

  • 《Kafka 核心技术与实战 - 极客时间》