diff --git a/pom.xml b/pom.xml index cbe0096..0ae4ba0 100644 --- a/pom.xml +++ b/pom.xml @@ -142,6 +142,11 @@ 3.0.0 + + org.springframework.boot + spring-boot-starter-websocket + 3.4.0 + org.slf4j slf4j-api diff --git a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/GenericWebSocketHandler.java b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/GenericWebSocketHandler.java new file mode 100644 index 0000000..382776d --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/GenericWebSocketHandler.java @@ -0,0 +1,61 @@ +package com.zsc.edu.gateway.framework.message.websocket; + +import org.springframework.stereotype.Component; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +/** + * @author zhuang + */ +@Component +public class GenericWebSocketHandler extends TextWebSocketHandler { + + private final ExecutorService executorService = Executors.newCachedThreadPool(); + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + // 从 session 中获取业务逻辑的 Supplier + Supplier dataSupplier = (Supplier) session.getAttributes().get("dataSupplier"); + + if (dataSupplier == null) { + session.close(); + return; + } + + AtomicBoolean isCompleted = new AtomicBoolean(false); + + executorService.execute(() -> { + try { + while (!isCompleted.get()) { + Object data = dataSupplier.get(); + if (data == null) { + break; + } + if (!isCompleted.get()) { + session.sendMessage(new TextMessage(data.toString())); + } + Thread.sleep(5000); // 每隔 5 秒发送一次数据 + } + } catch (IOException | InterruptedException e) { + try { + session.close(); + } catch (IOException ex) { + ex.printStackTrace(); + } + } finally { + try { + session.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + } +} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketConfig.java b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketConfig.java new file mode 100644 index 0000000..ac8b100 --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketConfig.java @@ -0,0 +1,27 @@ +package com.zsc.edu.gateway.framework.message.websocket; + +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +/** + * @author zhuang + */ +@Configuration +@EnableWebSocket +public class WebSocketConfig implements WebSocketConfigurer { + + private final GenericWebSocketHandler genericWebSocketHandler; + + public WebSocketConfig(GenericWebSocketHandler genericWebSocketHandler) { + this.genericWebSocketHandler = genericWebSocketHandler; + } + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(genericWebSocketHandler, "/ws/{path}") + .setAllowedOrigins("*") + .addInterceptors(new WebSocketInterceptor()); + } +} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketInterceptor.java b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketInterceptor.java new file mode 100644 index 0000000..e3db3ee --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketInterceptor.java @@ -0,0 +1,60 @@ +package com.zsc.edu.gateway.framework.message.websocket; + +import com.zsc.edu.gateway.modules.iot.device.service.DeviceService; +import com.zsc.edu.gateway.modules.iot.product.service.ProductService; +import com.zsc.edu.gateway.modules.iot.record.service.RecordDataService; +import jakarta.annotation.Resource; +import jakarta.servlet.http.HttpServletRequest; +import org.springframework.http.server.ServerHttpRequest; +import org.springframework.http.server.ServerHttpResponse; +import org.springframework.http.server.ServletServerHttpRequest; +import org.springframework.web.socket.WebSocketHandler; +import org.springframework.web.socket.server.HandshakeInterceptor; + +import java.util.Map; +import java.util.function.Supplier; + +/** + * @author lenovo + */ +public class WebSocketInterceptor implements HandshakeInterceptor { + @Resource + private DeviceService deviceService; + @Resource + private RecordDataService recordService; + @Resource + private ProductService productService; + + @Override + public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) throws Exception { + if (request instanceof ServletServerHttpRequest) { + HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); + // 获取路径变量 + String path = servletRequest.getRequestURI(); + String[] pathParts = path.split("/"); + String pathVariable = pathParts[pathParts.length - 1]; + + // 根据路径变量设置不同的业务逻辑 Supplier + switch (pathVariable) { + case "device/status": + attributes.put("dataSupplier", (Supplier) () -> String.valueOf(deviceService.status())); + break; + case "record/status": + attributes.put("dataSupplier", (Supplier) () -> String.valueOf(recordService.recordDataStatus())); + break; + case "product/status": + attributes.put("dataSupplier", (Supplier) () -> String.valueOf(productService.status())); + break; + default: + attributes.put("dataSupplier", (Supplier) () -> "Unknown path: " + pathVariable); + break; + } + } + return true; + } + + @Override + public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { + // 握手完成后不需要额外操作 + } +} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java b/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java index b2e39c0..c13a7c9 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/device/controller/DeviceController.java @@ -141,21 +141,21 @@ public class DeviceController { /** * 获取设备信息 */ - @GetMapping(value = "/sse/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @GetMapping(value = "/ws/device/status") @PreAuthorize("hasAuthority('iot:device:query')") @DataPermission - public SseEmitter status() { - return sseConfig.sendSseEvents(service::status); + public String status() { + return "websocket-device-status"; } /** * 获取设备上报信息 */ - @GetMapping(value = "/sse/record/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @GetMapping(value = "/ws/record/status") @PreAuthorize("hasAuthority('iot:device:query')") @DataPermission - public SseEmitter recordDataStatus() { - return sseConfig.sendSseEvents(recordService::recordDataStatus); + public String recordDataStatus() { + return "websocket-record-status"; } @@ -168,4 +168,12 @@ public class DeviceController { return service.send(deviceId, qos, paras); } + /** + * 切换在线状态 + */ + @PatchMapping("/toggle/{id}") + @OperationLogAnnotation(content = "'设备在线状态'", operationType = "更新") + public int toggle(@PathVariable("id") Long id) { + return service.toggle(id); + } } diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/DeviceService.java b/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/DeviceService.java index b9ccace..25d581f 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/DeviceService.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/DeviceService.java @@ -36,4 +36,6 @@ public interface DeviceService extends IService { String send(Long deviceId, Integer qos, JSONObject paras) throws JSONException; List findByName(String name); + + int toggle(Long id); } diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java b/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java index 2b62acb..d5ad87f 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/device/service/impl/DeviceServiceImpl.java @@ -231,4 +231,14 @@ public class DeviceServiceImpl extends ServiceImpl imp queryWrapper.like(Device::getName, name); return baseMapper.selectList(queryWrapper); } + + /** + * 切换在线状态返回切换数 + */ + @Override + public int toggle(Long id) { + Device device = baseMapper.selectById(id); + device.setOnline(!device.getOnline()); + return baseMapper.updateById(device); + } } diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/product/controller/ProductController.java b/src/main/java/com/zsc/edu/gateway/modules/iot/product/controller/ProductController.java index 8114d97..a48bc3d 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/product/controller/ProductController.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/product/controller/ProductController.java @@ -121,11 +121,11 @@ public class ProductController { /** * 查询产品各类信息 */ - @GetMapping(value = "/sse/status", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + @GetMapping(value = "/ws/product/status") @DataPermission @PreAuthorize("hasAuthority('iot:product:query')") - public SseEmitter status() { - return sseConfig.sendSseEvents(service::status); + public String status() { + return "websocket-product-status"; } }