通过 RabbitMQ 发送 MQTT 消息
2025-01-22 08:19:30    1.9k 字   
This post is also available in English and alternative languages.

RabbitMQ - MQTT 插件
emqx - MQTT 教程
rabbitmq + MQTT 实现智能家居

1. MQTT协议

MQTT(Message Queue Telemetry Transport),一种基于发布/订阅(publish / subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing)中的一个标准传输协议。

该协议将消息的发布者(publisher)与订阅者(subscriber)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ有点类似。

MQTT订阅

TCP 协议位于传输层,MQTT 协议位于应用层,MQTT 协议构建于 TCP/IP 协议上,也就是说只要支持 TCP/IP 协议栈的地方,都可以使用 MQTT 协议。

MQTT 只专注于发消息, 因此协议的结构非常简单。


1.1. MQTT数据包

MQTT 协议的数据包(也称为 MQTT 报文)是按照特定的分层结构组织的,由以下几个部分组成:

  • 固定头(Fixed header),所有数据包中都有固定头,包含类型、标志位、剩余长度。
  • 可变头(Variable header),位于固定头之后,内容和长度取决于报文类型。可变头通常包含一些特定的字段,例如:
    • 报文标识符(Packet Identifier):用于 QoS 1 和 QoS 2 报文中,标识唯一的报文。
    • 协议名称和版本:在 CONNECT 报文中,包含协议名称(如 “MQTT”)和协议版本(如 4 或 5)。
    • 主题名称(Topic Name):在 PUBLISH 报文中,表示消息的主题。
    • 其他字段:如用户名、密码、Will 消息等。
  • 有效载荷(Payload),报文的数据部分,其内容和长度也取决于报文类型。常见的有效载荷包括:
    • CONNECT 报文:客户端 ID、Will 消息、用户名、密码等。
    • PUBLISH 报文:实际的消息内容。
    • SUBSCRIBE 报文:订阅的主题列表和 QoS 级别。
    • UNSUBSCRIBE 报文:取消订阅的主题列表。
MQTT数据包

2. RabbitMQ

RabbitMQ 支持 AMQP、MQTT、STOMP 等协议。可以作为 MQTT 和其他协议(如 AMQP、STOMP)之间的桥梁,实现不同协议系统的互联。RabbitMQ 提供消息持久化,确保消息不丢失。支持消息确认机制,保证消息可靠传递。

其他的特点不做过多赘述。


2.1. docker 部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 启动 RabbitMQ,并启用 MQTT 插件
docker run -d \
--name rabbitmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
-p 1883:1883 \
-p 4369:4369 \
-p 5671:5671 \
-p 5672:5672 \
-p 15671:15671 \
-p 15672:15672 \
-p 15691-15692:15691-15692 \
-p 25672:25672 \
rabbitmq:3-management \
/bin/bash -c "rabbitmq-plugins enable rabbitmq_mqtt rabbitmq_web_mqtt && rabbitmq-server"
rabbit-mqtt

3. Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@Slf4j
@Configuration
public class MqttProducerConfiguration {

public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";

@Resource
private MqttConfiguration mqttConfiguration;

/**
* 创建 MqttPahoClientFactory 设置 MQTT 的连接属性
*/
@Bean
public MqttPahoClientFactory createMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}

@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(mqttConfiguration.getHost());
options.setAutomaticReconnect(mqttConfiguration.getAutomaticReconnect());
// 设置是否清空session.
// false表示服务器会保留客户端的连接记录,true表示每次连接到服务器都以新的身份连接
options.setCleanSession(mqttConfiguration.getCleanSession());
// 设置连接的用户名
options.setUserName(mqttConfiguration.getUsername());
// 设置连接的密码
options.setPassword(mqttConfiguration.getPassword().toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(mqttConfiguration.getConnectionTimeout());
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(mqttConfiguration.getKeepAliveInterval());
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
// if (Objects.nonNull(mqttConfiguration.getWillTopic()) && Objects.nonNull(mqttConfiguration.getWillData())) {
// options.setWill(mqttConfiguration.getWillTopic(), mqttConfiguration.getWillData().getBytes(),
// mqttConfiguration.getWillQos(), mqttConfiguration.getWillRetained());
// }
return options;
}

