iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > JAVA >如何在 Java 中使用 MQTT
  • 426
分享到

如何在 Java 中使用 MQTT

java物联网iotMQTT客户端 2023-09-04 08:09:36 426人浏览 八月长安
摘要

MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可在严重受限的硬件设备和低带宽、高延迟的网络上实现稳定传输。它凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山。

MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可在严重受限的硬件设备和低带宽、高延迟的网络上实现稳定传输。它凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山。

本文主要介绍如何在 Java 项目中使用 MQTT,实现客户端与服务器的连接、订阅和收发消息等功能。

引入客户端库

本文的开发环境为:

本文将使用 Eclipse Paho Java Client 作为客户端,该客户端是 Java 语言中使用最为广泛的 MQtT 客户端库。

添加以下依赖到项目 pom.xml 文件中。

<dependencies>   <dependency>       <groupId>org.eclipse.pahogroupId>       <artifactId>org.eclipse.paho.client.mqttv3artifactId>       <version>1.2.5version>   dependency>dependencies>

创建 MQTT 连接

MQTT 服务器

本文将使用 EMQX 提供的 免费公共 MQTT 服务器,该服务基于 EMQX 的 MQTT 云平台 创建。服务器接入信息如下:

  • Broker: broker.emqx.io(中国用户可以使用 broker-cn.emqx.io
  • tcp Port: 1883
  • SSL/TLS Port: 8883

普通 TCP 连接

设置 MQTT Broker 基本连接参数,用户名、密码为非必选参数。

String broker = "tcp://broker.emqx.io:1883";// TLS/SSL// String broker = "ssl://broker.emqx.io:8883";String username = "emqx";String passWord = "public";String clientid = "publish_client";

然后创建 MQTT 客户端并连接。

MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());client.connect(options);

说明

  • MqttClient: 同步调用客户端,使用阻塞方法通信。
  • MqttClientPersistence: 代表一个持久的数据存储,用于在传输过程中存储出站和入站的信息,使其能够传递到指定的 QoS。
  • MqttConnectOptions: 连接选项,用于指定连接的参数,下面列举一些常见的方法。
    • setUserName: 设置用户名
    • setPassword: 设置密码
    • setCleanSession: 设置是否清除会话
    • seTKEepAliveInterval: 设置心跳间隔
    • setConnectionTimeout: 设置连接超时时间
    • setAutomaticReconnect: 设置是否自动重连

TLS/SSL 连接

如果要使用自签名证书进行 TLS/SSL 连接,需添加 bcpkix-jdk15on 到 pom.xml 文件。

<dependency>   <groupId>org.bouncycastlegroupId>   <artifactId>bcpkix-jdk15onartifactId>   <version>1.70version>dependency>

然后使用如下代码创建 SSLUtils.java 文件。

