kwl hai 2 semanas
pai
achega
b4ccf681bb

+ 29 - 17
src/main/java/com/jttserver/codec/FlvPacketizer.java

@@ -4,6 +4,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.Map;
 
 import com.jttserver.protocol.Jtt1078NaluPacket;
@@ -31,6 +32,14 @@ public class FlvPacketizer {
             0x00, 0x00, 0x00, 0x09 // 头长度
     };
 
+    public static final byte[] FLV_HEADER_ALL = new byte[] {
+            0x46, 0x4C, 0x56, // "FLV"
+            0x01, // 版本
+            0x05, // 视频音频标签存在 (0x05 表示音频+视频)
+            0x00, 0x00, 0x00, 0x09, // 头长度
+            0x00, 0x00, 0x00, 0x00 // PreviousTagSize0
+    };
+
     // FLV文件头大小
     @SuppressWarnings("unused")
     private static final int FLV_HEADER_SIZE = 9;
@@ -57,6 +66,7 @@ public class FlvPacketizer {
      * 编解码器信息
      */
     private static class CodecInfo {
+        AtomicBoolean flvHeaderGenerated = new AtomicBoolean(false);
         // 编码类型:true为H.264,false为H.265
         int payloadVideo = -1;
         // 是否已完成视频编码格式判断(避免重复判定)
@@ -91,20 +101,22 @@ public class FlvPacketizer {
         byte[] recentIFrame;
     }
 
+
     /**
      * 创建FLV头
      * 
      * @return FLV头数据
      */
-    public byte[] createFlvHeader() {
-        ByteArrayOutputStream header = new ByteArrayOutputStream();
-        try {
-            header.write(FLV_HEADER);
-            header.write(PREVIOUS_TAG_SIZE0);
-        } catch (IOException e) {
-            // 不会发生
-        }
-        return header.toByteArray();
+    public static byte[] createFlvHeader() {
+        return FLV_HEADER_ALL;
+    }
+
+    /*
+     * 获取并设置FLV头生成标记   
+     */
+    public boolean getAndSetFlvHeaderGenerated(String channelId) {
+        CodecInfo codecInfo = channelCodecInfo.computeIfAbsent(channelId, k -> new CodecInfo());
+        return codecInfo.flvHeaderGenerated.getAndSet(true);
     }
 
     /**
@@ -331,12 +343,12 @@ public class FlvPacketizer {
             byte[] videoTag = createVideoTag((byte) 0x17, (byte) 0x00, 0, avcConfig.toByteArray(), timestamp);
 
             // 返回FLV头和序列头数据
-            ByteArrayOutputStream headerAVC = new ByteArrayOutputStream();
-            headerAVC.write(createFlvHeader());
-            headerAVC.write(videoTag);
+            // ByteArrayOutputStream headerAVC = new ByteArrayOutputStream();
+            // headerAVC.write(createFlvHeader());
+            // headerAVC.write(videoTag);
 
             // 在生成时直接缓存到CodecInfo
-            codecInfo.flvAndSequenceHeaderTag = headerAVC.toByteArray();
+            codecInfo.flvAndSequenceHeaderTag = videoTag;
             codecInfo.flvAndSequenceHeaderGenerated = true;
             return codecInfo.flvAndSequenceHeaderTag;
         } catch (IOException e) {
@@ -538,12 +550,12 @@ public class FlvPacketizer {
             byte[] videoTag = createVideoTag((byte) 0x1C, (byte) 0x00, 0, hevcConfig.toByteArray(), timestamp);
 
             // 返回FLV头和序列头数据
-            ByteArrayOutputStream headerHEVC = new ByteArrayOutputStream();
-            headerHEVC.write(createFlvHeader());
-            headerHEVC.write(videoTag);
+            // ByteArrayOutputStream headerHEVC = new ByteArrayOutputStream();
+            // headerHEVC.write(createFlvHeader());
+            // headerHEVC.write(videoTag);
 
             // 在生成时直接缓存到CodecInfo
-            codecInfo.flvAndSequenceHeaderTag = headerHEVC.toByteArray();
+            codecInfo.flvAndSequenceHeaderTag = videoTag;
             codecInfo.flvAndSequenceHeaderGenerated = true;
             return codecInfo.flvAndSequenceHeaderTag;
 

+ 5 - 0
src/main/java/com/jttserver/relay/FlvRealtimeStreamRelay.java

@@ -3,6 +3,7 @@ package com.jttserver.relay;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.jttserver.codec.FlvPacketizer;
 import com.jttserver.relay.workerthreads.BroadcastWorker;
 import com.jttserver.service.publisher.PublishServer;
 import com.jttserver.service.receiver.RecvSever;
@@ -68,6 +69,8 @@ public class FlvRealtimeStreamRelay extends StreamRelay {
             return true;
         }
 
+        byte[] flvHeader = FlvPacketizer.createFlvHeader();
+
         // 补发FLV头+视频序列头
         byte[] initVideoSegment = getChannelInitVideoSegment(channelId);
 
@@ -80,6 +83,8 @@ public class FlvRealtimeStreamRelay extends StreamRelay {
         // 在对应的Channel线程中发送,避免跨线程操作Channel引起的问题
         ch.eventLoop().execute(() -> {
 
+            ch.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(flvHeader)));
+
             if (initVideoSegment != null && initVideoSegment.length > 0) {
                 ch.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(initVideoSegment)));
             }

+ 20 - 11
src/main/java/com/jttserver/relay/StreamRelay.java

@@ -47,7 +47,7 @@ public abstract class StreamRelay {
     // 使用单实例 FlvPacketizer,内部以 channelId 维护编解码器信息
     protected FlvPacketizer packetizer;
 
-    /* 
+    /*
      * 构造函数
      */
     public StreamRelay() {
@@ -64,7 +64,7 @@ public abstract class StreamRelay {
 
         // 当前仅支持 FLV 打包
         this.packetizer = new FlvPacketizer();
-        if (this.publishServer != null) {     
+        if (this.publishServer != null) {
             publishServer.addPrefixRelay(prefix, this);
         }
     }
@@ -87,8 +87,7 @@ public abstract class StreamRelay {
         return this.prefix;
     }
 
-
-    /* 
+    /*
      * 推流视频数据
      */
     public void publishVideo(String channelId, byte[] nalu, Jtt1078PacketParams params,
@@ -135,7 +134,7 @@ public abstract class StreamRelay {
         }
     }
 
-    /* 
+    /*
      * 广播逻辑
      */
     private void broadcastStreamData(String channelId, byte[] data, Jtt1078PacketParams params) {
@@ -149,7 +148,7 @@ public abstract class StreamRelay {
                 channelIdToStreamId.put(channelId, streamId);
                 // 同步建立映射关系
                 publishServer.mapStreamToChannel(streamId, channelId, prefix);
-                
+
             }
         }
         // 使用 streamId
@@ -213,7 +212,7 @@ public abstract class StreamRelay {
         return new NaluSegment(payload, consumed);
     }
 
-    /* 
+    /*
      * 推流音频数据
      */
     public void publishAudio(String channelId, byte[] audio, Jtt1078PacketParams params,
@@ -238,7 +237,16 @@ public abstract class StreamRelay {
         }
     }
 
-    /* 
+    public void pushFirstFlvHeader(String channelId, Jtt1078PacketParams params, long timestampMs) {
+        if (packetizer.getAndSetFlvHeaderGenerated(channelId)) {
+            return; // 已经推送过FLV头,直接返回
+        }
+
+        broadcastStreamData(channelId, FlvPacketizer.FLV_HEADER_ALL, params);
+
+    }
+
+    /*
      * 关闭并清理指定通道的资源
      */
     public void closeChannel(String channelId) {
@@ -252,9 +260,9 @@ public abstract class StreamRelay {
         channelIdToStreamId.remove(channelId);
     }
 
-
-    /* 
+    /*
      * 初始化通道连接(订阅时调用一次,用于补发数据等)
+     * 
      * @return 是否成功初始化连接
      */
     public abstract Boolean initChannelConn(String channelId, String streamId, Channel ch, PublishServer server);
@@ -267,5 +275,6 @@ public abstract class StreamRelay {
     /**
      * 线程中广播流数据
      */
-    public abstract void threadBocastStreamData(PublishServer publishServer, String channelId, String streamId, byte[] data, String prefix);
+    public abstract void threadBocastStreamData(PublishServer publishServer, String channelId, String streamId,
+            byte[] data, String prefix);
 }

+ 3 - 0
src/main/java/com/jttserver/relay/workerthreads/VideoPublishWorker.java

@@ -77,6 +77,9 @@ public class VideoPublishWorker {
         // 创建推流任务
         Runnable publishTask = () -> {
             try {
+                // 首次发送时推送FLV头部
+                streamRelay.pushFirstFlvHeader(channelId, latestParams, tsMs);
+
                 if (latestParams.dataType <= JttConstants.TYPE_VIDEO_B_FRAME) {
                     streamRelay.publishVideo(channelId, completeNaluData, latestParams, tsMs);
                 } else if (latestParams.dataType == JttConstants.TYPE_AUDIO) {

+ 20 - 21
src/test/java/com/jttserver/codec/FlvPacketizerTest.java

@@ -86,17 +86,17 @@ public class FlvPacketizerTest {
         byte[] result2 = flvPacketizer.processVideoNalu(channelId, ppsData, params, 0);
         // PPS处理应该返回序列头
         assertTrue(result2.length > 0);
-        // 返回的是flv头+序列头
-        int baseIndex = 13;
+        // 返回的是视频Tag(不包含文件头)
+        int baseIndex = 0;
 
-        assertTrue(result2.length >14, "不够flv头长度");
+        assertTrue(result2.length > 14, "不够视频tag长度");
         // 检查是否是视频标签 (tag type = 0x09)
         assertEquals(0x09, result2[baseIndex + 0]);
-        
-        assertTrue(result2.length >14 + 11 + 1, "不够flv头长度+AVC序列头长度");
+
+        assertTrue(result2.length > 11 + 2, "不够视频tag长度+AVC序列头长度");
         // 检查AVC序列头标记 (AVCPacketType = 0x00)
         // Tag头: 1字节TagType + 3字节DataSize + 3字节Timestamp + 1字节TimestampExtended + 3字节StreamID = 11字节
-        assertEquals(0x00, result2[baseIndex+ 11 + 1]); // AVCPacketType
+        assertEquals(0x00, result2[baseIndex + 11 + 1]); // AVCPacketType
     }
 
     
@@ -133,15 +133,15 @@ public class FlvPacketizerTest {
         byte[] result2 = flvPacketizer.processVideoNalu(channelId, spsData, params, 0);
         // SPS处理应该返回序列头
         assertTrue(result2.length > 0);
-        
-        // 返回的是flv头+序列头
-        int baseIndex = 13;
 
-        assertTrue(result2.length >14, "不够flv头长度");
+        // 返回的是视频Tag(不包含文件头)
+        int baseIndex = 0;
+
+        assertTrue(result2.length > 14, "不够视频tag长度");
         // 检查是否是视频标签 (tag type = 0x09)
         assertEquals(0x09, result2[baseIndex + 0]);
-        
-        assertTrue(result2.length >14 + 11 + 1, "不够flv头长度+AVC序列头长度");
+
+        assertTrue(result2.length > 11 + 2, "不够视频tag长度+AVC序列头长度");
         // 检查AVC序列头标记 (AVCPacketType = 0x00)
         // Tag头: 1字节TagType + 3字节DataSize + 3字节Timestamp + 1字节TimestampExtended + 3字节StreamID = 11字节
         assertEquals(0x00, result2[baseIndex + 11 + 1]); // AVCPacketType
@@ -225,15 +225,15 @@ public class FlvPacketizerTest {
         // PPS处理应该返回序列头
         assertTrue(result3.length > 0);
         
-        // 返回的是flv头+序列头
-        int baseIndex = 13;
+        // 返回的是视频Tag(不包含文件头)
+        int baseIndex = 0;
 
-        assertTrue(result3.length >14, "不够flv头长度");
+        assertTrue(result3.length > 14, "不够视频tag长度");
 
         // 检查是否是视频标签 (tag type = 0x09)
         assertEquals(0x09, result3[baseIndex + 0]);
         
-        assertTrue(result3.length >14 + 11 + 1, "不够flv头长度+HEVC序列头长度");
+        assertTrue(result3.length > 11 + 2, "不够视频tag长度+HEVC序列头长度");
         // 检查HEVC序列头标记 (AVCPacketType = 0x00)
         // Tag头: 1字节TagType + 3字节DataSize + 3字节Timestamp + 1字节TimestampExtended + 3字节StreamID = 11字节
         assertEquals(0x00, result3[baseIndex + 11 + 1]); // AVCPacketType
@@ -276,16 +276,15 @@ public class FlvPacketizerTest {
         // SPS处理应该返回序列头
         assertTrue(result3.length > 0);
         
-        // 返回的是flv头+序列头
-        int baseIndex = 13;
-
-        assertTrue(result3.length >14, "不够flv头长度");
+        // 返回的是视频Tag(不包含文件头)
+        int baseIndex = 0;
 
+        assertTrue(result3.length > 14, "不够视频tag长度");
 
         // 检查是否是视频标签 (tag type = 0x09)
         assertEquals(0x09, result3[baseIndex + 0]);
         
-        assertTrue(result3.length >14 + 11 + 1, "不够flv头长度+HEVC序列头长度");
+        assertTrue(result3.length > 11 + 2, "不够视频tag长度+HEVC序列头长度");
         // 检查HEVC序列头标记 (AVCPacketType = 0x00)
         // Tag头: 1字节TagType + 3字节DataSize + 3字节Timestamp + 1字节TimestampExtended + 3字节StreamID = 11字节
         assertEquals(0x00, result3[baseIndex + 11 + 1]); // AVCPacketType

+ 23 - 9
src/test/java/com/jttserver/relay/FlvStreamRelayTest.java

@@ -3,13 +3,12 @@ package com.jttserver.relay;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import com.jttserver.codec.FlvPacketizer;
 import com.jttserver.protocol.Jtt1078PacketParams;
 
 import java.io.ByteArrayOutputStream;
 import static org.junit.jupiter.api.Assertions.*;
 
-
-
 public class FlvStreamRelayTest {
     private FlvRealtimeStreamRelay flvStreamRelay;
 
@@ -23,8 +22,8 @@ public class FlvStreamRelayTest {
         String channelId = "test_channel_avc";
 
         // 简化的 H.264 SPS/PPS,与已有 FlvPacketizerTest 中的数据保持一致
-        byte[] spsData = new byte[]{0x67, 0x42, 0x00, 0x0A, (byte)0xF8, 0x41, (byte)0xA2};
-        byte[] ppsData = new byte[]{0x68, (byte)0xCB, (byte)0x8C, (byte)0xB2};
+        byte[] spsData = new byte[] { 0x67, 0x42, 0x00, 0x0A, (byte) 0xF8, 0x41, (byte) 0xA2 };
+        byte[] ppsData = new byte[] { 0x68, (byte) 0xCB, (byte) 0x8C, (byte) 0xB2 };
 
         // 参数:payloadType=98 表示 H.264
         Jtt1078PacketParams params = new Jtt1078PacketParams();
@@ -36,6 +35,8 @@ public class FlvStreamRelayTest {
 
         // 验证初始化段(包含 FLV 头 + 序列头)
         byte[] init = flvStreamRelay.getChannelInitVideoSegment(channelId);
+        init = concat(FlvPacketizer.FLV_HEADER_ALL, init);
+
         assertNotNull(init);
         assertTrue(init.length > 13, "初始化段应包含文件头和至少一个视频Tag");
 
@@ -58,14 +59,16 @@ public class FlvStreamRelayTest {
         int tagStart = 13; // FLV头后的位置
         assertEquals(0x09, init[tagStart], "首个Tag应为视频类型(0x09)");
 
-         // 验证序列头标记:在Tag的11字节头之后,payload首字节为 FrameType+CodecID,下一字节为 AVCPacketType
+        // 验证序列头标记:在Tag的11字节头之后,payload首字节为 FrameType+CodecID,下一字节为 AVCPacketType
         int payloadStart = tagStart + 11;
         assertTrue(init.length > payloadStart + 2, "初始化段中的视频Tag的payload应存在");
         assertEquals(0x17, init[payloadStart], "AVC序列头的FrameType+CodecID应为0x17");
         assertEquals(0x00, init[payloadStart + 1], "AVCPacketType应为0x00(序列头)");
 
-         // 替代验证:使用初始化段校验序列头,不依赖内存缓冲
+        // 替代验证:使用初始化段校验序列头,不依赖内存缓冲
         byte[] initAgain = flvStreamRelay.getChannelInitVideoSegment(channelId);
+        initAgain = concat(FlvPacketizer.FLV_HEADER_ALL, initAgain);
+
         assertNotNull(initAgain);
         assertTrue(initAgain.length > tagStart + 11 + 2, "初始化段应包含FLV头和至少一个视频序列头Tag");
         assertEquals(0x09, initAgain[tagStart + 0], "初始化段首个Tag应为视频类型(0x09)");
@@ -79,9 +82,9 @@ public class FlvStreamRelayTest {
     void testPublishVideoSplitMultipleNalues_H264CombinedSpsPps() {
         String channelId = "test_channel_avc_multi";
 
-        byte[] startCode = new byte[]{0x00, 0x00, 0x00, 0x01};
-        byte[] spsData = new byte[]{0x67, 0x42, 0x00, 0x0A, (byte)0xF8, 0x41, (byte)0xA2};
-        byte[] ppsData = new byte[]{0x68, (byte)0xCB, (byte)0x8C, (byte)0xB2};
+        byte[] startCode = new byte[] { 0x00, 0x00, 0x00, 0x01 };
+        byte[] spsData = new byte[] { 0x67, 0x42, 0x00, 0x0A, (byte) 0xF8, 0x41, (byte) 0xA2 };
+        byte[] ppsData = new byte[] { 0x68, (byte) 0xCB, (byte) 0x8C, (byte) 0xB2 };
 
         // 参数:payloadType=98 表示 H.264
         Jtt1078PacketParams params = new Jtt1078PacketParams();
@@ -100,6 +103,8 @@ public class FlvStreamRelayTest {
 
         // 验证初始化段(包含 FLV 头 + 序列头)
         byte[] init = flvStreamRelay.getChannelInitVideoSegment(channelId);
+        init = concat(FlvPacketizer.FLV_HEADER_ALL, init);
+
         assertNotNull(init);
         assertTrue(init.length > 13, "初始化段应包含文件头和至少一个视频Tag");
         // 校验 FLV 头(13字节:9字节头+4字节PreviousTagSize0)
@@ -128,6 +133,8 @@ public class FlvStreamRelayTest {
 
         // 替代验证:使用初始化段再次校验序列头,不依赖内存缓冲
         byte[] initAgain = flvStreamRelay.getChannelInitVideoSegment(channelId);
+        initAgain = concat(FlvPacketizer.FLV_HEADER_ALL, initAgain);
+        
         assertNotNull(initAgain);
         assertTrue(initAgain.length > tagStart + 11 + 2, "初始化段应包含FLV头和至少一个视频序列头Tag");
         assertEquals(0x09, initAgain[tagStart + 0], "初始化段首个Tag应为视频类型(0x09)");
@@ -135,4 +142,11 @@ public class FlvStreamRelayTest {
         assertEquals(0x17, initAgain[tagStart + bufPayloadStart], "序列头的FrameType+CodecID应为0x17");
         assertEquals(0x00, initAgain[tagStart + bufPayloadStart + 1], "AVCPacketType应为0x00(序列头)");
     }
+
+    public static byte[] concat(byte[] a, byte[] b) {
+        byte[] result = new byte[a.length + b.length];
+        System.arraycopy(a, 0, result, 0, a.length);
+        System.arraycopy(b, 0, result, a.length, b.length);
+        return result;
+    }
 }