feat(message): 实现基于 WebSocket 的设备、记录和产品状态推送- 新增 WebSocket 接口替代原有的 SSE 接口

- 实现通用的 WebSocket 处理器和配置
- 添加 WebSocket拦截器用于处理不同的路径请求
- 更新设备、记录和产品服务,支持 WebSocket 状态推送
This commit is contained in:
zhuangtianxiang 2025-03-19 15:30:00 +08:00
parent a1a089583f
commit 0644faf262
8 changed files with 182 additions and 9 deletions

View File

@ -142,6 +142,11 @@
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>3.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>

View File

@ -0,0 +1,61 @@
package com.zsc.edu.gateway.framework.message.websocket;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
* @author zhuang
*/
@Component
public class GenericWebSocketHandler extends TextWebSocketHandler {
private final ExecutorService executorService = Executors.newCachedThreadPool();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
// session 中获取业务逻辑的 Supplier
Supplier<Object> dataSupplier = (Supplier<Object>) session.getAttributes().get("dataSupplier");
if (dataSupplier == null) {
session.close();
return;
}
AtomicBoolean isCompleted = new AtomicBoolean(false);
executorService.execute(() -> {
try {
while (!isCompleted.get()) {
Object data = dataSupplier.get();
if (data == null) {
break;
}
if (!isCompleted.get()) {
session.sendMessage(new TextMessage(data.toString()));
}
Thread.sleep(5000); // 每隔 5 秒发送一次数据
}
} catch (IOException | InterruptedException e) {
try {
session.close();
} catch (IOException ex) {
ex.printStackTrace();
}
} finally {
try {
session.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}

View File

@ -0,0 +1,27 @@
package com.zsc.edu.gateway.framework.message.websocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
/**
* @author zhuang
*/
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final GenericWebSocketHandler genericWebSocketHandler;
public WebSocketConfig(GenericWebSocketHandler genericWebSocketHandler) {
this.genericWebSocketHandler = genericWebSocketHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(genericWebSocketHandler, "/ws/{path}")
.setAllowedOrigins("*")
.addInterceptors(new WebSocketInterceptor());
}
}

View File

@ -0,0 +1,60 @@
package com.zsc.edu.gateway.framework.message.websocket;
import com.zsc.edu.gateway.modules.iot.device.service.DeviceService;
import com.zsc.edu.gateway.modules.iot.product.service.ProductService;
import com.zsc.edu.gateway.modules.iot.record.service.RecordDataService;
import jakarta.annotation.Resource;
import jakarta.servlet.http.HttpServletRequest;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.Map;
import java.util.function.Supplier;
/**
* @author lenovo
*/
public class WebSocketInterceptor implements HandshakeInterceptor {
@Resource
private DeviceService deviceService;
@Resource
private RecordDataService recordService;
@Resource
private ProductService productService;
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest();
// 获取路径变量
String path = servletRequest.getRequestURI();
String[] pathParts = path.split("/");
String pathVariable = pathParts[pathParts.length - 1];
// 根据路径变量设置不同的业务逻辑 Supplier
switch (pathVariable) {
case "device/status":
attributes.put("dataSupplier", (Supplier<String>) () -> String.valueOf(deviceService.status()));
break;
case "record/status":
attributes.put("dataSupplier", (Supplier<String>) () -> String.valueOf(recordService.recordDataStatus()));
break;
case "product/status":
attributes.put("dataSupplier", (Supplier<String>) () -> String.valueOf(productService.status()));
break;
default:
attributes.put("dataSupplier", (Supplier<String>) () -> "Unknown path: " + pathVariable);
break;
}
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
// 握手完成后不需要额外操作
}
}

View File

@ -141,21 +141,21 @@ public class DeviceController {
/**
* 获取设备信息
*/
@GetMapping(value = "/sse/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@GetMapping(value = "/ws/device/status")
@PreAuthorize("hasAuthority('iot:device:query')")
@DataPermission
public SseEmitter status() {
return sseConfig.sendSseEvents(service::status);
public String status() {
return "websocket-device-status";
}
/**
* 获取设备上报信息
*/
@GetMapping(value = "/sse/record/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@GetMapping(value = "/ws/record/status")
@PreAuthorize("hasAuthority('iot:device:query')")
@DataPermission
public SseEmitter recordDataStatus() {
return sseConfig.sendSseEvents(recordService::recordDataStatus);
public String recordDataStatus() {
return "websocket-record-status";
}
@ -168,4 +168,12 @@ public class DeviceController {
return service.send(deviceId, qos, paras);
}
/**
* 切换在线状态
*/
@PatchMapping("/toggle/{id}")
@OperationLogAnnotation(content = "'设备在线状态'", operationType = "更新")
public int toggle(@PathVariable("id") Long id) {
return service.toggle(id);
}
}

View File

@ -36,4 +36,6 @@ public interface DeviceService extends IService<Device> {
String send(Long deviceId, Integer qos, JSONObject paras) throws JSONException;
List<Device> findByName(String name);
int toggle(Long id);
}

View File

@ -231,4 +231,14 @@ public class DeviceServiceImpl extends ServiceImpl<DeviceRepository, Device> imp
queryWrapper.like(Device::getName, name);
return baseMapper.selectList(queryWrapper);
}
/**
* 切换在线状态返回切换数
*/
@Override
public int toggle(Long id) {
Device device = baseMapper.selectById(id);
device.setOnline(!device.getOnline());
return baseMapper.updateById(device);
}
}

View File

@ -121,11 +121,11 @@ public class ProductController {
/**
* 查询产品各类信息
*/
@GetMapping(value = "/sse/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@GetMapping(value = "/ws/product/status")
@DataPermission
@PreAuthorize("hasAuthority('iot:product:query')")
public SseEmitter status() {
return sseConfig.sendSseEvents(service::status);
public String status() {
return "websocket-product-status";
}
}