diff --git a/src/main/java/com/zsc/edu/gateway/IotGatewayApplication.java b/src/main/java/com/zsc/edu/gateway/IotGatewayApplication.java index 8b0562b..1450a6f 100644 --- a/src/main/java/com/zsc/edu/gateway/IotGatewayApplication.java +++ b/src/main/java/com/zsc/edu/gateway/IotGatewayApplication.java @@ -3,9 +3,11 @@ package com.zsc.edu.gateway; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.EnableAspectJAutoProxy; +import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableAspectJAutoProxy +@EnableScheduling public class IotGatewayApplication { public static void main(String[] args) { diff --git a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/GenericWebSocketHandler.java b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/GenericWebSocketHandler.java deleted file mode 100644 index 382776d..0000000 --- a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/GenericWebSocketHandler.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.zsc.edu.gateway.framework.message.websocket; - -import org.springframework.stereotype.Component; -import org.springframework.web.socket.TextMessage; -import org.springframework.web.socket.WebSocketSession; -import org.springframework.web.socket.handler.TextWebSocketHandler; - -import java.io.IOException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; - -/** - * @author zhuang - */ -@Component -public class GenericWebSocketHandler extends TextWebSocketHandler { - - private final ExecutorService executorService = Executors.newCachedThreadPool(); - - @Override - public void afterConnectionEstablished(WebSocketSession session) throws Exception { - // 从 session 中获取业务逻辑的 Supplier - Supplier dataSupplier = (Supplier) session.getAttributes().get("dataSupplier"); - - if (dataSupplier == null) { - session.close(); - return; - } - - AtomicBoolean isCompleted = new AtomicBoolean(false); - - executorService.execute(() -> { - try { - while (!isCompleted.get()) { - Object data = dataSupplier.get(); - if (data == null) { - break; - } - if (!isCompleted.get()) { - session.sendMessage(new TextMessage(data.toString())); - } - Thread.sleep(5000); // 每隔 5 秒发送一次数据 - } - } catch (IOException | InterruptedException e) { - try { - session.close(); - } catch (IOException ex) { - ex.printStackTrace(); - } - } finally { - try { - session.close(); - } catch (IOException e) { - e.printStackTrace(); - } - } - }); - } -} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketConfig.java b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketConfig.java deleted file mode 100644 index 1b1c4d3..0000000 --- a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketConfig.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.zsc.edu.gateway.framework.message.websocket; - -import org.springframework.context.annotation.Configuration; -import org.springframework.web.socket.config.annotation.EnableWebSocket; -import org.springframework.web.socket.config.annotation.WebSocketConfigurer; -import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; - -/** - * @author zhuang - */ -@Configuration -@EnableWebSocket -public class WebSocketConfig implements WebSocketConfigurer { - - private final GenericWebSocketHandler genericWebSocketHandler; - - public WebSocketConfig(GenericWebSocketHandler genericWebSocketHandler) { - this.genericWebSocketHandler = genericWebSocketHandler; - } - - @Override - public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { - registry.addHandler(genericWebSocketHandler, "/api/rest/device/ws/device/status", "/api/rest/device/ws/record/status", "/api/rest/product/ws/product/status") - .setAllowedOrigins("*") - .addInterceptors(new WebSocketInterceptor()); - } -} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketInterceptor.java b/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketInterceptor.java deleted file mode 100644 index 795ff42..0000000 --- a/src/main/java/com/zsc/edu/gateway/framework/message/websocket/WebSocketInterceptor.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.zsc.edu.gateway.framework.message.websocket; - -import com.zsc.edu.gateway.modules.iot.device.service.DeviceService; -import com.zsc.edu.gateway.modules.iot.product.service.ProductService; -import com.zsc.edu.gateway.modules.iot.record.service.RecordDataService; -import jakarta.annotation.Resource; -import jakarta.servlet.http.HttpServletRequest; -import org.springframework.http.server.ServerHttpRequest; -import org.springframework.http.server.ServerHttpResponse; -import org.springframework.http.server.ServletServerHttpRequest; -import org.springframework.web.socket.WebSocketHandler; -import org.springframework.web.socket.server.HandshakeInterceptor; - -import java.util.Map; -import java.util.function.Supplier; - -/** - * @author lenovo - */ -public class WebSocketInterceptor implements HandshakeInterceptor { - @Resource - private DeviceService deviceService; - @Resource - private RecordDataService recordService; - @Resource - private ProductService productService; - - @Override - public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map attributes) { - if (request instanceof ServletServerHttpRequest) { - HttpServletRequest servletRequest = ((ServletServerHttpRequest) request).getServletRequest(); - // 获取完整路径 - String path = servletRequest.getRequestURI(); - - // 根据路径设置不同的业务逻辑 Supplier - switch (path) { - case "/api/rest/device/ws/device/status": - attributes.put("dataSupplier", (Supplier) () -> String.valueOf(deviceService.status())); - break; - case "/api/rest/device/ws/record/status": - attributes.put("dataSupplier", (Supplier) () -> String.valueOf(recordService.recordDataStatus())); - break; - case "/api/rest/product/ws/product/status": - attributes.put("dataSupplier", (Supplier) () -> String.valueOf(productService.status())); - break; - default: - attributes.put("dataSupplier", (Supplier) () -> "Unknown path: " + path); - break; - } - } - return true; - } - - @Override - public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) { - // 握手完成后不需要额外操作 - } -} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/framework/security/SpringSecurityConfig.java b/src/main/java/com/zsc/edu/gateway/framework/security/SpringSecurityConfig.java index c7771f7..ad89c0f 100644 --- a/src/main/java/com/zsc/edu/gateway/framework/security/SpringSecurityConfig.java +++ b/src/main/java/com/zsc/edu/gateway/framework/security/SpringSecurityConfig.java @@ -88,6 +88,7 @@ public class SpringSecurityConfig { .requestMatchers(HttpMethod.GET, "/api/rest/user/menu","/api/rest/user/register","/api/rest/user/send-email").permitAll() .requestMatchers(HttpMethod.POST, "/api/rest/user/login","/api/rest/user/register").permitAll() .requestMatchers("/api/rest/user/me").permitAll() + .requestMatchers("/api/rest/ws/**").permitAll() .requestMatchers("/api/**").authenticated() ) // 不用注解,直接通过判断路径实现动态访问权限 @@ -144,7 +145,7 @@ public class SpringSecurityConfig { .rememberMe(rememberMe -> rememberMe .userDetailsService(userDetailsService) .tokenRepository(persistentTokenRepository())) - .csrf(csrf -> csrf.ignoringRequestMatchers("/api/internal/**", "/api/rest/user/logout","/api/rest/user/register")) + .csrf(csrf -> csrf.ignoringRequestMatchers("/api/internal/**", "/api/rest/user/logout", "/api/rest/user/register", "/api/rest/ws/**")) .sessionManagement(session -> session .maximumSessions(3) .sessionRegistry(sessionRegistry) diff --git a/src/main/java/com/zsc/edu/gateway/framework/websocket/BaseWebSocketHandler.java b/src/main/java/com/zsc/edu/gateway/framework/websocket/BaseWebSocketHandler.java new file mode 100644 index 0000000..2234d2b --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/framework/websocket/BaseWebSocketHandler.java @@ -0,0 +1,46 @@ +package com.zsc.edu.gateway.framework.websocket; + +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +public abstract class BaseWebSocketHandler extends TextWebSocketHandler { + private final Set sessions = new CopyOnWriteArraySet<>(); + + @Override + public void afterConnectionEstablished(WebSocketSession session) { + sessions.add(session); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) { + sessions.remove(session); + } + + public void broadcast(String message) { + sessions.removeIf(session -> { + if (!session.isOpen()) return true; + try { + session.sendMessage(new TextMessage(message)); + } catch (IOException e) { + // 生产环境应使用日志记录 + e.printStackTrace(); + } + return false; + }); + } +} + +class DeviceWebSocketHandler extends BaseWebSocketHandler { +} + +class ProductWebSocketHandler extends BaseWebSocketHandler { +} + +class RecordWebSocketHandler extends BaseWebSocketHandler { +} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/framework/websocket/DataPushTask.java b/src/main/java/com/zsc/edu/gateway/framework/websocket/DataPushTask.java new file mode 100644 index 0000000..241fc1c --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/framework/websocket/DataPushTask.java @@ -0,0 +1,41 @@ +package com.zsc.edu.gateway.framework.websocket; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.zsc.edu.gateway.modules.iot.device.service.DeviceService; +import com.zsc.edu.gateway.modules.iot.product.service.ProductService; +import com.zsc.edu.gateway.modules.iot.record.service.RecordDataService; +import lombok.AllArgsConstructor; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.function.Supplier; + +@Component +@AllArgsConstructor +public class DataPushTask { + private final DeviceService deviceService; + private final RecordDataService recordDataService; + private final ProductService productService; + private final ObjectMapper objectMapper; + private final DeviceWebSocketHandler deviceHandler; + private final RecordWebSocketHandler recordHandler; + private final ProductWebSocketHandler productHandler; + + @Scheduled(fixedRate = 20000) + public void pushData() { + pushAndBroadcast(deviceService::status, deviceHandler); + pushAndBroadcast(recordDataService::recordDataStatus, recordHandler); + pushAndBroadcast(productService::status, productHandler); + } + + private void pushAndBroadcast(Supplier dataSupplier, BaseWebSocketHandler handler) { + try { + Object status = dataSupplier.get(); + String json = objectMapper.writeValueAsString(status); + handler.broadcast(json); + } catch (Exception e) { + // 生产环境应使用日志记录 + e.printStackTrace(); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/framework/websocket/WebSocketConfig.java b/src/main/java/com/zsc/edu/gateway/framework/websocket/WebSocketConfig.java new file mode 100644 index 0000000..0119968 --- /dev/null +++ b/src/main/java/com/zsc/edu/gateway/framework/websocket/WebSocketConfig.java @@ -0,0 +1,37 @@ +package com.zsc.edu.gateway.framework.websocket; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +@Configuration +@EnableWebSocket +public class WebSocketConfig implements WebSocketConfigurer { + + @Override + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(deviceWebSocketHandler(), "/api/rest/ws/device/status") + .setAllowedOrigins("*"); + registry.addHandler(recordWebSocketHandler(), "/api/rest/ws/record/status") + .setAllowedOrigins("*"); + registry.addHandler(productWebSocketHandler(), "/api/rest/ws/product/status") + .setAllowedOrigins("*"); + } + + @Bean + public DeviceWebSocketHandler deviceWebSocketHandler() { + return new DeviceWebSocketHandler(); + } + + @Bean + public RecordWebSocketHandler recordWebSocketHandler() { + return new RecordWebSocketHandler(); + } + + @Bean + public ProductWebSocketHandler productWebSocketHandler() { + return new ProductWebSocketHandler(); + } +} \ No newline at end of file diff --git a/src/main/java/com/zsc/edu/gateway/modules/iot/record/repo/RecordDataRepository.java b/src/main/java/com/zsc/edu/gateway/modules/iot/record/repo/RecordDataRepository.java index 473704a..4bde8f7 100644 --- a/src/main/java/com/zsc/edu/gateway/modules/iot/record/repo/RecordDataRepository.java +++ b/src/main/java/com/zsc/edu/gateway/modules/iot/record/repo/RecordDataRepository.java @@ -12,13 +12,13 @@ import java.util.List; * @author zhuang */ public interface RecordDataRepository extends BaseMapper { - @Select("SELECT COUNT(*) FROM iot_record_data WHERE content->>'warning' IS NOT NULL AND (content->>'warning')::int & 1 = 0 AND ((content->>'warning')::int & ~1) > 0") + @Select("SELECT COUNT(*) FROM iot_record_data WHERE content::jsonb->>'warning' IS NOT NULL AND (content::jsonb->>'warning')::int & 1 = 0 AND ((content::jsonb->>'warning')::int & ~1) > 0") long countWarnings(); - @Select("SELECT COUNT(*) FROM iot_record_data WHERE content->>'warning' IS NOT NULL AND ((content->>'warning')::int & 1 = 0) AND ((content->>'warning')::int & ~1) > 0 AND record_time >= #{todayStart}") + @Select("SELECT COUNT(*) FROM iot_record_data WHERE content::jsonb->>'warning' IS NOT NULL AND ((content::jsonb->>'warning')::int & 1 = 0) AND ((content::jsonb->>'warning')::int & ~1) > 0 AND record_time >= #{todayStart}") long countTodayWarnings(@Param("todayStart") LocalDateTime todayStart); - @Select("SELECT COUNT(*) FROM iot_record_data WHERE content->>'warning' IS NOT NULL AND (content->>'warning')::int & #{bitPosition} = #{bitPosition}") + @Select("SELECT COUNT(*) FROM iot_record_data WHERE content::jsonb->>'warning' IS NOT NULL AND (content::jsonb->>'warning')::int & #{bitPosition} = #{bitPosition}") long countWarningsByBit(@Param("bitPosition") int bitPosition); List selectByClientId(@Param("clientId") String clientId);