ソースを参照

增加30s无订阅自动关闭通道

kwl 2 週間 前
コミット
08c421a380

+ 7 - 0
src/main/java/com/jttserver/service/publisher/PublishServer.java

@@ -78,4 +78,11 @@ public abstract class PublishServer {
      * @return 是否包含
      */
     public abstract boolean containsStreamGroup(String streamId, String prefix);
+
+    /** 
+     * 判断streamGroups 是否有订阅者
+     * @param chennelId 接收者的通道id
+     * @return 是否有订阅者
+     */
+    public abstract boolean hasSubscribers(String chennelId);
 }

+ 9 - 0
src/main/java/com/jttserver/service/publisher/WebsockServer.java

@@ -197,6 +197,15 @@ public class WebsockServer extends PublishServer {
         return streamGroups.containsKey(CommUtils.InfoItem.of(streamId, prefix));
     }
 
+    @Override
+    public boolean hasSubscribers(String channelId) {
+        CommUtils.InfoItem infoItem = channelIdToStreamId.get(channelId);
+        if (infoItem != null) {
+            return streamGroups.containsKey(infoItem);
+        }
+        return false;
+    }
+
     /**
      * 获取channelId对应的streamId方法
      */

+ 103 - 4
src/main/java/com/jttserver/service/receiver/JttVideoRecvServer.java

