From 600d779fb5ec9fa82a82354f87cab39e003b4838 Mon Sep 17 00:00:00 2001 From: zhuangtianxiang <2913129173@qq.com> Date: Fri, 28 Feb 2025 16:49:06 +0800 Subject: [PATCH] =?UTF-8?q?feat(iot):=20=E6=B7=BB=E5=8A=A0=20MQTT=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 MqttConfig、MqttMessageHandlerService、UploadDataSocketProcessor 等类 - 实现 MQTT 消息的接收、处理和转发逻辑 - 添加设备数据解析和上传 --- pom.xml | 9 + .../mybatisplus/MyMetaObjectHandler.java | 11 +- .../framework/security/UserDetailsImpl.java | 10 +- .../modules/iot/device/entity/Device.java | 6 + .../service/impl/DeviceServiceImpl.java | 3 + .../modules/iot/product/entity/Product.java | 2 + .../modules/iot/tsl/entity/BaseParam.java | 11 +- .../gateway/modules/iot/tsl/entity/Event.java | 2 +- .../modules/mqtt/config/MqttBConfig.java | 302 ++++++++++++++++++ .../modules/mqtt/config/MqttConfig.java | 131 ++++++++ .../config/MqttMessageHandlerService.java | 147 +++++++++ .../modules/mqtt/config/MqttSender.java | 49 +++ .../config/UploadDataSocketProcessor.java | 73 +++++ .../modules/system/entity/BaseEntity.java | 1 + 14 files changed, 749 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttBConfig.java create mode 100644 src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttConfig.java create mode 100644 src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttMessageHandlerService.java create mode 100644 src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttSender.java create mode 100644 src/main/java/com/zsc/edu/gateway/modules/mqtt/config/UploadDataSocketProcessor.java diff --git a/pom.xml b/pom.xml index 2838b29..2a64ee0 100644 --- a/pom.xml +++ b/pom.xml @@ -156,6 +156,15 @@ org.springframework.boot spring-boot-starter-aop + + + org.springframework.boot + spring-boot-starter-integration + + + org.springframework.integration + spring-integration-mqtt + diff --git a/src/main/java/com/zsc/edu/gateway/framework/mybatisplus/MyMetaObjectHandler.java b/src/main/java/com/zsc/edu/gateway/framework/mybatisplus/MyMetaObjectHandler.java index b119c60..70b1512 100644 --- a/src/main/java/com/zsc/edu/gateway/framework/mybatisplus/MyMetaObjectHandler.java +++ b/src/main/java/com/zsc/edu/gateway/framework/mybatisplus/MyMetaObjectHandler.java @@ -19,14 +19,20 @@ public class MyMetaObjectHandler implements MetaObjectHandler { @Override public void insertFill(MetaObject metaObject) { - UserDetailsImpl userInfo = SecurityUtil.getUserInfo(); if (userInfo.getUsername() == null) { userInfo.setUsername("system"); } + if (userInfo.getDeptId() == null) { + userInfo.setDeptId(2L); + } + if (userInfo.getCreateId() == null) { + userInfo.setCreateId(1L); + } this.strictInsertFill(metaObject, "createTime", LocalDateTime.class, LocalDateTime.now()); this.strictInsertFill(metaObject, "createBy", String.class, userInfo.getUsername()); - + this.strictInsertFill(metaObject, "deptId", Long.class, userInfo.getDept().getId()); + this.strictInsertFill(metaObject, "createId", Long.class, userInfo.getId()); } @Override @@ -40,4 +46,5 @@ public class MyMetaObjectHandler implements MetaObjectHandler { } + } \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/framework/security/UserDetailsImpl.java b/src/main/java/com/zsc/edu/gateway/framework/security/UserDetailsImpl.java index 467e338..2dd80de 100644 --- a/src/main/java/com/zsc/edu/gateway/framework/security/UserDetailsImpl.java +++ b/src/main/java/com/zsc/edu/gateway/framework/security/UserDetailsImpl.java @@ -37,8 +37,10 @@ public class UserDetailsImpl implements UserDetails { public Set authorities; public Set permissions; public Set dataScopeDeptIds; + public Long deptId; + public Long createId; - public UserDetailsImpl(Long id, String username, String password, String name, Boolean enableState, Dept dept, Set dataScopeDeptIds, Role role, Set authorities, Set permissions, List roles) { + public UserDetailsImpl(Long id, String username, String password, String name, Boolean enableState, Dept dept, Set dataScopeDeptIds, Role role, Set authorities, Set permissions, List roles, Long deptId, Long createId) { this.id = id; this.username = username; this.password = password; @@ -50,6 +52,8 @@ public class UserDetailsImpl implements UserDetails { this.authorities = authorities; this.permissions = permissions; this.roles = roles; + this.deptId = deptId; + this.createId = createId; } public static UserDetailsImpl from(User user, Set permissions) { @@ -64,7 +68,9 @@ public class UserDetailsImpl implements UserDetails { user.role, user.role.authorities, permissions, - user.roles + user.roles, + user.deptId, + user.createId ); } diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/device/entity/Device.java b/src/main/java/com/zsc/edu/gateway/modules/iot/device/entity/Device.java index 372fbda..0af6d66 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/device/entity/Device.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/device/entity/Device.java @@ -1,5 +1,6 @@ package com.zsc.edu.gateway.modules.iot.device.entity; +import com.baomidou.mybatisplus.annotation.FieldFill; import com.baomidou.mybatisplus.annotation.IEnum; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; @@ -91,6 +92,11 @@ public class Device extends BaseEntity { @TableField(exist = false) public Product product; + @TableField(value = "dept_id", fill = FieldFill.INSERT) + public Long deptId; + + //TODO 经纬度 Location + public enum Status implements IEnum, IState { UNACTIVATED(0, "未激活"), diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java b/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java index 96b0339..414df78 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java @@ -170,6 +170,9 @@ public class DeviceServiceImpl extends ServiceImpl imp .filter(Objects::nonNull) .distinct() .collect(Collectors.toList()); + if (productIds.isEmpty()) { + return devicePage; + } LambdaQueryWrapper productQueryWrapper = new LambdaQueryWrapper<>(); productQueryWrapper.in(Product::getId, productIds); List products = productRepo.selectList(productQueryWrapper); diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/product/entity/Product.java b/src/main/java/com/zsc/edu/gateway/modules/iot/product/entity/Product.java index 49e5e6e..100c9f6 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/product/entity/Product.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/product/entity/Product.java @@ -1,5 +1,6 @@ package com.zsc.edu.gateway.modules.iot.product.entity; +import com.baomidou.mybatisplus.annotation.FieldFill; import com.baomidou.mybatisplus.annotation.IEnum; import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableName; @@ -32,6 +33,7 @@ public class Product extends BaseEntity { /** * 部门ID */ + @TableField(value = "dept_id", fill = FieldFill.INSERT) private Long deptId; /** * 产品类型 diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/tsl/entity/BaseParam.java b/src/main/java/com/zsc/edu/gateway/modules/iot/tsl/entity/BaseParam.java index 4b57744..923f3d8 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/tsl/entity/BaseParam.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/tsl/entity/BaseParam.java @@ -1,8 +1,6 @@ package com.zsc.edu.gateway.modules.iot.tsl.entity; -import com.baomidou.mybatisplus.annotation.IdType; -import com.baomidou.mybatisplus.annotation.TableId; -import com.baomidou.mybatisplus.annotation.TableName; +import com.baomidou.mybatisplus.annotation.*; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -43,6 +41,13 @@ public class BaseParam implements Serializable { /** * 部门ID(权限) */ + @TableField(value = "dept_id", fill = FieldFill.INSERT) public Long deptId; + /** + * 创建人ID + */ + @TableField(value = "create_id", fill = FieldFill.INSERT) + public Long createId; + } diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/tsl/entity/Event.java b/src/main/java/com/zsc/edu/gateway/modules/iot/tsl/entity/Event.java index 17ba52d..33b889a 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/tsl/entity/Event.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/tsl/entity/Event.java @@ -17,7 +17,7 @@ import java.util.Set; /** * @author Yao - * @desciption 物模型服务 + * @desciption 物模型事件 */ @Setter @Getter diff --git a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttBConfig.java b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttBConfig.java new file mode 100644 index 0000000..60adafe --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttBConfig.java @@ -0,0 +1,302 @@ +//package com.zsc.edu.gateway.modules.mqtt.config; +// +//import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo; +//import jakarta.annotation.Resource; +//import lombok.extern.slf4j.Slf4j; +//import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.boot.configurationprocessor.json.JSONObject; +//import org.springframework.context.ApplicationContext; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.integration.annotation.ServiceActivator; +//import org.springframework.integration.channel.DirectChannel; +//import org.springframework.integration.core.MessageProducer; +//import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +//import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +//import org.springframework.messaging.MessageChannel; +//import org.springframework.messaging.MessageHandler; +//import org.springframework.util.StringUtils; +// +//import java.io.IOException; +//import java.text.ParseException; +//import java.text.SimpleDateFormat; +//import java.util.Date; +//import java.util.UUID; +// +//@Slf4j +//@Configuration +//public class MqttBConfig { +// +// @Resource +// private ApplicationContext applicationContext; +// @Resource +// private UploadDataSocketProcessor uploadDataSocketProcessor; +// +// +// public static final String CHANNEL_OUT = "mqttOutboundChannel"; +// public static final String CHANNEL_IN = "mqttInputChannel"; +// +// private static final byte[] WILL_DATA; +// +// static { +// WILL_DATA = "offline".getBytes(); +// } +// +// @Value("${gatherer.version}") +// private String gatherer; +// @Value("${mqtt.username}") +// private String username; +// @Value("${mqtt.password}") +// private String password; +// @Value("${mqtt.host}") +// private String host; +// @Value("${mqtt.port}") +// private String port; +// @Value("${mqtt.topic}") +// private String topic; +// @Value("${mqtt.qos}") +// private Integer qos; +// +// /** +// * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用ssl验证也在这里设置 +// * @return factory +// */ +// @Bean +// public MqttPahoClientFactory mqttPahoClientFactory() { +// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); +// MqttConnectOptions options = new MqttConnectOptions(); +// +// if(StringUtils.hasText(username)) { +// options.setUserName(username); +// } +// +// options.setPassword(password.toCharArray()); +// +// options.setServerURIs(new String[]{host + ":" + port}); +// +// options.setConnectionTimeout(10); +// +// options.setKeepAliveInterval(20); +// +// options.setWill("willTopic", WILL_DATA, 2, false); +// factory.setConnectionOptions(options); +// +// return factory; +// } +// +// /** +// * 入站通道 +// */ +// @Bean +// public MessageChannel mqttInputChannel() { +// return new DirectChannel(); +// } +// +// /** +// * 入站 +// */ +// @Bean +// public MessageProducer inbound() { +// // Paho客户端消息驱动通道适配器,主要用来订阅主题 +// MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( +// "clientId-" + UUID.randomUUID(), +// mqttPahoClientFactory(), +// "/v1/devices/+/datas" +// ); +// adapter.setCompletionTimeout(5000); +// +// // Paho消息转换器 +// DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); +// +// // 按字节接收消息 +//// defaultPahoMessageConverter.setPayloadAsBytes(true); +// adapter.setConverter(defaultPahoMessageConverter); +// adapter.setQos(qos); +// adapter.setOutputChannel(mqttInputChannel()); +// return adapter; +// +// } +// +// /** +// * ServiceActivator注解表明: 当前方法用于处理MQTT消息,inputChannel参数指定了用于消费的channel +// * @return +// */ +// @Bean +// @ServiceActivator(inputChannel = CHANNEL_IN) +// public MessageHandler handler() { +// return message -> { +// String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); +// String deviceCode = topic.split("/")[3]; +// String deviceGatherer = deviceCode.substring(0,3); +// if (topic.matches("/v1/devices/\\d{12}/datas") && deviceGatherer.equals(gatherer)) { +// JSONObject payload = JSONObject.parseObject(message.getPayload().toString()); +// Integer deviceModel = Integer.valueOf(deviceCode.substring(3,5)); +// log.info("获取设备编码:" + deviceCode); +// log.info("负载:" + payload); +// JSONObject data = (JSONObject) JSONPath.eval(payload, "$.devices[0].services[0].data"); +// DeviceVo deviceVo = null; +// switch (deviceModel) { +// case 2: +// deviceVo = decodeSpike(data, deviceCode, deviceModel); +// break; +// case 5: +// deviceVo = decodeCirculation(data, deviceCode, deviceModel); +// break; +// default: +// break; +// } +// if (deviceVo == null) return; // 或者抛出异常,根据你的需求处理 +// try { +// uploadDataSocketProcessor.processDeviceV2(null, deviceVo); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } +// } else { +// log.error("丢弃信息,主题["+ topic +"]"); +// } +// }; +// } +// +// private DeviceVo decodeCirculation(JSONObject data, String deviceCode, Integer deviceModel) { +// +//// "iccid": "89860818102381420337", +//// "imsi": "460088822108587", +//// "signal": 2, +//// "recordTime": "20241212T003246Z", +//// "battery": 100, +//// "current": 0, +//// "warning": 0 +// +// String iccid = data.getString("iccid"); +// String imsi = data.getString("imsi"); +// Integer signal = data.getInteger("signal"); +// String recordTimeString = data.getString("recordTime"); +// // 使用 SimpleDateFormat 解析日期字符串 +// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'"); +// Date recordTime; +// try { +// recordTime = dateFormat.parse(recordTimeString); +// } catch (ParseException e) { +// log.error("无法解析日期字符串: " + recordTimeString, e); +// return null; +// } +// Integer battery = data.getInteger("battery"); +// Float current = data.getFloat("current"); +// Integer warning = data.getInteger("warning"); +// +// Record record = new Record(); +// record.setWarn(warning); +//// record.setDeviceId(deviceId); +// record.setDeviceSignal(signal); +// record.setBattery(battery); +// record.setTemp1(current); +// record.setShakeWarn(warning & 1); +// record.setTemp1Warn(warning >> 1 & 1); +// record.setTemp2Warn(warning >> 2 & 1); +// record.setTemp3Warn(warning >> 3 & 1); +// record.setEarlyWarn(warning >> 4 & 1); +// record.setWarn(warning); +// record.setRecordTime(recordTime); +// DeviceVo deviceVo = new DeviceVo(); +// deviceVo.setClientId(deviceCode); +// deviceVo.setSimCode(iccid); +// deviceVo.setImeiCode(imsi); +// deviceVo.setDeviceModel(deviceModel); +// deviceVo.setWarn(warning); +// deviceVo.setRecordList(Lists.newArrayList(record)); +// return deviceVo; +// } +// +// private static DeviceVo decodeSpike(JSONObject data, String deviceCode, Integer deviceModel) { +//// "longitude": "31.2412402273", +//// "latitude": "121.473992206", +//// "iccid": "89860321047601121407", +//// "imsi": "460115009044492", +//// "signal": 1, +//// "recordTime": "20241205T124818Z", +//// "battery": 76, +//// "warning": 0 +// +// String iccid = data.getString("iccid"); +// String imsi = data.getString("imsi"); +// Integer signal = data.getInteger("signal"); +// String recordTimeString = data.getString("recordTime"); +// // 使用 SimpleDateFormat 解析日期字符串 +// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'"); +// Date recordTime; +// try { +// recordTime = dateFormat.parse(recordTimeString); +// } catch (ParseException e) { +// log.error("无法解析日期字符串: " + recordTimeString, e); +// return null; +// } +// Integer battery = data.getInteger("battery"); +// Integer warning = data.getInteger("warning"); +// +// int newWarn = 0; +// // 获取warn的最后三位 +// int lastThreeBits = warning & 0b111; +// // 检查最后三位是否为011 +// if (lastThreeBits == 0b011) { +// newWarn |= 1; // 设置newWarn的第一位为1 +// } +// // 获取warn的第四位的值 +// int fourthBit = (warning >> 3) & 1; +// // 将warn的第四位的值设置到newWarn的第八位 +// if (fourthBit == 1) { +// newWarn |= (1 << 8); // 设置newWarn的第八位为1 +// } +// +// Record record = new Record(); +// record.setWarn(newWarn); +//// record.setDeviceId(deviceId); +// record.setDeviceSignal(signal); +// record.setBattery(battery); +// record.setShakeWarn(newWarn & 1); +// record.setTemp1Warn(newWarn >> 1 & 1); +// record.setTemp2Warn(newWarn >> 2 & 1); +// record.setTemp3Warn(newWarn >> 3 & 1); +// record.setEarlyWarn(newWarn >> 4 & 1); +// record.setWarn(newWarn); +// record.setRecordTime(recordTime); +// DeviceVo deviceVo = new DeviceVo(); +// deviceVo.setClientId(deviceCode); +// deviceVo.setSimCode(iccid); +// deviceVo.setImeiCode(imsi); +// deviceVo.setDeviceModel(deviceModel); +// deviceVo.setWarn(newWarn); +// deviceVo.setRecordList(Lists.newArrayList(record)); +// return deviceVo; +// } +// +// +// /** +// * 出站通道 +// */ +// @Bean +// public MessageChannel mqttOutboundChannel() { +// return new DirectChannel(); +// } +// +// /** +// * 出站 +// */ +// @Bean +// @ServiceActivator(inputChannel = CHANNEL_OUT) +// public MessageHandler outbound() { +// MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( +// "clientId" + UUID.randomUUID(), +// mqttPahoClientFactory() +// ); +// messageHandler.setAsync(true); +// messageHandler.setDefaultTopic(topic); +// messageHandler.setConverter(new DefaultPahoMessageConverter()); +// return messageHandler; +// +// } +// +//} diff --git a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttConfig.java b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttConfig.java new file mode 100644 index 0000000..129f6a2 --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttConfig.java @@ -0,0 +1,131 @@ +//package com.zsc.edu.gateway.modules.mqtt.config; +// +//import lombok.extern.slf4j.Slf4j; +//import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.context.annotation.Bean; +//import org.springframework.context.annotation.Configuration; +//import org.springframework.integration.annotation.ServiceActivator; +//import org.springframework.integration.channel.DirectChannel; +//import org.springframework.integration.core.MessageProducer; +//import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +//import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +//import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +//import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +//import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; +//import org.springframework.messaging.MessageChannel; +//import org.springframework.messaging.MessageHandler; +//import org.springframework.util.StringUtils; +// +//import java.util.UUID; +// +//@Slf4j +//@Configuration +//public class MqttConfig { +// +// public static final String CHANNEL_OUT = "mqttOutboundChannel"; +// public static final String CHANNEL_IN = "mqttInputChannel"; +// private static final byte[] WILL_DATA; +// static { +// WILL_DATA = "offline".getBytes(); +// } +// +// @Value("${gatherer.version}") +// private String gatherer; +// @Value("${mqtt.username}") +// private String username; +// @Value("${mqtt.password}") +// private String password; +// @Value("${mqtt.host}") +// private String host; +// @Value("${mqtt.port}") +// private String port; +// @Value("${mqtt.topic}") +// private String topic; +// @Value("${mqtt.qos}") +// private Integer qos; +// +// /** +// * 创建MqttPahoClientFactory,设置MQTT Broker连接属性,如果使用ssl验证也在这里设置 +// * @return factory +// */ +// @Bean +// public MqttPahoClientFactory mqttPahoClientFactory() { +// DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); +// MqttConnectOptions options = new MqttConnectOptions(); +// +// if(StringUtils.hasText(username)) { +// options.setUserName(username); +// } +// +// options.setPassword(password.toCharArray()); +// +// options.setServerURIs(new String[]{host + ":" + port}); +// +// options.setConnectionTimeout(10); +// +// options.setKeepAliveInterval(20); +// +// options.setWill("willTopic", WILL_DATA, 2, false); +// factory.setConnectionOptions(options); +// +// return factory; +// } +// +// /** +// * 入站通道 +// */ +// @Bean +// public MessageChannel mqttInputChannel() { +// return new DirectChannel(); +// } +// +// /** +// * 入站 +// */ +// @Bean +// public MessageProducer inbound() { +// // Paho客户端消息驱动通道适配器,主要用来订阅主题 +// MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( +// "clientId-" + UUID.randomUUID(), +// mqttPahoClientFactory(), +// "/v1/devices/+/datas" +// ); +// adapter.setCompletionTimeout(5000); +// +// // Paho消息转换器 +// DefaultPahoMessageConverter defaultPahoMessageConverter = new DefaultPahoMessageConverter(); +// +// // 按字节接收消息 +//// defaultPahoMessageConverter.setPayloadAsBytes(true); +// adapter.setConverter(defaultPahoMessageConverter); +// adapter.setQos(qos); +// adapter.setOutputChannel(mqttInputChannel()); +// return adapter; +// +// } +// +//// @Bean +//// @ServiceActivator(inputChannel = CHANNEL_IN) +//// public MessageHandler handler(MqttMessageHandlerService mqttMessageHandlerService) { +//// return mqttMessageHandlerService::handleMessage; +//// } +// +// @Bean +// public MessageChannel mqttOutboundChannel() { +// return new DirectChannel(); +// } +// +// @Bean +// @ServiceActivator(inputChannel = CHANNEL_OUT) +// public MessageHandler outbound(MqttPahoClientFactory mqttPahoClientFactory) { +// MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( +// "clientId" + UUID.randomUUID(), +// mqttPahoClientFactory +// ); +// messageHandler.setAsync(true); +// messageHandler.setDefaultTopic(topic); +// messageHandler.setConverter(new DefaultPahoMessageConverter()); +// return messageHandler; +// } +//} diff --git a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttMessageHandlerService.java b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttMessageHandlerService.java new file mode 100644 index 0000000..0413f65 --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttMessageHandlerService.java @@ -0,0 +1,147 @@ +//package com.zsc.edu.gateway.modules.mqtt.config; +// +//import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo; +//import jakarta.annotation.Resource; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.beans.factory.annotation.Value; +//import org.springframework.boot.configurationprocessor.json.JSONObject; +//import org.springframework.messaging.Message; +//import org.springframework.stereotype.Service; +// +//import java.io.IOException; +//import java.text.ParseException; +//import java.text.SimpleDateFormat; +//import java.util.Date; +// +//@Slf4j +//@Service +//public class MqttMessageHandlerService { +// +// @Value("${gatherer.version}") +// private String gatherer; +// +// @Resource +// private final UploadDataSocketProcessor uploadDataSocketProcessor; +// +// public MqttMessageHandlerService(UploadDataSocketProcessor uploadDataSocketProcessor) { +// this.uploadDataSocketProcessor = uploadDataSocketProcessor; +// } +// +// public void handleMessage(Message message) { +// String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); +// String deviceCode = topic.split("/")[3]; +// String deviceGatherer = deviceCode.substring(0, 3); +// if (topic.matches("/v1/devices/\\d{12}/datas") && deviceGatherer.equals(gatherer)) { +// JSONObject payload = JSONObject.parseObject(message.getPayload().toString()); +// Integer deviceModel = Integer.valueOf(deviceCode.substring(3, 5)); +// log.info("获取设备编码:" + deviceCode); +// log.info("负载:" + payload); +// JSONObject data = (JSONObject) JSONPath.eval(payload, "$.devices[0].services[0].data"); +// DeviceVo deviceVo = null; +// switch (deviceModel) { +// case 2: +// deviceVo = decodeSpike(data, deviceCode, deviceModel); +// break; +// case 5: +// deviceVo = decodeCirculation(data, deviceCode, deviceModel); +// break; +// default: +// break; +// } +// if (deviceVo == null) return; // 或者抛出异常,根据你的需求处理 +// try { +// uploadDataSocketProcessor.processDeviceV2(null, deviceVo); +// } catch (IOException e) { +// throw new RuntimeException(e); +// } +// } else { +// log.error("丢弃信息,主题[" + topic + "]"); +// } +// } +// +// private DeviceVo decodeCirculation(JSONObject data, String deviceCode, Integer deviceModel) { +// String iccid = data.getString("iccid"); +// String imsi = data.getString("imsi"); +// Integer signal = data.getInteger("signal"); +// String recordTimeString = data.getString("recordTime"); +// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'"); +// Date recordTime; +// try { +// recordTime = dateFormat.parse(recordTimeString); +// } catch (ParseException e) { +// log.error("无法解析日期字符串: " + recordTimeString, e); +// return null; +// } +// Integer battery = data.getInteger("battery"); +// Float current = data.getFloat("current"); +// Integer warning = data.getInteger("warning"); +// +// Record record = new Record(); +// record.setWarn(warning); +// record.setDeviceSignal(signal); +// record.setBattery(battery); +// record.setTemp1(current); +// record.setShakeWarn(warning & 1); +// record.setTemp1Warn(warning >> 1 & 1); +// record.setTemp2Warn(warning >> 2 & 1); +// record.setTemp3Warn(warning >> 3 & 1); +// record.setEarlyWarn(warning >> 4 & 1); +// record.setWarn(warning); +// record.setRecordTime(recordTime); +// DeviceVo deviceVo = new DeviceVo(); +// deviceVo.setClientId(deviceCode); +// deviceVo.setSimCode(iccid); +// deviceVo.setImeiCode(imsi); +// deviceVo.setDeviceModel(deviceModel); +// deviceVo.setWarn(warning); +// deviceVo.setRecordList(Lists.newArrayList(record)); +// return deviceVo; +// } +// +// private DeviceVo decodeSpike(JSONObject data, String deviceCode, Integer deviceModel) { +// String iccid = data.getString("iccid"); +// String imsi = data.getString("imsi"); +// Integer signal = data.getInteger("signal"); +// String recordTimeString = data.getString("recordTime"); +// SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss'Z'"); +// Date recordTime; +// try { +// recordTime = dateFormat.parse(recordTimeString); +// } catch (ParseException e) { +// log.error("无法解析日期字符串: " + recordTimeString, e); +// return null; +// } +// Integer battery = data.getInteger("battery"); +// Integer warning = data.getInteger("warning"); +// +// int newWarn = 0; +// int lastThreeBits = warning & 0b111; +// if (lastThreeBits == 0b011) { +// newWarn |= 1; +// } +// int fourthBit = (warning >> 3) & 1; +// if (fourthBit == 1) { +// newWarn |= (1 << 8); +// } +// +// Record record = new Record(); +// record.setWarn(newWarn); +// record.setDeviceSignal(signal); +// record.setBattery(battery); +// record.setShakeWarn(newWarn & 1); +// record.setTemp1Warn(newWarn >> 1 & 1); +// record.setTemp2Warn(newWarn >> 2 & 1); +// record.setTemp3Warn(newWarn >> 3 & 1); +// record.setEarlyWarn(newWarn >> 4 & 1); +// record.setWarn(newWarn); +// record.setRecordTime(recordTime); +// DeviceVo deviceVo = new DeviceVo(); +// deviceVo.setClientId(deviceCode); +// deviceVo.setSimCode(iccid); +// deviceVo.setImeiCode(imsi); +// deviceVo.setDeviceModel(deviceModel); +// deviceVo.setWarn(newWarn); +// deviceVo.setRecordList(Lists.newArrayList(record)); +// return deviceVo; +// } +//} diff --git a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttSender.java b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttSender.java new file mode 100644 index 0000000..a39e237 --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/MqttSender.java @@ -0,0 +1,49 @@ +//package com.zsc.edu.gateway.modules.mqtt.config; +// +//import org.springframework.integration.annotation.MessagingGateway; +//import org.springframework.integration.mqtt.support.MqttHeaders; +//import org.springframework.messaging.handler.annotation.Header; +//import org.springframework.stereotype.Component; +// +//@Component +//@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_OUT) +//public interface MqttSender { +// +// /** +// * 定义重载方法,用于消息发送 +// * @param payload +// */ +// void sendMsg(String payload); +// +// /** +// * 指定topic进行消息发送 +// * @param topic +// * @param payload +// */ +// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, String payload); +// +// /** +// * 指定topic进行消息发送 +// * @param topic +// * @param retain +// * @param payload +// */ +// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.RETAINED) boolean retain, String payload); +// +// +// /** +// * 指定topic进行消息发送 +// * @param topic +// * @param qos +// * @param payload +// */ +// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); +// +// /** +// * 指定topic进行消息发送 +// * @param topic +// * @param qos +// * @param payload +// */ +// void sendMsg(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, byte[] payload); +//} diff --git a/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/UploadDataSocketProcessor.java b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/UploadDataSocketProcessor.java new file mode 100644 index 0000000..4cc0bb9 --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/modules/mqtt/config/UploadDataSocketProcessor.java @@ -0,0 +1,73 @@ +//package com.zsc.edu.gateway.modules.mqtt.config; +// +//import com.fasterxml.jackson.core.JsonProcessingException; +//import com.fasterxml.jackson.databind.ObjectMapper; +//import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo; +//import lombok.extern.slf4j.Slf4j; +//import org.springframework.stereotype.Component; +// +//import java.io.IOException; +//import java.io.OutputStream; +//import java.net.Socket; +// +//@Slf4j +//@Component +//public class UploadDataSocketProcessor { +// +// /** +// * 处理设备数据并上传 +// * @param socket 可能需要一个socket连接,这里假设为null +// * @param deviceVo 设备数据对象 +// * @throws IOException 如果上传过程中发生IO异常 +// */ +// public void processDeviceV2(Object socket, DeviceVo deviceVo) throws IOException { +// // 这里可以添加具体的上传逻辑 +// // 例如,将deviceVo对象转换为JSON字符串并发送到服务器 +// try { +// // 假设有一个方法将DeviceVo对象转换为JSON字符串 +// String deviceDataJson = convertDeviceVoToJson(deviceVo); +// // 假设有一个方法将JSON字符串发送到服务器 +// sendToDeviceServer(deviceDataJson); +// } catch (Exception e) { +// log.error("处理设备数据时发生错误: ", e); +// throw new IOException("处理设备数据时发生错误", e); +// } +// } +// +// /** +// * 将DeviceVo对象转换为JSON字符串 +// * @param deviceVo 设备数据对象 +// * @return JSON字符串 +// */ +// private String convertDeviceVoToJson(DeviceVo deviceVo) { +// // 这里可以使用Jackson或其他JSON库来实现转换 +// // 例如,使用Jackson库 +// ObjectMapper objectMapper = new ObjectMapper(); +// try { +// return objectMapper.writeValueAsString(deviceVo); +// } catch (JsonProcessingException e) { +// log.error("转换DeviceVo对象为JSON字符串时发生错误: ", e); +// throw new RuntimeException("转换DeviceVo对象为JSON字符串时发生错误", e); +// } +// } +// +// /** +// * 将JSON字符串发送到设备服务器 +// * @param jsonData JSON字符串 +// * @throws IOException 如果发送过程中发生IO异常 +// */ +// private void sendToDeviceServer(String jsonData) throws IOException { +// // 这里可以添加具体的发送逻辑 +// // 例如,使用Socket或HTTP请求发送数据 +// // 为了示例,这里假设使用Socket发送数据 +// try (Socket socket = new Socket("serverAddress" , 12345)) { +// OutputStream outputStream = socket.getOutputStream(); +// outputStream.write(jsonData.getBytes()); +// outputStream.flush(); +// } catch (IOException e) { +// log.error("发送数据到设备服务器时发生错误: ", e); +// throw e; +// } +// } +//} +// diff --git a/src/main/java/com/zsc/edu/gateway/modules/system/entity/BaseEntity.java b/src/main/java/com/zsc/edu/gateway/modules/system/entity/BaseEntity.java index 44748cc..446b1af 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/system/entity/BaseEntity.java +++ b/src/main/java/com/zsc/edu/gateway/modules/system/entity/BaseEntity.java @@ -31,6 +31,7 @@ public class BaseEntity implements Serializable { /** * 创建者ID */ + @TableField(value = "create_id", fill = FieldFill.INSERT) public Long createId; /**