feat(iot): 重构 MQTT消息处理逻辑

- 更新 application-dev.yml 和 application-prod.yml,添加 MQTT 相关配置
- 重构 MqttConfig 类,优化 MQTT 消息处理流程
- 修改 DeviceController 中的 recordData 方法,使用分页查询
- 更新 RecordData 实体类,使用 JsonbTypeHandler 处理 content 字段
- 重构 RecordDataService接口,简化方法列表
- 新增 RecordDataServiceImpl 类,实现记录数据保存逻辑
- 删除未使用的 UploadDataSocketProcessor 类
This commit is contained in:
zhuangtianxiang 2025-03-01 12:54:21 +08:00
parent 8222b69e48
commit bf846e95a4
12 changed files with 281 additions and 461 deletions

View File

@ -158,6 +158,12 @@
<artifactId>spring-integration-mqtt</artifactId> <artifactId>spring-integration-mqtt</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.21</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId> <artifactId>spring-boot-starter-test</artifactId>

View File

@ -1,5 +1,7 @@
package com.zsc.edu.gateway.modules.iot.device.controller; package com.zsc.edu.gateway.modules.iot.device.controller;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.zsc.edu.gateway.framework.mybatisplus.DataPermission; import com.zsc.edu.gateway.framework.mybatisplus.DataPermission;
@ -110,7 +112,7 @@ public class DeviceController {
*/ */
@GetMapping("record/data") @GetMapping("record/data")
@PreAuthorize("hasAuthority('iot:device:query')") @PreAuthorize("hasAuthority('iot:device:query')")
public List<RecordData> recordData(String clientId) { public Page<RecordData> recordData(Page<RecordData> page, String clientId) {
return recordService.recordData(clientId); return recordService.page(page, new LambdaQueryWrapper<RecordData>().eq(RecordData::getClientId, clientId));
} }
} }

View File

