ソースを参照

增加回放通道,增加logs记录

kwl 3 週間 前
コミット
7f428d4bdd

+ 10 - 3
pom.xml

@@ -14,6 +14,7 @@
         <junit.version>5.9.2</junit.version>
         <netty.version>4.1.100.Final</netty.version>
         <slf4j.version>2.0.9</slf4j.version>
+        <logback.version>1.4.14</logback.version>
     </properties>
 
     <dependencies>
@@ -63,10 +64,16 @@
             <artifactId>slf4j-api</artifactId>
             <version>${slf4j.version}</version>
         </dependency>
+        <!-- Logback 日志实现 -->
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-simple</artifactId>
-            <version>${slf4j.version}</version>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>${logback.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>${logback.version}</version>
         </dependency>
     </dependencies>
 

+ 5 - 2
src/main/java/com/jttserver/Server.java

@@ -42,26 +42,29 @@ public class Server {
 
         int wsPort = Integer.parseInt(com.jttserver.config.ConfigManager.get("server.websocket.port", "18090"));
         int realtimePort = Integer.parseInt(com.jttserver.config.ConfigManager.get("server.realtime.port", "18080"));
+        int playbackPort = Integer.parseInt(com.jttserver.config.ConfigManager.get("server.playback.port", "18081"));
 
 
         PublishServer wsServer = new WebsockServer(wsPort);
 
         try {
             wsServer.start();
+            ManagerWebServer webServer = new ManagerWebServer();
 
             RecvSever readTimeServer = new JttVideoRecvServer(wsServer, realtimePort, "/realtime/");
-            ManagerWebServer webServer = new ManagerWebServer();
+            RecvSever playbackServer = new JttVideoRecvServer(wsServer, playbackPort, "/playback/");
 
             // 当程序正常退出时自动调用stop()方法来关闭服务器。
             Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                 readTimeServer.stop();
+                playbackServer.stop();
                 webServer.stop();
                 wsServer.stop();
             }));
 
             webServer.start();
             readTimeServer.start();
-            
+            playbackServer.start();
 
             wsServer.waitForShutdown();
             

+ 8 - 8
src/main/java/com/jttserver/codec/FlvPacketizer.java

@@ -9,7 +9,7 @@ import java.util.Map;
 import com.jttserver.protocol.Jtt1078NaluPacket;
 import com.jttserver.protocol.Jtt1078PacketParams;
 import com.jttserver.protocol.JttConstants;
-import com.jttserver.utils.CommonUtils;
+import com.jttserver.utils.CommUtils;
 import com.jttserver.codec.audio.AudioDecoder;
 import com.jttserver.codec.nativeaac.AacEncoderNative;
 import com.jttserver.codec.audio.AudioDecoder.AacEncodeResult;
@@ -249,10 +249,10 @@ public class FlvPacketizer {
                 logger.error("FLV Video Tag data too large: {}", dataLength);
                 return new byte[0];
             }
-            CommonUtils.writeUi24(tag, dataLength);
+            CommUtils.writeUi24(tag, dataLength);
 
             // 时间戳 (毫秒)
-            CommonUtils.writeTimestamp(tag, (int) timestamp);
+            CommUtils.writeTimestamp(tag, (int) timestamp);
 
             // StreamID (总是0)
             tag.write(new byte[] { 0x00, 0x00, 0x00 });
