iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > JAVA >MQTT(二)Java整合MQTT
  • 747
分享到

MQTT(二)Java整合MQTT

java 2023-09-08 10:09:00 747人浏览 独家记忆
摘要

Java整合MQTT 上一节知道MQtT是一个通信协议,需要一个代理服务Broker;通信设备作为客户端Client,后台系统服务器也作为客户端Client。 经过了解选用EMQX作为代理服务Brok

Java整合MQTT

上一节知道MQtT是一个通信协议,需要一个代理服务Broker;通信设备作为客户端Client,后台系统服务器也作为客户端Client。

经过了解选用EMQX作为代理服务Broker(支持WEB界面查看)

后台服务使用spring Integration链接EMQX

1.EMQX

简介,EMQX 是一个开源分布式物联网 MQTT 消息服务器,它实现了 MQTT 协议的各种功能,并提供了可靠的消息传递、灵活的消息路由、可扩展的集群高可用性等特性。EMQ X 可以作为物联网应用的消息中间件,用于连接和管理大规模的物联网设备,实现设备之间的实时通信和数据传输。

语言上,EMQ X 是使用 Erlang/OTP 编程语言实现的。Erlang 是一种通用的函数式编程语言,它具有高并发、分布式和容错性的特点,非常适合构建可扩展和可靠的分布式系统。Erlang/OTP 是一个开发框架和平台,提供了一系列工具、库和标准库,用于开发并运行 Erlang 应用程序。

EMQ X 选择使用 Erlang/OTP 的主要原因是 Erlang 在处理并发和分布式通信方面具有优秀的性能和可靠性。Erlang 提供了轻量级进程(而非操作系统线程)的并发模型,每个进程具有独立的状态和消息传递机制,可以高效地处理大量的并发连接和消息处理。此外,Erlang 的可扩展性和容错性使得 EMQ X 能够在分布式环境中实现高可用性和高性能。

Erlang/OTP 还提供了许多用于网络通信、并发控制和错误处理的库和工具,这些功能对于构建物联网 MQTT 服务器非常有用。通过使用 Erlang/OTP,EMQ X 能够提供可靠的消息传递、灵活的消息路由和高可用性等关键功能,以满足物联网应用的需求

主要特点:

  1. MQTT 协议支持:EMQ X 是一个完全兼容 MQTT 协议的消息服务器,支持 MQTT 3.1.1 和 MQTT 5.0 版本。
  2. 分布式架构:EMQ X 可以在多台服务器上进行水平扩展,形成分布式集群,以处理大规模的设备连接和消息传输。
  3. 高可用性:EMQ X 支持主从复制和自动故障切换,以实现高可用性和容错性,保证设备和应用程序的持续可靠性。
  4. 灵活的消息路由:EMQ X 提供了丰富的消息路由功能,可以根据主题、内容、设备属性等灵活地进行消息过滤和转发。
  5. 安全性和权限控制:EMQ X 支持 SSL/TLS 加密和身份验证,可以对连接和消息进行安全保护,同时提供细粒度的权限控制,确保只有授权的设备和用户可以访问特定的主题和功能。
  6. 插件系统:EMQ X 提供了丰富的插件系统,可以通过插件扩展和定制功能,满足不同场景和需求的应用。
  7. 实时监控和统计:EMQ X 提供了实时监控和统计功能,可以监控设备连接数、消息发布和订阅情况等指标,帮助用户了解系统运行状态和性能。
  8. 多协议支持:除了 MQTT,EMQ X 还支持 AMQP、CoAP、websocket 等多种协议,以满足不同设备和应用的通信需求。

由于是测试关系,我选择下载windows版本

WEB登录地址:
http://localhost:18083/#/login

默认账户密码为

adminpublic

2.Spring Integration

Spring Integration 是一个基于 Spring 框架的集成框架,用于构建企业级应用程序中的消息驱动和事件驱动的系统。它提供了一组丰富的组件和工具,用于实现应用程序之间的异步消息传递、事件驱动的处理和系统集成。

Spring Integration 提供了一种声明式的方式来定义和管理消息流,它基于传统的企业集成模式(Enterprise Integration Patterns,EIP),使开发者能够通过简单的配置来实现消息的路由、转换、过滤、聚合、分割等操作。开发者可以使用 Spring Integration 提供的各种消息通道、消息适配器、消息处理器和消息端点来搭建一个灵活、可扩展和可靠的消息驱动系统。

Spring Integration 支持多种消息协议和传输方式,包括 JMS、AMQP、MQTT、Httptcp、UDP 等,可以与各种消息代理、消息队列和消息中间件进行集成。它还提供了与 Spring 框架的其他模块(如 Spring Boot、Spring mvc、Spring Batch 等)的无缝集成,使得构建复杂的企业应用程序变得更加简单和高效。

