Browse Source

修改bug,增加回放单通道

kwl 3 weeks ago
parent
commit
ca2bbd2cc1

+ 2 - 2
pom.xml

@@ -9,8 +9,8 @@
     <version>1.0-SNAPSHOT</version>
 
     <properties>
-        <maven.compiler.source>17</maven.compiler.source>
-        <maven.compiler.target>17</maven.compiler.target>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
         <junit.version>5.9.2</junit.version>
         <netty.version>4.1.100.Final</netty.version>
         <slf4j.version>2.0.9</slf4j.version>

+ 11 - 1
src/main/java/com/jttserver/relay/FlvPlaybackStreamRelay.java

@@ -18,10 +18,20 @@ public class FlvPlaybackStreamRelay extends StreamRelay {
     }
 
     @Override
-    public void initChannelConn(String channelId, String streamId, Channel ch) {
+    public Boolean initChannelConn(String channelId, String streamId, Channel ch, PublishServer server) {
+        // 如果已经有groupId映射,则断开连接后返回(回放唯一性)
+        if(server.containsStreamGroup(streamId, prefix)) {
+            logger.warn("回放流通道已存在,拒绝重复连接: channelId={}, streamId={}", channelId, streamId);
+            return false;
+        }
+
+        // 先注册通道连接
+        server.registerChannel(streamId, ch, this);
+
         // 初始化播放执行器线程
         PlaybackWorker.initPlaybackExecutor(streamId, publishServer);
         logger.info("初始化回放流通道: channelId={}, streamId={}", channelId, streamId);
+        return true;
     }
 
     @Override

+ 7 - 2
src/main/java/com/jttserver/relay/FlvRealtimeStreamRelay.java

@@ -59,10 +59,11 @@ public class FlvRealtimeStreamRelay extends StreamRelay {
     // }
 
     @Override
-    public void initChannelConn(String channelId, String streamId, Channel ch) {
+    public Boolean initChannelConn(String channelId, String streamId, Channel ch, PublishServer server) {
 
+        // 还没有视频设备连接可直接返回true
         if (channelId == null || channelId.isEmpty())
-            return;
+            return true;
 
         // 补发FLV头+视频序列头
         byte[] initVideoSegment = getChannelInitVideoSegment(channelId);
@@ -88,6 +89,10 @@ public class FlvRealtimeStreamRelay extends StreamRelay {
                 ch.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(recentIFrame)));
             }
         });
+
+        // 最后再注册通道连接
+        server.registerChannel(streamId, ch, this);
+        return true;
     }
 
     @Override

+ 2 - 1
src/main/java/com/jttserver/relay/StreamRelay.java

@@ -255,8 +255,9 @@ public abstract class StreamRelay {
 
     /* 
      * 初始化通道连接(订阅时调用一次,用于补发数据等)
+     * @return 是否成功初始化连接
      */
-    public abstract void initChannelConn(String channelId, String streamId, Channel ch);
+    public abstract Boolean initChannelConn(String channelId, String streamId, Channel ch, PublishServer server);
 
     /**
      * 结束通道连接(断开时调用一次,清理资源)

+ 15 - 0
src/main/java/com/jttserver/service/publisher/PublishServer.java

@@ -1,5 +1,6 @@
 package com.jttserver.service.publisher;
 
+import com.jttserver.relay.FlvPlaybackStreamRelay;
 import com.jttserver.relay.StreamRelay;
 
 import io.netty.channel.Channel;
@@ -63,4 +64,18 @@ public abstract class PublishServer {
             serverChannel.closeFuture().sync();
         }
     }
+
+    /**
+     * 注册通道连接(订阅时调用一次,用于补发数据等)
+     */
+    public abstract void registerChannel(String streamId, Channel ch, StreamRelay relay);
+
+
+    /**
+     * 判断streamGroups是否包含某个key
+     * @param streamId 流ID
+     * @param prefix 前缀
+     * @return 是否包含
+     */
+    public abstract boolean containsStreamGroup(String streamId, String prefix);
 }

+ 15 - 3
src/main/java/com/jttserver/service/publisher/WebsockServer.java

@@ -190,6 +190,11 @@ public class WebsockServer extends PublishServer {
             streamIdToChannelId.remove(infoItem);
         }
     }
+    
+    @Override
+    public boolean containsStreamGroup(String streamId, String prefix) {
+        return streamGroups.containsKey(CommUtils.InfoItem.of(streamId, prefix));
+    }
 
     /**
      * 获取channelId对应的streamId方法
@@ -263,8 +268,14 @@ public class WebsockServer extends PublishServer {
                     // 在注册订阅前,初始化发送该类型需要的数据
                     String channelId = server.getChannelIdByStreamId(streamId, currPrefix);
                     logger.info("为 streamId={} 注册订阅,channelId={}", streamId, channelId);
-                    relay.initChannelConn(channelId, streamId, ctx.channel());
-                    server.registerChannel(streamId, ctx.channel(), relay);
+
+                    if(!relay.initChannelConn(channelId, streamId, ctx.channel(), server)) {
+                        ctx.close();
+                        return;
+                    }
+                    //server.registerChannel(streamId, ctx.channel(), relay);
+                    
+                    
                 }else {
                     logger.warn("streamId 为空或无对应的流转发器,拒绝订阅,streamId: {}, prefix: {}, relay is null == {}", streamId, currPrefix, relay == null);
                 }
@@ -275,7 +286,8 @@ public class WebsockServer extends PublishServer {
     }
 
     // 注册订阅(握手完成后调用)
-    void registerChannel(String streamId, Channel ch, StreamRelay relay) {
+    @Override
+    public void registerChannel(String streamId, Channel ch, StreamRelay relay) {
         if (streamId == null || streamId.isEmpty() || ch == null) {
             return;
         }

+ 0 - 1
src/test/java/com/jttserver/relay/FlvStreamRelayTest.java

@@ -3,7 +3,6 @@ package com.jttserver.relay;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import com.jttserver.protocol.Jtt1078NaluPacket;
 import com.jttserver.protocol.Jtt1078PacketParams;
 
 import java.io.ByteArrayOutputStream;