1.简介 MQTT(message queuing telemetry transport)是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和
MQTT(message queuing telemetry transport)是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的。由于以上轻量级的特点,是实现智能家居的首选传输协议,相比于XMPP,更加轻量级而且占用宽带低。
a.由于采用发布/订阅的消息模式,可以提供一对多的消息发布
b.轻量级,网络开销小
c.对负载内容会有屏蔽的消息传输
d.有三种消息发布质量(Qos):
qos=0:“至多一次”,这一级别会发生消息丢失或重复,消息发布依赖于tcp/IP网络
qos=1:“至少一次”,确保消息到达,但消息重复可能会发生
qos=2:“只有一次”,确保消息到达一次
下载页面:https://mosquitto.org/download/
下载页面:https://mqttx.app/zh
1、导入依赖
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mQttv3</artifactId> <version>1.2.5</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency>
2、配置mqtt信息
server: port: 6001 #项目端口mqtt: url: "tcp://localhost:1883" #mqtt地址:端口 username: "username" #mqtt账号 passWord: "password" #mqtt密码
3、创建实体类接收mqtt信息
import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;@Data@Component@ConfigurationProperties(value = "mqtt")public final class MqttConfig { private String url; private String username; private String password;}
4、创建mqtt工具类
package com.example.mqtt.util;import com.example.mqtt.listener.MqttListener;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component@Slf4jpublic class MqttUtil { @Autowired MqttConfig mqttConfig; private MqttClient client; @PostConstruct//初始化注解 private MqttClient initPublish() { if (client == null) { try { client = new MqttClient(mqttConfig.getUrl(), "mqtt_client", new MemoryPersistence()); // 连接参数 MqttConnectOptions options = new MqttConnectOptions(); // 设置用户名和密码 options.setUserName(mqttConfig.getUsername()); options.setPassword(mqttConfig.getPassword().toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(60); // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.seTKEepAliveInterval(60); // 连接 client.connect(options); } catch (MqttException e) { throw new RuntimeException(e); } } return client; } public void publish(String topic, String content) { try { // 创建消息并设置 QoS //qos: 0 最多一次, 1至少一次, 2仅一次 MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(0); // 发布消息 client.publish(topic, message); log.info("topic:{}", topic); log.info("发送消息:{}", content); } catch (Exception e) { e.printStackTrace(); } } public void subscribe() { //topic 接收多个可用#代替 // 接收消息 try { client.subscribe("mqtt/#", 0, new MqttListener()); } catch (MqttException e) { throw new RuntimeException(e); } }}
5创建接收信息实现类
import com.example.mqtt.factory.MqttFactory;import com.example.mqtt.service.MqttService;import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.IMqttMessageListener;import org.eclipse.paho.client.mqttv3.MqttMessage;@Slf4jpublic class MqttListener implements IMqttMessageListener { @Override public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { MqttService mqttService = MqttFactory.getMqttFactory(topic); if (mqttService != null){ mqttService.analysisMessage(new String(mqttMessage.getPayload())); }else{ log.error("接收topic异常:{}",topic); } log.info("接收topic:{}",topic); log.info("接收信息:{}",new String(mqttMessage.getPayload())); }}
6、创建抽象工厂
注:个人采用工厂设计模式,如不采用可直接在5.4里的subscribe方法实现,
如:
public void subscribe() { try { //"mqtt/test"表示只接收这个通道,其余的不接收,个人不建议,要考虑后续扩展 client.subscribe("mqtt/test", 0, new IMqttMessageListener() { @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { } }); } catch (MqttException e) { throw new RuntimeException(e); } }
创建工厂
import com.example.mqtt.service.MqttService;import com.example.mqtt.service.impl.HelloMqttServiceImpl;import com.example.mqtt.service.impl.TestMqttServiceImpl;public class MqttFactory { public static MqttService getMqttFactory(String topic){ switch (topic){ case "mqtt/test": return new TestMqttServiceImpl(); case "mqtt/hello": return new HelloMqttServiceImpl(); default : return null; } }}
7创建具体业务实现信息
controller层
import com.example.mqtt.util.MqttUtil;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.WEB.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class MqttClientController { @Autowired private MqttUtil mqttUtil; @GetMapping("/pushMqtt") public String pushMqtt(@RequestParam("message")String message){ mqttUtil.publish("mqtt/test",message); return "发送成功"; } @GetMapping("/subscribe") public String subscribe(){ mqttUtil.subscribe(); return "接收成功"; }}
service层
public interface MqttService { void analysisMessage(String message);}
serviiceImpl层
import com.example.mqtt.service.MqttService;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;@Slf4j@Servicepublic class HelloMqttServiceImpl implements MqttService { @Override public void analysisMessage(String message) { log.info("处理topic/hello信息:{}",message); }}
import com.example.mqtt.service.MqttService;import lombok.extern.slf4j.Slf4j;import org.springframework.stereotype.Service;@Slf4j@Servicepublic class TestMqttServiceImpl implements MqttService { @Override public void analysisMessage(String message) { log.info("处理mqtt/test信息:{}",message); }}
来源地址:https://blog.csdn.net/weixin_44905972/article/details/131665070
--结束END--
本文标题: java实现mqtt协议
本文链接: https://www.lsjlt.com/news/393091.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
2024-02-29
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0