@ -6,6 +6,7 @@ import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import jakarta.validation.constraints.NotNull; import jakarta.validation.constraints.NotNull;
import org.hibernate.validator.constraints.UniqueElements;
import java.util.Map; import java.util.Map;
@ -26,6 +27,7 @@ public class DeviceServeDto {
* 客户ID * 客户ID
*/ */
@NotNull @NotNull
@UniqueElements
public String clientId; public String clientId;
/** /**

View File

@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
//import com.zsc.edu.gateway.framework.json.MapJsonTypeHandler; //import com.zsc.edu.gateway.framework.json.MapJsonTypeHandler;
import com.zsc.edu.gateway.framework.json.JsonbTypeHandler;
import lombok.*; import lombok.*;
import org.springframework.format.annotation.DateTimeFormat; import org.springframework.format.annotation.DateTimeFormat;
@ -26,7 +27,7 @@ public class RecordData {
private String attachmentId; private String attachmentId;
// @TableField(typeHandler = MapJsonTypeHandler.class) @TableField(typeHandler = JsonbTypeHandler.class)
private Map<String, Object> content; private Map<String, Object> content;
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")

View File

@ -1,5 +1,6 @@
package com.zsc.edu.gateway.modules.iot.record.service; package com.zsc.edu.gateway.modules.iot.record.service;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService; import com.baomidou.mybatisplus.extension.service.IService;
import com.zsc.edu.gateway.modules.iot.record.entity.RecordData; import com.zsc.edu.gateway.modules.iot.record.entity.RecordData;
@ -10,9 +11,6 @@ import java.util.List;
* @author zhuang * @author zhuang
*/ */
public interface RecordDataService extends IService<RecordData> { public interface RecordDataService extends IService<RecordData> {
RecordData create(RecordData record);
List<RecordData> page(IPage<RecordData> page, String clientId); RecordData recordData(String clientId, JSONObject data);
List<RecordData> recordData(String clientId);
} }

View File

@ -1,5 +1,6 @@
package com.zsc.edu.gateway.modules.iot.record.service.impl; package com.zsc.edu.gateway.modules.iot.record.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
@ -11,10 +12,7 @@ import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeParseException;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.Optional;
/** /**
* @author zhuang * @author zhuang
@ -22,33 +20,15 @@ import java.util.Optional;
@AllArgsConstructor @AllArgsConstructor
@Service @Service
public class RecordDataServiceImpl extends ServiceImpl<RecordDataRepository, RecordData> implements RecordDataService { public class RecordDataServiceImpl extends ServiceImpl<RecordDataRepository, RecordData> implements RecordDataService {
RedisUtils redisUtils;
@Override
public RecordData create(RecordData record) {
save(record);
return record;
}
@Override
public List<RecordData> page(IPage<RecordData> page, String clientId) {
LambdaQueryWrapper<RecordData> queryWrapper = new LambdaQueryWrapper<>();
if (clientId != null) {
queryWrapper.eq(RecordData::getClientId, clientId);
return baseMapper.selectList(page, queryWrapper);
}
return baseMapper.selectList(page, queryWrapper);
}
@Override @Override
public List<RecordData> recordData(String clientId) { public RecordData recordData(String clientId, JSONObject data) {
LocalDateTime recordTime = LocalDateTime.parse(redisUtils.get("serve:sendTime:photograph:" + clientId)); RecordData recordData = new RecordData();
List<RecordData> records = baseMapper.selectList(new LambdaQueryWrapper<RecordData>() recordData.setClientId(clientId);
.eq(RecordData::getClientId, clientId).le(RecordData::getRecordTime, recordTime)); recordData.setContent(data);
Optional<RecordData> first = records.stream().max(Comparator.comparing(RecordData::getRecordTime)); recordData.setRecordTime(LocalDateTime.now());
if (first.isPresent()) { baseMapper.insert(recordData);
String recordTimeStr = (String) first.get().getContent().get("recordTimeStr"); return recordData;
return baseMapper.findByRecordTimeStr(recordTimeStr);
} else {
return records;
}
} }
} }

View File

@ -1,303 +1,177 @@
//package com.zsc.edu.gateway.modules.mqtt.config; package com.zsc.edu.gateway.modules.mqtt.config;
//
//import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo; import com.alibaba.fastjson.JSONObject;
//import jakarta.annotation.Resource; import com.alibaba.fastjson.JSONPath;
//import lombok.extern.slf4j.Slf4j; import com.fasterxml.jackson.databind.ObjectMapper;
//import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo;
//import org.springframework.beans.factory.annotation.Value; import com.zsc.edu.gateway.modules.iot.record.service.RecordDataService;
//import org.springframework.boot.configurationprocessor.json.JSONObject; import com.zsc.edu.gateway.modules.iot.record.service.impl.RecordDataServiceImpl;
//import org.springframework.context.ApplicationContext; import jakarta.annotation.Resource;
//import org.springframework.context.annotation.Bean; import lombok.extern.slf4j.Slf4j;
//import org.springframework.context.annotation.Configuration; import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
//import org.springframework.integration.annotation.ServiceActivator; import org.springframework.beans.factory.annotation.Value;
//import org.springframework.integration.channel.DirectChannel; import org.springframework.context.ApplicationContext;
//import org.springframework.integration.core.MessageProducer; import org.springframework.context.annotation.Bean;
//import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.context.annotation.Configuration;
//import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.annotation.ServiceActivator;
//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.channel.DirectChannel;
//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.core.MessageProducer;
//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
//import org.springframework.messaging.MessageChannel; import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
//import org.springframework.messaging.MessageHandler; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
//import org.springframework.util.StringUtils; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
// import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
//import java.io.IOException; import org.springframework.messaging.MessageChannel;
//import java.text.ParseException; import org.springframework.messaging.MessageHandler;
//import java.text.SimpleDateFormat; import org.springframework.util.StringUtils;
//import java.util.Date;
//import java.util.TimeZone; import java.io.IOException;
//import java.util.UUID; import java.text.ParseException;
// import java.text.SimpleDateFormat;
//@Slf4j import java.util.Date;
//@Configuration import java.util.TimeZone;
//public class MqttConfig { import java.util.UUID;
//
// @Resource @Slf4j
// private ApplicationContext applicationContext; @Configuration
// @Resource public class MqttConfig {
// private UploadDataSocketProcessor uploadDataSocketProcessor; @Resource
// private RecordDataService recordDataService;
//
// public static final String CHANNEL_OUT = "mqttOutboundChannel";
// public static final String CHANNEL_IN = "mqttInputChannel"; public static final String CHANNEL_OUT = "mqttOutboundChannel";
// public static final String CHANNEL_IN = "mqttInputChannel";
// private static final byte[] WILL_DATA;
// private static final byte[] WILL_DATA;
// static {
// WILL_DATA = "offline".getBytes(); static {
// } WILL_DATA = "offline".getBytes();
// }
// @Value("${gatherer.version}")
// private String gatherer; @Value("${gatherer.version}")
// @Value("${mqtt.username}") private String gatherer;
// private String username; @Value("${mqtt.username}")
// @Value("${mqtt.password}") private String username;
// private String password; @Value("${mqtt.password}")
// @Value("${mqtt.host}") private String password;
// private String host; @Value("${mqtt.host}")
// @Value("${mqtt.port}") private String host;
// private String port; @Value("${mqtt.port}")
// @Value("${mqtt.topic}") private String port;
// private String topic; @Value("${mqtt.topic}")
// @Value("${mqtt.qos}") private String topic;
// private Integer qos; @Value("${mqtt.qos}")
// private Integer qos;
// /**
// * 创建MqttPahoClientFactory设置MQTT Broker连接属性如果使用ssl验证也在这里设置 /**
// * @return factory * 创建MqttPahoClientFactory设置MQTT Broker连接属性如果使用ssl验证也在这里设置
// */ *
// @Bean * @return factory
// public MqttPahoClientFactory mqttPahoClientFactory() { */
// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); @Bean
// MqttConnectOptions options = new MqttConnectOptions(); public MqttPahoClientFactory mqttPahoClientFactory() {
// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// if(StringUtils.hasText(username)) { MqttConnectOptions options = new MqttConnectOptions();
// options.setUserName(username);
// } if (StringUtils.hasText(username)) {
// options.setUserName(username);
// options.setPassword(password.toCharArray()); }
//
// options.setServerURIs(new String[]{host + ":" + port}); options.setPassword(password.toCharArray());
//
// options.setConnectionTimeout(10); options.setServerURIs(new String[]{host + ":" + port});
//
// options.setKeepAliveInterval(20); options.setConnectionTimeout(10);
//
// options.setWill("willTopic", WILL_DATA, 2, false); options.setKeepAliveInterval(20);
// factory.setConnectionOptions(options);
// options.setWill("willTopic", WILL_DATA, 2, false);
// return factory; factory.setConnectionOptions(options);
// }
// return factory;
// /** }
// * 入站通道
// */ /**
// @Bean * 入站通道
// public MessageChannel mqttInputChannel() { */
// return new DirectChannel(); @Bean
// } public MessageChannel mqttInputChannel() {
// return new DirectChannel();
// /** }
// * 入站
// */ /**
// @Bean * 入站
// public MessageProducer inbound() { */
// // Paho客户端消息驱动通道适配器主要用来订阅主题 @Bean
// MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( public MessageProducer inbound() {
// "clientId-" + UUID.randomUUID(), // Paho客户端消息驱动通道适配器主要用来订阅主题
// mqttPahoClientFactory(), MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
// "/v1/devices/+/datas" "clientId-" + UUID.randomUUID(),
// ); mqttPahoClientFactory(),
// adapter.setCompletionTimeout(5000); "/v1/devices/+/datas"
// );
// // Paho消息转换器 adapter.setCompletionTimeout(5000);
// DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
// // Paho消息转换器
// // 按字节接收消息 DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
//// defaultPahoMessageConverter.setPayloadAsBytes(true);
// adapter.setConverter(defaultPahoMessageConverter); // 按字节接收消息
// adapter.setQos(qos); // defaultPahoMessageConverter.setPayloadAsBytes(true);
// adapter.setOutputChannel(mqttInputChannel()); adapter.setConverter(defaultPahoMessageConverter);
// return adapter; adapter.setQos(qos);
// adapter.setOutputChannel(mqttInputChannel());
// } return adapter;
//
// /** }
// * ServiceActivator注解表明 当前方法用于处理MQTT消息inputChannel参数指定了用于消费的channel
// * @return /**
// */ * ServiceActivator注解表明 当前方法用于处理MQTT消息inputChannel参数指定了用于消费的channel
// @Bean *
// @ServiceActivator(inputChannel = CHANNEL_IN) * @return
// public MessageHandler handler() { */
// return message -> { @Bean
// String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); @ServiceActivator(inputChannel = CHANNEL_IN)
// String deviceCode = topic.split("/")[3]; public MessageHandler handler() {
// String deviceGatherer = deviceCode.substring(0,3); return message -> {
// if (topic.matches("/v1/devices/\\d{12}/datas") && deviceGatherer.equals(gatherer)) { String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// JSONObject payload = JSONObject.parseObject(message.getPayload().toString()); String clientId = topic.split("/")[3];
// Integer deviceModel = Integer.valueOf(deviceCode.substring(3,5)); String deviceGatherer = clientId.substring(0, 3);
// log.info("获取设备编码:" + deviceCode); if (topic.matches("/v1/devices/\\d{12}/datas") && deviceGatherer.equals(gatherer)) {
// log.info("负载:" + payload); JSONObject payload = JSONObject.parseObject(message.getPayload().toString());
// JSONObject data = (JSONObject) JSONPath.eval(payload, "$.devices[0].services[0].data"); log.info("获取设备编码:" + clientId);
// DeviceVo deviceVo = null; log.info("负载:" + payload);
// switch (deviceModel) { JSONObject data = (JSONObject) JSONPath.eval(payload, "$.devices[0].services[0].data");
// case 2: recordDataService.recordData(clientId, data);
// deviceVo = decodeSpike(data, deviceCode, deviceModel); } else {
// break; log.error("丢弃信息,主题[" + topic + "]");
// case 5: }
// deviceVo = decodeCirculation(data, deviceCode, deviceModel); };
// break; }
// default:
// break;
// } /**
// if (deviceVo == null) return; // 或者抛出异常根据你的需求处理 * 出站通道
// try { */
// uploadDataSocketProcessor.processDeviceV2(null, deviceVo); @Bean
// } catch (IOException e) { public MessageChannel mqttOutboundChannel() {
// throw new RuntimeException(e); return new DirectChannel();
// } }
// } else {
// log.error("丢弃信息,主题["+ topic +"]"); /**
// } * 出站
// }; */
// } @Bean
// @ServiceActivator(inputChannel = CHANNEL_OUT)
// private DeviceVo decodeCirculation(JSONObject data, String deviceCode, Integer deviceModel) { public MessageHandler outbound() {
// MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
//// "iccid": "89860818102381420337", "clientId" + UUID.randomUUID(),
//// "imsi": "460088822108587", mqttPahoClientFactory()
//// "signal": 2, );
//// "recordTime": "20241212T003246Z", messageHandler.setAsync(true);
//// "battery": 100, messageHandler.setDefaultTopic(topic);
//// "current": 0, messageHandler.setConverter(new DefaultPahoMessageConverter());
//// "warning": 0 return messageHandler;
//
// String iccid = data.getString("iccid"); }
// String imsi = data.getString("imsi");
// Integer signal = data.getInteger("signal"); }
// String recordTimeString = data.getString("recordTime");
// // 使用 SimpleDateFormat 解析日期字符串
// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
// Date recordTime;
// try {
// recordTime = dateFormat.parse(recordTimeString);
// } catch (ParseException e) {
// log.error("无法解析日期字符串: " + recordTimeString, e);
// return null;
// }
// Integer battery = data.getInteger("battery");
// Float current = data.getFloat("current");
// Integer warning = data.getInteger("warning");
//
// Record record = new Record();
// record.setWarn(warning);
//// record.setDeviceId(deviceId);
// record.setDeviceSignal(signal);
// record.setBattery(battery);
// record.setTemp1(current);
// record.setShakeWarn(warning & 1);
// record.setTemp1Warn(warning >> 1 & 1);
// record.setTemp2Warn(warning >> 2 & 1);
// record.setTemp3Warn(warning >> 3 & 1);
// record.setEarlyWarn(warning >> 4 & 1);
// record.setWarn(warning);
// record.setRecordTime(recordTime);
// DeviceVo deviceVo = new DeviceVo();
// deviceVo.setClientId(deviceCode);
// deviceVo.setSimCode(iccid);
// deviceVo.setImeiCode(imsi);
// deviceVo.setDeviceModel(deviceModel);
// deviceVo.setWarn(warning);
// deviceVo.setRecordList(Lists.newArrayList(record));
// return deviceVo;
// }
//
// private static DeviceVo decodeSpike(JSONObject data, String deviceCode, Integer deviceModel) {
//// "longitude": "31.2412402273",
//// "latitude": "121.473992206",
//// "iccid": "89860321047601121407",
//// "imsi": "460115009044492",
//// "signal": 1,
//// "recordTime": "20241205T124818Z",
//// "battery": 76,
//// "warning": 0
//
// String iccid = data.getString("iccid");
// String imsi = data.getString("imsi");
// Integer signal = data.getInteger("signal");
// String recordTimeString = data.getString("recordTime");
// // 使用 SimpleDateFormat 解析日期字符串
// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
// Date recordTime;
// try {
// recordTime = dateFormat.parse(recordTimeString);
// } catch (ParseException e) {
// log.error("无法解析日期字符串: " + recordTimeString, e);
// return null;
// }
// Integer battery = data.getInteger("battery");
// Integer warning = data.getInteger("warning");
//
// int newWarn = 0;
// // 获取warn的最后三位
// int lastThreeBits = warning & 0b111;
// // 检查最后三位是否为011
// if (lastThreeBits == 0b011) {
// newWarn |= 1; // 设置newWarn的第一位为1
// }
// // 获取warn的第四位的值
// int fourthBit = (warning >> 3) & 1;
// // 将warn的第四位的值设置到newWarn的第八位
// if (fourthBit == 1) {
// newWarn |= (1 << 8); // 设置newWarn的第八位为1
// }
//
// Record record = new Record();
// record.setWarn(newWarn);
//// record.setDeviceId(deviceId);
// record.setDeviceSignal(signal);
// record.setBattery(battery);
// record.setShakeWarn(newWarn & 1);
// record.setTemp1Warn(newWarn >> 1 & 1);
// record.setTemp2Warn(newWarn >> 2 & 1);
// record.setTemp3Warn(newWarn >> 3 & 1);
// record.setEarlyWarn(newWarn >> 4 & 1);
// record.setWarn(newWarn);
// record.setRecordTime(recordTime);
// DeviceVo deviceVo = new DeviceVo();
// deviceVo.setClientId(deviceCode);
// deviceVo.setSimCode(iccid);
// deviceVo.setImeiCode(imsi);
// deviceVo.setDeviceModel(deviceModel);
// deviceVo.setWarn(newWarn);
// deviceVo.setRecordList(Lists.newArrayList(record));
// return deviceVo;
// }
//
//
// /**
// * 出站通道
// */
// @Bean
// public MessageChannel mqttOutboundChannel() {
// return new DirectChannel();
// }
//
// /**
// * 出站
// */
// @Bean
// @ServiceActivator(inputChannel = CHANNEL_OUT)
// public MessageHandler outbound() {
// MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
// "clientId" + UUID.randomUUID(),
// mqttPahoClientFactory()
// );
// messageHandler.setAsync(true);
// messageHandler.setDefaultTopic(topic);
// messageHandler.setConverter(new DefaultPahoMessageConverter());
// return messageHandler;
//
// }
//
//}

View File

@ -1,49 +1,54 @@
//package com.zsc.edu.gateway.modules.mqtt.config; package com.zsc.edu.gateway.modules.mqtt.config;
//
//import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.annotation.MessagingGateway;
//import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.integration.mqtt.support.MqttHeaders;
//import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Header;
//import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
//
//@Component @Component
//@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_OUT) @MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_OUT)
//public interface MqttSender { public interface MqttSender {
//
// /** /**
// * 定义重载方法用于消息发送 * 定义重载方法用于消息发送
// * @param payload *
// */ * @param payload
// void sendMsg(String payload); */
// void sendMsg(String payload);
// /**
// * 指定topic进行消息发送 /**
// * @param topic * 指定topic进行消息发送
// * @param payload *
// */ * @param topic
// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, String payload); * @param payload
// */
// /** void sendMsg(@Header(MqttHeaders.TOPIC) String topic, String payload);
// * 指定topic进行消息发送
// * @param topic /**
// * @param retain * 指定topic进行消息发送
// * @param payload *
// */ * @param topic
// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.RETAINED) boolean retain, String payload); * @param retain
// * @param payload
// */
// /** void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.RETAINED) boolean retain, String payload);
// * 指定topic进行消息发送
// * @param topic
// * @param qos /**
// * @param payload * 指定topic进行消息发送
// */ *
// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); * @param topic
// * @param qos
// /** * @param payload
// * 指定topic进行消息发送 */
// * @param topic void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
// * @param qos
// * @param payload /**
// */ * 指定topic进行消息发送
// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); *
//} * @param topic
* @param qos
* @param payload
*/
void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
}

