浏览代码

增加数据回放通道

kwl 3 周之前
父节点
当前提交
d44eed3146

+ 44 - 0
src/main/java/com/jttserver/relay/FlvPlaybackStreamRelay.java

@@ -0,0 +1,44 @@
+package com.jttserver.relay;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jttserver.relay.workerthreads.PlaybackWorker;
+import com.jttserver.service.publisher.PublishServer;
+import com.jttserver.service.receiver.RecvSever;
+
+import io.netty.channel.Channel;
+
+public class FlvPlaybackStreamRelay extends StreamRelay {
+
+    private static final Logger logger = LoggerFactory.getLogger(FlvPlaybackStreamRelay.class);
+
+    public FlvPlaybackStreamRelay(PublishServer publishServer, RecvSever receiveServer, String prefix) {
+        super(publishServer, receiveServer, prefix);
+    }
+
+    @Override
+    public void initChannelConn(String channelId, String streamId, Channel ch) {
+        // 初始化播放执行器线程
+        PlaybackWorker.initPlaybackExecutor(streamId, publishServer);
+        logger.info("初始化回放流通道: channelId={}, streamId={}", channelId, streamId);
+    }
+
+    @Override
+    public void destroyChannelDisconn(String channelId, String streamId) {
+        receiveServer.disconnChannel(channelId);
+        PlaybackWorker.shutdownExecutor(streamId);
+        logger.info("销毁回放流通道: channelId={}, streamId={}", channelId, streamId);
+        
+    }
+
+    @Override
+    public void threadBocastStreamData(PublishServer publishServer, String channelId, String streamId, byte[] data, String prefix) {
+        // 入队播放任务
+        PlaybackWorker.enqueue(publishServer, streamId, data, prefix);
+    }
+
+
+
+
+}

+ 7 - 175
src/main/java/com/jttserver/relay/FlvRealtimeStreamRelay.java

@@ -1,13 +1,10 @@
 package com.jttserver.relay;
 
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.jttserver.codec.FlvPacketizer;
-import com.jttserver.protocol.Jtt1078NaluPacket;
 import com.jttserver.protocol.Jtt1078PacketParams;
 import com.jttserver.relay.workerthreads.BroadcastWorker;
 import com.jttserver.service.publisher.PublishServer;
