瀏覽代碼

优化初始化补发数据

kwl 3 周之前
父節點
當前提交
b0bef95b98
共有 1 個文件被更改,包括 28 次插入23 次删除
  1. 28 23
      src/main/java/com/jttserver/relay/FlvRealtimeStreamRelay.java

+ 28 - 23
src/main/java/com/jttserver/relay/FlvRealtimeStreamRelay.java

@@ -1,6 +1,5 @@
 package com.jttserver.relay;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -14,7 +13,6 @@ import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
 
 public class FlvRealtimeStreamRelay extends StreamRelay {
 
-
     private static final Logger logger = LoggerFactory.getLogger(FlvRealtimeStreamRelay.class);
 
     public FlvRealtimeStreamRelay(PublishServer publishServer, RecvSever receiveServer, String prefix) {
@@ -25,7 +23,8 @@ public class FlvRealtimeStreamRelay extends StreamRelay {
      * 线程广播
      */
     @Override
-    public void threadBocastStreamData(PublishServer publishServer, String channelId, String streamId, byte[] data, String prefix) {
+    public void threadBocastStreamData(PublishServer publishServer, String channelId, String streamId, byte[] data,
+            String prefix) {
         BroadcastWorker.broadcast(publishServer, channelId, streamId, data, prefix);
     }
 
@@ -48,41 +47,47 @@ public class FlvRealtimeStreamRelay extends StreamRelay {
      * 重置指定 channelId 的流状态(如断线重连时调用)
      */
     // public void resetChannel(String channelId) {
-    //     // 清空 FlvPacketizer 的该通道编解码器信息
-    //     packetizer.clearChannel(channelId);
-
-    //     // 同步移除映射
-    //     if (publishServer != null) {
-    //         publishServer.removeChannelMapping(channelId);
-    //     }
-    //     // 同步清理本地缓存的 streamId
-    //     channelIdToStreamId.remove(channelId);
+    // // 清空 FlvPacketizer 的该通道编解码器信息
+    // packetizer.clearChannel(channelId);
+
+    // // 同步移除映射
+    // if (publishServer != null) {
+    // publishServer.removeChannelMapping(channelId);
+    // }
+    // // 同步清理本地缓存的 streamId
+    // channelIdToStreamId.remove(channelId);
     // }
 
-    
     @Override
     public void initChannelConn(String channelId, String streamId, Channel ch) {
 
-        if (channelId == null || channelId.isEmpty()) return;
+        if (channelId == null || channelId.isEmpty())
+            return;
 
         // 补发FLV头+视频序列头
         byte[] initVideoSegment = getChannelInitVideoSegment(channelId);
-        if (initVideoSegment != null && initVideoSegment.length > 0) {
-            ch.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(initVideoSegment)));
-        }
 
         // 补发音频序列头
         byte[] initAudioSegment = getChannelInitAudioSegment(channelId);
-        if (initAudioSegment != null && initAudioSegment.length > 0) {
-            ch.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(initAudioSegment)));
-        }
 
         // 补发最近的I帧
         byte[] recentIFrame = getChannelRecentIFrame(channelId);
-        if (recentIFrame != null && recentIFrame.length > 0) {
-            ch.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(recentIFrame)));
-        }
 
+        // 在对应的Channel线程中发送,避免跨线程操作Channel引起的问题
+        ch.eventLoop().execute(() -> {
+
+            if (initVideoSegment != null && initVideoSegment.length > 0) {
+                ch.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(initVideoSegment)));
+            }
+
+            if (initAudioSegment != null && initAudioSegment.length > 0) {
+                ch.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(initAudioSegment)));
+            }
+
+            if (recentIFrame != null && recentIFrame.length > 0) {
+                ch.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(recentIFrame)));
+            }
+        });
     }
 
     @Override