@@ -4,6 +4,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.concurrent.ScheduledFuture;
 
 /* 
  * 通用接收视频服务器
@@ -52,7 +54,6 @@ public class JttVideoRecvServer extends RecvSever {
     // 存储每个连接的Channel
     private static final Map<String, Channel> channelIdToCtxMap = new ConcurrentHashMap<>();
 
-
     public JttVideoRecvServer(PublishServer publishServer, int port, String prefix, StreamRelayType relayType) {
         super(publishServer, port, prefix, relayType);
     }
@@ -77,7 +78,8 @@ public class JttVideoRecvServer extends RecvSever {
                         ch.pipeline().addLast(new Jtt1078MessageDecoder()); // 添加JTT1078解码器处理粘包拆包
 
                         // 添加处理视频流数据的处理器
-                        ch.pipeline().addLast(new VideoStreamHandler(deviceManagementEnabled, streamRelay, publishServer, prefix));
+                        ch.pipeline().addLast(
+                                new VideoStreamHandler(deviceManagementEnabled, streamRelay, publishServer, prefix));
                     }
                 })
                 .option(ChannelOption.SO_BACKLOG, 128)
@@ -131,7 +133,7 @@ public class JttVideoRecvServer extends RecvSever {
         if (channel != null) {
             channel.eventLoop().execute(() -> {
                 VideoStreamHandler handler = channel.pipeline().get(VideoStreamHandler.class);
-                if(handler != null) {
+                if (handler != null) {
                     handler.deviceDisConnectChannel(channelId);
                 }
             });
@@ -159,6 +161,17 @@ public class JttVideoRecvServer extends RecvSever {
 
         private final String prefix;
 
+        // 定时器配置
+        private static final int DEFAULT_CHECK_INTERVAL = 5; // 默认检查间隔5秒
+        private static final int DEFAULT_TIMEOUT = 30; // 默认超时时间30秒
+        private static final String CHECK_INTERVAL_KEY = "video.channel.sub.check.interval";
+        private static final String TIMEOUT_KEY = "video.channel.sub.timeout";
+
+        // 存储每个连接的定时器
+        private final Map<String, ScheduledFuture<?>> channelTimers = new ConcurrentHashMap<>();
+        // 存储每个连接的开始时间
+        private final Map<String, Long> channelStartTimes = new ConcurrentHashMap<>();
+
         public VideoStreamHandler(boolean deviceManagementEnabled, StreamRelay streamRelay,
                 PublishServer publishServer, String prefix) {
             this.deviceManagementEnabled = deviceManagementEnabled;
@@ -180,7 +193,7 @@ public class JttVideoRecvServer extends RecvSever {
             // 添加设备信息(受功能开关控制)
             if (deviceManagementEnabled) {
                 DeviceManager.DeviceInfo deviceInfo = new DeviceManager.DeviceInfo(channelId,
-                    ctx.channel().remoteAddress().toString());
+                        ctx.channel().remoteAddress().toString());
                 // 设置接收服务器的路径前缀,例如 "/realtime/" 或 "/playback/"
                 deviceInfo.setPrefix(this.prefix);
                 DeviceManager.registerDevice(channelId, deviceInfo);
@@ -189,6 +202,9 @@ public class JttVideoRecvServer extends RecvSever {
             // 存储Channel
             channelIdToCtxMap.put(channelId, ctx.channel());
 
+            // 启动定时器检查
+            startChannelTimer(ctx, channelId);
+
             logger.info("新客户端连接: {} ,ChannelId: {}", ctx.channel().remoteAddress(), channelId);
             super.channelActive(ctx);
         }
@@ -299,6 +315,87 @@ public class JttVideoRecvServer extends RecvSever {
         }
 
         /**
+         * 启动通道定时器
+         * 
+         * @param ctx       ChannelHandlerContext
+         * @param channelId 通道ID
+         */
+        private void startChannelTimer(ChannelHandlerContext ctx, String channelId) {
+            // 记录通道开始时间
+            channelStartTimes.put(channelId, System.currentTimeMillis());
+
+            // 获取配置参数
+            int checkInterval = Integer
+                    .parseInt(ConfigManager.get(CHECK_INTERVAL_KEY, String.valueOf(DEFAULT_CHECK_INTERVAL)));
+            int timeout = Integer.parseInt(ConfigManager.get(TIMEOUT_KEY, String.valueOf(DEFAULT_TIMEOUT)));
+
+            // 启动定时任务
+            ScheduledFuture<?> timer = ctx.channel().eventLoop().scheduleAtFixedRate(
+                    () -> checkChannelSubscribers(ctx, channelId, timeout),
+                    checkInterval, checkInterval, TimeUnit.SECONDS);
+
+            channelTimers.put(channelId, timer);
+            logger.debug("启动通道 {} 定时器,检查间隔: {}秒,超时时间: {}秒", channelId, checkInterval, timeout);
+        }
+
+        /**
+         * 检查通道订阅者状态
+         * 
+         * @param ctx       ChannelHandlerContext
+         * @param channelId 通道ID
+         * @param timeout   超时时间(秒)
+         */
+        private void checkChannelSubscribers(ChannelHandlerContext ctx, String channelId, int timeout) {
+            // 检查通道是否仍然活跃
+            if (!ctx.channel().isActive()) {
+                stopChannelTimer(channelId);
+                return;
+            }
+
+            if (publishServer != null) {
+                // 检查是否有订阅者
+                boolean hasSubscribers = publishServer.hasSubscribers(channelId);
+
+                if (hasSubscribers) {
+                    // 有订阅者,停止定时器
+                    logger.info("通道 {} 已有订阅者,停止定时器检查", channelId);
+                    stopChannelTimer(channelId);
+                    return;
+                }
+            }
+
+            // 检查是否超时
+            Long startTime = channelStartTimes.get(channelId);
+            if (startTime != null) {
+                long elapsedTime = (System.currentTimeMillis() - startTime) / 1000;
+
+                if (elapsedTime >= timeout) {
+                    // 超时,关闭通道
+                    logger.info("通道 {} 在 {} 秒内无订阅者,关闭连接", channelId, elapsedTime);
+                    ctx.close();
+                    stopChannelTimer(channelId);
+                } else {
+                    logger.debug("通道 {} 无订阅者,已等待 {} 秒,剩余 {} 秒",
+                            channelId, elapsedTime, timeout - elapsedTime);
+                }
+            }
+        }
+
+        /**
+         * 停止通道定时器
+         * 
+         * @param channelId 通道ID
+         */
+        private void stopChannelTimer(String channelId) {
+            ScheduledFuture<?> timer = channelTimers.remove(channelId);
+            if (timer != null && !timer.isCancelled()) {
+                timer.cancel(false);
+                logger.debug("停止通道 {} 定时器", channelId);
+            }
+            channelStartTimes.remove(channelId);
+        }
+
+        /**
          * 主动断开通道
          * 
          * @param channelId 通道ID
@@ -320,6 +417,8 @@ public class JttVideoRecvServer extends RecvSever {
          */
         public void clearChannelContext(ChannelHandlerContext ctx, Boolean isInactive) throws Exception {
             String channelId = ctx.channel().id().asShortText();
+            // 停止定时器
+            stopChannelTimer(channelId);
             // 清理资源
             naluPacketAssemblerMap.remove(channelId);
             if (isInactive) {

+ 3 - 5
src/main/resources/app.properties

@@ -12,8 +12,6 @@ server.video.port=18080
 server.websocket.port=18090
 server.manager.port=8099
 
-# 默认自动关闭时间
-server.video.autoCloseTimeout=300
-
-# 默认无数据30秒关闭
-server.video.idleTimer = 30
+# 默认无订阅检测时间和关闭时间
+video.channel.sub.check.interval=5
+video.channel.sub.timeout=30