View File

@ -1,73 +0,0 @@
package com.zsc.edu.gateway.modules.mqtt.config;
import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@Slf4j
public class UploadDataSocketProcessor {
/**
* 处理设备数据并上传
*
* @param context 上下文对象这里传入的是null
* @param deviceVo 设备数据对象
* @throws IOException 如果上传过程中发生IO异常
*/
public void processDeviceV2(Object context, DeviceVo deviceVo) throws IOException {
// 这里可以添加具体的上传逻辑
// 例如将设备数据转换为JSON格式并通过网络发送到服务器
if (deviceVo == null) {
log.warn("尝试处理空的DeviceVo对象");
return;
}
// 示例将DeviceVo对象转换为JSON字符串
String deviceDataJson = convertDeviceVoToJson(deviceVo);
log.info("设备数据JSON: {}", deviceDataJson);
// 示例通过网络上传设备数据
// 注意这里需要替换为实际的上传逻辑
uploadDeviceData(deviceDataJson);
}
/**
* 将DeviceVo对象转换为JSON字符串
*
* @param deviceVo 设备数据对象
* @return JSON字符串
*/
private String convertDeviceVoToJson(DeviceVo deviceVo) {
// 这里可以使用Jackson或其他JSON库进行转换
// 示例使用Jackson库
try {
return new com.fasterxml.jackson.databind.ObjectMapper().writeValueAsString(deviceVo);
} catch (Exception e) {
log.error("无法将DeviceVo对象转换为JSON", e);
return "{}";
}
}
/**
* 上传设备数据到服务器
*
* @param deviceDataJson 设备数据的JSON字符串
* @throws IOException 如果上传过程中发生IO异常
*/
private void uploadDeviceData(String deviceDataJson) throws IOException {
// 这里可以添加具体的上传逻辑
// 示例通过HTTP POST请求上传数据
// 注意这里需要替换为实际的上传逻辑
log.info("上传设备数据: {}", deviceDataJson);
// 示例使用HttpClient发送POST请求
// HttpClient httpClient = HttpClient.newHttpClient();
// HttpRequest request = HttpRequest.newBuilder()
// .uri(URI.create("http://example.com/upload"))
// .header("Content-Type", "application/json")
// .POST(HttpRequest.BodyPublishers.ofString(deviceDataJson))
// .build();
// HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
// log.info("上传响应: {}", response.body());
}
}

