refactor(iot): 重构数据监控和告警功能

-移除 DataWarningVo 类,将告警信息整合到 RecordDataStatusVo 中
- 重构 RecordDataServiceImpl 中的数据处理和告警逻辑
- 新增 CompareParam 类,用以定义比较参数
- 更新 Event 相关的实体类和接口,增加启用状态和比较参数
- 删除 TemperatureExceededEvent 和 TemperatureExceededEventListener 类,改为在 RecordDataServiceImpl 中直接处理告警
This commit is contained in:
zhuangtianxiang 2025-03-17 19:13:59 +08:00
parent b8df5df38f
commit 247a4b58fd
24 changed files with 213 additions and 230 deletions

View File

@ -1,24 +0,0 @@
package com.zsc.edu.gateway.framework.springEvent;
import com.zsc.edu.gateway.modules.iot.record.entity.RecordData;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
/**
* @author zhuang
*/
@Getter
@Setter
public class TemperatureExceededEvent extends ApplicationEvent {
private final RecordData recordData;
private final double reducedParameter;
private final String eventName;
public TemperatureExceededEvent(Object source, RecordData recordData, double reducedParameter, String eventName) {
super(source);
this.recordData = recordData;
this.reducedParameter = reducedParameter;
this.eventName = eventName;
}
}

View File

@ -1,43 +0,0 @@
package com.zsc.edu.gateway.framework.springEvent;
import com.zsc.edu.gateway.modules.iot.device.repo.DeviceRepository;
import com.zsc.edu.gateway.modules.iot.record.entity.RecordData;
import com.zsc.edu.gateway.modules.iot.tsl.entity.EventLog;
import com.zsc.edu.gateway.modules.iot.tsl.repo.EventLogRepository;
import com.zsc.edu.gateway.modules.iot.tsl.repo.EventRepository;
import jakarta.annotation.Resource;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* @author zhuang
*/
@Component
public class TemperatureExceededEventListener implements ApplicationListener<TemperatureExceededEvent> {
@Resource
private EventLogRepository eventLogRepository;
@Resource
private DeviceRepository deviceRepository;
@Resource
private EventRepository eventRepository;
@Override
public void onApplicationEvent(TemperatureExceededEvent event) {
RecordData recordData = event.getRecordData();
System.out.println(event.getEventName() + "数值如下:" + event.getReducedParameter());
// 创建 EventLog 对象
EventLog eventLog = new EventLog();
eventLog.setName(event.getEventName());
eventLog.setRecordId(recordData.getId());
eventLog.setTriggerTime(LocalDateTime.now());
eventLog.setIsRead(false);
// 保存 EventLog 对象到数据库
eventLogRepository.insert(eventLog);
}
}

View File