/**
* MQTT信息通道(生产者),出站直连通道
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

/**
* MQTT消息处理器(生产者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfiguration.getClientId(), createMqttClientFactory());
// 如果设置成true,即异步,发送消息时将不会阻塞。
messageHandler.setAsync(true);
// 设置默认QoS
messageHandler.setDefaultQos(mqttConfiguration.getQos());
// 设置默认主题,取第一个
messageHandler.setDefaultTopic(mqttConfiguration.getTopics()[0]);
messageHandler.setConverter(new DefaultPahoMessageConverter());
// 配合 mqttMessageSentEventListener 方法一起
messageHandler.setAsyncEvents(Boolean.TRUE);
return messageHandler;
}

/**
* 监听 MQTT 异步事件
*/
@Bean
public ApplicationListener<MqttMessageSentEvent> mqttMessageSentEventListener() {
return event -> {
Object topic = event.getMessage().getHeaders().get(MqttHeaders.TOPIC);
Object payload = event.getMessage().getPayload();
if (Objects.nonNull(event.getCause())) {
Throwable cause = event.getCause();
// 如果是 MQTT 异常
String reasonCode = cause instanceof MqttException ? String.valueOf(((MqttException) cause).getReasonCode()) : StringUtils.EMPTY;
// 处理发送失败的情况
log.error("MQTT消息发送失败,topic:{},MQTT错误码:{},errMsg:{}", topic, reasonCode, cause.getMessage(), cause);
// 重试机制、统计失败率
} else {
// 处理发送成功的情况
log.info("MQTT消息发送成功,topic:{},payload:{}", topic, payload);
// 统计成功率
}
};
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@Data
@Configuration
public class MqttConfiguration {
/**
* host 服务器地址配置
*/
@Value("${mqtt.host}")
private String[] host;
/**
* clientId
*/
@Value("${mqtt.clientId}")
private String clientId;
/**
* 主题
*/
@Value("${mqtt.topics}")
private String[] topics;
/**
* 用户名
*/
@Value("${mqtt.username}")
private String username;
/**
* 密码
*/
@Value("${mqtt.password}")
private String password;
/**
* 连接超时时长
*/
@Value("${mqtt.connectionTimeout}")
private Integer connectionTimeout;
/**
* keep Alive时间(心跳检测)
*/
@Value("${mqtt.keepAliveInterval}")
private Integer keepAliveInterval;
/**
* 消息处理机制
*/
@Value("${mqtt.qos}")
private Integer qos;
/**
* false为建立持久会话
*/
@Value("${mqtt.cleanSession}")
private Boolean cleanSession;
/**
* 断开后重新连接
*/
@Value("${mqtt.automaticReconnect}")
private Boolean automaticReconnect;
/**
* 遗嘱消息主题
*/
@Value("${mqtt.willTopic}")
private String willTopic;
/**
* 遗嘱消息内容
*/
@Value("${mqtt.willData}")
private String willData;
/**
* 遗嘱消息的QoS
*/
@Value("${mqtt.willQos}")
private Integer willQos;
/**
* 遗嘱消息是否保留
*/
@Value("${mqtt.willRetained}")
private Boolean willRetained;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
mqtt:
# 服务器地址
host: tcp://127.0.0.1:1883
# ID唯一
clientId: test_mqtt/producer
# 主题 多个主题用逗号(,)分割 #表示这个主题下面所有,topic1,topic2,topic2/topic22/#(example中默认取第一个主题)
topics: test_mqtt_topic
# 用户名
username: admin
# 密码
password: admin
# 连接超时
connectionTimeout: 30
# 心跳检测
keepAliveInterval: 60
# 对消息处理的几种机制。
# 0 表示的是订阅者没收到消息不会再次发送,消息会丢失
# 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息
# 2 多了一次去重的动作,确保订阅者收到的消息有一次
qos: 1
# 是否清空session,设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
cleanSession: false
# 断开后重新连接
automaticReconnect: true
# 遗嘱消息主题
willTopic: willTopic
# 遗嘱消息内容
willData: "offline"
# 遗嘱消息的QoS
willQos: 1
# 遗嘱消息是否保留
willRetained: false

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Slf4j
@SpringBootTest
@ExtendWith(SpringExtension.class)
public class ApplicationSpringBootExampleTest33 {
@Resource
private MqttSender mqttSender;
@Test
public void sendToMqtt() {
for (int i = 0; i < 20; i++) {
try {
mqttSender.sendToMqtt("test_mqtt_topic", "123456" + i);
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
}

4. 接收消息

4.1. MQTTX 接收

MQTTX接收

4.2. RabbitMQ Consumer 接收

有一点需要注意,当使用 RabbitMQ 的 MQTT 插件发送 MQTT 消息时,消息会被路由到 RabbitMQ 的默认交换机 amq.topic 上。

RabbitMQ监听接收消息

5. MQTT 主题设计

设计适用于 AWS IoT Core 的MQTT 主题

mqtt主题设计最佳实践

5.1. 点对点

一对一,Producer 和 Consumer 用同一个 MQTT 主题(Topic)作为交流渠道,需要确保 Topic 的唯一性,防止其他不相干设备订阅了 Topic。


5.2. 广播

一对多,广播模式将同一消息发送到多个设备。在广播模式中,多个 Consumer 订阅同一 MQTT 主题(Topic),Producer 将消息发布到这个主题。


5.3. 扇入

扇入模式是多对一通信模式,可以认为是广播模式的逆向。 多个设备在共享主题上发布,只有一个订阅者。