refactor(iot): 重构设备和产品模块的 WebSocket 消息发送逻辑
- 移除了 Device 和 Product 控制器中的字符串返回值,改为返回服务层对象- 优化了 Event 和 Param服务的实现,使用流式处理提高效率 - 删除了未使用的 SseConfig 类和 WebSocketServer 类 - 调整了 WebSocketConfig 和 WebSocketInterceptor 中的路径配置 - 移除了 Device 和 DeviceVo 中的冗余字段
This commit is contained in:
parent
4a1abef98a
commit
182d4c7961
@ -1,48 +0,0 @@
|
|||||||
package com.zsc.edu.gateway.framework.message.sse;
|
|
||||||
|
|
||||||
import org.springframework.scheduling.annotation.Async;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.function.Supplier;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author zhuang
|
|
||||||
*/
|
|
||||||
@Component
|
|
||||||
public class SseConfig {
|
|
||||||
private final ExecutorService executorService = Executors.newCachedThreadPool();
|
|
||||||
|
|
||||||
public SseEmitter sendSseEvents(Supplier<Object> dataSupplier) {
|
|
||||||
SseEmitter emitter = new SseEmitter(30000L);
|
|
||||||
|
|
||||||
AtomicBoolean isCompleted = new AtomicBoolean(false);
|
|
||||||
emitter.onCompletion(() -> isCompleted.set(true));
|
|
||||||
|
|
||||||
executorService.execute(() -> {
|
|
||||||
try {
|
|
||||||
while (!isCompleted.get()) {
|
|
||||||
Object data = dataSupplier.get();
|
|
||||||
if (data == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (!isCompleted.get()) {
|
|
||||||
emitter.send(SseEmitter.event().data(data));
|
|
||||||
}
|
|
||||||
Thread.sleep(5000);
|
|
||||||
}
|
|
||||||
} catch (IOException | InterruptedException e) {
|
|
||||||
emitter.completeWithError(e);
|
|
||||||
} finally {
|
|
||||||
emitter.complete();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return emitter;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
@ -20,7 +20,7 @@ public class WebSocketConfig implements WebSocketConfigurer {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
|
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
|
||||||
registry.addHandler(genericWebSocketHandler, "/ws/{path}")
|
registry.addHandler(genericWebSocketHandler, "/api/rest/device/ws/device/status", "/api/rest/device/ws/record/status", "/api/rest/product/ws/product/status")
|
||||||
.setAllowedOrigins("*")
|
.setAllowedOrigins("*")
|
||||||
.addInterceptors(new WebSocketInterceptor());
|
.addInterceptors(new WebSocketInterceptor());
|
||||||
}
|
}
|
||||||
|
@ -36,13 +36,13 @@ public class WebSocketInterceptor implements HandshakeInterceptor {
|
|||||||
|
|
||||||
// 根据路径变量设置不同的业务逻辑 Supplier
|
// 根据路径变量设置不同的业务逻辑 Supplier
|
||||||
switch (pathVariable) {
|
switch (pathVariable) {
|
||||||
case "device/status":
|
case "/api/rest/device/ws/device/status":
|
||||||
attributes.put("dataSupplier", (Supplier<String>) () -> String.valueOf(deviceService.status()));
|
attributes.put("dataSupplier", (Supplier<String>) () -> String.valueOf(deviceService.status()));
|
||||||
break;
|
break;
|
||||||
case "record/status":
|
case "/api/rest/device/ws/record/status":
|
||||||
attributes.put("dataSupplier", (Supplier<String>) () -> String.valueOf(recordService.recordDataStatus()));
|
attributes.put("dataSupplier", (Supplier<String>) () -> String.valueOf(recordService.recordDataStatus()));
|
||||||
break;
|
break;
|
||||||
case "product/status":
|
case "/api/rest/product/ws/product/status":
|
||||||
attributes.put("dataSupplier", (Supplier<String>) () -> String.valueOf(productService.status()));
|
attributes.put("dataSupplier", (Supplier<String>) () -> String.valueOf(productService.status()));
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -1,4 +0,0 @@
|
|||||||
package com.zsc.edu.gateway.framework.message.websocket;
|
|
||||||
|
|
||||||
public class WebSocketServer {
|
|
||||||
}
|
|
@ -144,8 +144,8 @@ public class DeviceController {
|
|||||||
@GetMapping(value = "/ws/device/status")
|
@GetMapping(value = "/ws/device/status")
|
||||||
@PreAuthorize("hasAuthority('iot:device:query')")
|
@PreAuthorize("hasAuthority('iot:device:query')")
|
||||||
@DataPermission
|
@DataPermission
|
||||||
public String status() {
|
public Object status() {
|
||||||
return "websocket-device-status";
|
return service.status();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -154,8 +154,8 @@ public class DeviceController {
|
|||||||
@GetMapping(value = "/ws/record/status")
|
@GetMapping(value = "/ws/record/status")
|
||||||
@PreAuthorize("hasAuthority('iot:device:query')")
|
@PreAuthorize("hasAuthority('iot:device:query')")
|
||||||
@DataPermission
|
@DataPermission
|
||||||
public String recordDataStatus() {
|
public Object recordDataStatus() {
|
||||||
return "websocket-record-status";
|
return recordService.recordDataStatus();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -4,9 +4,10 @@ import com.baomidou.mybatisplus.annotation.FieldFill;
|
|||||||
import com.baomidou.mybatisplus.annotation.IEnum;
|
import com.baomidou.mybatisplus.annotation.IEnum;
|
||||||
import com.baomidou.mybatisplus.annotation.TableField;
|
import com.baomidou.mybatisplus.annotation.TableField;
|
||||||
import com.baomidou.mybatisplus.annotation.TableName;
|
import com.baomidou.mybatisplus.annotation.TableName;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
|
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
|
||||||
import com.zsc.edu.gateway.common.enums.IState;
|
import com.zsc.edu.gateway.common.enums.IState;
|
||||||
import com.zsc.edu.gateway.framework.json.JsonbTypeHandler;
|
import com.zsc.edu.gateway.framework.json.JsonbTypeHandler;
|
||||||
import com.zsc.edu.gateway.modules.attachment.entity.Attachment;
|
|
||||||
import com.zsc.edu.gateway.modules.iot.product.entity.Product;
|
import com.zsc.edu.gateway.modules.iot.product.entity.Product;
|
||||||
import com.zsc.edu.gateway.modules.system.entity.BaseEntity;
|
import com.zsc.edu.gateway.modules.system.entity.BaseEntity;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
@ -24,6 +25,7 @@ import java.util.Map;
|
|||||||
@Getter
|
@Getter
|
||||||
@NoArgsConstructor
|
@NoArgsConstructor
|
||||||
@AllArgsConstructor
|
@AllArgsConstructor
|
||||||
|
@JsonInclude
|
||||||
@TableName("iot_device")
|
@TableName("iot_device")
|
||||||
public class Device extends BaseEntity {
|
public class Device extends BaseEntity {
|
||||||
|
|
||||||
@ -114,22 +116,12 @@ public class Device extends BaseEntity {
|
|||||||
*/
|
*/
|
||||||
public String iconId;
|
public String iconId;
|
||||||
|
|
||||||
/**
|
|
||||||
* 设备图标附件
|
|
||||||
*/
|
|
||||||
@TableField(exist = false)
|
|
||||||
public Attachment icon;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 设备预览图附件ID
|
* 设备预览图附件ID
|
||||||
*/
|
*/
|
||||||
public String previewId;
|
public String previewId;
|
||||||
|
|
||||||
/**
|
|
||||||
* 设备预览图附件
|
|
||||||
*/
|
|
||||||
@TableField(exist = false)
|
|
||||||
public Attachment preview;
|
|
||||||
|
|
||||||
public enum Status implements IEnum<Integer>, IState<Status> {
|
public enum Status implements IEnum<Integer>, IState<Status> {
|
||||||
UNACTIVATED(0, "未激活"),
|
UNACTIVATED(0, "未激活"),
|
||||||
|
@ -227,12 +227,10 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceRepository, Device> imp
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<Device> findByName(String name) {
|
public List<Device> findByName(String name) {
|
||||||
if (name == null) {
|
|
||||||
return baseMapper.selectList(new QueryWrapper<>());
|
|
||||||
}
|
|
||||||
return baseMapper.selectListByName(name);
|
return baseMapper.selectListByName(name);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 切换在线状态返回切换数
|
* 切换在线状态返回切换数
|
||||||
*/
|
*/
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
package com.zsc.edu.gateway.modules.iot.device.vo;
|
package com.zsc.edu.gateway.modules.iot.device.vo;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.annotation.TableField;
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import com.zsc.edu.gateway.framework.json.JsonbTypeHandler;
|
|
||||||
import com.zsc.edu.gateway.modules.attachment.entity.Attachment;
|
import com.zsc.edu.gateway.modules.attachment.entity.Attachment;
|
||||||
import com.zsc.edu.gateway.modules.iot.device.entity.Device;
|
import com.zsc.edu.gateway.modules.iot.device.entity.Device;
|
||||||
import com.zsc.edu.gateway.modules.iot.product.entity.Product;
|
import com.zsc.edu.gateway.modules.iot.product.entity.Product;
|
||||||
@ -117,6 +115,7 @@ public class DeviceVo {
|
|||||||
*/
|
*/
|
||||||
public Attachment icon;
|
public Attachment icon;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 设备预览图附件ID
|
* 设备预览图附件ID
|
||||||
*/
|
*/
|
||||||
|
@ -1,6 +1,5 @@
|
|||||||
package com.zsc.edu.gateway.modules.iot.product.controller;
|
package com.zsc.edu.gateway.modules.iot.product.controller;
|
||||||
|
|
||||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
|
||||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||||
import com.zsc.edu.gateway.framework.message.sse.SseConfig;
|
import com.zsc.edu.gateway.framework.message.sse.SseConfig;
|
||||||
import com.zsc.edu.gateway.framework.mybatisplus.DataPermission;
|
import com.zsc.edu.gateway.framework.mybatisplus.DataPermission;
|
||||||
@ -8,23 +7,13 @@ import com.zsc.edu.gateway.modules.iot.product.dto.ProductDto;
|
|||||||
import com.zsc.edu.gateway.modules.iot.product.entity.Product;
|
import com.zsc.edu.gateway.modules.iot.product.entity.Product;
|
||||||
import com.zsc.edu.gateway.modules.iot.product.query.ProductQuery;
|
import com.zsc.edu.gateway.modules.iot.product.query.ProductQuery;
|
||||||
import com.zsc.edu.gateway.modules.iot.product.service.ProductService;
|
import com.zsc.edu.gateway.modules.iot.product.service.ProductService;
|
||||||
import com.zsc.edu.gateway.modules.iot.product.service.impl.ProductServiceImpl;
|
|
||||||
import com.zsc.edu.gateway.modules.iot.product.vo.ProductStatusVo;
|
|
||||||
import com.zsc.edu.gateway.modules.operationLog.entity.OperationLogAnnotation;
|
import com.zsc.edu.gateway.modules.operationLog.entity.OperationLogAnnotation;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import org.springframework.data.domain.Pageable;
|
|
||||||
import org.springframework.data.domain.Sort;
|
|
||||||
import org.springframework.data.web.PageableDefault;
|
|
||||||
import org.springframework.http.MediaType;
|
|
||||||
import org.springframework.security.access.prepost.PreAuthorize;
|
import org.springframework.security.access.prepost.PreAuthorize;
|
||||||
import org.springframework.web.bind.annotation.*;
|
import org.springframework.web.bind.annotation.*;
|
||||||
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collection;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author zhuang
|
* @author zhuang
|
||||||
@ -124,8 +113,8 @@ public class ProductController {
|
|||||||
@GetMapping(value = "/ws/product/status")
|
@GetMapping(value = "/ws/product/status")
|
||||||
@DataPermission
|
@DataPermission
|
||||||
@PreAuthorize("hasAuthority('iot:product:query')")
|
@PreAuthorize("hasAuthority('iot:product:query')")
|
||||||
public String status() {
|
public Object status() {
|
||||||
return "websocket-product-status";
|
return service.status();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -14,8 +14,6 @@ public interface ParamService extends IService<Param> {
|
|||||||
|
|
||||||
Boolean create(List<ParamDto> params, Long id, Param.ForeignType foreignType);
|
Boolean create(List<ParamDto> params, Long id, Param.ForeignType foreignType);
|
||||||
|
|
||||||
Boolean createCompareParam(List<ParamDto> params, Long id, Param.ForeignType foreignType);
|
|
||||||
|
|
||||||
Boolean update(List<ParamDto> paramDto, Long id);
|
Boolean update(List<ParamDto> paramDto, Long id);
|
||||||
|
|
||||||
Boolean delete(Long id);
|
Boolean delete(Long id);
|
||||||
|
@ -3,9 +3,12 @@ package com.zsc.edu.gateway.modules.iot.tsl.service.impl;
|
|||||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
import com.zsc.edu.gateway.exception.ConstraintException;
|
import com.zsc.edu.gateway.exception.ConstraintException;
|
||||||
import com.zsc.edu.gateway.modules.iot.tsl.dto.EventDto;
|
import com.zsc.edu.gateway.modules.iot.tsl.dto.EventDto;
|
||||||
|
import com.zsc.edu.gateway.modules.iot.tsl.entity.CompareParam;
|
||||||
import com.zsc.edu.gateway.modules.iot.tsl.entity.Event;
|
import com.zsc.edu.gateway.modules.iot.tsl.entity.Event;
|
||||||
import com.zsc.edu.gateway.modules.iot.tsl.entity.Param;
|
import com.zsc.edu.gateway.modules.iot.tsl.entity.Param;
|
||||||
|
import com.zsc.edu.gateway.modules.iot.tsl.mapper.CompareParamMapper;
|
||||||
import com.zsc.edu.gateway.modules.iot.tsl.mapper.EventMapper;
|
import com.zsc.edu.gateway.modules.iot.tsl.mapper.EventMapper;
|
||||||
|
import com.zsc.edu.gateway.modules.iot.tsl.repo.CompareParamRepository;
|
||||||
import com.zsc.edu.gateway.modules.iot.tsl.repo.EventRepository;
|
import com.zsc.edu.gateway.modules.iot.tsl.repo.EventRepository;
|
||||||
import com.zsc.edu.gateway.modules.iot.tsl.service.EventService;
|
import com.zsc.edu.gateway.modules.iot.tsl.service.EventService;
|
||||||
import com.zsc.edu.gateway.modules.iot.tsl.service.ParamService;
|
import com.zsc.edu.gateway.modules.iot.tsl.service.ParamService;
|
||||||
@ -13,6 +16,9 @@ import lombok.AllArgsConstructor;
|
|||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author Yao
|
* @author Yao
|
||||||
*/
|
*/
|
||||||
@ -21,6 +27,8 @@ import org.springframework.transaction.annotation.Transactional;
|
|||||||
public class EventServiceImpl extends ServiceImpl<EventRepository, Event> implements EventService {
|
public class EventServiceImpl extends ServiceImpl<EventRepository, Event> implements EventService {
|
||||||
private final EventMapper mapper;
|
private final EventMapper mapper;
|
||||||
private final ParamService paramService;
|
private final ParamService paramService;
|
||||||
|
private final CompareParamRepository compareParamRepository;
|
||||||
|
private final CompareParamMapper compareParamMapper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 新建物模型事件
|
* 新建物模型事件
|
||||||
@ -36,7 +44,15 @@ public class EventServiceImpl extends ServiceImpl<EventRepository, Event> implem
|
|||||||
}
|
}
|
||||||
Event event = mapper.toEntity(dto);
|
Event event = mapper.toEntity(dto);
|
||||||
save(event);
|
save(event);
|
||||||
paramService.createCompareParam(dto.getOutputs(), event.getId(), Param.ForeignType.EVENT);
|
List<CompareParam> paramsToInsert = dto.getOutputs().stream()
|
||||||
|
.map(output -> {
|
||||||
|
CompareParam param = compareParamMapper.toEntity(output);
|
||||||
|
param.setForeignId(event.getId());
|
||||||
|
param.setForeignType(Param.ForeignType.EVENT);
|
||||||
|
return param;
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
compareParamRepository.insert(paramsToInsert);
|
||||||
return event;
|
return event;
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
|
@ -44,20 +44,6 @@ public class ParamServiceImpl extends ServiceImpl<ParamRepository, Param> implem
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
@Transactional
|
|
||||||
public Boolean createCompareParam(List<ParamDto> params, Long id, Param.ForeignType foreignType) {
|
|
||||||
List<CompareParam> paramsToInsert = params.stream()
|
|
||||||
.map(dto -> {
|
|
||||||
CompareParam param = compareParamMapper.toEntity(dto);
|
|
||||||
param.setForeignId(id);
|
|
||||||
param.setForeignType(foreignType);
|
|
||||||
return param;
|
|
||||||
})
|
|
||||||
.collect(Collectors.toList());
|
|
||||||
compareParamRepository.insert(paramsToInsert);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@Transactional
|
@Transactional
|
||||||
|
@ -99,8 +99,6 @@
|
|||||||
ip.name as param_name,
|
ip.name as param_name,
|
||||||
ip.remark as param_remark,
|
ip.remark as param_remark,
|
||||||
ai.id as icon_id,
|
ai.id as icon_id,
|
||||||
ai.file_name as icon_file_name,
|
|
||||||
ap.file_name as preview_file_name,
|
|
||||||
ap.id as preview_id
|
ap.id as preview_id
|
||||||
from iot_device d
|
from iot_device d
|
||||||
left join iot_product p on d.product_id = p.id
|
left join iot_product p on d.product_id = p.id
|
||||||
|
Loading…
Reference in New Issue
Block a user