@ -58,7 +58,7 @@ public class DeviceController {
@PostMapping("batch")
@PreAuthorize("hasAuthority('iot:device:create')")
@OperationLogAnnotation(content = "'批量设备'", operationType = "新建")
public List<Device> batchCreate(@RequestBody BatchDeviceDto batchDeviceDto) {
public List<Device> createBatch(@RequestBody BatchDeviceDto batchDeviceDto) {
return service.batchCreate(batchDeviceDto);
}
@ -148,15 +148,6 @@ public class DeviceController {
return sseConfig.sendSseEvents(recordService::recordDataStatus);
}
/**
* 获取告警信息
*/
@GetMapping(value = "/sse/data/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@PreAuthorize("hasAuthority('iot:device:query')")
@DataPermission
public SseEmitter dataWarning() {
return sseConfig.sendSseEvents(recordService::dataWarning);
}
/**
* 下发命令

View File

@ -177,6 +177,9 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceRepository, Device> imp
return deviceRepo.selectPage(page, query.wrapper());
}
/**
* 设备状态
*/
@Override
public DeviceStatusVo status() {
return DeviceStatusVo.builder()
@ -193,6 +196,9 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceRepository, Device> imp
.build();
}
/**
* 发送消息
*/
@Override
public String send(Long deviceId, Integer qos, JSONObject paras) throws JSONException {
DeviceVo device = detail(deviceId);

View File

@ -1,34 +0,0 @@
package com.zsc.edu.gateway.modules.iot.record.entity;
import lombok.Data;
/**
* @author zhuang
*/
@Data
public class DataWarningVo {
/**
* 告警总数
*/
private Long warningCount;
/**
* 今日新增
*/
private Long todayWarningCount;
/**
* 一级报警数
*/
private Long firstWarningCount;
/**
* 二级报警数
*/
private Long secondWarningCount;
/**
* 三级报警数
*/
private Long thirdWarningCount;
}

View File

@ -1,10 +1,12 @@
package com.zsc.edu.gateway.modules.iot.record.entity;
import lombok.Builder;
import lombok.Data;
/**
* @author zhuang
*/
@Builder
@Data
public class RecordDataStatusVo {
/**
@ -16,4 +18,14 @@ public class RecordDataStatusVo {
* 数据点数
*/
public Long dataCount;
/**
* 告警总数
*/
private Long warningCount;
/**
* 今日新增
*/
private Long todayWarningCount;
}

View File

@ -4,12 +4,9 @@ import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import com.zsc.edu.gateway.modules.iot.record.entity.DataWarningVo;
import com.zsc.edu.gateway.modules.iot.record.entity.RecordData;
import com.zsc.edu.gateway.modules.iot.record.entity.RecordDataStatusVo;
import java.util.List;
/**
* @author zhuang
*/
@ -19,7 +16,5 @@ public interface RecordDataService extends IService<RecordData> {
RecordDataStatusVo recordDataStatus();
DataWarningVo dataWarning();
IPage<RecordData> query(Page<RecordData> page, String clientId);
}

View File

@ -1,6 +1,5 @@
package com.zsc.edu.gateway.modules.iot.record.service.impl;
import com.alibaba.fastjson2.JSONException;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
@ -8,16 +7,14 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.zsc.edu.gateway.modules.iot.device.entity.Device;
import com.zsc.edu.gateway.modules.iot.device.repo.DeviceRepository;
import com.zsc.edu.gateway.modules.iot.record.entity.DataWarningVo;
import com.zsc.edu.gateway.modules.iot.record.entity.RecordData;
import com.zsc.edu.gateway.modules.iot.record.entity.RecordDataStatusVo;
import com.zsc.edu.gateway.framework.springEvent.TemperatureExceededEvent;
import com.zsc.edu.gateway.modules.iot.record.repo.RecordDataRepository;
import com.zsc.edu.gateway.modules.iot.record.service.RecordDataService;
import com.zsc.edu.gateway.modules.iot.tsl.dto.ParamDto;
import com.zsc.edu.gateway.modules.iot.tsl.entity.Event;
import com.zsc.edu.gateway.modules.iot.tsl.entity.Param;
import com.zsc.edu.gateway.modules.iot.tsl.mapper.ParamMapper;
import com.zsc.edu.gateway.modules.iot.tsl.entity.EventLog;
import com.zsc.edu.gateway.modules.iot.tsl.entity.CompareParam;
import com.zsc.edu.gateway.modules.iot.tsl.repo.EventLogRepository;
import com.zsc.edu.gateway.modules.iot.tsl.repo.EventRepository;
import jakarta.annotation.Resource;
import lombok.AllArgsConstructor;
@ -27,8 +24,12 @@ import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Predicate;
/**
*上报数据
*
* @author zhuang
*/
@AllArgsConstructor
@ -42,7 +43,15 @@ public class RecordDataServiceImpl extends ServiceImpl<RecordDataRepository, Rec
@Resource
private final EventRepository eventRepository;
@Resource
private final ParamMapper paramMapper;
private final EventLogRepository eventLogRepository;
/**
* 处理上报数据并进行监控
*
* @param clientId
* @param data
* @return recordData
*/
@Override
@Transactional
public RecordData recordData(String clientId, JSONObject data) {
@ -55,31 +64,29 @@ public class RecordDataServiceImpl extends ServiceImpl<RecordDataRepository, Rec
return recordData;
}
/**
* 获取上报数据状态
* @return RecordDataStatusVo
*/
@Override
public RecordDataStatusVo recordDataStatus() {
RecordDataStatusVo recordDataStatusVo = new RecordDataStatusVo();
recordDataStatusVo.setRecordCount(baseMapper.selectCount(new LambdaQueryWrapper<RecordData>()));
recordDataStatusVo.setDataCount(baseMapper.selectCount(new LambdaQueryWrapper<RecordData>().isNotNull(RecordData::getContent)));
return recordDataStatusVo;
}
@Override
public DataWarningVo dataWarning() {
DataWarningVo dataWarningVo = new DataWarningVo();
LocalDateTime todayStart = LocalDateTime.now().withHour(0).withMinute(0).withSecond(0);
long warningCount = baseMapper.countWarnings();
long todayWarningCount = baseMapper.countTodayWarnings(todayStart);
long firstWarningCount = baseMapper.countWarningsByBit(1);
long secondWarningCount = baseMapper.countWarningsByBit(2);
long thirdWarningCount = baseMapper.countWarningsByBit(4);
dataWarningVo.setWarningCount(warningCount);
dataWarningVo.setTodayWarningCount(todayWarningCount);
dataWarningVo.setFirstWarningCount(firstWarningCount);
dataWarningVo.setSecondWarningCount(secondWarningCount);
dataWarningVo.setThirdWarningCount(thirdWarningCount);
return dataWarningVo;
return RecordDataStatusVo.builder()
.warningCount(baseMapper.countWarnings())
.todayWarningCount(baseMapper.countTodayWarnings(todayStart))
.recordCount(baseMapper.selectCount(new LambdaQueryWrapper<>()))
.dataCount(baseMapper.selectCount(new LambdaQueryWrapper<RecordData>().isNotNull(RecordData::getContent)))
.build();
}
/**
* 查询上报数据
* @param page 分页数据
* @param clientId 设备的 clientId
* @return IPage<RecordData>
*/
@Override
public IPage<RecordData> query(Page<RecordData> page, String clientId) {
LambdaQueryWrapper<RecordData> queryWrapper = new LambdaQueryWrapper<>();
@ -87,6 +94,10 @@ public class RecordDataServiceImpl extends ServiceImpl<RecordDataRepository, Rec
return baseMapper.selectPage(page, queryWrapper);
}
/**
* 监控上报数据
* @param recordData 上报数据
*/
public void processRecordData(RecordData recordData) {
// 根据 clientId 查询设备信息
Device device = deviceRepository.findByClientId(recordData.getClientId());
@ -95,7 +106,7 @@ public class RecordDataServiceImpl extends ServiceImpl<RecordDataRepository, Rec
}
// 获取产品下的所有事件
List<Event> events = eventRepository.selectByProductId(device.getProductId());
List<Event> events = eventRepository.selectByProductId(device.getProductId(),true);
if (events.isEmpty()) {
return; // 如果没有事件直接返回
}
@ -106,9 +117,16 @@ public class RecordDataServiceImpl extends ServiceImpl<RecordDataRepository, Rec
}
}
private void processParam(RecordData recordData, Param param, Event event) {
/**
* 处理数据
*
* @param recordData 上报数据
* @param param 属性
* @param event 事件
*/
private void processParam(RecordData recordData, CompareParam param, Event event) {
// 获取参数的默认值和比较类型
Double defaultValue = param.getDefaultValue();
double defaultValue = Double.parseDouble(param.getDefaultValue());
String identifier = param.getIdentifier();
// 检查 recordData 的内容中是否包含该参数的标识符
@ -116,31 +134,26 @@ public class RecordDataServiceImpl extends ServiceImpl<RecordDataRepository, Rec
if (valueObj != null) {
// 获取参数的实际值
double value = Double.parseDouble(valueObj.toString());
Param.CompareType compareType = param.getCompareType();
CompareParam.CompareType compareType = param.getCompareType();
// 根据比较类型进行不同的处理
switch (compareType) {
case GT:
if (value > defaultValue) {
eventPublisher.publishEvent(new TemperatureExceededEvent(this, recordData, value, event.getName()));
}
break;
case LT:
if (value < defaultValue) {
eventPublisher.publishEvent(new TemperatureExceededEvent(this, recordData, value, event.getName()));
}
break;
case EQ:
if (value == defaultValue) {
eventPublisher.publishEvent(new TemperatureExceededEvent(this, recordData, value, event.getName()));
}
break;
default:
// 处理未知的比较类型
throw new JSONException("Unknown compare type: " + compareType);
}
Predicate<Double> comparison = switch (compareType) {
case GT -> v -> v > defaultValue;
case LT -> v -> v < defaultValue;
case EQ -> v -> v == defaultValue;
case GE -> v -> v >= defaultValue;
case LE -> v -> v <= defaultValue;
};
// 定义插入日志的 Consumer
Consumer<Double> logInsertion = v -> {
if (comparison.test(v)) {
eventLogRepository.insert(new EventLog(null, event.getName(), recordData.getId(), recordData, LocalDateTime.now(), null, null));
}
};
// 执行比较和日志插入
logInsertion.accept(value);
}
}
}

View File

@ -107,5 +107,16 @@ public class EventController {
return service.detail(id);
}
/**
* 更新事件状态
*
* @return 更新后的事件
*/
@PatchMapping("/toggle/{id}")
@PreAuthorize("hasAuthority('iot:event:update')")
@OperationLogAnnotation(content = "'事件启用状态'", operationType = "更新")
public Boolean updateEnabled(@PathVariable("id") Long id) {
return service.toggle(id);
}
}

View File

@ -26,5 +26,7 @@ public class EventDto {
public Event.Type type;
public Boolean enabled;
private List<ParamDto> outputs;
}

View File

@ -1,6 +1,7 @@
package com.zsc.edu.gateway.modules.iot.tsl.dto;
import com.zsc.edu.gateway.modules.iot.tsl.entity.DataType;
import com.zsc.edu.gateway.modules.iot.tsl.entity.CompareParam;
import com.zsc.edu.gateway.modules.iot.tsl.entity.Param;
import lombok.AllArgsConstructor;
import lombok.Data;
@ -26,7 +27,7 @@ public class ParamDto {
public Param.Type type;
private Param.CompareType compareType;
private CompareParam.CompareType compareType;
private Double defaultValue;

View File

@ -0,0 +1,73 @@
package com.zsc.edu.gateway.modules.iot.tsl.entity;
import com.baomidou.mybatisplus.annotation.IEnum;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.zsc.edu.gateway.common.enums.IState;
import lombok.*;
/**
* @author lenovo
*/
@Setter
@Getter
@EqualsAndHashCode(callSuper = false)
@NoArgsConstructor
@AllArgsConstructor
@TableName("iot_param")
@JsonInclude
public class CompareParam extends Param {
/**
* 对比类型
*/
private CompareType compareType;
/**
* 默认数值
*/
private String defaultValue;
public enum CompareType implements IEnum<String>, IState<CompareType> {
/**
* 大于
*/
GT("GT", "GT"),
/**
* 小于
*/
LT("LT", "LT"),
/**
* 等于
*/
EQ("EQ", "EQ"),
/**
* 大于等于
*/
GE("GE", "GE"),
/**
* 小于等于
*/
LE("LE", "LE");
private final String value;
private final String description;
CompareType(String value, String description) {
this.value = value;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String toString() {
return this.description;
}
}
}

View File

@ -5,16 +5,12 @@ import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.zsc.edu.gateway.common.enums.IState;
import com.zsc.edu.gateway.modules.iot.product.entity.Product;
import com.zsc.edu.gateway.modules.iot.tsl.dto.ParamDto;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.context.annotation.ImportSelector;
import java.util.List;
import java.util.Set;
/**
* @author Yao
@ -35,11 +31,17 @@ public class Event extends BaseParam {
*/
private Type type;
/**
* 是否启用服务
*/
private Boolean enabled;
/**
* 服务输出的参数
*/
@TableField(exist = false)
private List<Param> outputs;
private List<CompareParam> outputs;
public enum Type implements IEnum<String>, IState<Type> {
/**

View File

@ -45,11 +45,6 @@ public class EventLog {
*/
private LocalDateTime triggerTime;
/**
* 是否已读
*/
private Boolean isRead;
/**
* 部门ID(权限)
*/
@ -61,4 +56,5 @@ public class EventLog {
*/
@TableField(value = "create_id", fill = FieldFill.INSERT)
public Long createId;
}

View File

@ -44,15 +44,6 @@ public class Param extends BaseParam {
*/
private Long foreignId;
/**
* 对比类型
*/
private CompareType compareType;
/**
* 默认数值
*/
private Double defaultValue;
public enum Type implements IEnum<String>, IState<Type> {
/**
@ -120,35 +111,5 @@ public class Param extends BaseParam {
}
}
public enum CompareType implements IEnum<String>, IState<CompareType> {
/**
* 大于
*/
GT("GT", "GT"),
/**
* 小于
*/
LT("LT", "LT"),
/**
* 等于
*/
EQ("EQ", "EQ");
private final String value;
private final String description;
CompareType(String value, String description) {
this.value = value;
this.description = description;
}
@Override
public String getValue() {
return value;
}
@Override
public String toString() {
return this.description;
}
}
}

View File

@ -25,5 +25,5 @@ public interface EventRepository extends BaseMapper<Event> {
Event selectById(@Param("id") Long id);
List<Event> selectByProductId(@Param("productId") Long id);
List<Event> selectByProductId(@Param("productId") Long id, @Param("enabled") Boolean enabled);
}

View File

@ -18,4 +18,6 @@ public interface EventService extends IService<Event> {
Event detail(Long id);
boolean delete(Long id);
boolean toggle(Long id);
}

View File

@ -9,10 +9,7 @@ import com.zsc.edu.gateway.modules.iot.tsl.mapper.EventMapper;
import com.zsc.edu.gateway.modules.iot.tsl.repo.EventRepository;
import com.zsc.edu.gateway.modules.iot.tsl.service.EventService;
import com.zsc.edu.gateway.modules.iot.tsl.service.ParamService;
import jakarta.annotation.Resource;
import lombok.AllArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -81,4 +78,14 @@ public class EventServiceImpl extends ServiceImpl<EventRepository, Event> implem
removeById(id);
return paramService.delete(id);
}
/**
* 切换启用状态
*/
@Override
public boolean toggle(Long id) {
Event event = detail(id);
event.setEnabled(!event.getEnabled());
return updateById(event);
}
}

View File

@ -18,7 +18,7 @@ import java.util.List;
@AllArgsConstructor
@RestController
@RequestMapping("/api/rest/log")
public class OperationController {
public class OperationLogController {
private OperationLogRepository repo;
/**

View File

@ -26,6 +26,7 @@ public enum ModuleTypeEnum implements IEnum<String>, IState<ModuleTypeEnum> {
notice("notice", "notice"),
bulletin("bulletin", "bulletin"),
attachment("attachment", "attachment"),
operationLog("operationLog", "operationLog"),
other("other", "other");
private final String code;

View File

@ -57,13 +57,19 @@ public class OperationLogAspect {
OperationLogAnnotation operationLogAnnotation = ((MethodSignature) joinPoint.getSignature()).getMethod().
getAnnotation(OperationLogAnnotation.class);
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
// 获取方法名
String methodName = joinPoint.getSignature().getName().toLowerCase();
// 根据方法名判定方法类型
String methodType = FunctionTypeEnum.getMessageByCode(methodName);
String methodType;
if (methodName.contains("update")) {
methodType = "update";
} else if (methodName.contains("delete")) {
methodType = "delete";
} else if (methodName.contains("create")) {
methodType = "create";
} else {
methodType = "other";
}
// 获取类名
String className = joinPoint.getTarget().getClass().getSimpleName().toLowerCase();

View File

@ -9,8 +9,10 @@
<result column="type" property="type"/>
<result column="identifier" property="identifier"/>
<result column="name" property="name"/>
<result column="enabled" property="enabled"/>
<result column="remark" property="remark"/>
<collection property="outputs" ofType="com.zsc.edu.gateway.modules.iot.tsl.entity.Param" autoMapping="true"
<collection property="outputs" ofType="com.zsc.edu.gateway.modules.iot.tsl.entity.CompareParam"
autoMapping="true"
columnPrefix="param_">
<id column="id" property="id"/>
<result column="data_type" property="dataType"/>
@ -34,7 +36,9 @@
ip.type as param_type,
ip.identifier as param_identifier,
ip.name as param_name,
ip.remark as param_remark
ip.remark as param_remark,
ip.compare_type as param_compare_type,
ip.default_value as param_default_value
from iot_event e
left join iot_param ip on e.id = ip.foreign_id
and ip.foreign_type = 1
@ -55,5 +59,6 @@
left join iot_param ip on e.id = ip.foreign_id
and ip.foreign_type = 1
where e.product_id = #{productId}
and e.enabled = true
</select>
</mapper>

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB