feat(iot): 添加 MQTT 消息处理功能

- 新增 MqttConfig、MqttMessageHandlerService、UploadDataSocketProcessor 等类
- 实现 MQTT 消息的接收、处理和转发逻辑
- 添加设备数据解析和上传
This commit is contained in:
zhuangtianxiang 2025-02-28 16:49:06 +08:00
parent 0ecd639619
commit 600d779fb5
14 changed files with 749 additions and 8 deletions

View File

@ -156,6 +156,15 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId> <artifactId>spring-boot-starter-aop</artifactId>
</dependency> </dependency>
<!--mqtt依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency> <dependency>

View File

@ -19,14 +19,20 @@ public class MyMetaObjectHandler implements MetaObjectHandler {
@Override @Override
public void insertFill(MetaObject metaObject) { public void insertFill(MetaObject metaObject) {
UserDetailsImpl userInfo = SecurityUtil.getUserInfo(); UserDetailsImpl userInfo = SecurityUtil.getUserInfo();
if (userInfo.getUsername() == null) { if (userInfo.getUsername() == null) {
userInfo.setUsername("system"); userInfo.setUsername("system");
} }
if (userInfo.getDeptId() == null) {
userInfo.setDeptId(2L);
}
if (userInfo.getCreateId() == null) {
userInfo.setCreateId(1L);
}
this.strictInsertFill(metaObject, "createTime", LocalDateTime.class, LocalDateTime.now()); this.strictInsertFill(metaObject, "createTime", LocalDateTime.class, LocalDateTime.now());
this.strictInsertFill(metaObject, "createBy", String.class, userInfo.getUsername()); this.strictInsertFill(metaObject, "createBy", String.class, userInfo.getUsername());
this.strictInsertFill(metaObject, "deptId", Long.class, userInfo.getDept().getId());
this.strictInsertFill(metaObject, "createId", Long.class, userInfo.getId());
} }
@Override @Override
@ -40,4 +46,5 @@ public class MyMetaObjectHandler implements MetaObjectHandler {
} }
} }

View File

