iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >SpringBoot实现MQTT消息发送和接收方式
  • 597
分享到

SpringBoot实现MQTT消息发送和接收方式

SpringBoot MQTT消息MQTT消息发送MQTT消息接收 2023-03-11 11:03:33 597人浏览 安东尼

Python 官方文档:入门教程 => 点击学习

摘要

目录spring integration交互逻辑1、Maven依赖2、yaml配置文件3、MQtt生产者消费者配置类4、消息处理类 5、mQtt发送接口 6、mq

Spring integration交互逻辑

对于发布者:

1.消息通过消息网关发送出去,由 MessageChannel 的实例 DirectChannel 处理发送的细节。

2.DirectChannel 收到消息后,内部通过 MessageHandler 的实例 MqttPahoMessageHandler 发送到指定的 Topic。

对于订阅者:

1.通过注入 MessageProducerSupport 的实例 MqttPahoMessageDrivenChannelAdapter,实现订阅 Topic 和绑定消息消费的 MessageChannel

2.同样由 MessageChannel 的实例 DirectChannel 处理消费细节。

Channel 消息后会发送给我们自定义的 MqttInboundMessageHandler 实例进行消费。

可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定

1、maven依赖

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <version>2.5.1</version>
</dependency>
 
<!-- Https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>5.5.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.5.5</version>
</dependency>

2、yaml配置文件

#mqtt配置
mqtt:
  username: 123
  passWord: 123
  #MQTT-服务器连接地址,如果有多个,用逗号隔开
  url: tcp://127.0.0.1:1883
  #MQTT-连接服务器默认客户端ID
  client:
    id: ${random.value}
  default:
    #MQTT-默认的消息推送主题,实际可在调用接口时指定
    topic: topic,mqtt/test/#
    #连接超时
  completionTimeout: 3000

3、mqtt生产者消费者配置类

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
 
import java.util.Arrays;
import java.util.List;
 

@Configuration
@IntegrationComponentScan
@Slf4j
public class MqttSenderAndReceiveConfig {
 
    private static final byte[] WILL_DATA;
 
    static {
        WILL_DATA = "offline".getBytes();
    }
 
    @Autowired
    private MqttReceiveHandle mqttReceiveHandle;
 
    @Value("${mqtt.username}")
    private String username;
 
    @Value("${mqtt.password}")
    private String password;
 
    @Value("${mqtt.url}")
    private String hostUrl;
 
    @Value("${mqtt.client.id}")
    private String clientId;
 
    @Value("${mqtt.default.topic}")
    private String defaultTopic;
 
    @Value("${mqtt.completionTimeout}")
    private int completionTimeout;   //连接超时
 
    
    @Bean(value = "getMqttConnectOptions")
    public MqttConnectOptions getMqttConnectOptions1() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(true);
        // 设置超时时间 单位为秒
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{hostUrl});
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        mqttConnectOptions.seTKEepAliveInterval(10);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
        return mqttConnectOptions;
    }
 
    
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions1());
        return factory;
    }
 
    
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
 
    
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(defaultTopic);
        messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调
        //设置转换器 发送bytes数据
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        return messageHandler;
    }
 
    
    @Bean
    public MessageProducer inbound() {
        List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
        String[] topics = new String[topicList.size()];
        topicList.toArray(topics);
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);
        adapter.setCompletionTimeout(completionTimeout);
        DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
        converter.setPayloadAsBytes(true);
        adapter.setConverter(converter);
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
 
    
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
 
    
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //处理接收消息
                mqttReceiveHandle.handle(message);
            }
        };
    }
}

4、消息处理类 


@Slf4j
@Component
public class MqttReceiveHandle {
 
    public void handle(Message<?> message) {
        log.info("收到订阅消息: {}", message);
        String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
        log.info("消息主题:{}", topic);
        Object payLoad = message.getPayload();
        byte[] data = (byte[]) payLoad;
        Packet packet = Packet.parse(data);
        log.info("发送的Packet数据{}", JSON.tojsONString(packet));
 
    }
}

5、mqtt发送接口 

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
 

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
 
    
    void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
 
    
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
 
    
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
 
    
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);
 
    
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
}

6、mqtt事件监听类 

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
import org.springframework.stereotype.Component;
 
@Slf4j
@Component
public class MqttListener {
    
    @EventListener(classes = MqttConnectionFailedEvent.class)
    public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
        log.info("连接失败的事件通知");
    }
 
    
    @EventListener(classes = MqttMessageSentEvent.class)
    public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
        log.info("已发送的事件通知");
    }
 
    
    @EventListener(classes = MqttMessageDeliveredEvent.class)
    public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
        log.info("已传输完成的事件通知");
    }
 
    
    @EventListener(classes = MqttSubscribedEvent.class)
    public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
        log.info("消息订阅的事件通知");
    }
}

 7、接口测试

@Resource
    private MqttGateway mqttGateway;
    
    @RequestMapping(value = "/sendMqtt",method = RequestMethod.POST)
    public String sendMqtt(String sendData, String topic) {
        MqttMessage mqttMessage = new MqttMessage();
        mqttGateway.sendToMqtt(topic, sendData);
        //mqttGateway.sendToMqttObject(topic, sendData.getBytes());
        return "OK";
    }

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持编程网。

--结束END--

本文标题: SpringBoot实现MQTT消息发送和接收方式

本文链接: https://www.lsjlt.com/news/199331.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作