package io.emqx.mqtt;import org.bouncycastle.jce.provider.BouncyCastleProvider;import org.bouncycastle.openssl.PEMKeyPair;import org.bouncycastle.openssl.PEMParser;import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter;import javax.net.ssl.KeyManagerFactory;import javax.net.ssl.SSLContext;import javax.net.ssl.SSLSocketFactory;import javax.net.ssl.TrustManagerFactory;import java.io.BufferedInputStream;import java.io.FileInputStream;import java.io.FileReader;import java.security.KeyPair;import java.security.KeyStore;import java.security.Security;import java.security.cert.CertificateFactory;import java.security.cert.X509Certificate;public class SSLUtils {   public static SSLSocketFactory getSocketFactory(final String caCrtFile,                       final String crtFile, final String keyFile, final String password)           throws Exception {       Security.addProvider(new BouncyCastleProvider());       // load CA certificate       X509Certificate caCert = null;       FileInputStream fis = new FileInputStream(caCrtFile);       BufferedInputStream bis = new BufferedInputStream(fis);       CertificateFactory cf = CertificateFactory.getInstance("X.509");       while (bis.available() > 0) {           caCert = (X509Certificate) cf.generateCertificate(bis);      }       // load client certificate       bis = new BufferedInputStream(new FileInputStream(crtFile));       X509Certificate cert = null;       while (bis.available() > 0) {           cert = (X509Certificate) cf.generateCertificate(bis);      }       // load client private key       PEMParser pemParser = new PEMParser(new FileReader(keyFile));       Object object = pemParser.readObject();       JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC");       KeyPair key = converter.getKeyPair((PEMKeyPair) object);       pemParser.close();       // CA certificate is used to authenticate server       KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());       caKs.load(null, null);       caKs.setCertificateEntry("ca-certificate", caCert);       TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509");       tmf.init(caKs);       // client key and certificates are sent to server so it can authenticate       KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());       ks.load(null, null);       ks.setCertificateEntry("certificate", cert);       ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),               new java.security.cert.Certificate[]{cert});       KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory              .getDefaultAlGorithm());       kmf.init(ks, password.toCharArray());       // finally, create SSL socket factory       SSLContext context = SSLContext.getInstance("TLSv1.2");       context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);       return context.getSocketFactory();  }}

参照如下设置 options

// 设置 SSL/TLS 连接地址String broker = "ssl://broker.emqx.io:8883";// 设置 socket factoryString caFilePath = "/cacert.pem";String clientCrtFilePath = "/client.pem";String clientKeyFilePath = "/client.key";SSLSocketFactory socketFactory = getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, "");options.setSocketFactory(socketFactory);

发布 MQTT 消息

创建一个发布客户端类 PublishSample,该类将发布一条 Hello MQTT 消息至主题 mqtt/test

package io.emqx.mqtt;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class PublishSample {   public static void main(String[] args) {       String broker = "tcp://broker.emqx.io:1883";       String topic = "mqtt/test";       String username = "emqx";       String password = "public";       String clientid = "publish_client";       String content = "Hello MQTT";       int qos = 0;       try {           MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());           // 连接参数           MqttConnectOptions options = new MqttConnectOptions();           // 设置用户名和密码           options.setUserName(username);           options.setPassword(password.toCharArray());           options.setConnectionTimeout(60);      options.setKeepAliveInterval(60);           // 连接           client.connect(options);           // 创建消息并设置 QoS           MqttMessage message = new MqttMessage(content.getBytes());           message.setQos(qos);           // 发布消息           client.publish(topic, message);           System.out.println("Message published");           System.out.println("topic: " + topic);           System.out.println("message content: " + content);           // 关闭连接           client.disconnect();           // 关闭客户端           client.close();      } catch (MqttException e) {           throw new RuntimeException(e);      }  }}

订阅 MQTT 主题

创建一个订阅客户端类 SubscribeSample,该类将订阅主题 mqtt/test

package io.emqx.mqtt;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class SubscribeSample {   public static void main(String[] args) {       String broker = "tcp://broker.emqx.io:1883";       String topic = "mqtt/test";       String username = "emqx";       String password = "public";       String clientid = "subscribe_client";       int qos = 0;       try {           MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());           // 连接参数           MqttConnectOptions options = new MqttConnectOptions();           options.setUserName(username);           options.setPassword(password.toCharArray());           options.setConnectionTimeout(60);      options.setKeepAliveInterval(60);           // 设置回调           client.setCallback(new MqttCallback() {               public void connectionLost(Throwable cause) {                   System.out.println("connectionLost: " + cause.getMessage());              }               public void messageArrived(String topic, MqttMessage message) {                   System.out.println("topic: " + topic);                   System.out.println("Qos: " + message.getQos());                   System.out.println("message content: " + new String(message.getPayload()));              }               public void deliveryComplete(IMqttDeliveryToken token) {                   System.out.println("deliveryComplete---------" + token.isComplete());              }          });           client.connect(options);           client.subscribe(topic, qos);      } catch (Exception e) {           e.printStackTrace();      }  }}

MqttCallback 说明:

  • connectionLost(Throwable cause): 连接丢失时被调用
  • messageArrived(String topic, MqttMessage message): 接收到消息时被调用
  • deliveryComplete(IMqttDeliveryToken token): 消息发送完成时被调用

测试

接下来运行 SubscribeSample,订阅 mqtt/test 主题。 然后运行 PublishSample,发布消息到 mqtt/test 主题。 我们将会看到发布端成功发布消息,同时订阅端接收到消息。

在这里插入图片描述

至此,我们完成了在 Java 中使用 Paho Java Client 来作为 MQTT 客户端连接到 公共 MQTT 服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅。

完整代码请见:https://github.com/emqx/MQTT-Client-Examples/tree/master/mqtt-client-Java

版权声明: 本文为 EMQ 原创,转载请注明出处。

原文链接:https://www.emqx.com/zh/blog/how-to-use-mqtt-in-java

来源地址:https://blog.csdn.net/emqx_broker/article/details/126605202

--结束END--

本文标题: 如何在 Java 中使用 MQTT

本文链接: https://www.lsjlt.com/news/393176.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • 如何在 Java 中使用 MQTT
    MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可在严重受限的硬件设备和低带宽、高延迟的网络上实现稳定传输。它凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山。...
    99+
    2023-09-04
    java 物联网 iot MQTT 客户端
  • 如何在Spring Boot中使用MQTT
    目录为什么选择MQTT MQTT, 启动! 使用方式 Client模式 创建工厂类 创建工具类 Spring Integration 总结 为什么选择MQTT MQTT的定义相信很...
    99+
    2024-04-02
  • 如何在uniapp项目中使用mqtt
    目录一、uniapp插件市场的参考插件二、具体引入过程1.安装mqtt和uuid2.页面引入mqtt并调用3.运行结果由于要取一些实时数据并在手机app上展示,就想到用mqtt进行即...
    99+
    2024-04-02
  • 在Django中如何使用MQTT的方法
    这篇文章主要介绍了在Django中如何使用MQTT的方法,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。安装MQTTPython环境下安装MQTT也很简单,需要注意的就是不要输...
    99+
    2023-06-15
  • 如何使用Spring integration在Springboot中集成Mqtt详解
    目录前言关于Spring IntergrationSpring Intergration核心组件Message(消息)Message Channel(消息管道)Message End...
    99+
    2023-02-24
    spring integration集成mqtt 集成mqtt mqtt通信
  • 怎么在Spring Boot中使用MQTT
    这篇文章给大家分享的是有关怎么在Spring Boot中使用MQTT的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。为什么选择MQTTMQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用...
    99+
    2023-06-14
  • 在Django中使用MQTT的方法
    安装MQTT Python环境下安装MQTT也很简单,需要注意的就是不要输错命令 在Python3环境下安装Python MQTT的命令是: pip3 install pah...
    99+
    2024-04-02
  • 怎么在uniapp项目中使用mqtt
    这篇文章将为大家详细讲解有关怎么在uniapp项目中使用mqtt,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一、uniapp插件市场的参考插件  https://ext.dcloud.net.cn/pl...
    99+
    2023-06-15
  • stack如何在java中使用
    stack如何在java中使用?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。java中stack类继承于vector,其特性为后进先出(lastinfirstout).入栈...
    99+
    2023-05-30
    java stack
  • synchronized如何在Java中使用
    synchronized如何在Java中使用 ?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。synchronized原理在java中,每一个对象有且仅有一个同步...
    99+
    2023-05-31
    java synchronized ava
  • 如何在java中使用Comparator
    这篇文章给大家介绍如何在java中使用Comparator,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。java基本数据类型有哪些Java的基本数据类型分为:1、整数类型,用来表示整数的数据类型。2、浮点类型,用来表示...
    99+
    2023-06-14
  • 如何在java中使用SelectableChannel
    如何在java中使用SelectableChannel?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Java有哪些集合类Java中的集合主要分为四类:1、Lis...
    99+
    2023-06-14
  • 如何在java中使用bean
    如何在java中使用bean?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。Java可以用来干什么Java主要应用于:1. web开发;2. Android开发;3. 客户端...
    99+
    2023-06-14
  • Stream如何在java中使用
    Stream如何在java中使用?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。Java可以用来干什么Java主要应用于:1. web开发;2. Android开发;3. 客...
    99+
    2023-06-14
  • ThreadLocal如何在Java中使用
    本篇文章给大家分享的是有关ThreadLocal如何在Java中使用,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。1. 应用场景ThreadLocal 的常见应用场景有两种:多...
    99+
    2023-06-15
  • JSONP如何在java中使用
    这篇文章给大家介绍JSONP如何在java中使用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。 json和JSONP这二者在开发中还是很常见的,此处JSON暂且不说。一个众所周知的问题,Ajax直接请求普通文...
    99+
    2023-05-31
    java jsonp ava
  • Selector如何在java中使用
    Selector如何在java中使用?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。java基本数据类型有哪些Java的基本数据类型分为:1、整数类型,用来表示整数的数据类型...
    99+
    2023-06-14
  • 如何在java中使用stringbuffer
    本篇文章给大家分享的是有关如何在java中使用stringbuffer,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。Java有哪些集合类Java中的集合主要分为四类:1、Lis...
    99+
    2023-06-14
  • 如何在java中使用BlockingQueue
    今天就跟大家聊聊有关如何在java中使用BlockingQueue,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。Java的特点有哪些Java的特点有哪些1.Java语言作为静态面向对...
    99+
    2023-06-14
  • Jwt如何在Java中使用
    这篇文章给大家介绍Jwt如何在Java中使用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。Java的优点是什么1. 简单,只需理解基本的概念,就可以编写适合于各种情况的应用程序;2. 面向对象;3. 分布性,Java是...
    99+
    2023-06-14
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作