From 0644faf2626e8d65f86b714f62e0fabc60fbb1d0 Mon Sep 17 00:00:00 2001 From: zhuangtianxiang <2913129173@qq.com> Date: Wed, 19 Mar 2025 15:30:00 +0800 Subject: [PATCH] =?UTF-8?q?feat(message):=20=E5=AE=9E=E7=8E=B0=E5=9F=BA?= =?UTF-8?q?=E4=BA=8E=20WebSocket=20=E7=9A=84=E8=AE=BE=E5=A4=87=E3=80=81?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E5=92=8C=E4=BA=A7=E5=93=81=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E6=8E=A8=E9=80=81-=20=E6=96=B0=E5=A2=9E=20WebSocket=20?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E6=9B=BF=E4=BB=A3=E5=8E=9F=E6=9C=89=E7=9A=84?= =?UTF-8?q?=20SSE=20=E6=8E=A5=E5=8F=A3=20-=20=E5=AE=9E=E7=8E=B0=E9=80=9A?= =?UTF-8?q?=E7=94=A8=E7=9A=84=20WebSocket=20=E5=A4=84=E7=90=86=E5=99=A8?= =?UTF-8?q?=E5=92=8C=E9=85=8D=E7=BD=AE=20-=20=E6=B7=BB=E5=8A=A0=20WebSocke?= =?UTF-8?q?t=E6=8B=A6=E6=88=AA=E5=99=A8=E7=94=A8=E4=BA=8E=E5=A4=84?= =?UTF-8?q?=E7=90=86=E4=B8=8D=E5=90=8C=E7=9A=84=E8=B7=AF=E5=BE=84=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=20-=20=E6=9B=B4=E6=96=B0=E8=AE=BE=E5=A4=87=E3=80=81?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=E5=92=8C=E4=BA=A7=E5=93=81=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=20WebSocket=20=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E6=8E=A8=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 ++ .../websocket/GenericWebSocketHandler.java | 61 +++++++++++++++++++ .../message/websocket/WebSocketConfig.java | 27 ++++++++ .../websocket/WebSocketInterceptor.java | 60 ++++++++++++++++++ .../device/controller/DeviceController.java | 20 ++++-- .../iot/device/service/DeviceService.java | 2 + .../service/impl/DeviceServiceImpl.java | 10 +++ .../product/controller/ProductController.java | 6 +- 8 files changed, 182 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/zsc/edu/gateway/framework/message/websocket/GenericWebSocketHandler.java create mode 100644 src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketConfig.java create mode 100644 src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketInterceptor.java diff --git a/pom.xml b/pom.xml index cbe0096..0ae4ba0 100644 --- a/pom.xml +++ b/pom.xml @@ -142,6 +142,11 @@ 3.0.0 + + org.springframework.boot + spring-boot-starter-websocket + 3.4.0 + org.slf4j slf4j-api diff --git a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/GenericWebSocketHandler.java b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/GenericWebSocketHandler.java new file mode 100644 index 0000000..382776d --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/GenericWebSocketHandler.java @@ -0,0 +1,61 @@ +package com.zsc.edu.gateway.framework.message.websocket; + +import org.springframework.stereotype.Component; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +/** + * @author zhuang + */ +@Component +public class GenericWebSocketHandler extends TextWebSocketHandler { + + private final ExecutorService executorService = Executors.newCachedThreadPool(); + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + // 从 session 中获取业务逻辑的 Supplier + Supplier dataSupplier = (Supplier) session.getAttributes().get("dataSupplier"); + + if (dataSupplier == null) { + session.close(); + return; + } + + AtomicBoolean isCompleted = new AtomicBoolean(false); + + executorService.execute(() -> { + try { + while (!isCompleted.get()) { + Object data = dataSupplier.get(); + if (data == null) { + break; + } + if (!isCompleted.get()) { + session.sendMessage(new TextMessage(data.toString())); + } + Thread.sleep(5000); // 每隔 5 秒发送一次数据 + } + } catch (IOException | InterruptedException e) { + try { + session.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } + } finally { + try { + session.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + } +} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketConfig.java b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketConfig.java new file mode 100644 index 0000000..ac8b100 --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketConfig.java @@ -0,0 +1,27 @@ +package com.zsc.edu.gateway.framework.message.websocket; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +/** + * @author zhuang + */ +@Configuration +@EnableWebSocket +public class WebSocketConfig implements WebSocketConfigurer { + + private final GenericWebSocketHandler genericWebSocketHandler; + + public WebSocketConfig(GenericWebSocketHandler genericWebSocketHandler) { + this.genericWebSocketHandler = genericWebSocketHandler; + } + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(genericWebSocketHandler, "/ws/{path}") + .setAllowedOrigins("*") + .addInterceptors(new WebSocketInterceptor()); + } +} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketInterceptor.java b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketInterceptor.java new file mode 100644 index 0000000..e3db3ee --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketInterceptor.java @@ -0,0 +1,60 @@ +package com.zsc.edu.gateway.framework.message.websocket; + +import com.zsc.edu.gateway.modules.iot.device.service.DeviceService; +import com.zsc.edu.gateway.modules.iot.product.service.ProductService; +import com.zsc.edu.gateway.modules.iot.record.service.RecordDataService; +import jakarta.annotation.Resource; +import jakarta.servlet.http.HttpServletRequest; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.http.server.ServletServerHttpRequest; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.HandshakeInterceptor; + +import java.util.Map; +import java.util.function.Supplier; + +/** + * @author lenovo + */ +public class WebSocketInterceptor implements HandshakeInterceptor { + @Resource + private DeviceService deviceService; + @Resource + private RecordDataService recordService; + @Resource + private ProductService productService; + + @Override + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { + if (request instanceof ServletServerHttpRequest) { + HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); + // 获取路径变量 + String path = servletRequest.getRequestURI(); + String[] pathParts = path.split("/"); + String pathVariable = pathParts[pathParts.length - 1]; + + // 根据路径变量设置不同的业务逻辑 Supplier + switch (pathVariable) { + case "device/status": + attributes.put("dataSupplier", (Supplier) () -> String.valueOf(deviceService.status())); + break; + case "record/status": + attributes.put("dataSupplier", (Supplier) () -> String.valueOf(recordService.recordDataStatus())); + break; + case "product/status": + attributes.put("dataSupplier", (Supplier) () -> String.valueOf(productService.status())); + break; + default: + attributes.put("dataSupplier", (Supplier) () -> "Unknown path: " + pathVariable); + break; + } + } + return true; + } + + @Override + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { + // 握手完成后不需要额外操作 + } +} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java b/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java index b2e39c0..c13a7c9 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java @@ -141,21 +141,21 @@ public class DeviceController { /** * 获取设备信息 */ - @GetMapping(value = "/sse/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @GetMapping(value = "/ws/device/status") @PreAuthorize("hasAuthority('iot:device:query')") @DataPermission - public SseEmitter status() { - return sseConfig.sendSseEvents(service::status); + public String status() { + return "websocket-device-status"; } /** * 获取设备上报信息 */ - @GetMapping(value = "/sse/record/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @GetMapping(value = "/ws/record/status") @PreAuthorize("hasAuthority('iot:device:query')") @DataPermission - public SseEmitter recordDataStatus() { - return sseConfig.sendSseEvents(recordService::recordDataStatus); + public String recordDataStatus() { + return "websocket-record-status"; } @@ -168,4 +168,12 @@ public class DeviceController { return service.send(deviceId, qos, paras); } + /** + * 切换在线状态 + */ + @PatchMapping("/toggle/{id}") + @OperationLogAnnotation(content = "'设备在线状态'", operationType = "更新") + public int toggle(@PathVariable("id") Long id) { + return service.toggle(id); + } } diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/DeviceService.java b/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/DeviceService.java index b9ccace..25d581f 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/DeviceService.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/DeviceService.java @@ -36,4 +36,6 @@ public interface DeviceService extends IService { String send(Long deviceId, Integer qos, JSONObject paras) throws JSONException; List findByName(String name); + + int toggle(Long id); } diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java b/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java index 2b62acb..d5ad87f 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java @@ -231,4 +231,14 @@ public class DeviceServiceImpl extends ServiceImpl imp queryWrapper.like(Device::getName, name); return baseMapper.selectList(queryWrapper); } + + /** + * 切换在线状态返回切换数 + */ + @Override + public int toggle(Long id) { + Device device = baseMapper.selectById(id); + device.setOnline(!device.getOnline()); + return baseMapper.updateById(device); + } } diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/product/controller/ProductController.java b/src/main/java/com/zsc/edu/gateway/modules/iot/product/controller/ProductController.java index 8114d97..a48bc3d 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/product/controller/ProductController.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/product/controller/ProductController.java @@ -121,11 +121,11 @@ public class ProductController { /** * 查询产品各类信息 */ - @GetMapping(value = "/sse/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @GetMapping(value = "/ws/product/status") @DataPermission @PreAuthorize("hasAuthority('iot:product:query')") - public SseEmitter status() { - return sseConfig.sendSseEvents(service::status); + public String status() { + return "websocket-product-status"; } }