@@ -262,7 +262,7 @@ public class FlvPacketizer {
             tag.write(avcPacketType); // AVCPacketType
 
             // CompositionTime (时间偏移,可能为负)
-            CommonUtils.writeSi24(tag, compositionTime);
+            CommUtils.writeSi24(tag, compositionTime);
 
             // NALU数据(payload)
             if (data != null && data.length > 0) {
@@ -379,7 +379,7 @@ public class FlvPacketizer {
             // 处理视频帧
             if (naluType == NALU_TYPE_H264_IDR || naluType == 1 || naluType == 2) {
                 byte frameType = (naluType == NALU_TYPE_H264_IDR) ? (byte) 0x17 : (byte) 0x27;
-                byte[] payload = CommonUtils.toLengthPrefixedPayloadFromAnnexB(naluData);
+                byte[] payload = CommUtils.toLengthPrefixedPayloadFromAnnexB(naluData);
                 byte[] videoTag = createVideoTag(frameType, (byte) 0x01, 0, payload, timestamp);
                 // 缓存最近的I帧
                 if (naluType == NALU_TYPE_H264_IDR) {
@@ -434,7 +434,7 @@ public class FlvPacketizer {
                 byte frameType = (naluType == NALU_TYPE_H265_IDR || naluType == NALU_TYPE_H265_IDR_N_LP)
                         ? (byte) 0x1C
                         : (byte) 0x2C; // 0x1C for H.265 keyframe, 0x2C for inter frame
-                byte[] payload = CommonUtils.toLengthPrefixedPayloadFromAnnexB(naluData);
+                byte[] payload = CommUtils.toLengthPrefixedPayloadFromAnnexB(naluData);
                 return createVideoTag(frameType, (byte) 0x01, 0, payload, timestamp);
             }
         } catch (Exception e) {
@@ -609,10 +609,10 @@ public class FlvPacketizer {
                 logger.error("FLV Audio Tag data too large: {}", dataLength);
                 return new byte[0];
             }
-            CommonUtils.writeUi24(tag, dataLength);
+            CommUtils.writeUi24(tag, dataLength);
 
             // 时间戳 (毫秒)
-            CommonUtils.writeTimestamp(tag, (int) timestamp);
+            CommUtils.writeTimestamp(tag, (int) timestamp);
 
             // StreamID (总是0)
             tag.write(new byte[] { 0x00, 0x00, 0x00 });

+ 12 - 12
src/main/java/com/jttserver/relay/FlvStreamRelay.java

@@ -134,7 +134,7 @@ public class FlvStreamRelay extends StreamRelay {
                 logger.info("channelId: {}, streamId: {}", channelId, streamId);
                 channelIdToStreamId.put(channelId, streamId);
                 // 同步建立映射关系
-                publishServer.mapStreamToChannel(streamId, channelId);
+                publishServer.mapStreamToChannel(streamId, channelId, prefix);
                 
             }
         }
@@ -164,17 +164,17 @@ public class FlvStreamRelay extends StreamRelay {
     /*
      * 重置指定 channelId 的流状态(如断线重连时调用)
      */
-    public void resetChannel(String channelId) {
-        // 清空 FlvPacketizer 的该通道编解码器信息
-        packetizer.clearChannel(channelId);
-
-        // 同步移除映射
-        if (publishServer != null) {
-            publishServer.removeChannelMapping(channelId);
-        }
-        // 同步清理本地缓存的 streamId
-        channelIdToStreamId.remove(channelId);
-    }
+    // public void resetChannel(String channelId) {
+    //     // 清空 FlvPacketizer 的该通道编解码器信息
+    //     packetizer.clearChannel(channelId);
+
+    //     // 同步移除映射
+    //     if (publishServer != null) {
+    //         publishServer.removeChannelMapping(channelId);
+    //     }
+    //     // 同步清理本地缓存的 streamId
+    //     channelIdToStreamId.remove(channelId);
+    // }
 
     /*
      * 关闭指定 channelId 的流(如连接断开时调用)

+ 4 - 0
src/main/java/com/jttserver/relay/StreamRelay.java

@@ -50,6 +50,10 @@ public abstract class StreamRelay {
         this.prefix = prefix;
     }
 
+    public String getPrefix() {
+        return this.prefix;
+    }
+
 
     /* 
      * 推流视频数据

+ 1 - 1
src/main/java/com/jttserver/service/publisher/PublishServer.java

@@ -33,7 +33,7 @@ public abstract class PublishServer {
     /**
      * 维护 streamId(sim卡号+逻辑通道号) 与 随机 channelId 的对应关系
      */
-    public abstract void mapStreamToChannel(String streamId, String channelId);
+    public abstract void mapStreamToChannel(String streamId, String channelId, String prefix);
 
     /**
      * 广播数据到指定 streamId 的所有订阅者

+ 28 - 20
src/main/java/com/jttserver/service/publisher/WebsockServer.java

@@ -1,6 +1,7 @@
 package com.jttserver.service.publisher;
 
 import com.jttserver.relay.StreamRelay;
+import com.jttserver.utils.CommUtils;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -51,9 +52,11 @@ public class WebsockServer extends PublishServer {
     }
 
 
-    // 维护 streamId(sim+logic) 与 原始视频 channelId 的映射(注意是jtt1078设备的ChannelId)
-    private final Map<String, String> streamIdToChannelId = new ConcurrentHashMap<>();
-    private final Map<String, String> channelIdToStreamId = new ConcurrentHashMap<>();
+    // 维护 streamId(sim+logic) 与 原始视频 channelId 的映射(注意是jtt1078设备的ChannelId),带有路径前缀
+    // streamIdToChannelId key=(streamID,prefix), value=channelId
+    private final Map<CommUtils.InfoItem,String> streamIdToChannelId = new ConcurrentHashMap<>();
+    // channelIdToStreamId key=channelId, value=(streamID,prefix)
+    private final Map<String, CommUtils.InfoItem> channelIdToStreamId = new ConcurrentHashMap<>();
 
     // 订阅数据结构
     // 按照streamId 为 key,ChannelGroup(包含多个channel) 为 value
@@ -90,7 +93,7 @@ public class WebsockServer extends PublishServer {
                         // 使用配置开启前缀匹配,允许 /realtime/{streamId}
                         ch.pipeline().addLast(new WebSocketServerProtocolHandler(
                                 WebSocketServerProtocolConfig.newBuilder()
-                                        .websocketPath("/realtime")
+                                        .websocketPath("/")
                                         .subprotocols(null)
                                         .allowExtensions(true)
                                         .checkStartsWith(true)
@@ -169,12 +172,12 @@ public class WebsockServer extends PublishServer {
      * 维护 streamId(sim卡号+逻辑通道号) 与 随机 channelId 的对应关系
      */
     @Override
-    public void mapStreamToChannel(String streamId, String channelId) {
+    public void mapStreamToChannel(String streamId, String channelId, String prefix) {
         if (streamId == null || streamId.isEmpty() || channelId == null || channelId.isEmpty()) {
             return;
         }
-        streamIdToChannelId.put(streamId, channelId);
-        channelIdToStreamId.put(channelId, streamId);
+        streamIdToChannelId.put(new CommUtils.InfoItem(streamId, prefix), channelId);
+        channelIdToStreamId.put(channelId, new CommUtils.InfoItem(streamId, prefix));
     }
 
     /**
@@ -185,24 +188,24 @@ public class WebsockServer extends PublishServer {
         if (channelId == null || channelId.isEmpty()) {
             return;
         }
-        String sid = channelIdToStreamId.remove(channelId);
-        if (sid != null) {
-            streamIdToChannelId.remove(sid);
+        CommUtils.InfoItem infoItem = channelIdToStreamId.remove(channelId);
+        if (infoItem != null) {
+            streamIdToChannelId.remove(infoItem);
         }
     }
 
     /**
      * 获取channelId对应的streamId方法
      */
-    public String getStreamIdByChannelId(String channelId) {
+    public CommUtils.InfoItem getStreamIdByChannelId(String channelId) {
         return channelIdToStreamId.get(channelId);
     }
 
     /**
      * 获取streamId对应的channelId方法
      */
-    public String getChannelIdByStreamId(String streamId) {
-        return streamIdToChannelId.get(streamId);
+    public String getChannelIdByStreamId(String streamId, String prefix) {
+        return streamIdToChannelId.get(new CommUtils.InfoItem(streamId, prefix));
     }
 
     /**
@@ -261,9 +264,10 @@ public class WebsockServer extends PublishServer {
                 logger.info("FLVWebSocket 握手完成 - 请求路径: {}, 流ID: {}", uri, streamId);
                 if (!streamId.isEmpty() && relay != null) {
                     // 在注册订阅前,初始化发送该类型需要的数据
-                    String channelId = server.getChannelIdByStreamId(streamId);
-
-                    relay.initChannelConn(channelId, ctx.channel());
+                    String channelId = server.getChannelIdByStreamId(streamId, currPrefix);
+                    if (channelId != null && !channelId.isEmpty()) {
+                        relay.initChannelConn(channelId, ctx.channel());
+                    }
                     server.registerChannel(streamId, ctx.channel(), relay);
                 }
             }
@@ -300,19 +304,23 @@ public class WebsockServer extends PublishServer {
                 if (g.isEmpty()) {
                     streamGroups.remove(sid);
                     // 移除流与通道映射
-                    String channelId = getChannelIdByStreamId(sid);
-                    if (channelId != null) {
+                    StreamRelay relay = channelRelayMap.remove(ch.id());
+                    String currPrefix = relay.getPrefix();
+                    // 获取recv服务器的对应channelId
+                    String channelId = getChannelIdByStreamId(sid, currPrefix);
+                    if (channelId != null && !channelId.isEmpty()) {
                         logger.info("流 {} 已无订阅者,channel已移除{}", sid, channelId);
                         
                         // destroy
-                        StreamRelay relay = channelRelayMap.remove(ch.id());
+                        
                         if (relay != null) {
                             relay.destroyChannelDisconn(channelId);
                         }
-
                         // 移除通道与流映射
                         removeChannelMapping(channelId);
                         
+                    }else {
+                        logger.warn("无对应的channelId,无需关闭视频通道,streamId: {}, prefix: {}", sid, currPrefix);
                     }
                 }
             }

+ 2 - 2
src/main/java/com/jttserver/service/receiver/JttVideoRecvServer.java

@@ -19,7 +19,7 @@ import com.jttserver.relay.StreamRelay;
 import com.jttserver.relay.workerthreads.BroadcastWorker;
 import com.jttserver.relay.workerthreads.VideoPublishWorker;
 import com.jttserver.service.publisher.PublishServer;
-import com.jttserver.utils.CommonUtils;
+import com.jttserver.utils.CommUtils;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.ByteBuf;
@@ -278,7 +278,7 @@ public class JttVideoRecvServer extends RecvSever {
                 Jtt1078PacketParams latestParams = assembler.getLatestParams();
 
                 // 计算时间戳
-                long tsMs = CommonUtils.toTimestampMillis(latestParams != null ? latestParams.timestamp : null);
+                long tsMs = CommUtils.toTimestampMillis(latestParams != null ? latestParams.timestamp : null);
                 // // 视频数据类型0x00-0x02 推流
                 // if (latestParams.dataType <= JttConstants.TYPE_VIDEO_B_FRAME) {
                 // flvPublisher.publishVideo(channelId, completeNaluData, latestParams, tsMs);

+ 44 - 3
src/main/java/com/jttserver/utils/CommonUtils.java → src/main/java/com/jttserver/utils/CommUtils.java

@@ -3,12 +3,53 @@ package com.jttserver.utils;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Objects;
 
 /**
  * 通用工具方法(字节与编解码相关的公共逻辑)。
  */
-public final class CommonUtils {
-    private CommonUtils() {
+public final class CommUtils {
+    private CommUtils() {
+    }
+
+    /**
+     * 通用信息项类,可用于map的key或value。
+     */
+    public static final class InfoItem {
+        private final String title;
+        private final String content;
+
+        public InfoItem(String title, String content) {
+            this.title = Objects.requireNonNull(title, "title cannot be null[title不能为空]");
+            this.content = Objects.requireNonNull(content, "content cannot be null[content不能为空]");
+        }
+
+        public String getTitle() {
+            return title;
+        }
+        public String getContent() {
+            return content;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+            InfoItem other = (InfoItem) obj;
+            return title.equals(other.title) && content.equals(other.content);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(title, content);
+        }
+
+        @Override
+        public String toString() {
+            return "InfoItem [title=" + title + ", content=" + content + "]";
+        }
     }
 
     // 写无符号24位整数(大端)
@@ -127,4 +168,4 @@ public final class CommonUtils {
          }
          return sb.toString().trim();
      }
-}
+}

+ 2 - 2
src/test/java/com/jttserver/codec/FlvPacketizerTest.java

@@ -5,7 +5,7 @@ import org.junit.jupiter.api.Test;
 
 import com.jttserver.protocol.Jtt1078NaluPacket;
 import com.jttserver.protocol.Jtt1078PacketParams;
-import com.jttserver.utils.CommonUtils;
+import com.jttserver.utils.CommUtils;
 
 import static org.junit.jupiter.api.Assertions.*;
 
@@ -675,7 +675,7 @@ public class FlvPacketizerTest {
                 | (tag[tag.length - 1] & 0xFF);
         assertEquals(tag.length - 4, previousTagSize, "PreviousTagSize 应等于总长度减4");
         // 打印tag 数据
-        System.out.println("FLV AAC Sequence Header tag (" + tag.length + " bytes): " + CommonUtils.bytesToHex(tag));
+        System.out.println("FLV AAC Sequence Header tag (" + tag.length + " bytes): " + CommUtils.bytesToHex(tag));
     }
 
 }