View File

@ -57,3 +57,14 @@ storage:
jwt: jwt:
secret: your_secret_key_here secret: your_secret_key_here
expiration: 3600 expiration: 3600
gatherer:
version: 123
mqtt:
username: you_mqtt_username
password: you_mqtt_password
host: tcp://8.134.67.99
port: 1883
topic: test/topic
qos: 0

View File

@ -39,3 +39,15 @@ spring:
storage: storage:
attachment: ./storage/attachment attachment: ./storage/attachment
temp: ./storage/temp temp: ./storage/temp
gatherer:
version: 1.0.0
mqtt:
username: you_mqtt_username
password: you_mqtt_password
host: tcp://8.134.67.99
port: 1883
topic: test/topic
qos: 0

View File

@ -862,3 +862,5 @@ VALUES ('Device1', TRUE, 2, 'HW1.0', 'FW1.0', 'FAC12345', 'CLI12345', 1, '{
}', 'admin', CURRENT_TIMESTAMP, 'admin', CURRENT_TIMESTAMP, 'Device 10 remark', 2, 10, 31.241249, 121.474001, }', 'admin', CURRENT_TIMESTAMP, 'admin', CURRENT_TIMESTAMP, 'Device 10 remark', 2, 10, 31.241249, 121.474001,
'icon10.png'); 'icon10.png');
ALTER TABLE iot_device
ADD CONSTRAINT uc_client_id UNIQUE (client_id);