feat(iot): 实现设备和产品信息的 SSE 推送- 在 DeviceController 和 ProductController 中添加 SSE 接口

- 新增 SseConfig 类用于处理 SSE
This commit is contained in:
zhuangtianxiang 2025-03-07 17:13:15 +08:00
parent 36f4f8abde
commit 58a9f08c3b
4 changed files with 160 additions and 91 deletions

View File

@ -0,0 +1,50 @@
package com.zsc.edu.gateway.framework.message.sse;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
* @author zhuang
*/
@Component
public class SseConfig {
private final ExecutorService executorService = Executors.newCachedThreadPool();
public SseEmitter sendSseEvents(Supplier<Object> dataSupplier) {
SseEmitter emitter = new SseEmitter(30000L);
AtomicBoolean isCompleted = new AtomicBoolean(false);
emitter.onCompletion(() -> isCompleted.set(true));
executorService.execute(() -> {
try {
while (!isCompleted.get()) {
Object data = dataSupplier.get();
if (data == null) {
break;
}
if (!isCompleted.get()) {
emitter.send(SseEmitter.event().data(data));
}
Thread.sleep(5000);
}
} catch (IOException | InterruptedException e) {
emitter.completeWithError(e);
} finally {
emitter.complete();
}
});
return emitter;
}
}

View File