@@ -23,117 +20,16 @@ public class FlvRealtimeStreamRelay extends StreamRelay {
 
     private static final Logger logger = LoggerFactory.getLogger(FlvRealtimeStreamRelay.class);
 
-    // 为每个 channelId 缓存已计算的 streamId,避免重复计算
-    private final Map<String, String> channelIdToStreamId = new ConcurrentHashMap<>();
-
-    // 使用单实例 FlvPacketizer,内部以 channelId 维护编解码器信息
-    private final FlvPacketizer packetizer;
-
-    
     public FlvRealtimeStreamRelay(PublishServer publishServer, RecvSever receiveServer, String prefix) {
         super(publishServer, receiveServer, prefix);
-        // 当前仅支持 FLV 打包
-        this.packetizer = new FlvPacketizer();
-        if (this.publishServer != null) {     
-            publishServer.addPrefixRelay(prefix, this);
-        }
     }
 
     /*
-     * 推流视频数据
+     * 线程广播
      */
     @Override
-    public void publishVideo(String channelId, byte[] nalu, Jtt1078PacketParams params,
-            long timestampMs) {
-        if (nalu == null || nalu.length == 0) {
-            return;
-        }
-        // 逐段解析并发布单个NALU单元(不含起始码)
-        int offset = 0;
-        int totalLength = nalu.length;
-        while (offset < totalLength) {
-            // 提取从当前偏移开始的下一个NALU单元(不含起始码),并返回本次消耗的字节数
-            NaluSegment segment = getNaluSingle(nalu, offset);
-            if (segment.consumedBytes <= 0) { // 未能有效解析出片段,结束循环
-                break;
-            }
-            // 有效负载才进行发布
-            if (segment.payload != null && segment.payload.length > 0) {
-                publishSingleNalu(channelId, segment.payload, params, timestampMs);
-            }
-            // 增加偏移到下一个片段起始位置
-            offset += segment.consumedBytes;
-        }
-    }
-
-    /*
-     * 推流单个NALU单元
-     */
-    public void publishSingleNalu(String channelId, byte[] nalu, Jtt1078PacketParams params,
-            long timestampMs) {
-        if (nalu == null || nalu.length == 0) {
-            return;
-        }
-
-        // 处理单个NALU数据
-        byte[] tag = packetizer.processVideoNalu(channelId, nalu, params, timestampMs);
-        if (tag != null && tag.length > 0) {
-            // 通过构造时缓存的 publishServer 引用进行广播
-            if (publishServer != null) {
-                broadcastStreamData(channelId, tag, params);
-            } else {
-                logger.warn("publishServer is null, cannot broadcast stream data");
-            }
-        }
-    }
-
-    /*
-     * 推流音频数据
-     */
-    @Override
-    public void publishAudio(String channelId, byte[] audio, Jtt1078PacketParams params,
-            long timestampMs) {
-        if (audio == null || audio.length == 0) {
-            return;
-        }
-        byte[] tag = packetizer.processAudioNalu(channelId, audio, params, timestampMs);
-        if (tag != null && tag.length > 0) {
-            // 通过构造时缓存的 publishServer 引用进行广播
-            if (publishServer != null) {
-                // 如果是channel第一次广播音频,先广播AAC序列头
-                byte[] aacSeqHeader = packetizer.getOrCreateAacSequenceHeader(channelId, timestampMs);
-                if (aacSeqHeader != null && aacSeqHeader.length > 0) {
-                    broadcastStreamData(channelId, aacSeqHeader, params);
-                }
-
-                broadcastStreamData(channelId, tag, params);
-            } else {
-                logger.warn("ws is null, cannot broadcast stream data");
-            }
-        }
-    }
-
-    // 将广播逻辑封装为独立函数,提升可读性与复用性
-    private void broadcastStreamData(String channelId, byte[] tag, Jtt1078PacketParams params) {
-        // 先尝试复用缓存的 streamId
-        String streamId = channelIdToStreamId.get(channelId);
-        if (streamId == null || streamId.isEmpty()) {
-            // 首次或未缓存,计算并建立映射
-            streamId = SimCardUtils.buildStreamId(params.simCardNumberStr, params.logicChannelNumber);
-            if (streamId != null && !streamId.isEmpty()) {
-                logger.info("channelId: {}, streamId: {}", channelId, streamId);
-                channelIdToStreamId.put(channelId, streamId);
-                // 同步建立映射关系
-                publishServer.mapStreamToChannel(streamId, channelId, prefix);
-                
-            }
-        }
-        // 使用 streamId
-        if (streamId != null && !streamId.isEmpty()) {
-            BroadcastWorker.broadcast(publishServer, channelId, streamId, tag, prefix);
-        } else {
-            logger.warn("streamId为空,无法广播数据");
-        }
+    public void threadBocastStreamData(PublishServer publishServer, String channelId, String streamId, byte[] data, String prefix) {
+        BroadcastWorker.broadcast(publishServer, channelId, streamId, data, prefix);
     }
 
     // 提供初始化段(FLV Header + 最新序列头),序列头由 FlvPacketizer 内部构建
@@ -166,76 +62,12 @@ public class FlvRealtimeStreamRelay extends StreamRelay {
     //     channelIdToStreamId.remove(channelId);
     // }
 
-    /*
-     * 关闭指定 channelId 的流(如连接断开时调用)
-     */
+    
     @Override
-    public void closeChannel(String channelId) {
-        // 同步清理 FlvPacketizer 的该通道信息
-        packetizer.clearChannel(channelId);
-        // 同步移除映射
-        if (publishServer != null) {
-            publishServer.removeChannelMapping(channelId);
-        }
-        // 同步清理本地缓存的 streamId
-        channelIdToStreamId.remove(channelId);
-    }
-
-    // 提取从指定偏移开始的单个NALU负载(不含起始码),返回同时包含本次消耗的字节数
-    private NaluSegment getNaluSingle(byte[] data, int offset) {
-        int len = (data == null) ? 0 : data.length;
-        if (data == null || offset >= len) {
-            return new NaluSegment(new byte[0], 0);
-        }
-
-        // 查找当前或之后的起始码(00 00 00 01)
-        int start = -1;
-        for (int i = offset; i <= len - 4; i++) {
-            if (data[i] == 0x00 && data[i + 1] == 0x00 && data[i + 2] == 0x00 && data[i + 3] == 0x01) {
-                start = i;
-                break;
-            }
-        }
+    public void initChannelConn(String channelId, String streamId, Channel ch) {
 
-        if (start < 0) {
-            // 未检测到起始码:将剩余数据视为单个NALU负载进行处理
-            int remaining = len - offset;
-            byte[] payload = new byte[remaining];
-            System.arraycopy(data, offset, payload, 0, remaining);
-            return new NaluSegment(payload, remaining);
-        }
-
-        // 负载从起始码之后开始
-        int payloadStart = start + 4;
-
-        // 查找下一个起始码,用于确定本片段的结束位置
-        int nextStart = -1;
-        for (int j = payloadStart; j <= len - 4; j++) {
-            if (data[j] == 0x00 && data[j + 1] == 0x00 && data[j + 2] == 0x00 && data[j + 3] == 0x01) {
-                nextStart = j;
-                break;
-            }
-        }
-        int end = (nextStart >= 0) ? nextStart : len;
-
-        // 复制负载数据(不含起始码)
+        if (channelId == null || channelId.isEmpty()) return;
 
-        // 负载为起始码之后到下一个起始码(或数据末尾)之间的数据
-        int payloadLen = end - payloadStart;
-        if (payloadLen <= 0) {
-            return new NaluSegment(new byte[0], 0);
-        }
-        byte[] payload = new byte[payloadLen];
-
-        System.arraycopy(data, payloadStart, payload, 0, payloadLen);
-
-        // 本次消耗的字节数 = 起始码长度(4) + 负载长度
-        int consumed = 4 + payloadLen;
-        return new NaluSegment(payload, consumed);
-    }
-
-    @Override
-    public void initChannelConn(String channelId, Channel ch) {
         // 补发FLV头+视频序列头
         byte[] initVideoSegment = getChannelInitVideoSegment(channelId);
         if (initVideoSegment != null && initVideoSegment.length > 0) {
@@ -257,7 +89,7 @@ public class FlvRealtimeStreamRelay extends StreamRelay {
     }
 
     @Override
-    public void destroyChannelDisconn(String channelId) {
+    public void destroyChannelDisconn(String channelId, String streamId) {
         receiveServer.disconnChannel(channelId);
     }
 }

+ 181 - 5
src/main/java/com/jttserver/relay/StreamRelay.java

@@ -1,8 +1,16 @@
 package com.jttserver.relay;
 
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jttserver.codec.FlvPacketizer;
 import com.jttserver.protocol.Jtt1078PacketParams;
 import com.jttserver.service.publisher.PublishServer;
 import com.jttserver.service.receiver.RecvSever;
+import com.jttserver.utils.SimCardUtils;
 
 import io.netty.channel.Channel;
 
@@ -11,6 +19,8 @@ import io.netty.channel.Channel;
  */
 public abstract class StreamRelay {
 
+    private static final Logger logger = LoggerFactory.getLogger(StreamRelay.class);
+
     // 单个nalu解析结果
     protected static class NaluSegment {
         final byte[] payload; // 解析出的NALU负载(不含起始码)
@@ -31,6 +41,12 @@ public abstract class StreamRelay {
     // 路径前缀
     protected String prefix;
 
+    // 为每个 channelId 缓存已计算的 streamId,避免重复计算
+    protected Map<String, String> channelIdToStreamId = new ConcurrentHashMap<>();
+
+    // 使用单实例 FlvPacketizer,内部以 channelId 维护编解码器信息
+    protected FlvPacketizer packetizer;
+
     /* 
      * 构造函数
      */
@@ -45,6 +61,12 @@ public abstract class StreamRelay {
         this.publishServer = publishServer;
         this.receiveServer = receiveServer;
         this.prefix = prefix;
+
+        // 当前仅支持 FLV 打包
+        this.packetizer = new FlvPacketizer();
+        if (this.publishServer != null) {     
+            publishServer.addPrefixRelay(prefix, this);
+        }
     }
 
     public void setPublishServer(PublishServer publishServer) {
@@ -69,26 +91,180 @@ public abstract class StreamRelay {
     /* 
      * 推流视频数据
      */
-    public abstract void publishVideo(String channelId, byte[] nalu, Jtt1078PacketParams params,long timestampMs);
+    public void publishVideo(String channelId, byte[] nalu, Jtt1078PacketParams params,
+            long timestampMs) {
+        if (nalu == null || nalu.length == 0) {
+            return;
+        }
+        // 逐段解析并发布单个NALU单元(不含起始码)
+        int offset = 0;
+        int totalLength = nalu.length;
+        while (offset < totalLength) {
+            // 提取从当前偏移开始的下一个NALU单元(不含起始码),并返回本次消耗的字节数
+            NaluSegment segment = getNaluSingle(nalu, offset);
+            if (segment.consumedBytes <= 0) { // 未能有效解析出片段,结束循环
+                break;
+            }
+            // 有效负载才进行发布
+            if (segment.payload != null && segment.payload.length > 0) {
+                publishSingleNalu(channelId, segment.payload, params, timestampMs);
+            }
+            // 增加偏移到下一个片段起始位置
+            offset += segment.consumedBytes;
+        }
+    }
+
+    /*
+     * 推流单个NALU单元
+     */
+    public void publishSingleNalu(String channelId, byte[] nalu, Jtt1078PacketParams params,
+            long timestampMs) {
+        if (nalu == null || nalu.length == 0) {
+            return;
+        }
+
+        // 处理单个NALU数据
+        byte[] tag = packetizer.processVideoNalu(channelId, nalu, params, timestampMs);
+        if (tag != null && tag.length > 0) {
+            // 通过构造时缓存的 publishServer 引用进行广播
+            if (publishServer != null) {
+                broadcastStreamData(channelId, tag, params);
+            } else {
+                logger.warn("publishServer is null, cannot broadcast stream data");
+            }
+        }
+    }
+
+    /* 
+     * 广播逻辑
+     */
+    private void broadcastStreamData(String channelId, byte[] data, Jtt1078PacketParams params) {
+        // 先尝试复用缓存的 streamId
+        String streamId = channelIdToStreamId.get(channelId);
+        if (streamId == null || streamId.isEmpty()) {
+            // 首次或未缓存,计算并建立映射
+            streamId = SimCardUtils.buildStreamId(params.simCardNumberStr, params.logicChannelNumber);
+            if (streamId != null && !streamId.isEmpty()) {
+                logger.info("channelId: {}, streamId: {}", channelId, streamId);
+                channelIdToStreamId.put(channelId, streamId);
+                // 同步建立映射关系
+                publishServer.mapStreamToChannel(streamId, channelId, prefix);
+                
+            }
+        }
+        // 使用 streamId
+        if (streamId != null && !streamId.isEmpty()) {
+            threadBocastStreamData(publishServer, channelId, streamId, data, prefix);
+        } else {
+            logger.warn("streamId为空,无法广播数据");
+        }
+    }
+
+    // 提取从指定偏移开始的单个NALU负载(不含起始码),返回同时包含本次消耗的字节数
+    private NaluSegment getNaluSingle(byte[] data, int offset) {
+        int len = (data == null) ? 0 : data.length;
+        if (data == null || offset >= len) {
+            return new NaluSegment(new byte[0], 0);
+        }
+
+        // 查找当前或之后的起始码(00 00 00 01)
+        int start = -1;
+        for (int i = offset; i <= len - 4; i++) {
+            if (data[i] == 0x00 && data[i + 1] == 0x00 && data[i + 2] == 0x00 && data[i + 3] == 0x01) {
+                start = i;
+                break;
+            }
+        }
+
+        if (start < 0) {
+            // 未检测到起始码:将剩余数据视为单个NALU负载进行处理
+            int remaining = len - offset;
+            byte[] payload = new byte[remaining];
+            System.arraycopy(data, offset, payload, 0, remaining);
+            return new NaluSegment(payload, remaining);
+        }
+
+        // 负载从起始码之后开始
+        int payloadStart = start + 4;
+
+        // 查找下一个起始码,用于确定本片段的结束位置
+        int nextStart = -1;
+        for (int j = payloadStart; j <= len - 4; j++) {
+            if (data[j] == 0x00 && data[j + 1] == 0x00 && data[j + 2] == 0x00 && data[j + 3] == 0x01) {
+                nextStart = j;
+                break;
+            }
+        }
+        int end = (nextStart >= 0) ? nextStart : len;
+
+        // 复制负载数据(不含起始码)
+
+        // 负载为起始码之后到下一个起始码(或数据末尾)之间的数据
+        int payloadLen = end - payloadStart;
+        if (payloadLen <= 0) {
+            return new NaluSegment(new byte[0], 0);
+        }
+        byte[] payload = new byte[payloadLen];
+
+        System.arraycopy(data, payloadStart, payload, 0, payloadLen);
+
+        // 本次消耗的字节数 = 起始码长度(4) + 负载长度
+        int consumed = 4 + payloadLen;
+        return new NaluSegment(payload, consumed);
+    }
 
     /* 
      * 推流音频数据
      */
-    public abstract void publishAudio(String channelId, byte[] audio, Jtt1078PacketParams params, long timestampMs);
+    public void publishAudio(String channelId, byte[] audio, Jtt1078PacketParams params,
+            long timestampMs) {
+        if (audio == null || audio.length == 0) {
+            return;
+        }
+        byte[] tag = packetizer.processAudioNalu(channelId, audio, params, timestampMs);
+        if (tag != null && tag.length > 0) {
+            // 通过构造时缓存的 publishServer 引用进行广播
+            if (publishServer != null) {
+                // 如果是channel第一次广播音频,先广播AAC序列头
+                byte[] aacSeqHeader = packetizer.getOrCreateAacSequenceHeader(channelId, timestampMs);
+                if (aacSeqHeader != null && aacSeqHeader.length > 0) {
+                    broadcastStreamData(channelId, aacSeqHeader, params);
+                }
+
+                broadcastStreamData(channelId, tag, params);
+            } else {
+                logger.warn("ws is null, cannot broadcast stream data");
+            }
+        }
+    }
 
     /* 
      * 关闭并清理指定通道的资源
      */
-    public abstract void closeChannel(String channelId);
+    public void closeChannel(String channelId) {
+        // 同步清理 FlvPacketizer 的该通道信息
+        packetizer.clearChannel(channelId);
+        // 同步移除映射
+        if (publishServer != null) {
+            publishServer.removeChannelMapping(channelId);
+        }
+        // 同步清理本地缓存的 streamId
+        channelIdToStreamId.remove(channelId);
+    }
 
 
     /* 
      * 初始化通道连接(订阅时调用一次,用于补发数据等)
      */
-    public abstract void initChannelConn(String channelId, Channel ch);
+    public abstract void initChannelConn(String channelId, String streamId, Channel ch);
 
     /**
      * 结束通道连接(断开时调用一次,清理资源)
      */
-    public abstract void destroyChannelDisconn(String channelId);
+    public abstract void destroyChannelDisconn(String channelId, String streamId);
+
+    /**
+     * 线程中广播流数据
+     */
+    public abstract void threadBocastStreamData(PublishServer publishServer, String channelId, String streamId, byte[] data, String prefix);
 }

+ 1 - 1
src/main/java/com/jttserver/relay/StreamRelayType.java

@@ -13,7 +13,7 @@ public enum StreamRelayType {
     FLV_PLAYBACK {
         @Override
         public StreamRelay create(PublishServer ps, RecvSever owner, String prefix) {
-            return new FlvRealtimeStreamRelay(ps, owner,  prefix);
+            return new FlvPlaybackStreamRelay(ps, owner,  prefix);
         }
     };
 

+ 154 - 0
src/main/java/com/jttserver/relay/workerthreads/PlaybackWorker.java

@@ -0,0 +1,154 @@
+package com.jttserver.relay.workerthreads;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jttserver.config.ConfigManager;
+import com.jttserver.service.publisher.PublishServer;
+
+/**
+ * PlaybackWorker 用于 /playback/ 场景:
+ * - 若对应 streamId 的客户端未连接,则将数据缓存在内存队列中(不发送)
+ * - 一旦调用 initPlaybackExecutor(表示客户端已连接),按队列顺序发送队列中的数据
+ * - 连接存在时有单消费者线程持续从队列取数据发送,新数据也放入队列由消费者发送
+ */
+public class PlaybackWorker {
+    private static final Logger logger = LoggerFactory.getLogger(PlaybackWorker.class);
+
+    // 内部类用于包装数据与前缀
+    private static class PlaybackItem {
+        final byte[] data;
+        final String prefix;
+
+        PlaybackItem(byte[] data, String prefix) {
+            this.data = data;
+            this.prefix = prefix;
+        }
+    }
+
+    // 每个 streamId 对应的内存队列(缓存待播放数据)
+    private static final ConcurrentHashMap<String, ArrayBlockingQueue<PlaybackItem>> playbackQueueMap = new ConcurrentHashMap<>();
+
+    // 每个 streamId 对应的单消费者执行器(只有在客户端连接时创建并运行)
+    private static final ConcurrentHashMap<String, ThreadPoolExecutor> playbackExecutorMap = new ConcurrentHashMap<>();
+
+
+    private static int getQueueCapacity() {
+        return Integer.parseInt(ConfigManager.get("playback.queue.capacity", "10000"));
+    }
+
+    private static ThreadPoolExecutor createSingleConsumerExecutor(String streamId) {
+        int capacity = 1; // 线程池队列对消费者本身不需要缓存,这里使用最小设置
+        ThreadFactory tf = r -> {
+            Thread t = new Thread(r);
+            t.setName("playback-" + streamId);
+            t.setDaemon(true);
+            return t;
+        };
+        return new ThreadPoolExecutor(
+                1,
+                1,
+                0L,
+                TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<>(capacity),
+                tf,
+                new ThreadPoolExecutor.DiscardPolicy());
+    }
+
+    private static ArrayBlockingQueue<PlaybackItem> createOrGetQueue(String streamId) {
+        return playbackQueueMap.computeIfAbsent(streamId, id -> new ArrayBlockingQueue<>(getQueueCapacity()));
+    }
+
+    /**
+     * 将数据入队(不直接发送)。若客户端已连接并且消费者正在运行,则消费者会尽快取出并发送。
+     * @param publishServer PublishServer 实例(发送逻辑在消费者中调用)
+     * @param streamId      流 ID
+     * @param data          数据
+     * @param prefix        前缀 / 元数据
+     */
+    public static void enqueue(PublishServer publishServer, String streamId, byte[] data, String prefix) {
+        ArrayBlockingQueue<PlaybackItem> queue = createOrGetQueue(streamId);
+        boolean offered = queue.offer(new PlaybackItem(data, prefix));
+        if (!offered) {
+            // 队列满时按策略:记录并丢弃最新数据(也可以改为丢弃最旧)
+            logger.warn("播放队列已满,丢弃数据。streamId: {}, 队列容量: {}", streamId, queue.size());
+        }
+    }
+
+    /**
+     * 当 websocket /playback/ 对应客户端连接建立时调用:
+     * - 为该 streamId 创建单消费者线程(若不存在)
+     * - 消费者线程会阻塞等待队列数据并按序调用 publishServer.playback(...)
+     * @param streamId      流 ID
+     * @param publishServer PublishServer 实例(消费者使用它发送数据)
+     */
+    public static void initPlaybackExecutor(String streamId, PublishServer publishServer) {
+        ThreadPoolExecutor prev = playbackExecutorMap.putIfAbsent(streamId, createSingleConsumerExecutor(streamId));
+        logger.info("为 streamId={} 初始化播放消费者线程", streamId);
+        if (prev == null) {
+            ThreadPoolExecutor exec = playbackExecutorMap.get(streamId);
+            // 启动消费者任务(长期运行,阻塞取数据)
+            try {
+                exec.execute(() -> {
+                    ArrayBlockingQueue<PlaybackItem> queue = createOrGetQueue(streamId);
+                    while (!Thread.currentThread().isInterrupted() && !exec.isShutdown()) {
+                        try {
+                            PlaybackItem item = queue.take(); // 阻塞直到有数据
+                            try {
+                                // 调用播放逻辑(暂时用同一个广播)
+                                publishServer.broadcast(streamId, item.data, item.prefix);
+                            } catch (Throwable t) {
+                                logger.error("播放发送时出现异常,streamId: {}", streamId, t);
+                            }
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                            break;
+                        }
+                    }
+                });
+            } catch (RejectedExecutionException e) {
+                logger.warn("无法为 streamId={} 启动播放消费者:{}", streamId, e.getMessage());
+            }
+        }else {
+            logger.info("streamId={} 的播放消费者已存在,无需重复创建", streamId);
+        }
+
+    }
+
+    /**
+     * 当客户端断开时调用:停止消费者线程,但保留队列以继续缓存数据
+     * @param streamId 流 ID
+     */
+    public static void shutdownExecutor(String streamId) {
+        ThreadPoolExecutor exec = playbackExecutorMap.remove(streamId);
+        if (exec != null) {
+            exec.shutdownNow();
+        }
+        playbackQueueMap.remove(streamId);
+    }
+
+    /**
+     * 清空指定 streamId 的缓存队列(可选操作)
+     * @param streamId 流 ID
+     */
+    public static void clearQueue(String streamId) {
+        ArrayBlockingQueue<PlaybackItem> q = playbackQueueMap.get(streamId);
+        if (q != null) q.clear();
+    }
+
+    /**
+     * 获取队列长度
+     */
+    public static int getQueueSize(String streamId) {
+        ArrayBlockingQueue<PlaybackItem> q = playbackQueueMap.get(streamId);
+        return q == null ? 0 : q.size();
+    }
+
+}

+ 5 - 4
src/main/java/com/jttserver/service/publisher/WebsockServer.java

@@ -265,10 +265,11 @@ public class WebsockServer extends PublishServer {
                 if (!streamId.isEmpty() && relay != null) {
                     // 在注册订阅前,初始化发送该类型需要的数据
                     String channelId = server.getChannelIdByStreamId(streamId, currPrefix);
-                    if (channelId != null && !channelId.isEmpty()) {
-                        relay.initChannelConn(channelId, ctx.channel());
-                    }
+                    logger.info("为 streamId={} 注册订阅,channelId={}", streamId, channelId);
+                    relay.initChannelConn(channelId, streamId, ctx.channel());
                     server.registerChannel(streamId, ctx.channel(), relay);
+                }else {
+                    logger.warn("streamId 为空或无对应的流转发器,拒绝订阅,streamId: {}, prefix: {}, relay is null == {}", streamId, currPrefix, relay == null);
                 }
             }
             super.userEventTriggered(ctx, evt);
@@ -316,7 +317,7 @@ public class WebsockServer extends PublishServer {
                         // destroy
                         
                         if (relay != null) {
-                            relay.destroyChannelDisconn(channelId);
+                            relay.destroyChannelDisconn(channelId, sid);
                         }
                         // 移除通道与流映射
                         removeChannelMapping(channelId);