Spring Integration 的主要目标是帮助开发者构建可扩展和可维护的集成解决方案,通过解耦和模块化的方式来处理异步消息和事件。它广泛应用于各种领域,包括企业集成、消息驱动的微服务架构、大数据处理等。

POM引入依赖

                            org.springframework.integration            spring-integration-stream                            org.springframework.integration            spring-integration-mqtt                            org.springframework.boot            spring-boot-starter-integration        

配置类MqttConfiguration

@Component@Configuration@Data@ConfigurationProperties("mqtt")public class MqttConfiguration {    @Autowired    private MqttCustomerClient mqttCustomerClient;     private String host;    private String clientId;    private String username;    private String passWord;    private String topic;    private int timeout;    private int keepAlive;     @Bean    public MqttCustomerClient getMqttCustomerClient() {        mqttCustomerClient.connect(host, clientId, username, password, timeout,keepAlive);        // 以/#结尾表示订阅所有以test开头的主题        mqttCustomerClient.subscribe("test/#");        return mqttCustomerClient;    }}

封装客户端Bean对象MqttCustomerClient

package com.wn.mqtt.util; import lombok.extern.slf4j.Slf4j;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component; @Slf4j@Componentpublic class MqttCustomerClient {    @Autowired    private PushCallback pushCallback;      private static MqttClient client;     public  static MqttClient getClient(){        return  client;    }     public static void setClient(MqttClient client){        MqttCustomerClient.client=client;    }         public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){        MqttClient client;         try {            client=new MqttClient(host,clientID,new MemoryPersistence());            MqttConnectOptions options=new MqttConnectOptions();            options.setCleanSession(true);            options.setUserName(username);            options.setPassword(password.toCharArray());            options.setConnectionTimeout(timeout);            options.seTKEepAliveInterval(keeplive);            MqttCustomerClient.setClient(client);            try {                client.setCallback(pushCallback);                client.connect(options);            }catch (Exception e){                e.printStackTrace();            }        }catch (Exception e){            e.printStackTrace();        }    }         public void pushlish(String topic,String pushMessage){        pushlish(0,false,topic,pushMessage);    }         public void pushlish(int qos,boolean retained,String topic,String pushMessage){        MqttMessage message=new MqttMessage();        message.setQos(qos);        message.setRetained(retained);        message.setPayload(pushMessage.getBytes());        MqttTopic mqttTopic= MqttCustomerClient.getClient().getTopic(topic);        if(null== mqttTopic){            log.error("topic not exist");        }        MqttDeliveryToken token;        try {            token=mqttTopic.publish(message);            token.waitForCompletion();        }catch (MqttPersistenceException e){            e.printStackTrace();        }catch (MqttException e){            e.printStackTrace();        }    }         public void subscribe(String topic){        log.error("开始订阅主题" + topic);        subscribe(topic,0);    }     public void subscribe(String topic,int qos){        try {            MqttCustomerClient.getClient().subscribe(topic,qos);        }catch (MqttException e){            e.printStackTrace();        }    }}

消息监听类PushCallback

@Componentpublic class PushCallback implements MqttCallback {     private static MqttClient client;     @Override    public void connectionLost(Throwable throwable) {        if (client == null || !client.isConnected()) {            System.out.println("连接断开,正在重连....");        }    }     @Override    public void messageArrived(String topic, MqttMessage message) throws Exception {        System.out.println("接收消息主题 : " + topic);        System.out.println("接收消息Qos : " + message.getQos());        System.out.println("接收消息内容 : " + new String(message.getPayload()));    }     @Override    public void deliveryComplete(IMqttDeliveryToken token) {        System.out.println("deliveryComplete---------" + token.isComplete());    }}

测试类

@Autowiredprivate MqttCustomerClient mqttCustomerClient;@Testvoid pushlish() {    for (int i = 0; i < 10; i++) {        mqttCustomerClient.pushlish("test/device1", "hello mqtt............" + i);        try {            Thread.*sleep*(3000);        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

EMQX WEB监控界面接收消息数据

在这里插入图片描述

来源地址:https://blog.csdn.net/qq_42583549/article/details/131383562

--结束END--

本文标题: MQTT(二)Java整合MQTT

本文链接: https://www.lsjlt.com/news/399738.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • MQTT(二)Java整合MQTT
    Java整合MQTT 上一节知道MQTT是一个通信协议,需要一个代理服务Broker;通信设备作为客户端Client,后台系统服务器也作为客户端Client。 经过了解选用EMQX作为代理服务Brok...
    99+
    2023-09-08
    java
  • springboot如何整合mqtt
    这篇“springboot如何整合mqtt”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“springboot如何整合mqtt...
    99+
    2023-07-05
  • SpringBoot整合MQTT小结汇总
    目录前言:一、什么是mqtt二、主要思想发布/订阅模式三、MQTT重要概念3.1MQTTClient3.2MQTTBroker3.3MQTTConnection3.4MQTT主要参数...
    99+
    2024-04-02
  • Springboot整合mqtt服务的示例代码
    首先在pom文件里引入mqtt的依赖配置 <!--mqtt--> <dependency> <g...
    99+
    2024-04-02
  • springboot整合netty-mqtt-client实现Mqtt消息的订阅和发布示例
    目录1.添加依赖2.源码3.运行测试1.添加依赖 <dependency> <groupId>org.jetlinks</groupId>...
    99+
    2024-04-02
  • Laravel如何整合Workerman命令行监听MQTT
    这篇文章主要介绍“Laravel如何整合Workerman命令行监听MQTT”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“Laravel如何整合Workerman命令行监听MQTT”文章能帮助大家解...
    99+
    2023-07-04
  • java实现mqtt协议
    1.简介 MQTT(message queuing telemetry transport)是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和...
    99+
    2023-09-04
    java
  • ThinkPHP6.0 workerman/mqtt 与phpMQTT配合使用
    第一步:下载phpMQTT扩展下载地址,然后放在了扩展文件夹中 第二步:下载  workerman/mqtt 官方地址  通过composer进行安装 composer require workerman/mqtt 因为我是之前有用wo...
    99+
    2023-09-04
    PHP MQTT workerman linux Mosquitto
  • SpringBoot整合MQTT并实现异步线程调用的问题
    目录为什么选择MQTT使用背景代码实现基础代码异步线程处理实现为什么选择MQTT MQTT的定义相信很多人都能讲的头头是道,本文章也不讨论什么高大上的东西,旨在用最简单直观的方式让每...
    99+
    2024-04-02
  • 如何在 Java 中使用 MQTT
    MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可在严重受限的硬件设备和低带宽、高延迟的网络上实现稳定传输。它凭借简单易实现、支持 QoS、报文小等特点,占据了物联网协议的半壁江山。...
    99+
    2023-09-04
    java 物联网 iot MQTT 客户端
  • 使用springboot怎么结合mqtt进行开发
    这篇文章给大家介绍使用springboot怎么结合mqtt进行开发,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/...
    99+
    2023-05-31
    springboot mqtt
  • MQTT 常用客户端库介绍 (全面涵盖c,c++,java,c#,python)
    MQTT(Message Queuing Telemetry Transport)是一种轻量级的通信协议,被广泛应用于物联网和分布式系统中。它以其简单、可靠和高效的特性而备受推崇,成为连接设备和应用程序的首选协议。MQTT的重要性不言而...
    99+
    2023-09-08
    java 开发语言 mqtt c++ c语言
  • flex3整合java
    Flex3利用LCDS整合java开发: 1.       环境搭建: a)         下载flex3工具: Adobe AIR SDK Flex 3 SDK* Flex Builde...
    99+
    2023-01-31
    java
  • java整合WebSocket
    WebSocket 一、WebSocket介绍1、简介2、优势3、服务端注解 二、springboot整合1、引入依赖2、配置3、业务代码>>群聊>>单人聊天 三、部署webso...
    99+
    2023-10-03
    java websocket 开发语言
  • Java SpringBoot整合SpringCloud
    目录1. SpringCloud特点2. 分布式系统的三个指标CAP3. Eureka4. SpringCloud Demo4.1 registry4.2 api4.3 provid...
    99+
    2024-04-02
  • Java 整合 Modbus TCP
    Modbus协议 1.概述 概念 Modbus是一种串行通信协议,是Modicon公司(现在的施耐德电气 Schneider Electric)于1979年为使用可编程逻辑控制器(PLC)通信而发表。Modbus已经成为工业领域通信协议的业...
    99+
    2023-08-30
    java tcp/ip 物联网
  • Java之Spring整合Junit
    目录1 测试类中的问题和解决思路1.1 问题1.2 解决思路分析2 配置步骤2.1 第一步:拷贝整合 junit 的必备 jar 包到 lib 目录2.2 第二步:使用@RunWit...
    99+
    2023-05-14
    Java Spring整合Junit Spring整合Junit
  • Java SpringBoot如何整合ActiveMQ
    Java SpringBoot如何整合ActiveMQ,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。一、 如果要想在项目之中去使用 ActiveMQ 组件,则应...
    99+
    2023-06-05
  • Java ClickHouse整合—官方教程
    一、开发环境 OpenJDK版本 >= 17ClickHouse:20.7+  1、支持的数据类型 FormatSupportCommentAggregatedFunction❌limited to groupBitmap, and kno...
    99+
    2023-10-22
    clickhouse
  • Java基础之SpringBoot整合knife4j
    插件的特点 1、非常简洁清爽的UI设计,接口的快速搜索。 2、支持个性化设置,个性化设置包含: 请求参数缓存 动态请求参数 RequestMapping接口过滤 ...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作