This post is also available in English and alternative languages.RabbitMQ - MQTT 插件
emqx - MQTT 教程
rabbitmq + MQTT 实现智能家居
1. MQTT协议
MQTT(Message Queue Telemetry Transport),一种基于发布/订阅(publish / subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing)中的一个标准传输协议。
该协议将消息的发布者(publisher)与订阅者(subscriber)进行分离,因此可以在不可靠的网络环境中,为远程连接的设备提供可靠的消息服务,使用方式与传统的MQ有点类似。
TCP 协议位于传输层,MQTT 协议位于应用层,MQTT 协议构建于 TCP/IP 协议上,也就是说只要支持 TCP/IP 协议栈的地方,都可以使用 MQTT 协议。
MQTT 只专注于发消息, 因此协议的结构非常简单。
1.1. MQTT数据包
MQTT 协议的数据包(也称为 MQTT 报文)是按照特定的分层结构组织的,由以下几个部分组成:
- 固定头(Fixed header),所有数据包中都有固定头,包含类型、标志位、剩余长度。
- 可变头(Variable header),位于固定头之后,内容和长度取决于报文类型。可变头通常包含一些特定的字段,例如:
- 报文标识符(Packet Identifier):用于 QoS 1 和 QoS 2 报文中,标识唯一的报文。
- 协议名称和版本:在 CONNECT 报文中,包含协议名称(如 “MQTT”)和协议版本(如 4 或 5)。
- 主题名称(Topic Name):在 PUBLISH 报文中,表示消息的主题。
- 其他字段:如用户名、密码、Will 消息等。
- 有效载荷(Payload),报文的数据部分,其内容和长度也取决于报文类型。常见的有效载荷包括:
- CONNECT 报文:客户端 ID、Will 消息、用户名、密码等。
- PUBLISH 报文:实际的消息内容。
- SUBSCRIBE 报文:订阅的主题列表和 QoS 级别。
- UNSUBSCRIBE 报文:取消订阅的主题列表。
2. RabbitMQ
RabbitMQ 支持 AMQP、MQTT、STOMP 等协议。可以作为 MQTT 和其他协议(如 AMQP、STOMP)之间的桥梁,实现不同协议系统的互联。RabbitMQ 提供消息持久化,确保消息不丢失。支持消息确认机制,保证消息可靠传递。
其他的特点不做过多赘述。
2.1. docker 部署
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| docker run -d \ --name rabbitmq \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ -p 1883:1883 \ -p 4369:4369 \ -p 5671:5671 \ -p 5672:5672 \ -p 15671:15671 \ -p 15672:15672 \ -p 15691-15692:15691-15692 \ -p 25672:25672 \ rabbitmq:3-management \ /bin/bash -c "rabbitmq-plugins enable rabbitmq_mqtt rabbitmq_web_mqtt && rabbitmq-server"
|
3. Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| @Slf4j @Configuration public class MqttProducerConfiguration {
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
@Resource private MqttConfiguration mqttConfiguration;
@Bean public MqttPahoClientFactory createMqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; }
@Bean public MqttConnectOptions getMqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(mqttConfiguration.getHost()); options.setAutomaticReconnect(mqttConfiguration.getAutomaticReconnect()); options.setCleanSession(mqttConfiguration.getCleanSession()); options.setUserName(mqttConfiguration.getUsername()); options.setPassword(mqttConfiguration.getPassword().toCharArray()); options.setConnectionTimeout(mqttConfiguration.getConnectionTimeout()); options.setKeepAliveInterval(mqttConfiguration.getKeepAliveInterval()); return options; }
@Bean(name = CHANNEL_NAME_OUT) public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }
@Bean @ServiceActivator(inputChannel = CHANNEL_NAME_OUT) public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfiguration.getClientId(), createMqttClientFactory()); messageHandler.setAsync(true); messageHandler.setDefaultQos(mqttConfiguration.getQos()); messageHandler.setDefaultTopic(mqttConfiguration.getTopics()[0]); messageHandler.setConverter(new DefaultPahoMessageConverter()); messageHandler.setAsyncEvents(Boolean.TRUE); return messageHandler; }
@Bean public ApplicationListener<MqttMessageSentEvent> mqttMessageSentEventListener() { return event -> { Object topic = event.getMessage().getHeaders().get(MqttHeaders.TOPIC); Object payload = event.getMessage().getPayload(); if (Objects.nonNull(event.getCause())) { Throwable cause = event.getCause(); String reasonCode = cause instanceof MqttException ? String.valueOf(((MqttException) cause).getReasonCode()) : StringUtils.EMPTY; log.error("MQTT消息发送失败,topic:{},MQTT错误码:{},errMsg:{}", topic, reasonCode, cause.getMessage(), cause); } else { log.info("MQTT消息发送成功,topic:{},payload:{}", topic, payload); } }; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| @Data @Configuration public class MqttConfiguration {
@Value("${mqtt.host}") private String[] host;
@Value("${mqtt.clientId}") private String clientId;
@Value("${mqtt.topics}") private String[] topics;
@Value("${mqtt.username}") private String username;
@Value("${mqtt.password}") private String password;
@Value("${mqtt.connectionTimeout}") private Integer connectionTimeout;
@Value("${mqtt.keepAliveInterval}") private Integer keepAliveInterval;
@Value("${mqtt.qos}") private Integer qos;
@Value("${mqtt.cleanSession}") private Boolean cleanSession;
@Value("${mqtt.automaticReconnect}") private Boolean automaticReconnect;
@Value("${mqtt.willTopic}") private String willTopic;
@Value("${mqtt.willData}") private String willData;
@Value("${mqtt.willQos}") private Integer willQos;
@Value("${mqtt.willRetained}") private Boolean willRetained; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| mqtt: host: tcp://127.0.0.1:1883 clientId: test_mqtt/producer topics: test_mqtt_topic username: admin password: admin connectionTimeout: 30 keepAliveInterval: 60 qos: 1 cleanSession: false automaticReconnect: true willTopic: willTopic willData: "offline" willQos: 1 willRetained: false
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Slf4j @SpringBootTest @ExtendWith(SpringExtension.class) public class ApplicationSpringBootExampleTest33 { @Resource private MqttSender mqttSender; @Test public void sendToMqtt() { for (int i = 0; i < 20; i++) { try { mqttSender.sendToMqtt("test_mqtt_topic", "123456" + i); TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { log.error(e.getMessage(), e); } } } }
|
4. 接收消息
4.1. MQTTX 接收
4.2. RabbitMQ Consumer 接收
有一点需要注意,当使用 RabbitMQ 的 MQTT 插件发送 MQTT 消息时,消息会被路由到 RabbitMQ 的默认交换机 amq.topic
上。
5. MQTT 主题设计
设计适用于 AWS IoT Core 的MQTT 主题
mqtt主题设计最佳实践
5.1. 点对点
一对一,Producer 和 Consumer 用同一个 MQTT 主题(Topic)作为交流渠道,需要确保 Topic 的唯一性,防止其他不相干设备订阅了 Topic。
5.2. 广播
一对多,广播模式将同一消息发送到多个设备。在广播模式中,多个 Consumer 订阅同一 MQTT 主题(Topic),Producer 将消息发布到这个主题。
5.3. 扇入
扇入模式是多对一通信模式,可以认为是广播模式的逆向。 多个设备在共享主题上发布,只有一个订阅者。