@ -37,8 +37,10 @@ public class UserDetailsImpl implements UserDetails {
public Set<Authority> authorities; public Set<Authority> authorities;
public Set<String> permissions; public Set<String> permissions;
public Set<Long> dataScopeDeptIds; public Set<Long> dataScopeDeptIds;
public Long deptId;
public Long createId;
public UserDetailsImpl(Long id, String username, String password, String name, Boolean enableState, Dept dept, Set<Long> dataScopeDeptIds, Role role, Set<Authority> authorities, Set<String> permissions, List<Role> roles) { public UserDetailsImpl(Long id, String username, String password, String name, Boolean enableState, Dept dept, Set<Long> dataScopeDeptIds, Role role, Set<Authority> authorities, Set<String> permissions, List<Role> roles, Long deptId, Long createId) {
this.id = id; this.id = id;
this.username = username; this.username = username;
this.password = password; this.password = password;
@ -50,6 +52,8 @@ public class UserDetailsImpl implements UserDetails {
this.authorities = authorities; this.authorities = authorities;
this.permissions = permissions; this.permissions = permissions;
this.roles = roles; this.roles = roles;
this.deptId = deptId;
this.createId = createId;
} }
public static UserDetailsImpl from(User user, Set<String> permissions) { public static UserDetailsImpl from(User user, Set<String> permissions) {
@ -64,7 +68,9 @@ public class UserDetailsImpl implements UserDetails {
user.role, user.role,
user.role.authorities, user.role.authorities,
permissions, permissions,
user.roles user.roles,
user.deptId,
user.createId
); );
} }

View File

@ -1,5 +1,6 @@
package com.zsc.edu.gateway.modules.iot.device.entity; package com.zsc.edu.gateway.modules.iot.device.entity;
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.IEnum; import com.baomidou.mybatisplus.annotation.IEnum;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
@ -91,6 +92,11 @@ public class Device extends BaseEntity {
@TableField(exist = false) @TableField(exist = false)
public Product product; public Product product;
@TableField(value = "dept_id", fill = FieldFill.INSERT)
public Long deptId;
//TODO 经纬度 Location
public enum Status implements IEnum<Integer>, IState<Status> { public enum Status implements IEnum<Integer>, IState<Status> {
UNACTIVATED(0, "未激活"), UNACTIVATED(0, "未激活"),

View File

@ -170,6 +170,9 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceRepository, Device> imp
.filter(Objects::nonNull) .filter(Objects::nonNull)
.distinct() .distinct()
.collect(Collectors.toList()); .collect(Collectors.toList());
if (productIds.isEmpty()) {
return devicePage;
}
LambdaQueryWrapper<Product> productQueryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<Product> productQueryWrapper = new LambdaQueryWrapper<>();
productQueryWrapper.in(Product::getId, productIds); productQueryWrapper.in(Product::getId, productIds);
List<Product> products = productRepo.selectList(productQueryWrapper); List<Product> products = productRepo.selectList(productQueryWrapper);

View File

@ -1,5 +1,6 @@
package com.zsc.edu.gateway.modules.iot.product.entity; package com.zsc.edu.gateway.modules.iot.product.entity;
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.IEnum; import com.baomidou.mybatisplus.annotation.IEnum;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
@ -32,6 +33,7 @@ public class Product extends BaseEntity {
/** /**
* 部门ID * 部门ID
*/ */
@TableField(value = "dept_id", fill = FieldFill.INSERT)
private Long deptId; private Long deptId;
/** /**
* 产品类型 * 产品类型

View File

@ -1,8 +1,6 @@
package com.zsc.edu.gateway.modules.iot.tsl.entity; package com.zsc.edu.gateway.modules.iot.tsl.entity;
import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.*;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@ -43,6 +41,13 @@ public class BaseParam implements Serializable {
/** /**
* 部门ID(权限) * 部门ID(权限)
*/ */
@TableField(value = "dept_id", fill = FieldFill.INSERT)
public Long deptId; public Long deptId;
/**
* 创建人ID
*/
@TableField(value = "create_id", fill = FieldFill.INSERT)
public Long createId;
} }

View File

@ -17,7 +17,7 @@ import java.util.Set;
/** /**
* @author Yao * @author Yao
* @desciption 物模型服务 * @desciption 物模型事件
*/ */
@Setter @Setter
@Getter @Getter

View File

@ -0,0 +1,302 @@
//package com.zsc.edu.gateway.modules.mqtt.config;
//
//import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo;
//import jakarta.annotation.Resource;
//import lombok.extern.slf4j.Slf4j;
//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.boot.configurationprocessor.json.JSONObject;
//import org.springframework.context.ApplicationContext;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.integration.annotation.ServiceActivator;
//import org.springframework.integration.channel.DirectChannel;
//import org.springframework.integration.core.MessageProducer;
//import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
//import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
//import org.springframework.messaging.MessageChannel;
//import org.springframework.messaging.MessageHandler;
//import org.springframework.util.StringUtils;
//
//import java.io.IOException;
//import java.text.ParseException;
//import java.text.SimpleDateFormat;
//import java.util.Date;
//import java.util.UUID;
//
//@Slf4j
//@Configuration
//public class MqttBConfig {
//
// @Resource
// private ApplicationContext applicationContext;
// @Resource
// private UploadDataSocketProcessor uploadDataSocketProcessor;
//
//
// public static final String CHANNEL_OUT = "mqttOutboundChannel";
// public static final String CHANNEL_IN = "mqttInputChannel";
//
// private static final byte[] WILL_DATA;
//
// static {
// WILL_DATA = "offline".getBytes();
// }
//
// @Value("${gatherer.version}")
// private String gatherer;
// @Value("${mqtt.username}")
// private String username;
// @Value("${mqtt.password}")
// private String password;
// @Value("${mqtt.host}")
// private String host;
// @Value("${mqtt.port}")
// private String port;
// @Value("${mqtt.topic}")
// private String topic;
// @Value("${mqtt.qos}")
// private Integer qos;
//
// /**
// * 创建MqttPahoClientFactory设置MQTT Broker连接属性如果使用ssl验证也在这里设置
// * @return factory
// */
// @Bean
// public MqttPahoClientFactory mqttPahoClientFactory() {
// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// MqttConnectOptions options = new MqttConnectOptions();
//
// if(StringUtils.hasText(username)) {
// options.setUserName(username);
// }
//
// options.setPassword(password.toCharArray());
//
// options.setServerURIs(new String[]{host + ":" + port});
//
// options.setConnectionTimeout(10);
//
// options.setKeepAliveInterval(20);
//
// options.setWill("willTopic", WILL_DATA, 2, false);
// factory.setConnectionOptions(options);
//
// return factory;
// }
//
// /**
// * 入站通道
// */
// @Bean
// public MessageChannel mqttInputChannel() {
// return new DirectChannel();
// }
//
// /**
// * 入站
// */
// @Bean
// public MessageProducer inbound() {
// // Paho客户端消息驱动通道适配器主要用来订阅主题
// MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
// "clientId-" + UUID.randomUUID(),
// mqttPahoClientFactory(),
// "/v1/devices/+/datas"
// );
// adapter.setCompletionTimeout(5000);
//
// // Paho消息转换器
// DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
//
// // 按字节接收消息
//// defaultPahoMessageConverter.setPayloadAsBytes(true);
// adapter.setConverter(defaultPahoMessageConverter);
// adapter.setQos(qos);
// adapter.setOutputChannel(mqttInputChannel());
// return adapter;
//
// }
//
// /**
// * ServiceActivator注解表明 当前方法用于处理MQTT消息inputChannel参数指定了用于消费的channel
// * @return
// */
// @Bean
// @ServiceActivator(inputChannel = CHANNEL_IN)
// public MessageHandler handler() {
// return message -> {
// String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// String deviceCode = topic.split("/")[3];
// String deviceGatherer = deviceCode.substring(0,3);
// if (topic.matches("/v1/devices/\\d{12}/datas") && deviceGatherer.equals(gatherer)) {
// JSONObject payload = JSONObject.parseObject(message.getPayload().toString());
// Integer deviceModel = Integer.valueOf(deviceCode.substring(3,5));
// log.info("获取设备编码:" + deviceCode);
// log.info("负载:" + payload);
// JSONObject data = (JSONObject) JSONPath.eval(payload, "$.devices[0].services[0].data");
// DeviceVo deviceVo = null;
// switch (deviceModel) {
// case 2:
// deviceVo = decodeSpike(data, deviceCode, deviceModel);
// break;
// case 5:
// deviceVo = decodeCirculation(data, deviceCode, deviceModel);
// break;
// default:
// break;
// }
// if (deviceVo == null) return; // 或者抛出异常根据你的需求处理
// try {
// uploadDataSocketProcessor.processDeviceV2(null, deviceVo);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// } else {
// log.error("丢弃信息,主题["+ topic +"]");
// }
// };
// }
//
// private DeviceVo decodeCirculation(JSONObject data, String deviceCode, Integer deviceModel) {
//
//// "iccid": "89860818102381420337",
//// "imsi": "460088822108587",
//// "signal": 2,
//// "recordTime": "20241212T003246Z",
//// "battery": 100,
//// "current": 0,
//// "warning": 0
//
// String iccid = data.getString("iccid");
// String imsi = data.getString("imsi");
// Integer signal = data.getInteger("signal");
// String recordTimeString = data.getString("recordTime");
// // 使用 SimpleDateFormat 解析日期字符串
// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
// Date recordTime;
// try {
// recordTime = dateFormat.parse(recordTimeString);
// } catch (ParseException e) {
// log.error("无法解析日期字符串: " + recordTimeString, e);
// return null;
// }
// Integer battery = data.getInteger("battery");
// Float current = data.getFloat("current");
// Integer warning = data.getInteger("warning");
//
// Record record = new Record();
// record.setWarn(warning);
//// record.setDeviceId(deviceId);
// record.setDeviceSignal(signal);
// record.setBattery(battery);
// record.setTemp1(current);
// record.setShakeWarn(warning & 1);
// record.setTemp1Warn(warning >> 1 & 1);
// record.setTemp2Warn(warning >> 2 & 1);
// record.setTemp3Warn(warning >> 3 & 1);
// record.setEarlyWarn(warning >> 4 & 1);
// record.setWarn(warning);
// record.setRecordTime(recordTime);
// DeviceVo deviceVo = new DeviceVo();
// deviceVo.setClientId(deviceCode);
// deviceVo.setSimCode(iccid);
// deviceVo.setImeiCode(imsi);
// deviceVo.setDeviceModel(deviceModel);
// deviceVo.setWarn(warning);
// deviceVo.setRecordList(Lists.newArrayList(record));
// return deviceVo;
// }
//
// private static DeviceVo decodeSpike(JSONObject data, String deviceCode, Integer deviceModel) {
//// "longitude": "31.2412402273",
//// "latitude": "121.473992206",
//// "iccid": "89860321047601121407",
//// "imsi": "460115009044492",
//// "signal": 1,
//// "recordTime": "20241205T124818Z",
//// "battery": 76,
//// "warning": 0
//
// String iccid = data.getString("iccid");
// String imsi = data.getString("imsi");
// Integer signal = data.getInteger("signal");
// String recordTimeString = data.getString("recordTime");
// // 使用 SimpleDateFormat 解析日期字符串
// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
// Date recordTime;
// try {
// recordTime = dateFormat.parse(recordTimeString);
// } catch (ParseException e) {
// log.error("无法解析日期字符串: " + recordTimeString, e);
// return null;
// }
// Integer battery = data.getInteger("battery");
// Integer warning = data.getInteger("warning");
//
// int newWarn = 0;
// // 获取warn的最后三位
// int lastThreeBits = warning & 0b111;
// // 检查最后三位是否为011
// if (lastThreeBits == 0b011) {
// newWarn |= 1; // 设置newWarn的第一位为1
// }
// // 获取warn的第四位的值
// int fourthBit = (warning >> 3) & 1;
// // 将warn的第四位的值设置到newWarn的第八位
// if (fourthBit == 1) {
// newWarn |= (1 << 8); // 设置newWarn的第八位为1
// }
//
// Record record = new Record();
// record.setWarn(newWarn);
//// record.setDeviceId(deviceId);
// record.setDeviceSignal(signal);
// record.setBattery(battery);
// record.setShakeWarn(newWarn & 1);
// record.setTemp1Warn(newWarn >> 1 & 1);
// record.setTemp2Warn(newWarn >> 2 & 1);
// record.setTemp3Warn(newWarn >> 3 & 1);
// record.setEarlyWarn(newWarn >> 4 & 1);
// record.setWarn(newWarn);
// record.setRecordTime(recordTime);
// DeviceVo deviceVo = new DeviceVo();
// deviceVo.setClientId(deviceCode);
// deviceVo.setSimCode(iccid);
// deviceVo.setImeiCode(imsi);
// deviceVo.setDeviceModel(deviceModel);
// deviceVo.setWarn(newWarn);
// deviceVo.setRecordList(Lists.newArrayList(record));
// return deviceVo;
// }
//
//
// /**
// * 出站通道
// */
// @Bean
// public MessageChannel mqttOutboundChannel() {
// return new DirectChannel();
// }
//
// /**
// * 出站
// */
// @Bean
// @ServiceActivator(inputChannel = CHANNEL_OUT)
// public MessageHandler outbound() {
// MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
// "clientId" + UUID.randomUUID(),
// mqttPahoClientFactory()
// );
// messageHandler.setAsync(true);
// messageHandler.setDefaultTopic(topic);
// messageHandler.setConverter(new DefaultPahoMessageConverter());
// return messageHandler;
//
// }
//
//}

View File

@ -0,0 +1,131 @@
//package com.zsc.edu.gateway.modules.mqtt.config;
//
//import lombok.extern.slf4j.Slf4j;
//import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.context.annotation.Bean;
//import org.springframework.context.annotation.Configuration;
//import org.springframework.integration.annotation.ServiceActivator;
//import org.springframework.integration.channel.DirectChannel;
//import org.springframework.integration.core.MessageProducer;
//import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
//import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
//import org.springframework.messaging.MessageChannel;
//import org.springframework.messaging.MessageHandler;
//import org.springframework.util.StringUtils;
//
//import java.util.UUID;
//
//@Slf4j
//@Configuration
//public class MqttConfig {
//
// public static final String CHANNEL_OUT = "mqttOutboundChannel";
// public static final String CHANNEL_IN = "mqttInputChannel";
// private static final byte[] WILL_DATA;
// static {
// WILL_DATA = "offline".getBytes();
// }
//
// @Value("${gatherer.version}")
// private String gatherer;
// @Value("${mqtt.username}")
// private String username;
// @Value("${mqtt.password}")
// private String password;
// @Value("${mqtt.host}")
// private String host;
// @Value("${mqtt.port}")
// private String port;
// @Value("${mqtt.topic}")
// private String topic;
// @Value("${mqtt.qos}")
// private Integer qos;
//
// /**
// * 创建MqttPahoClientFactory设置MQTT Broker连接属性如果使用ssl验证也在这里设置
// * @return factory
// */
// @Bean
// public MqttPahoClientFactory mqttPahoClientFactory() {
// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
// MqttConnectOptions options = new MqttConnectOptions();
//
// if(StringUtils.hasText(username)) {
// options.setUserName(username);
// }
//
// options.setPassword(password.toCharArray());
//
// options.setServerURIs(new String[]{host + ":" + port});
//
// options.setConnectionTimeout(10);
//
// options.setKeepAliveInterval(20);
//
// options.setWill("willTopic", WILL_DATA, 2, false);
// factory.setConnectionOptions(options);
//
// return factory;
// }
//
// /**
// * 入站通道
// */
// @Bean
// public MessageChannel mqttInputChannel() {
// return new DirectChannel();
// }
//
// /**
// * 入站
// */
// @Bean
// public MessageProducer inbound() {
// // Paho客户端消息驱动通道适配器主要用来订阅主题
// MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
// "clientId-" + UUID.randomUUID(),
// mqttPahoClientFactory(),
// "/v1/devices/+/datas"
// );
// adapter.setCompletionTimeout(5000);
//
// // Paho消息转换器
// DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter();
//
// // 按字节接收消息
//// defaultPahoMessageConverter.setPayloadAsBytes(true);
// adapter.setConverter(defaultPahoMessageConverter);
// adapter.setQos(qos);
// adapter.setOutputChannel(mqttInputChannel());
// return adapter;
//
// }
//
//// @Bean
//// @ServiceActivator(inputChannel = CHANNEL_IN)
//// public MessageHandler handler(MqttMessageHandlerService mqttMessageHandlerService) {
//// return mqttMessageHandlerService::handleMessage;
//// }
//
// @Bean
// public MessageChannel mqttOutboundChannel() {
// return new DirectChannel();
// }
//
// @Bean
// @ServiceActivator(inputChannel = CHANNEL_OUT)
// public MessageHandler outbound(MqttPahoClientFactory mqttPahoClientFactory) {
// MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
// "clientId" + UUID.randomUUID(),
// mqttPahoClientFactory
// );
// messageHandler.setAsync(true);
// messageHandler.setDefaultTopic(topic);
// messageHandler.setConverter(new DefaultPahoMessageConverter());
// return messageHandler;
// }
//}

View File

@ -0,0 +1,147 @@
//package com.zsc.edu.gateway.modules.mqtt.config;
//
//import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo;
//import jakarta.annotation.Resource;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.boot.configurationprocessor.json.JSONObject;
//import org.springframework.messaging.Message;
//import org.springframework.stereotype.Service;
//
//import java.io.IOException;
//import java.text.ParseException;
//import java.text.SimpleDateFormat;
//import java.util.Date;
//
//@Slf4j
//@Service
//public class MqttMessageHandlerService {
//
// @Value("${gatherer.version}")
// private String gatherer;
//
// @Resource
// private final UploadDataSocketProcessor uploadDataSocketProcessor;
//
// public MqttMessageHandlerService(UploadDataSocketProcessor uploadDataSocketProcessor) {
// this.uploadDataSocketProcessor = uploadDataSocketProcessor;
// }
//
// public void handleMessage(Message<?> message) {
// String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// String deviceCode = topic.split("/")[3];
// String deviceGatherer = deviceCode.substring(0, 3);
// if (topic.matches("/v1/devices/\\d{12}/datas") && deviceGatherer.equals(gatherer)) {
// JSONObject payload = JSONObject.parseObject(message.getPayload().toString());
// Integer deviceModel = Integer.valueOf(deviceCode.substring(3, 5));
// log.info("获取设备编码:" + deviceCode);
// log.info("负载:" + payload);
// JSONObject data = (JSONObject) JSONPath.eval(payload, "$.devices[0].services[0].data");
// DeviceVo deviceVo = null;
// switch (deviceModel) {
// case 2:
// deviceVo = decodeSpike(data, deviceCode, deviceModel);
// break;
// case 5:
// deviceVo = decodeCirculation(data, deviceCode, deviceModel);
// break;
// default:
// break;
// }
// if (deviceVo == null) return; // 或者抛出异常根据你的需求处理
// try {
// uploadDataSocketProcessor.processDeviceV2(null, deviceVo);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
// } else {
// log.error("丢弃信息,主题[" + topic + "]");
// }
// }
//
// private DeviceVo decodeCirculation(JSONObject data, String deviceCode, Integer deviceModel) {
// String iccid = data.getString("iccid");
// String imsi = data.getString("imsi");
// Integer signal = data.getInteger("signal");
// String recordTimeString = data.getString("recordTime");
// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
// Date recordTime;
// try {
// recordTime = dateFormat.parse(recordTimeString);
// } catch (ParseException e) {
// log.error("无法解析日期字符串: " + recordTimeString, e);
// return null;
// }
// Integer battery = data.getInteger("battery");
// Float current = data.getFloat("current");
// Integer warning = data.getInteger("warning");
//
// Record record = new Record();
// record.setWarn(warning);
// record.setDeviceSignal(signal);
// record.setBattery(battery);
// record.setTemp1(current);
// record.setShakeWarn(warning & 1);
// record.setTemp1Warn(warning >> 1 & 1);
// record.setTemp2Warn(warning >> 2 & 1);
// record.setTemp3Warn(warning >> 3 & 1);
// record.setEarlyWarn(warning >> 4 & 1);
// record.setWarn(warning);
// record.setRecordTime(recordTime);
// DeviceVo deviceVo = new DeviceVo();
// deviceVo.setClientId(deviceCode);
// deviceVo.setSimCode(iccid);
// deviceVo.setImeiCode(imsi);
// deviceVo.setDeviceModel(deviceModel);
// deviceVo.setWarn(warning);
// deviceVo.setRecordList(Lists.newArrayList(record));
// return deviceVo;
// }
//
// private DeviceVo decodeSpike(JSONObject data, String deviceCode, Integer deviceModel) {
// String iccid = data.getString("iccid");
// String imsi = data.getString("imsi");
// Integer signal = data.getInteger("signal");
// String recordTimeString = data.getString("recordTime");
// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'");
// Date recordTime;
// try {
// recordTime = dateFormat.parse(recordTimeString);
// } catch (ParseException e) {
// log.error("无法解析日期字符串: " + recordTimeString, e);
// return null;
// }
// Integer battery = data.getInteger("battery");
// Integer warning = data.getInteger("warning");
//
// int newWarn = 0;
// int lastThreeBits = warning & 0b111;
// if (lastThreeBits == 0b011) {
// newWarn |= 1;
// }
// int fourthBit = (warning >> 3) & 1;
// if (fourthBit == 1) {
// newWarn |= (1 << 8);
// }
//
// Record record = new Record();
// record.setWarn(newWarn);
// record.setDeviceSignal(signal);
// record.setBattery(battery);
// record.setShakeWarn(newWarn & 1);
// record.setTemp1Warn(newWarn >> 1 & 1);
// record.setTemp2Warn(newWarn >> 2 & 1);
// record.setTemp3Warn(newWarn >> 3 & 1);
// record.setEarlyWarn(newWarn >> 4 & 1);
// record.setWarn(newWarn);
// record.setRecordTime(recordTime);
// DeviceVo deviceVo = new DeviceVo();
// deviceVo.setClientId(deviceCode);
// deviceVo.setSimCode(iccid);
// deviceVo.setImeiCode(imsi);
// deviceVo.setDeviceModel(deviceModel);
// deviceVo.setWarn(newWarn);
// deviceVo.setRecordList(Lists.newArrayList(record));
// return deviceVo;
// }
//}

View File

@ -0,0 +1,49 @@
//package com.zsc.edu.gateway.modules.mqtt.config;
//
//import org.springframework.integration.annotation.MessagingGateway;
//import org.springframework.integration.mqtt.support.MqttHeaders;
//import org.springframework.messaging.handler.annotation.Header;
//import org.springframework.stereotype.Component;
//
//@Component
//@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_OUT)
//public interface MqttSender {
//
// /**
// * 定义重载方法用于消息发送
// * @param payload
// */
// void sendMsg(String payload);
//
// /**
// * 指定topic进行消息发送
// * @param topic
// * @param payload
// */
// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, String payload);
//
// /**
// * 指定topic进行消息发送
// * @param topic
// * @param retain
// * @param payload
// */
// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.RETAINED) boolean retain, String payload);
//
//
// /**
// * 指定topic进行消息发送
// * @param topic
// * @param qos
// * @param payload
// */
// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
//
// /**
// * 指定topic进行消息发送
// * @param topic
// * @param qos
// * @param payload
// */
// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload);
//}

View File

@ -0,0 +1,73 @@
//package com.zsc.edu.gateway.modules.mqtt.config;
//
//import com.fasterxml.jackson.core.JsonProcessingException;
//import com.fasterxml.jackson.databind.ObjectMapper;
//import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.stereotype.Component;
//
//import java.io.IOException;
//import java.io.OutputStream;
//import java.net.Socket;
//
//@Slf4j
//@Component
//public class UploadDataSocketProcessor {
//
// /**
// * 处理设备数据并上传
// * @param socket 可能需要一个socket连接这里假设为null
// * @param deviceVo 设备数据对象
// * @throws IOException 如果上传过程中发生IO异常
// */
// public void processDeviceV2(Object socket, DeviceVo deviceVo) throws IOException {
// // 这里可以添加具体的上传逻辑
// // 例如将deviceVo对象转换为JSON字符串并发送到服务器
// try {
// // 假设有一个方法将DeviceVo对象转换为JSON字符串
// String deviceDataJson = convertDeviceVoToJson(deviceVo);
// // 假设有一个方法将JSON字符串发送到服务器
// sendToDeviceServer(deviceDataJson);
// } catch (Exception e) {
// log.error("处理设备数据时发生错误: ", e);
// throw new IOException("处理设备数据时发生错误", e);
// }
// }
//
// /**
// * 将DeviceVo对象转换为JSON字符串
// * @param deviceVo 设备数据对象
// * @return JSON字符串
// */
// private String convertDeviceVoToJson(DeviceVo deviceVo) {
// // 这里可以使用Jackson或其他JSON库来实现转换
// // 例如使用Jackson库
// ObjectMapper objectMapper = new ObjectMapper();
// try {
// return objectMapper.writeValueAsString(deviceVo);
// } catch (JsonProcessingException e) {
// log.error("转换DeviceVo对象为JSON字符串时发生错误: ", e);
// throw new RuntimeException("转换DeviceVo对象为JSON字符串时发生错误", e);
// }
// }
//
// /**
// * 将JSON字符串发送到设备服务器
// * @param jsonData JSON字符串
// * @throws IOException 如果发送过程中发生IO异常
// */
// private void sendToDeviceServer(String jsonData) throws IOException {
// // 这里可以添加具体的发送逻辑
// // 例如使用Socket或HTTP请求发送数据
// // 为了示例这里假设使用Socket发送数据
// try (Socket socket = new Socket("serverAddress" , 12345)) {
// OutputStream outputStream = socket.getOutputStream();
// outputStream.write(jsonData.getBytes());
// outputStream.flush();
// } catch (IOException e) {
// log.error("发送数据到设备服务器时发生错误: ", e);
// throw e;
// }
// }
//}
//

View File

@ -31,6 +31,7 @@ public class BaseEntity implements Serializable {
/** /**
* 创建者ID * 创建者ID
*/ */
@TableField(value = "create_id", fill = FieldFill.INSERT)
public Long createId; public Long createId;
/** /**