物联网开发终端管理篇-java从MQTT获取设备数据,并通过Druid连接池把数据写入Mysql数据库(windows系统) 下面来给大家做个简单的数据对接,也就是通过写JAVA代码实现MQTT协议
package com.baidai;import com.fasterxml.jackson.databind.JSONnode;import com.fasterxml.jackson.databind.ObjectMapper;import org.eclipse.paho.client.mqttv3.*;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import java.sql.*;import java.text.SimpleDateFORMat;import java.util.Date;public class ClientMQTT implements MqttCallback { public static final String HOST = "tcp://127.0.0.1:1883";//(127.0.0.1也就是EMQX的ip地址) private static final String clientID = "clientXX";//(这个clientXX 可以随便写) private String TOPIC= "testtopic";//(这个testtopic 是EMQX的订阅主题,如果你对接别的数据,别人给了你主题,改这个就行) private MqttClient client; private MqttConnectOptions options; private String user = "admin";//(连接登录EMQX的账号) private String passWord = "xiaofang";//(连接登录EMQX的密码) private String driverName = "com.mysql.cj.jdbc.Driver";//(连接MySQL数据库) //private String driverName = "net.sourceforge.jtds.jdbc.Driver";//(连接SQLServer数据库) private String url = "";//根据不同数据库填写自己的数据库地址 private String userName = "";//填写自己的数据库名称 private String userPwd = "";//数据库对应密码 public void clientStart(){ try { client = new MqttClient(HOST,clientID,new MemoryPersistence()); options = new MqttConnectOptions(); options.setCleanSession(true); options.seTKEepAliveInterval(10); options.setConnectionTimeout(50); options.setUserName(user); options.setPassword(password.toCharArray()); client.setCallback(new ClientMQTT()); MqttTopic topic = client.getTopic(TOPIC); //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 options.setWill(topic,"close".getBytes(),1,true); client.connect(options); int[] Qos = {1}; String[] topic1 = {TOPIC}; client.subscribe(topic1,Qos); } catch (MqttException e) { e.printStackTrace(); } } public void connection(){ try { Class.forName(driverName); System.out.println("连接成功!!!"); } catch (ClassNotFoundException e) { e.printStackTrace(); System.out.println("驱动加载失败"); } try { //在这里可以先测试数据库能不能连接 Connection dbcon = DriverManager.getConnection(url,userName,userPwd); System.out.println("数据库连接成功!"); System.out.println("数据库连接成功!"); } catch (SQLException e) { e.printStackTrace(); System.out.println("连接失败"); } } @Override public void connectionLost(Throwable throwable) { System.out.println(throwable); //连接断掉会执行到这里 System.out.println("连接以断,请重新连接!!!"); }//接收EMQX上订阅主题的数据 @Override public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { try {//获取消息返回格式 String msg = new String(mqttMessage.getPayload()); if(msg.equals("close")){ return; } ObjectMapper objectMapper = new ObjectMapper(); jsonNode jsonNode = objectMapper.readTree(msg); String info= jsonNode.get("info").toString(); String time= jsonNode.get("time").toString().replaceAll("\"", "").replaceAll("/","");//格式化接收过来的时间 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddhhmmss"); Date productTime = simpleDateFormat.parse(timeStamp); String infoTwo = jsonNode.get("infoTwo ").toString();//按照对方返回过来的格式接收数据 JsonNode infoList = objectMapper.readTree(info); JsonNode infoTwoList = objectMapper.readTree(infoTwo); for (JsonNode dataJsonNode : infoList) { //这里就省略了 } for (JsonNode dataJsonNode : infoTwoList) { //这里就省略了 }//连接数据库 Connection dbcon = DriverManager.getConnection(url,userName,userPwd); Statement stmt = dbcon.createStatement();//如果是SqlServe不能自动生成id,可以用这个生成一个随机id ResultSet rs = stmt.executeQuery("select REPLACE(NEWID(), '-', '') as Id"); String id=""; while(rs.next()) { id=rs.getString("Id").toString(); } String sql="insert into 表的名称 (id,name,date,number,totalNumber,nowDate)"+ "values(?,?,?,?,?,?)"; SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); PreparedStatement preparedStatement = dbcon.prepareStatement(sql);//预编译下SQL语句 preparedStatement.setString( 1,id); preparedStatement.setString( 2,"测试"); preparedStatement.setString( 3, dateFormat.format(time));//获取时间 preparedStatement.setInt( 4,number);//数量 preparedStatement.setDouble( 5,totalNumber);//总数量 preparedStatement.setString( 6,dateFormat.format(System.currentTimeMillis()));//获取当前时间 //这里是执行上面的sql语句的方法 preparedStatement.executeUpdate(); //subscribe后会执行到这里 System.out.println("订阅消息的主题是:"+s); System.out.println("消息的ID是:"+mqttMessage.getId()); System.out.println("添加成功:"+msg); System.out.println("添加成功SQl语句:"+preparedStatement); }catch (Exception e){ System.out.println("插入错误信息:"+e); } } @Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { //publish可以执行到这里 System.out.println("This is deliveryComplete method----->"+iMqttDeliveryToken.isComplete()); } public static void main(String[] args) { ClientMQTT clientMQTT = new ClientMQTT(); clientMQTT.clientStart(); //在这里可以先测试数据库能不能连接 //clientMQTT.connection(); }}
@echo off
start javaw -jar MqttDataToMySQL.jar
exit
C:\java8\jdk1.8.0.131 为JDK的安装路径
@echo off
set JAVA_HOME=C:\java8\jdk1.8.0.131
set CLASSPATH=.;%JAVA_HOME%\lib\dt.jar;%JAVA_HOMe%\lib\tools.jar;
set Path=%JAVA_HOME%\bin;
start javaw -jar MqttDataToMySQL.jar
exit
“Exception in thread “main” java.lang.SecurityException: Invalid signature file digest for Manifest”
来源地址:https://blog.csdn.net/xiaofangzhen/article/details/129206771
--结束END--
本文标题: 物联网开发终端管理篇-java从MQTT获取设备数据,并通过Druid连接池把数据写入MySQL数据库(Windows系统)
本文链接: https://www.lsjlt.com/news/408979.html(转载时请注明来源链接)
有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341
下载Word文档到电脑,方便收藏和打印~
2024-05-13
2024-05-13
2024-05-13
2024-05-13
2024-05-12
2024-05-12
2024-05-12
2024-05-12
2024-05-12
2024-05-12
回答
回答
回答
回答
回答
回答
回答
回答
回答
回答
0