diff --git a/pom.xml b/pom.xml
index 1a2d496..cbe0096 100644
--- a/pom.xml
+++ b/pom.xml
@@ -158,6 +158,12 @@
spring-integration-mqtt
+
+ com.alibaba
+ fastjson
+ 2.0.21
+
+
org.springframework.boot
spring-boot-starter-test
diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java b/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java
index fa546d8..a06707f 100644
--- a/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java
+++ b/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java
@@ -1,5 +1,7 @@
package com.zsc.edu.gateway.modules.iot.device.controller;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.zsc.edu.gateway.framework.mybatisplus.DataPermission;
@@ -110,7 +112,7 @@ public class DeviceController {
*/
@GetMapping("record/data")
@PreAuthorize("hasAuthority('iot:device:query')")
- public List recordData(String clientId) {
- return recordService.recordData(clientId);
+ public Page recordData(Page page, String clientId) {
+ return recordService.page(page, new LambdaQueryWrapper().eq(RecordData::getClientId, clientId));
}
}
diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/device/dto/DeviceServeDto.java b/src/main/java/com/zsc/edu/gateway/modules/iot/device/dto/DeviceServeDto.java
index 2705390..e85cffb 100644
--- a/src/main/java/com/zsc/edu/gateway/modules/iot/device/dto/DeviceServeDto.java
+++ b/src/main/java/com/zsc/edu/gateway/modules/iot/device/dto/DeviceServeDto.java
@@ -6,6 +6,7 @@ import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import jakarta.validation.constraints.NotNull;
+import org.hibernate.validator.constraints.UniqueElements;
import java.util.Map;
@@ -26,6 +27,7 @@ public class DeviceServeDto {
* 客户ID
*/
@NotNull
+ @UniqueElements
public String clientId;
/**
diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/record/entity/RecordData.java b/src/main/java/com/zsc/edu/gateway/modules/iot/record/entity/RecordData.java
index db5b850..53d5a92 100644
--- a/src/main/java/com/zsc/edu/gateway/modules/iot/record/entity/RecordData.java
+++ b/src/main/java/com/zsc/edu/gateway/modules/iot/record/entity/RecordData.java
@@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
//import com.zsc.edu.gateway.framework.json.MapJsonTypeHandler;
+import com.zsc.edu.gateway.framework.json.JsonbTypeHandler;
import lombok.*;
import org.springframework.format.annotation.DateTimeFormat;
@@ -26,7 +27,7 @@ public class RecordData {
private String attachmentId;
- // @TableField(typeHandler = MapJsonTypeHandler.class)
+ @TableField(typeHandler = JsonbTypeHandler.class)
private Map content;
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/record/service/RecordDataService.java b/src/main/java/com/zsc/edu/gateway/modules/iot/record/service/RecordDataService.java
index 441d7ec..b3ee38c 100644
--- a/src/main/java/com/zsc/edu/gateway/modules/iot/record/service/RecordDataService.java
+++ b/src/main/java/com/zsc/edu/gateway/modules/iot/record/service/RecordDataService.java
@@ -1,5 +1,6 @@
package com.zsc.edu.gateway.modules.iot.record.service;
+import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import com.zsc.edu.gateway.modules.iot.record.entity.RecordData;
@@ -10,9 +11,6 @@ import java.util.List;
* @author zhuang
*/
public interface RecordDataService extends IService {
- RecordData create(RecordData record);
- List page(IPage page, String clientId);
-
- List recordData(String clientId);
+ RecordData recordData(String clientId, JSONObject data);
}
\ No newline at end of file
diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/record/service/impl/RecordDataServiceImpl.java b/src/main/java/com/zsc/edu/gateway/modules/iot/record/service/impl/RecordDataServiceImpl.java
index 942e6fa..b48a7ad 100644
--- a/src/main/java/com/zsc/edu/gateway/modules/iot/record/service/impl/RecordDataServiceImpl.java
+++ b/src/main/java/com/zsc/edu/gateway/modules/iot/record/service/impl/RecordDataServiceImpl.java
@@ -1,5 +1,6 @@
package com.zsc.edu.gateway.modules.iot.record.service.impl;
+import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@@ -11,10 +12,7 @@ import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
-import java.time.format.DateTimeParseException;
-import java.util.Comparator;
import java.util.List;
-import java.util.Optional;
/**
* @author zhuang
@@ -22,33 +20,15 @@ import java.util.Optional;
@AllArgsConstructor
@Service
public class RecordDataServiceImpl extends ServiceImpl implements RecordDataService {
- RedisUtils redisUtils;
- @Override
- public RecordData create(RecordData record) {
- save(record);
- return record;
- }
- @Override
- public List page(IPage page, String clientId) {
- LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>();
- if (clientId != null) {
- queryWrapper.eq(RecordData::getClientId, clientId);
- return baseMapper.selectList(page, queryWrapper);
- }
- return baseMapper.selectList(page, queryWrapper);
- }
@Override
- public List recordData(String clientId) {
- LocalDateTime recordTime = LocalDateTime.parse(redisUtils.get("serve:sendTime:photograph:" + clientId));
- List records = baseMapper.selectList(new LambdaQueryWrapper()
- .eq(RecordData::getClientId, clientId).le(RecordData::getRecordTime, recordTime));
- Optional first = records.stream().max(Comparator.comparing(RecordData::getRecordTime));
- if (first.isPresent()) {
- String recordTimeStr = (String) first.get().getContent().get("recordTimeStr");
- return baseMapper.findByRecordTimeStr(recordTimeStr);
- } else {
- return records;
- }
+ public RecordData recordData(String clientId, JSONObject data) {
+ RecordData recordData = new RecordData();
+ recordData.setClientId(clientId);
+ recordData.setContent(data);
+ recordData.setRecordTime(LocalDateTime.now());
+ baseMapper.insert(recordData);
+ return recordData;
}
+
}
diff --git a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttConfig.java b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttConfig.java
index cd999a2..e0642b1 100644
--- a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttConfig.java
+++ b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttConfig.java
@@ -1,303 +1,177 @@
-//package com.zsc.edu.gateway.modules.mqtt.config;
-//
-//import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo;
-//import jakarta.annotation.Resource;
-//import lombok.extern.slf4j.Slf4j;
-//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-//import org.springframework.beans.factory.annotation.Value;
-//import org.springframework.boot.configurationprocessor.json.JSONObject;
-//import org.springframework.context.ApplicationContext;
-//import org.springframework.context.annotation.Bean;
-//import org.springframework.context.annotation.Configuration;
-//import org.springframework.integration.annotation.ServiceActivator;
-//import org.springframework.integration.channel.DirectChannel;
-//import org.springframework.integration.core.MessageProducer;
-//import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
-//import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
-//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
-//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
-//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
-//import org.springframework.messaging.MessageChannel;
-//import org.springframework.messaging.MessageHandler;
-//import org.springframework.util.StringUtils;
-//
-//import java.io.IOException;
-//import java.text.ParseException;
-//import java.text.SimpleDateFormat;
-//import java.util.Date;
-//import java.util.TimeZone;
-//import java.util.UUID;
-//
-//@Slf4j
-//@Configuration
-//public class MqttConfig {
-//
-// @Resource
-// private ApplicationContext applicationContext;
-// @Resource
-// private UploadDataSocketProcessor uploadDataSocketProcessor;
-//
-//
-// public static final String CHANNEL_OUT = "mqttOutboundChannel";
-// public static final String CHANNEL_IN = "mqttInputChannel";
-//
-// private static final byte[] WILL_DATA;
-//
-// static {
-// WILL_DATA = "offline".getBytes();
-// }
-//
-// @Value("${gatherer.version}")
-// private String gatherer;
-// @Value("${mqtt.username}")
-// private String username;
-// @Value("${mqtt.password}")
-// private String password;
-// @Value("${mqtt.host}")
-// private String host;
-// @Value("${mqtt.port}")
-// private String port;
-// @Value("${mqtt.topic}")
-// private String topic;
-// @Value("${mqtt.qos}")
-// private Integer qos;
-//
-// /**
-// * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用ssl验证也在这里设置
-// * @return factory
-// */
-// @Bean
-// public MqttPahoClientFactory mqttPahoClientFactory() {
-// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
-// MqttConnectOptions options = new MqttConnectOptions();
-//
-// if(StringUtils.hasText(username)) {
-// options.setUserName(username);
-// }
-//
-// options.setPassword(password.toCharArray());
-//
-// options.setServerURIs(new String[]{host + ":" + port});
-//
-// options.setConnectionTimeout(10);
-//
-// options.setKeepAliveInterval(20);
-//
-// options.setWill("willTopic", WILL_DATA, 2, false);
-// factory.setConnectionOptions(options);
-//
-// return factory;
-// }
-//
-// /**
-// * 入站通道
-// */
-// @Bean
-// public MessageChannel mqttInputChannel() {
-// return new DirectChannel();
-// }
-//
-// /**
-// * 入站
-// */
-// @Bean
-// public MessageProducer inbound() {
-// // Paho客户端消息驱动通道适配器,主要用来订阅主题
-// MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
-// "clientId-" + UUID.randomUUID(),
-// mqttPahoClientFactory(),
-// "/v1/devices/+/datas"
-// );
-// adapter.setCompletionTimeout(5000);
-//
-// // Paho消息转换器
-// DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
-//
-// // 按字节接收消息
-//// defaultPahoMessageConverter.setPayloadAsBytes(true);
-// adapter.setConverter(defaultPahoMessageConverter);
-// adapter.setQos(qos);
-// adapter.setOutputChannel(mqttInputChannel());
-// return adapter;
-//
-// }
-//
-// /**
-// * ServiceActivator注解表明: 当前方法用于处理MQTT消息,inputChannel参数指定了用于消费的channel
-// * @return
-// */
-// @Bean
-// @ServiceActivator(inputChannel = CHANNEL_IN)
-// public MessageHandler handler() {
-// return message -> {
-// String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
-// String deviceCode = topic.split("/")[3];
-// String deviceGatherer = deviceCode.substring(0,3);
-// if (topic.matches("/v1/devices/\\d{12}/datas") && deviceGatherer.equals(gatherer)) {
-// JSONObject payload = JSONObject.parseObject(message.getPayload().toString());
-// Integer deviceModel = Integer.valueOf(deviceCode.substring(3,5));
-// log.info("获取设备编码:" + deviceCode);
-// log.info("负载:" + payload);
-// JSONObject data = (JSONObject) JSONPath.eval(payload, "$.devices[0].services[0].data");
-// DeviceVo deviceVo = null;
-// switch (deviceModel) {
-// case 2:
-// deviceVo = decodeSpike(data, deviceCode, deviceModel);
-// break;
-// case 5:
-// deviceVo = decodeCirculation(data, deviceCode, deviceModel);
-// break;
-// default:
-// break;
-// }
-// if (deviceVo == null) return; // 或者抛出异常,根据你的需求处理
-// try {
-// uploadDataSocketProcessor.processDeviceV2(null, deviceVo);
-// } catch (IOException e) {
-// throw new RuntimeException(e);
-// }
-// } else {
-// log.error("丢弃信息,主题["+ topic +"]");
-// }
-// };
-// }
-//
-// private DeviceVo decodeCirculation(JSONObject data, String deviceCode, Integer deviceModel) {
-//
-//// "iccid": "89860818102381420337",
-//// "imsi": "460088822108587",
-//// "signal": 2,
-//// "recordTime": "20241212T003246Z",
-//// "battery": 100,
-//// "current": 0,
-//// "warning": 0
-//
-// String iccid = data.getString("iccid");
-// String imsi = data.getString("imsi");
-// Integer signal = data.getInteger("signal");
-// String recordTimeString = data.getString("recordTime");
-// // 使用 SimpleDateFormat 解析日期字符串
-// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
-// Date recordTime;
-// try {
-// recordTime = dateFormat.parse(recordTimeString);
-// } catch (ParseException e) {
-// log.error("无法解析日期字符串: " + recordTimeString, e);
-// return null;
-// }
-// Integer battery = data.getInteger("battery");
-// Float current = data.getFloat("current");
-// Integer warning = data.getInteger("warning");
-//
-// Record record = new Record();
-// record.setWarn(warning);
-//// record.setDeviceId(deviceId);
-// record.setDeviceSignal(signal);
-// record.setBattery(battery);
-// record.setTemp1(current);
-// record.setShakeWarn(warning & 1);
-// record.setTemp1Warn(warning >> 1 & 1);
-// record.setTemp2Warn(warning >> 2 & 1);
-// record.setTemp3Warn(warning >> 3 & 1);
-// record.setEarlyWarn(warning >> 4 & 1);
-// record.setWarn(warning);
-// record.setRecordTime(recordTime);
-// DeviceVo deviceVo = new DeviceVo();
-// deviceVo.setClientId(deviceCode);
-// deviceVo.setSimCode(iccid);
-// deviceVo.setImeiCode(imsi);
-// deviceVo.setDeviceModel(deviceModel);
-// deviceVo.setWarn(warning);
-// deviceVo.setRecordList(Lists.newArrayList(record));
-// return deviceVo;
-// }
-//
-// private static DeviceVo decodeSpike(JSONObject data, String deviceCode, Integer deviceModel) {
-//// "longitude": "31.2412402273",
-//// "latitude": "121.473992206",
-//// "iccid": "89860321047601121407",
-//// "imsi": "460115009044492",
-//// "signal": 1,
-//// "recordTime": "20241205T124818Z",
-//// "battery": 76,
-//// "warning": 0
-//
-// String iccid = data.getString("iccid");
-// String imsi = data.getString("imsi");
-// Integer signal = data.getInteger("signal");
-// String recordTimeString = data.getString("recordTime");
-// // 使用 SimpleDateFormat 解析日期字符串
-// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
-// Date recordTime;
-// try {
-// recordTime = dateFormat.parse(recordTimeString);
-// } catch (ParseException e) {
-// log.error("无法解析日期字符串: " + recordTimeString, e);
-// return null;
-// }
-// Integer battery = data.getInteger("battery");
-// Integer warning = data.getInteger("warning");
-//
-// int newWarn = 0;
-// // 获取warn的最后三位
-// int lastThreeBits = warning & 0b111;
-// // 检查最后三位是否为011
-// if (lastThreeBits == 0b011) {
-// newWarn |= 1; // 设置newWarn的第一位为1
-// }
-// // 获取warn的第四位的值
-// int fourthBit = (warning >> 3) & 1;
-// // 将warn的第四位的值设置到newWarn的第八位
-// if (fourthBit == 1) {
-// newWarn |= (1 << 8); // 设置newWarn的第八位为1
-// }
-//
-// Record record = new Record();
-// record.setWarn(newWarn);
-//// record.setDeviceId(deviceId);
-// record.setDeviceSignal(signal);
-// record.setBattery(battery);
-// record.setShakeWarn(newWarn & 1);
-// record.setTemp1Warn(newWarn >> 1 & 1);
-// record.setTemp2Warn(newWarn >> 2 & 1);
-// record.setTemp3Warn(newWarn >> 3 & 1);
-// record.setEarlyWarn(newWarn >> 4 & 1);
-// record.setWarn(newWarn);
-// record.setRecordTime(recordTime);
-// DeviceVo deviceVo = new DeviceVo();
-// deviceVo.setClientId(deviceCode);
-// deviceVo.setSimCode(iccid);
-// deviceVo.setImeiCode(imsi);
-// deviceVo.setDeviceModel(deviceModel);
-// deviceVo.setWarn(newWarn);
-// deviceVo.setRecordList(Lists.newArrayList(record));
-// return deviceVo;
-// }
-//
-//
-// /**
-// * 出站通道
-// */
-// @Bean
-// public MessageChannel mqttOutboundChannel() {
-// return new DirectChannel();
-// }
-//
-// /**
-// * 出站
-// */
-// @Bean
-// @ServiceActivator(inputChannel = CHANNEL_OUT)
-// public MessageHandler outbound() {
-// MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
-// "clientId" + UUID.randomUUID(),
-// mqttPahoClientFactory()
-// );
-// messageHandler.setAsync(true);
-// messageHandler.setDefaultTopic(topic);
-// messageHandler.setConverter(new DefaultPahoMessageConverter());
-// return messageHandler;
-//
-// }
-//
-//}
+package com.zsc.edu.gateway.modules.mqtt.config;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.JSONPath;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo;
+import com.zsc.edu.gateway.modules.iot.record.service.RecordDataService;
+import com.zsc.edu.gateway.modules.iot.record.service.impl.RecordDataServiceImpl;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.integration.annotation.ServiceActivator;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.core.MessageProducer;
+import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
+import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
+import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
+import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
+import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.messaging.MessageHandler;
+import org.springframework.util.StringUtils;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.UUID;
+
+@Slf4j
+@Configuration
+public class MqttConfig {
+ @Resource
+ private RecordDataService recordDataService;
+
+
+ public static final String CHANNEL_OUT = "mqttOutboundChannel";
+ public static final String CHANNEL_IN = "mqttInputChannel";
+
+ private static final byte[] WILL_DATA;
+
+ static {
+ WILL_DATA = "offline".getBytes();
+ }
+
+ @Value("${gatherer.version}")
+ private String gatherer;
+ @Value("${mqtt.username}")
+ private String username;
+ @Value("${mqtt.password}")
+ private String password;
+ @Value("${mqtt.host}")
+ private String host;
+ @Value("${mqtt.port}")
+ private String port;
+ @Value("${mqtt.topic}")
+ private String topic;
+ @Value("${mqtt.qos}")
+ private Integer qos;
+
+ /**
+ * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用ssl验证也在这里设置
+ *
+ * @return factory
+ */
+ @Bean
+ public MqttPahoClientFactory mqttPahoClientFactory() {
+ DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
+ MqttConnectOptions options = new MqttConnectOptions();
+
+ if (StringUtils.hasText(username)) {
+ options.setUserName(username);
+ }
+
+ options.setPassword(password.toCharArray());
+
+ options.setServerURIs(new String[]{host + ":" + port});
+
+ options.setConnectionTimeout(10);
+
+ options.setKeepAliveInterval(20);
+
+ options.setWill("willTopic", WILL_DATA, 2, false);
+ factory.setConnectionOptions(options);
+
+ return factory;
+ }
+
+ /**
+ * 入站通道
+ */
+ @Bean
+ public MessageChannel mqttInputChannel() {
+ return new DirectChannel();
+ }
+
+ /**
+ * 入站
+ */
+ @Bean
+ public MessageProducer inbound() {
+ // Paho客户端消息驱动通道适配器,主要用来订阅主题
+ MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
+ "clientId-" + UUID.randomUUID(),
+ mqttPahoClientFactory(),
+ "/v1/devices/+/datas"
+ );
+ adapter.setCompletionTimeout(5000);
+
+ // Paho消息转换器
+ DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
+
+ // 按字节接收消息
+// defaultPahoMessageConverter.setPayloadAsBytes(true);
+ adapter.setConverter(defaultPahoMessageConverter);
+ adapter.setQos(qos);
+ adapter.setOutputChannel(mqttInputChannel());
+ return adapter;
+
+ }
+
+ /**
+ * ServiceActivator注解表明: 当前方法用于处理MQTT消息,inputChannel参数指定了用于消费的channel
+ *
+ * @return
+ */
+ @Bean
+ @ServiceActivator(inputChannel = CHANNEL_IN)
+ public MessageHandler handler() {
+ return message -> {
+ String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
+ String clientId = topic.split("/")[3];
+ String deviceGatherer = clientId.substring(0, 3);
+ if (topic.matches("/v1/devices/\\d{12}/datas") && deviceGatherer.equals(gatherer)) {
+ JSONObject payload = JSONObject.parseObject(message.getPayload().toString());
+ log.info("获取设备编码:" + clientId);
+ log.info("负载:" + payload);
+ JSONObject data = (JSONObject) JSONPath.eval(payload, "$.devices[0].services[0].data");
+ recordDataService.recordData(clientId, data);
+ } else {
+ log.error("丢弃信息,主题[" + topic + "]");
+ }
+ };
+ }
+
+
+ /**
+ * 出站通道
+ */
+ @Bean
+ public MessageChannel mqttOutboundChannel() {
+ return new DirectChannel();
+ }
+
+ /**
+ * 出站
+ */
+ @Bean
+ @ServiceActivator(inputChannel = CHANNEL_OUT)
+ public MessageHandler outbound() {
+ MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
+ "clientId" + UUID.randomUUID(),
+ mqttPahoClientFactory()
+ );
+ messageHandler.setAsync(true);
+ messageHandler.setDefaultTopic(topic);
+ messageHandler.setConverter(new DefaultPahoMessageConverter());
+ return messageHandler;
+
+ }
+
+}
diff --git a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttSender.java b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttSender.java
index a39e237..e6d7456 100644
--- a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttSender.java
+++ b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttSender.java
@@ -1,49 +1,54 @@
-//package com.zsc.edu.gateway.modules.mqtt.config;
-//
-//import org.springframework.integration.annotation.MessagingGateway;
-//import org.springframework.integration.mqtt.support.MqttHeaders;
-//import org.springframework.messaging.handler.annotation.Header;
-//import org.springframework.stereotype.Component;
-//
-//@Component
-//@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_OUT)
-//public interface MqttSender {
-//
-// /**
-// * 定义重载方法,用于消息发送
-// * @param payload
-// */
-// void sendMsg(String payload);
-//
-// /**
-// * 指定topic进行消息发送
-// * @param topic
-// * @param payload
-// */
-// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, String payload);
-//
-// /**
-// * 指定topic进行消息发送
-// * @param topic
-// * @param retain
-// * @param payload
-// */
-// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.RETAINED) boolean retain, String payload);
-//
-//
-// /**
-// * 指定topic进行消息发送
-// * @param topic
-// * @param qos
-// * @param payload
-// */
-// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
-//
-// /**
-// * 指定topic进行消息发送
-// * @param topic
-// * @param qos
-// * @param payload
-// */
-// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
-//}
+package com.zsc.edu.gateway.modules.mqtt.config;
+
+import org.springframework.integration.annotation.MessagingGateway;
+import org.springframework.integration.mqtt.support.MqttHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+@Component
+@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_OUT)
+public interface MqttSender {
+
+ /**
+ * 定义重载方法,用于消息发送
+ *
+ * @param payload
+ */
+ void sendMsg(String payload);
+
+ /**
+ * 指定topic进行消息发送
+ *
+ * @param topic
+ * @param payload
+ */
+ void sendMsg(@Header(MqttHeaders.TOPIC) String topic, String payload);
+
+ /**
+ * 指定topic进行消息发送
+ *
+ * @param topic
+ * @param retain
+ * @param payload
+ */
+ void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.RETAINED) boolean retain, String payload);
+
+
+ /**
+ * 指定topic进行消息发送
+ *
+ * @param topic
+ * @param qos
+ * @param payload
+ */
+ void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
+
+ /**
+ * 指定topic进行消息发送
+ *
+ * @param topic
+ * @param qos
+ * @param payload
+ */
+ void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
+}
diff --git a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/UploadDataSocketProcessor.java b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/UploadDataSocketProcessor.java
deleted file mode 100644
index 0168e74..0000000
--- a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/UploadDataSocketProcessor.java
+++ /dev/null
@@ -1,73 +0,0 @@
-package com.zsc.edu.gateway.modules.mqtt.config;
-
-import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-
-@Slf4j
-public class UploadDataSocketProcessor {
-
- /**
- * 处理设备数据并上传
- *
- * @param context 上下文对象,这里传入的是null
- * @param deviceVo 设备数据对象
- * @throws IOException 如果上传过程中发生IO异常
- */
- public void processDeviceV2(Object context, DeviceVo deviceVo) throws IOException {
- // 这里可以添加具体的上传逻辑
- // 例如,将设备数据转换为JSON格式并通过网络发送到服务器
- if (deviceVo == null) {
- log.warn("尝试处理空的DeviceVo对象");
- return;
- }
-
- // 示例:将DeviceVo对象转换为JSON字符串
- String deviceDataJson = convertDeviceVoToJson(deviceVo);
- log.info("设备数据JSON: {}", deviceDataJson);
-
- // 示例:通过网络上传设备数据
- // 注意:这里需要替换为实际的上传逻辑
- uploadDeviceData(deviceDataJson);
- }
-
- /**
- * 将DeviceVo对象转换为JSON字符串
- *
- * @param deviceVo 设备数据对象
- * @return JSON字符串
- */
- private String convertDeviceVoToJson(DeviceVo deviceVo) {
- // 这里可以使用Jackson或其他JSON库进行转换
- // 示例:使用Jackson库
- try {
- return new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(deviceVo);
- } catch (Exception e) {
- log.error("无法将DeviceVo对象转换为JSON", e);
- return "{}";
- }
- }
-
- /**
- * 上传设备数据到服务器
- *
- * @param deviceDataJson 设备数据的JSON字符串
- * @throws IOException 如果上传过程中发生IO异常
- */
- private void uploadDeviceData(String deviceDataJson) throws IOException {
- // 这里可以添加具体的上传逻辑
- // 示例:通过HTTP POST请求上传数据
- // 注意:这里需要替换为实际的上传逻辑
- log.info("上传设备数据: {}", deviceDataJson);
- // 示例:使用HttpClient发送POST请求
- // HttpClient httpClient = HttpClient.newHttpClient();
- // HttpRequest request = HttpRequest.newBuilder()
- // .uri(URI.create("http://example.com/upload"))
- // .header("Content-Type", "application/json")
- // .POST(HttpRequest.BodyPublishers.ofString(deviceDataJson))
- // .build();
- // HttpResponse response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
- // log.info("上传响应: {}", response.body());
- }
-}
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index 95c0800..4653eb7 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -57,3 +57,14 @@ storage:
jwt:
secret: your_secret_key_here
expiration: 3600
+
+gatherer:
+ version: 123
+
+mqtt:
+ username: you_mqtt_username
+ password: you_mqtt_password
+ host: tcp://8.134.67.99
+ port: 1883
+ topic: test/topic
+ qos: 0
diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml
index 865a751..8922597 100644
--- a/src/main/resources/application-prod.yml
+++ b/src/main/resources/application-prod.yml
@@ -39,3 +39,15 @@ spring:
storage:
attachment: ./storage/attachment
temp: ./storage/temp
+
+gatherer:
+ version: 1.0.0
+
+mqtt:
+ username: you_mqtt_username
+ password: you_mqtt_password
+ host: tcp://8.134.67.99
+ port: 1883
+ topic: test/topic
+ qos: 0
+
diff --git a/src/main/resources/db/gateway.sql b/src/main/resources/db/gateway.sql
index 1c92a36..8d41168 100644
--- a/src/main/resources/db/gateway.sql
+++ b/src/main/resources/db/gateway.sql
@@ -862,3 +862,5 @@ VALUES ('Device1', TRUE, 2, 'HW1.0', 'FW1.0', 'FAC12345', 'CLI12345', 1, '{
}', 'admin', CURRENT_TIMESTAMP, 'admin', CURRENT_TIMESTAMP, 'Device 10 remark', 2, 10, 31.241249, 121.474001,
'icon10.png');
+ALTER TABLE iot_device
+ ADD CONSTRAINT uc_client_id UNIQUE (client_id);
\ No newline at end of file