@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.zsc.edu.gateway.framework.message.sse.SseConfig;
import com.zsc.edu.gateway.framework.mybatisplus.DataPermission;
import com.zsc.edu.gateway.modules.iot.device.dto.BatchDeviceDto;
import com.zsc.edu.gateway.modules.iot.device.dto.DeviceDto;
@ -17,13 +18,16 @@ import com.zsc.edu.gateway.modules.iot.device.vo.DeviceVo;
import com.zsc.edu.gateway.modules.iot.record.entity.DataWarningVo;
import com.zsc.edu.gateway.modules.iot.record.entity.RecordData;
import com.zsc.edu.gateway.modules.iot.record.entity.RecordDataStatusVo;
import com.zsc.edu.gateway.modules.iot.record.repo.RecordDataRepository;
import com.zsc.edu.gateway.modules.iot.record.service.RecordDataService;
import jakarta.annotation.Resource;
import lombok.AllArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
@ -38,6 +42,8 @@ public class DeviceController {
DeviceService service;
RecordDataService recordService;
@Resource
private SseConfig sseConfig;
/**
* 创建设备
@ -117,37 +123,38 @@ public class DeviceController {
*/
@GetMapping("record/data")
@PreAuthorize("hasAuthority('iot:device:query')")
@DataPermission
public IPage<RecordData> recordData(Page<RecordData> page, String clientId) {
return recordService.query(page, clientId);
}
/**
* 查询设备信息数量
* 获取设备信息
*/
@GetMapping("status")
@GetMapping(value = "/sse/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@PreAuthorize("hasAuthority('iot:device:query')")
@DataPermission
public DeviceStatusVo status() {
return service.status();
public SseEmitter status() {
return sseConfig.sendSseEvents(service::status);
}
/**
* 查询设备消息信息
* 获取设备上报信息
*/
@GetMapping("/record/status")
@GetMapping(value = "/sse/record/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@PreAuthorize("hasAuthority('iot:device:query')")
@DataPermission
public RecordDataStatusVo recordDataStatus() {
return recordService.recordDataStatus();
public SseEmitter recordDataStatus() {
return sseConfig.sendSseEvents(recordService::recordDataStatus);
}
/**
* 查询设备告警信息
* 获取告警信息
*/
@GetMapping("/data/status")
@GetMapping(value = "/sse/data/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@PreAuthorize("hasAuthority('iot:device:query')")
@DataPermission
public DataWarningVo dataWarning() {
return recordService.dataWarning();
public SseEmitter dataWarning() {
return sseConfig.sendSseEvents(recordService::dataWarning);
}
/**
@ -157,4 +164,5 @@ public class DeviceController {
public String send(Long deviceId, Integer qos, @RequestBody JSONObject paras) throws JSONException {
return service.send(deviceId, qos, paras);
}
}

View File

@ -2,6 +2,7 @@ package com.zsc.edu.gateway.modules.iot.product.controller;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.zsc.edu.gateway.framework.message.sse.SseConfig;
import com.zsc.edu.gateway.framework.mybatisplus.DataPermission;
import com.zsc.edu.gateway.modules.iot.product.dto.ProductDto;
import com.zsc.edu.gateway.modules.iot.product.entity.Product;
@ -9,15 +10,20 @@ import com.zsc.edu.gateway.modules.iot.product.query.ProductQuery;
import com.zsc.edu.gateway.modules.iot.product.service.ProductService;
import com.zsc.edu.gateway.modules.iot.product.service.impl.ProductServiceImpl;
import com.zsc.edu.gateway.modules.iot.product.vo.ProductStatusVo;
import jakarta.annotation.Resource;
import lombok.AllArgsConstructor;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.web.PageableDefault;
import org.springframework.http.MediaType;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
/**
* @author zhuang
@ -29,6 +35,9 @@ public class ProductController {
private final ProductService service;
@Resource
SseConfig sseConfig;
/**
* 创建产品
*
@ -108,9 +117,11 @@ public class ProductController {
/**
* 查询产品各类信息
*/
@GetMapping("status")
@GetMapping(value = "/sse/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@DataPermission
public ProductStatusVo status() {
return service.status();
@PreAuthorize("hasAuthority('iot:product:query')")
public SseEmitter status() {
return sseConfig.sendSseEvents(service::status);
}
}

View File

@ -1,75 +1,75 @@
package com.zsc.edu.gateway.service.iot;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zsc.edu.gateway.domain.iot.DeviceBuilder;
import com.zsc.edu.gateway.exception.ConstraintException;
import com.zsc.edu.gateway.modules.iot.device.dto.DeviceDto;
import com.zsc.edu.gateway.modules.iot.device.entity.Device;
import com.zsc.edu.gateway.modules.iot.device.repo.DeviceRepository;
import com.zsc.edu.gateway.modules.iot.device.service.DeviceService;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
@SpringBootTest
public class DeviceServiceTest {
@Autowired
private DeviceService service;
@Autowired
private DeviceRepository repo;
private Device device1;
private Device device2;
@BeforeEach
void setUp() {
device1 = DeviceBuilder.aDevice().name("DEVICE_NAME_1").online(true).state(Device.Status.ACTIVATED).build();
repo.insert(device1);
device2 = DeviceBuilder.aDevice().name("DEVICE_NAME_2").online(false).state(Device.Status.ACTIVATED).build();
repo.insert(device2);
}
@AfterEach
void tearDown() {
repo.delete(new LambdaQueryWrapper<Device>()
.in(Device::getName, "DEVICE_NAME_1", "DEVICE_NAME_2", "DEVICE_3", "DEVICE_UPDATE"));
}
@Test
void list() {
LambdaQueryWrapper<Device> queryWrapper = new LambdaQueryWrapper<>();
assertEquals(2, service.list(queryWrapper.like(Device::getName, "DEVICE")).size());
assertEquals(1, service.list(queryWrapper.eq(Device::getName, device1.getName())).size());
// assertEquals(2, service.list().size());
}
@Test
void createProduct() {
DeviceDto dto = new DeviceDto("DEVICE_3", null, null, null, null, 121L, "备注");
Device device = service.create(dto);
assertNotNull(device);
List<Device> devices = service.list(new LambdaQueryWrapper<Device>().like(Device::getName, "DEVICE"));
assertEquals(3, devices.size());
// 不能创建其他已存在的同名同代码部门
DeviceDto dto2 = new DeviceDto(device2.getName(), null, null, null, null, 121L, "备注");
assertThrows(ConstraintException.class, () -> service.create(dto2));
}
@Test
void updateProduct() {
DeviceDto dto = new DeviceDto();
dto.setName("DEVICE_UPDATE");
service.update(dto, device2.getId());
Device updatedProduct = repo.selectOne(new LambdaQueryWrapper<Device>().eq(Device::getName, dto.getName()));
assertNotNull(updatedProduct);
assertEquals("DEVICE_UPDATE", updatedProduct.getName());
assertEquals(device2.getId(), updatedProduct.getId());
}
}
//package com.zsc.edu.gateway.service.iot;
//
//import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
//import com.zsc.edu.gateway.domain.iot.DeviceBuilder;
//import com.zsc.edu.gateway.exception.ConstraintException;
//import com.zsc.edu.gateway.modules.iot.device.dto.DeviceDto;
//import com.zsc.edu.gateway.modules.iot.device.entity.Device;
//import com.zsc.edu.gateway.modules.iot.device.repo.DeviceRepository;
//import com.zsc.edu.gateway.modules.iot.device.service.DeviceService;
//import org.junit.jupiter.api.AfterEach;
//import org.junit.jupiter.api.BeforeEach;
//import org.junit.jupiter.api.Test;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.boot.test.context.SpringBootTest;
//
//import java.util.List;
//
//import static org.junit.jupiter.api.Assertions.*;
//import static org.junit.jupiter.api.Assertions.assertEquals;
//
//@SpringBootTest
//public class DeviceServiceTest {
// @Autowired
// private DeviceService service;
// @Autowired
// private DeviceRepository repo;
//
// private Device device1;
// private Device device2;
//
// @BeforeEach
// void setUp() {
// device1 = DeviceBuilder.aDevice().name("DEVICE_NAME_1").online(true).state(Device.Status.ACTIVATED).build();
// repo.insert(device1);
// device2 = DeviceBuilder.aDevice().name("DEVICE_NAME_2").online(false).state(Device.Status.ACTIVATED).build();
// repo.insert(device2);
// }
//
// @AfterEach
// void tearDown() {
// repo.delete(new LambdaQueryWrapper<Device>()
// .in(Device::getName, "DEVICE_NAME_1", "DEVICE_NAME_2", "DEVICE_3", "DEVICE_UPDATE"));
// }
//
// @Test
// void list() {
// LambdaQueryWrapper<Device> queryWrapper = new LambdaQueryWrapper<>();
// assertEquals(2, service.list(queryWrapper.like(Device::getName, "DEVICE")).size());
// assertEquals(1, service.list(queryWrapper.eq(Device::getName, device1.getName())).size());
//// assertEquals(2, service.list().size());
// }
//
// @Test
// void createProduct() {
// DeviceDto dto = new DeviceDto("DEVICE_3", null, null, null, null, 121L, "备注");
// Device device = service.create(dto);
// assertNotNull(device);
// List<Device> devices = service.list(new LambdaQueryWrapper<Device>().like(Device::getName, "DEVICE"));
// assertEquals(3, devices.size());
// // 不能创建其他已存在的同名同代码部门
// DeviceDto dto2 = new DeviceDto(device2.getName(), null, null, null, null, 121L, "备注");
// assertThrows(ConstraintException.class, () -> service.create(dto2));
// }
//
// @Test
// void updateProduct() {
// DeviceDto dto = new DeviceDto();
// dto.setName("DEVICE_UPDATE");
// service.update(dto, device2.getId());
// Device updatedProduct = repo.selectOne(new LambdaQueryWrapper<Device>().eq(Device::getName, dto.getName()));
// assertNotNull(updatedProduct);
// assertEquals("DEVICE_UPDATE", updatedProduct.getName());
// assertEquals(device2.getId(), updatedProduct.getId());
// }
//}