iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >SpringBoot如何实现MQTT消息发送和接收
  • 912
分享到

SpringBoot如何实现MQTT消息发送和接收

2023-07-05 11:07:57 912人浏览 独家记忆
摘要

今天小编给大家分享一下SpringBoot如何实现MQTT消息发送和接收的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。spr

今天小编给大家分享一下SpringBoot如何实现MQTT消息发送和接收的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

spring integration交互逻辑

对于发布者:

消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。

DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MQttPahoMessageHandler 发送到指定的 Topic。

对于订阅者:

通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel

同样由 MessageChannel 的实例 DirectChannel 处理消费细节。

Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。

可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定。

1、Maven依赖

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration --><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-integration</artifactId>    <version>2.5.1</version></dependency> <!-- Https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream --><dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-stream</artifactId>    <version>5.5.5</version></dependency><!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt --><dependency>    <groupId>org.springframework.integration</groupId>    <artifactId>spring-integration-mqtt</artifactId>    <version>5.5.5</version></dependency>

2、yaml配置文件

#mqtt配置mqtt:  username: 123  passWord: 123  #MQTT-服务器连接地址,如果有多个,用逗号隔开  url: tcp://127.0.0.1:1883  #MQTT-连接服务器默认客户端ID  client:    id: ${random.value}  default:    #MQTT-默认的消息推送主题,实际可在调用接口时指定    topic: topic,mqtt/test/#    #连接超时  completionTimeout: 3000

3、mqtt生产者消费者配置类

import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.integration.annotation.IntegrationComponentScan;import org.springframework.integration.annotation.ServiceActivator;import org.springframework.integration.channel.DirectChannel;import org.springframework.integration.core.MessageProducer;import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;import org.springframework.integration.mqtt.core.MqttPahoClientFactory;import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.MessageHandler;import org.springframework.messaging.MessagingException; import java.util.Arrays;import java.util.List; @Configuration@IntegrationComponentScan@Slf4jpublic class MqttSenderAndReceiveConfig {     private static final byte[] WILL_DATA;     static {        WILL_DATA = "offline".getBytes();    }     @Autowired    private MqttReceiveHandle mqttReceiveHandle;     @Value("${mqtt.username}")    private String username;     @Value("${mqtt.password}")    private String password;     @Value("${mqtt.url}")    private String hostUrl;     @Value("${mqtt.client.id}")    private String clientId;     @Value("${mqtt.default.topic}")    private String defaultTopic;     @Value("${mqtt.completionTimeout}")    private int completionTimeout;   //连接超时         @Bean(value = "getMqttConnectOptions")    public MqttConnectOptions getMqttConnectOptions1() {        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接        mqttConnectOptions.setCleanSession(true);        // 设置超时时间 单位为秒        mqttConnectOptions.setConnectionTimeout(10);        mqttConnectOptions.setAutomaticReconnect(true);        mqttConnectOptions.setUserName(username);        mqttConnectOptions.setPassword(password.toCharArray());        mqttConnectOptions.setServerURIs(new String[]{hostUrl});        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制        mqttConnectOptions.seTKEepAliveInterval(10);        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。        //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);        return mqttConnectOptions;    }         @Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        factory.setConnectionOptions(getMqttConnectOptions1());        return factory;    }         @Bean    public MessageChannel mqttOutboundChannel() {        return new DirectChannel();    }         @Bean    @ServiceActivator(inputChannel = "mqttOutboundChannel")    public MessageHandler mqttOutbound() {        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic(defaultTopic);        messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调        //设置转换器 发送bytes数据        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();        converter.setPayloadAsBytes(true);        return messageHandler;    }         @Bean    public MessageProducer inbound() {        List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));        String[] topics = new String[topicList.size()];        topicList.toArray(topics);        MqttPahoMessageDrivenChannelAdapter adapter =                new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);        adapter.setCompletionTimeout(completionTimeout);        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();        converter.setPayloadAsBytes(true);        adapter.setConverter(converter);        adapter.setQos(2);        adapter.setOutputChannel(mqttInputChannel());        return adapter;    }         @Bean    public MessageChannel mqttInputChannel() {        return new DirectChannel();    }         @Bean    @ServiceActivator(inputChannel = "mqttInputChannel")    public MessageHandler handler() {        return new MessageHandler() {            @Override            public void handleMessage(Message<?> message) throws MessagingException {                //处理接收消息                mqttReceiveHandle.handle(message);            }        };    }}

4、消息处理类 

@Slf4j@Componentpublic class MqttReceiveHandle {     public void handle(Message<?> message) {        log.info("收到订阅消息: {}", message);        String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();        log.info("消息主题:{}", topic);        Object payLoad = message.getPayload();        byte[] data = (byte[]) payLoad;        Packet packet = Packet.parse(data);        log.info("发送的Packet数据{}", JSON.tojsONString(packet));     }}

5、mqtt发送接口 

import org.springframework.integration.annotation.MessagingGateway;import org.springframework.integration.mqtt.support.MqttHeaders;import org.springframework.messaging.handler.annotation.Header; @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {         void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);         void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);         void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);         void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);         void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);}

6、mqtt事件监听类 

import lombok.extern.slf4j.Slf4j;import org.springframework.context.event.EventListener;import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;import org.springframework.integration.mqtt.event.MqttMessageSentEvent;import org.springframework.integration.mqtt.event.MqttSubscribedEvent;import org.springframework.stereotype.Component; @Slf4j@Componentpublic class MqttListener {        @EventListener(classes = MqttConnectionFailedEvent.class)    public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {        log.info("连接失败的事件通知");    }         @EventListener(classes = MqttMessageSentEvent.class)    public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {        log.info("已发送的事件通知");    }         @EventListener(classes = MqttMessageDeliveredEvent.class)    public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {        log.info("已传输完成的事件通知");    }         @EventListener(classes = MqttSubscribedEvent.class)    public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {        log.info("消息订阅的事件通知");    }}

 7、接口测试

@Resource    private MqttGateway mqttGateway;        @RequestMapping(value = "/sendMqtt",method = RequestMethod.POST)    public String sendMqtt(String sendData, String topic) {        MqttMessage mqttMessage = new MqttMessage();        mqttGateway.sendToMqtt(topic, sendData);        //mqttGateway.sendToMqttObject(topic, sendData.getBytes());        return "OK";    }

以上就是“SpringBoot如何实现MQTT消息发送和接收”这篇文章的所有内容,感谢各位的阅读!相信大家阅读完这篇文章都有很大的收获,小编每天都会为大家更新不同的知识,如果还想学习更多的知识,请关注编程网精选频道。

--结束END--

本文标题: SpringBoot如何实现MQTT消息发送和接收

本文链接: https://www.lsjlt.com/news/351360.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作