Browse Source

修复多通道group共用streamID的问题

kwl 3 weeks ago
parent
commit
1f5d251931

+ 4 - 14
src/main/java/com/jttserver/relay/FlvStreamRelay.java → src/main/java/com/jttserver/relay/FlvRealtimeStreamRelay.java

@@ -18,20 +18,10 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
 
-public class FlvStreamRelay extends StreamRelay {
+public class FlvRealtimeStreamRelay extends StreamRelay {
 
-    // 单个nalu解析结果
-    private static class NaluSegment {
-        final byte[] payload; // 解析出的NALU负载(不含起始码)
-        final int consumedBytes; // 从偏移开始本次解析消耗的字节数(含起始码与负载)
 
-        NaluSegment(byte[] payload, int consumedBytes) {
-            this.payload = payload;
-            this.consumedBytes = consumedBytes;
-        }
-    }
-
-    private static final Logger logger = LoggerFactory.getLogger(FlvStreamRelay.class);
+    private static final Logger logger = LoggerFactory.getLogger(FlvRealtimeStreamRelay.class);
 
     // 为每个 channelId 缓存已计算的 streamId,避免重复计算
     private final Map<String, String> channelIdToStreamId = new ConcurrentHashMap<>();
@@ -40,7 +30,7 @@ public class FlvStreamRelay extends StreamRelay {
     private final FlvPacketizer packetizer;
 
     
-    public FlvStreamRelay(PublishServer publishServer, RecvSever receiveServer, String prefix) {
+    public FlvRealtimeStreamRelay(PublishServer publishServer, RecvSever receiveServer, String prefix) {
         super(publishServer, receiveServer, prefix);
         // 当前仅支持 FLV 打包
         this.packetizer = new FlvPacketizer();
@@ -140,7 +130,7 @@ public class FlvStreamRelay extends StreamRelay {
         }
         // 使用 streamId
         if (streamId != null && !streamId.isEmpty()) {
-            BroadcastWorker.broadcast(publishServer, channelId, streamId, tag);
+            BroadcastWorker.broadcast(publishServer, channelId, streamId, tag, prefix);
         } else {
             logger.warn("streamId为空,无法广播数据");
         }

+ 11 - 0
src/main/java/com/jttserver/relay/StreamRelay.java

@@ -11,6 +11,17 @@ import io.netty.channel.Channel;
  */
 public abstract class StreamRelay {
 
+    // 单个nalu解析结果
+    protected static class NaluSegment {
+        final byte[] payload; // 解析出的NALU负载(不含起始码)
+        final int consumedBytes; // 从偏移开始本次解析消耗的字节数(含起始码与负载)
+
+        NaluSegment(byte[] payload, int consumedBytes) {
+            this.payload = payload;
+            this.consumedBytes = consumedBytes;
+        }
+    }
+
     // 接收服务器
     protected RecvSever receiveServer;
 

+ 2 - 2
src/main/java/com/jttserver/relay/workerthreads/BroadcastWorker.java

@@ -54,12 +54,12 @@ public class BroadcastWorker {
      * @param streamId 通道ID
      * @param data     数据
      */
-    public static void broadcast(PublishServer publishServer, String channelId, String streamId, byte[] data) {
+    public static void broadcast(PublishServer publishServer, String channelId, String streamId, byte[] data, String prefix) {
         
         // 创建广播任务
         Runnable broadcastTask = () -> {
             // 处理广播逻辑
-            publishServer.broadcast(streamId, data);
+            publishServer.broadcast(streamId, data, prefix);
         };
 
         // 获取该channelId对应的单线程执行器

+ 1 - 1
src/main/java/com/jttserver/service/publisher/PublishServer.java

@@ -41,7 +41,7 @@ public abstract class PublishServer {
      * @param streamId 流ID
      * @param data     数据
      */
-    public abstract void broadcast(String streamId, byte[] data);
+    public abstract void broadcast(String streamId, byte[] data, String prefix);
 
     /* 
      * 添加流转发器映射

+ 15 - 13
src/main/java/com/jttserver/service/publisher/WebsockServer.java

@@ -54,15 +54,15 @@ public class WebsockServer extends PublishServer {
 
     // 维护 streamId(sim+logic) 与 原始视频 channelId 的映射(注意是jtt1078设备的ChannelId),带有路径前缀
     // streamIdToChannelId key=(streamID,prefix), value=channelId
-    private final Map<CommUtils.InfoItem,String> streamIdToChannelId = new ConcurrentHashMap<>();
+    private final Map<CommUtils.InfoItem, String> streamIdToChannelId = new ConcurrentHashMap<>();
     // channelIdToStreamId key=channelId, value=(streamID,prefix)
     private final Map<String, CommUtils.InfoItem> channelIdToStreamId = new ConcurrentHashMap<>();
 
     // 订阅数据结构
-    // 按照streamId 为 key,ChannelGroup(包含多个channel) 为 value
-    private final Map<String, ChannelGroup> streamGroups = new ConcurrentHashMap<>();
-    // 按照channelId 为 key,streamId 为 value
-    private final Map<ChannelId, String> channelStreamMap = new ConcurrentHashMap<>();
+    // 按照(streamID,prefix) 为 key,ChannelGroup(包含多个channel) 为 value
+    private final Map<CommUtils.InfoItem, ChannelGroup> streamGroups = new ConcurrentHashMap<>();
+    // 按照channelId 为 key,(streamID,prefix) 为 value
+    private final Map<ChannelId, CommUtils.InfoItem> channelStreamMap = new ConcurrentHashMap<>();
 
     // 按照channel为key,streamRelay为value,方便断开时清理
     private final Map<ChannelId, StreamRelay> channelRelayMap = new ConcurrentHashMap<>();
@@ -158,11 +158,11 @@ public class WebsockServer extends PublishServer {
     }
 
     @Override
-    public void broadcast(String streamId, byte[] data) {
+    public void broadcast(String streamId, byte[] data, String prefix) {
         if (streamId == null || streamId.isEmpty() || data == null || data.length == 0) {
             return;
         }
-        ChannelGroup g = streamGroups.get(streamId);
+        ChannelGroup g = streamGroups.get(new CommUtils.InfoItem(streamId, prefix));
         if (g != null && !g.isEmpty()) {
             g.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(data)));
         }
@@ -281,11 +281,12 @@ public class WebsockServer extends PublishServer {
         if (streamId == null || streamId.isEmpty() || ch == null) {
             return;
         }
+        CommUtils.InfoItem infoItem = new CommUtils.InfoItem(streamId, relay.getPrefix());
         // 将channel加入对应的ChannelGroup
-        ChannelGroup group = streamGroups.computeIfAbsent(streamId,
+        ChannelGroup group = streamGroups.computeIfAbsent(infoItem,
                 k -> new DefaultChannelGroup(workerGroup.next()));
         group.add(ch);
-        channelStreamMap.put(ch.id(), streamId);
+        channelStreamMap.put(ch.id(), infoItem);
 
         // 将流转发器与channel关联
         channelRelayMap.put(ch.id(), relay);
@@ -296,13 +297,14 @@ public class WebsockServer extends PublishServer {
         if (ch == null) {
             return;
         }
-        String sid = channelStreamMap.remove(ch.id());
-        if (sid != null) {
-            ChannelGroup g = streamGroups.get(sid);
+        CommUtils.InfoItem infoItem = channelStreamMap.remove(ch.id());
+        if (infoItem != null && infoItem.getTitle() != null) {
+            String sid = infoItem.getTitle();
+            ChannelGroup g = streamGroups.get(infoItem);
             if (g != null) {
                 g.remove(ch);
                 if (g.isEmpty()) {
-                    streamGroups.remove(sid);
+                    streamGroups.remove(infoItem);
                     // 移除流与通道映射
                     StreamRelay relay = channelRelayMap.remove(ch.id());
                     String currPrefix = relay.getPrefix();

+ 2 - 2
src/main/java/com/jttserver/service/receiver/JttVideoRecvServer.java

@@ -14,7 +14,7 @@ import com.jttserver.device.DeviceManager;
 import com.jttserver.protocol.Jtt1078NaluPacket;
 import com.jttserver.protocol.Jtt1078PacketParams;
 import com.jttserver.protocol.Jtt1078PacketParser;
-import com.jttserver.relay.FlvStreamRelay;
+import com.jttserver.relay.FlvRealtimeStreamRelay;
 import com.jttserver.relay.StreamRelay;
 import com.jttserver.relay.workerthreads.BroadcastWorker;
 import com.jttserver.relay.workerthreads.VideoPublishWorker;
@@ -57,7 +57,7 @@ public class JttVideoRecvServer extends RecvSever {
 
     public JttVideoRecvServer(PublishServer publishServer, int port, String prefix) {
         super(publishServer, port, prefix);
-        this.streamRelay = new FlvStreamRelay(publishServer, this, prefix);
+        this.streamRelay = new FlvRealtimeStreamRelay(publishServer, this, prefix);
     }
 
     /**

+ 2 - 2
src/test/java/com/jttserver/relay/FlvStreamRelayTest.java

@@ -12,11 +12,11 @@ import static org.junit.jupiter.api.Assertions.*;
 
 
 public class FlvStreamRelayTest {
-    private FlvStreamRelay flvStreamRelay;
+    private FlvRealtimeStreamRelay flvStreamRelay;
 
     @BeforeEach
     void setUp() {
-        flvStreamRelay = new FlvStreamRelay(null, null, "/realtime/");
+        flvStreamRelay = new FlvRealtimeStreamRelay(null, null, "/realtime/");
     }
 
     @Test