瀏覽代碼

初步对接好原有功能

kwl 3 周之前
父節點
當前提交
b42a107bf7

+ 39 - 0
pom.xml

@@ -63,6 +63,45 @@
             <artifactId>slf4j-api</artifactId>
             <version>${slf4j.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
     </dependencies>
 
+    <build>
+        <plugins>
+            <!-- Git提交信息插件 -->
+            <plugin>
+                <groupId>io.github.git-commit-id</groupId>
+                <artifactId>git-commit-id-maven-plugin</artifactId>
+                <version>5.0.0</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>revision</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <generateGitPropertiesFile>true</generateGitPropertiesFile>
+                    <generateGitPropertiesFilename>${project.build.outputDirectory}/git.properties</generateGitPropertiesFilename>
+                    <injectAllReactorProjects>true</injectAllReactorProjects>
+                    <failOnNoGitDirectory>false</failOnNoGitDirectory>
+                    <failOnUnableToExtractRepoInfo>false</failOnUnableToExtractRepoInfo>
+                </configuration>
+            </plugin>
+            <!-- 执行插件 -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>exec-maven-plugin</artifactId>
+                <version>3.1.0</version>
+                <configuration>
+                    <mainClass>com.jttserver.Server</mainClass>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
 </project>

+ 61 - 0
src/main/java/com/jttserver/Server.java

@@ -7,11 +7,72 @@ import java.util.Properties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.jttserver.config.ConfigManager;
+import com.jttserver.service.ManagerWebServer;
+import com.jttserver.service.publisher.PublishServer;
+import com.jttserver.service.publisher.WebsockServer;
+import com.jttserver.service.receiver.JttVideoRecvServer;
+import com.jttserver.service.receiver.RecvSever;
+
 public class Server {
     private static final Logger logger = LoggerFactory.getLogger(Server.class);
     public static void main(String[] args) {
         // 打印 Git Commit ID
         showGitHash();
+
+        // 检查是否启用设备管理功能
+        boolean enableDeviceManagement = true;
+        for (String arg : args) {
+            if ("-disableDeviceManagement".equals(arg)) {
+                enableDeviceManagement = false;
+                break;
+            }
+        }
+
+        // 根据参数设置功能开关
+        if (enableDeviceManagement) {
+            ConfigManager.enableDeviceManagement();
+            logger.info("设备管理功能已启用, 如需禁用请使用 -disableDeviceManagement 参数");
+        } else {
+            ConfigManager.disableDeviceManagement();
+            logger.info("设备管理功能已禁用");
+        }
+
+        logger.info("服务启动");
+
+        int wsPort = Integer.parseInt(com.jttserver.config.ConfigManager.get("server.websocket.port", "18090"));
+        int realtimePort = Integer.parseInt(com.jttserver.config.ConfigManager.get("server.realtime.port", "18080"));
+
+
+        PublishServer wsServer = new WebsockServer(wsPort);
+
+        try {
+            wsServer.start();
+
+            RecvSever readTimeServer = new JttVideoRecvServer(wsServer, realtimePort, "/realtime/");
+            ManagerWebServer webServer = new ManagerWebServer();
+
+            // 当程序正常退出时自动调用stop()方法来关闭服务器。
+            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+                readTimeServer.stop();
+                webServer.stop();
+                wsServer.stop();
+            }));
+
+            webServer.start();
+            readTimeServer.start();
+            
+
+            wsServer.waitForShutdown();
+            
+        } catch (InterruptedException e) {
+            logger.error("服务器运行被中断", e);
+            Thread.currentThread().interrupt();   
+        } catch (Exception e) {
+            logger.error("服务启动异常", e);
+        }
+
+
     }
 
     public static void showGitHash() {

+ 7 - 2
src/main/java/com/jttserver/relay/FlvStreamRelay.java

@@ -11,6 +11,7 @@ import com.jttserver.protocol.Jtt1078NaluPacket;
 import com.jttserver.protocol.Jtt1078PacketParams;
 import com.jttserver.relay.workerthreads.BroadcastWorker;
 import com.jttserver.service.publisher.PublishServer;
+import com.jttserver.service.receiver.RecvSever;
 import com.jttserver.utils.SimCardUtils;
 
 import io.netty.buffer.Unpooled;
@@ -39,9 +40,13 @@ public class FlvStreamRelay extends StreamRelay {
     private final FlvPacketizer packetizer;
 
     
-    public FlvStreamRelay() {
+    public FlvStreamRelay(PublishServer publishServer, RecvSever receiveServer, String prefix) {
+        super(publishServer, receiveServer, prefix);
         // 当前仅支持 FLV 打包
         this.packetizer = new FlvPacketizer();
+        if (this.publishServer != null) {     
+            publishServer.addPrefixRelay(prefix, this);
+        }
     }
 
     /*
@@ -130,7 +135,7 @@ public class FlvStreamRelay extends StreamRelay {
                 channelIdToStreamId.put(channelId, streamId);
                 // 同步建立映射关系
                 publishServer.mapStreamToChannel(streamId, channelId);
-                publishServer.addStreamRelay(streamId, this);
+                
             }
         }
         // 使用 streamId

+ 17 - 1
src/main/java/com/jttserver/relay/StreamRelay.java

@@ -17,11 +17,23 @@ public abstract class StreamRelay {
     // 发布服务器引用 websocketserver
     protected PublishServer publishServer;
 
+    // 路径前缀
+    protected String prefix;
+
     /* 
      * 构造函数
      */
     public StreamRelay() {
-        
+
+    }
+
+    /**
+     * 带参数构造函数
+     */
+    public StreamRelay(PublishServer publishServer, RecvSever receiveServer, String prefix) {
+        this.publishServer = publishServer;
+        this.receiveServer = receiveServer;
+        this.prefix = prefix;
     }
 
     public void setPublishServer(PublishServer publishServer) {
@@ -34,6 +46,10 @@ public abstract class StreamRelay {
         this.receiveServer = receiveServer;
     }
 
+    public void setPrefix(String prefix) {
+        this.prefix = prefix;
+    }
+
 
     /* 
      * 推流视频数据

+ 420 - 0
src/main/java/com/jttserver/service/ManagerWebServer.java

@@ -0,0 +1,420 @@
+package com.jttserver.service;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.http.*;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+
+import java.net.InetSocketAddress;
+import java.util.*;
+
+import com.jttserver.config.ConfigManager;
+import com.jttserver.device.DeviceManager;
+
+import java.io.InputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ManagerWebServer {
+
+    private static final Logger logger = LoggerFactory.getLogger(ManagerWebServer.class);
+
+    private final int port = Integer.parseInt(ConfigManager.get("server.manager.port", "8099"));
+
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+    private Channel webServerChannel;
+
+    // 设备管理功能开关状态(初始化时获取,避免频繁调用ConfigManager)
+    private final boolean deviceManagementEnabled = ConfigManager.isDeviceManagementEnabled();
+
+    public ManagerWebServer() {
+        // 检查资源文件是否存在(仅在功能启用时检查)
+        if (deviceManagementEnabled) {
+            checkResources();
+        }
+    }
+
+    /**
+     * 检查资源文件是否存在
+     */
+    private void checkResources() {
+        String[] resourceFiles = { "devices.html", "player.html", "playerWhthoutAudio.html", "offline_player.html", "mpegts.js", "jessibuca/demo.html", "jessibuca/jessibuca.js" };
+        for (String file : resourceFiles) {
+            try (InputStream inputStream = ManagerWebServer.class.getClassLoader().getResourceAsStream("web/" + file)) {
+                if (inputStream == null) {
+                    logger.warn("找不到资源文件: {}", file);
+                }
+            } catch (IOException e) {
+                logger.warn("无法访问资源文件: {}", file, e);
+            }
+        }
+    }
+
+    /**
+     * 启动Web服务器
+     */
+    public void start() throws InterruptedException {
+        // 检查功能开关
+        if (!deviceManagementEnabled) {
+            logger.info("Web管理界面服务器功能未启用");
+            return;
+        }
+
+        bossGroup = new NioEventLoopGroup(1);
+        workerGroup = new NioEventLoopGroup();
+
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.group(bossGroup, workerGroup)
+                .channel(NioServerSocketChannel.class)
+                .localAddress(new InetSocketAddress(port))
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    protected void initChannel(SocketChannel ch) throws Exception {
+                        ch.pipeline().addLast(new HttpServerCodec());
+                        ch.pipeline().addLast(new HttpObjectAggregator(65536));
+                        ch.pipeline().addLast(new ChunkedWriteHandler());
+                        ch.pipeline().addLast(new WebRequestHandler(deviceManagementEnabled));
+                    }
+                });
+
+        ChannelFuture future = bootstrap.bind().sync();
+        webServerChannel = future.channel();
+
+        logger.info("Web管理界面服务器启动,监听端口: {}", port);
+    }
+
+    /**
+     * 停止Web服务器
+     */
+    public void stop() {
+        // 检查功能开关
+        if (!ConfigManager.isDeviceManagementEnabled()) {
+            return;
+        }
+
+        if (webServerChannel != null) {
+            webServerChannel.close();
+        }
+
+        if (bossGroup != null) {
+            bossGroup.shutdownGracefully();
+        }
+
+        if (workerGroup != null) {
+            workerGroup.shutdownGracefully();
+        }
+
+        logger.info("Web管理界面服务器已停止");
+    }
+
+    /**
+     * 兼容Java 8的readAllBytes实现
+     */
+    private static byte[] readAllBytes(InputStream inputStream) throws IOException {
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        int nRead;
+        byte[] data = new byte[4096];
+        while ((nRead = inputStream.read(data, 0, data.length)) != -1) {
+            buffer.write(data, 0, nRead);
+        }
+        return buffer.toByteArray();
+    }
+
+    /**
+     * Web请求处理器
+     */
+    public static class WebRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+        // 设备管理功能开关状态(避免频繁调用ConfigManager)
+        private final boolean deviceManagementEnabled;
+
+        public WebRequestHandler(boolean deviceManagementEnabled) {
+            this.deviceManagementEnabled = deviceManagementEnabled;
+        }
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+            // 检查功能开关
+            if (!deviceManagementEnabled) {
+                handleNotFound(ctx);
+                return;
+            }
+
+            String uri = request.uri();
+
+            // 移除查询参数部分,只保留路径部分
+            int queryIndex = uri.indexOf('?');
+            if (queryIndex != -1) {
+                uri = uri.substring(0, queryIndex);
+            }
+
+            // 默认路径和 /devices.html 都显示设备详情页面
+            if ("/".equals(uri) || "/devices.html".equals(uri) || "/index.html".equals(uri)) {
+                handleDevicesPage(ctx);
+            } else if ("/player.html".equals(uri)) {
+                handlePlayerPage(ctx);
+            } else if ("/playerWhthoutAudio.html".equals(uri)) {
+                handlePlayerWithoutAudioPage(ctx);
+            } else if ("/offline_player.html".equals(uri)) {
+                handleOfflinePlayerPage(ctx);
+            } else if ("/mpegts.js".equals(uri)) {
+                handleMpegtsJs(ctx);
+            } else if (uri.startsWith("/jessibuca/")) {
+                handleJessibucaFile(ctx, uri);
+            } else if ("/devices".equals(uri)) {
+                handleDevicesApi(ctx);
+            } else {
+                handleNotFound(ctx);
+            }
+        }
+
+        /**
+         * 从资源文件中读取HTML内容
+         */
+        private static String loadHtmlFile(String fileName) throws IOException {
+            try (InputStream inputStream = ManagerWebServer.class.getClassLoader()
+                    .getResourceAsStream("web/" + fileName)) {
+                if (inputStream == null) {
+                    throw new IOException("无法找到资源文件: " + fileName);
+                }
+                return new String(readAllBytes(inputStream), StandardCharsets.UTF_8);
+            }
+        }
+
+        /**
+         * 处理设备详情页面请求
+         */
+        private void handleDevicesPage(ChannelHandlerContext ctx) throws IOException {
+            String html = loadHtmlFile("devices.html");
+
+            FullHttpResponse response = new DefaultFullHttpResponse(
+                    HttpVersion.HTTP_1_1,
+                    HttpResponseStatus.OK,
+                    Unpooled.copiedBuffer(html, CharsetUtil.UTF_8));
+            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
+            response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+            ctx.writeAndFlush(response);
+        }
+
+        /**
+         * 处理播放器测试页面请求
+         */
+        private void handlePlayerPage(ChannelHandlerContext ctx) throws IOException {
+            String html = loadHtmlFile("player.html");
+
+            FullHttpResponse response = new DefaultFullHttpResponse(
+                    HttpVersion.HTTP_1_1,
+                    HttpResponseStatus.OK,
+                    Unpooled.copiedBuffer(html, CharsetUtil.UTF_8));
+            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
+            response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+            ctx.writeAndFlush(response);
+        }
+
+        
+        /**
+         * 处理无音频播放器测试页面请求
+         */
+        private void handlePlayerWithoutAudioPage(ChannelHandlerContext ctx) throws IOException {
+            String html = loadHtmlFile("playerWhthoutAudio.html");
+
+            FullHttpResponse response = new DefaultFullHttpResponse(
+                    HttpVersion.HTTP_1_1,
+                    HttpResponseStatus.OK,
+                    Unpooled.copiedBuffer(html, CharsetUtil.UTF_8));
+            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
+            response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+            ctx.writeAndFlush(response);
+        }
+
+         /**
+         * 处理离线播放器页面请求
+         */
+        private void handleOfflinePlayerPage(ChannelHandlerContext ctx) throws IOException {
+            String html = loadHtmlFile("offline_player.html");
+
+            FullHttpResponse response = new DefaultFullHttpResponse(
+                    HttpVersion.HTTP_1_1,
+                    HttpResponseStatus.OK,
+                    Unpooled.copiedBuffer(html, CharsetUtil.UTF_8));
+            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
+            response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+            ctx.writeAndFlush(response);
+        }
+
+        /**
+         * 处理 mpegts.js 静态资源请求
+         */
+        private void handleMpegtsJs(ChannelHandlerContext ctx) throws IOException {
+            byte[] js = loadStaticFile("mpegts.js");
+
+            FullHttpResponse response = new DefaultFullHttpResponse(
+                    HttpVersion.HTTP_1_1,
+                    HttpResponseStatus.OK,
+                    Unpooled.wrappedBuffer(js));
+            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/javascript; charset=UTF-8");
+            response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+            ctx.writeAndFlush(response);
+        }
+
+        /**
+         * 从资源文件中读取静态二进制内容(例如 mpegts.js)
+         */
+        private static byte[] loadStaticFile(String fileName) throws IOException {
+            try (InputStream inputStream = ManagerWebServer.class.getClassLoader()
+                    .getResourceAsStream("web/" + fileName)) {
+                if (inputStream == null) {
+                    throw new IOException("无法找到资源文件: " + fileName);
+                }
+                return readAllBytes(inputStream);
+            }
+        }
+
+        /**
+         * 将设备信息转换为JSON格式
+         */
+        private String deviceToJson(Map<String, Object> device) {
+            return "{" +
+                    "\"channelId\":\"" + device.get("channelId") + "\"," +
+                    "\"remoteAddress\":\"" + device.get("remoteAddress") + "\"," +
+                    "\"simCardNumber\":\"" + (device.get("simCardNumber") != null ? device.get("simCardNumber") : "")
+                    + "\"," +
+                    "\"logicChannelNumber\":" + device.get("logicChannelNumber") + "," +
+                    "\"connectTime\":" + device.get("connectTime") + "," +
+                    "\"lastActiveTime\":" + device.get("lastActiveTime") +
+                    "}";
+        }
+
+        /**
+         * 处理设备API请求
+         */
+        private void handleDevicesApi(ChannelHandlerContext ctx) {
+            Collection<DeviceManager.DeviceInfo> devices = DeviceManager.getConnectedDevices();
+            List<Map<String, Object>> deviceList = new ArrayList<>();
+
+            for (DeviceManager.DeviceInfo device : devices) {
+                Map<String, Object> deviceMap = new HashMap<>();
+                deviceMap.put("channelId", device.getChannelId());
+                deviceMap.put("remoteAddress", device.getRemoteAddress());
+                deviceMap.put("simCardNumber", device.getSimCardNumber());
+                deviceMap.put("logicChannelNumber", device.getLogicChannelNumber());
+                deviceMap.put("connectTime", device.getConnectTime());
+                deviceMap.put("lastActiveTime", device.getLastActiveTime());
+                deviceList.add(deviceMap);
+            }
+
+            String jsonResponse = "[" + deviceList.stream()
+                    .map(this::deviceToJson)
+                    .reduce((a, b) -> a + "," + b)
+                    .orElse("") + "]";
+
+            FullHttpResponse response = new DefaultFullHttpResponse(
+                    HttpVersion.HTTP_1_1,
+                    HttpResponseStatus.OK,
+                    Unpooled.copiedBuffer(jsonResponse, CharsetUtil.UTF_8));
+            response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json; charset=UTF-8");
+            response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+            ctx.writeAndFlush(response);
+        }
+
+        /**
+         * 处理404错误
+         */
+        private void handleNotFound(ChannelHandlerContext ctx) throws IOException {
+            // 如果功能未启用或资源文件不存在,返回简单404响应
+            try {
+                String html = loadHtmlFile("404.html");
+
+                FullHttpResponse response = new DefaultFullHttpResponse(
+                        HttpVersion.HTTP_1_1,
+                        HttpResponseStatus.NOT_FOUND,
+                        Unpooled.copiedBuffer(html, CharsetUtil.UTF_8));
+                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
+                response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+                ctx.writeAndFlush(response);
+            } catch (IOException e) {
+                // 如果无法加载notfound.html,则返回简单文本响应
+                FullHttpResponse response = new DefaultFullHttpResponse(
+                        HttpVersion.HTTP_1_1,
+                        HttpResponseStatus.NOT_FOUND,
+                        Unpooled.copiedBuffer("404 - 页面未找到", CharsetUtil.UTF_8));
+                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
+                response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
+
+                ctx.writeAndFlush(response);
+            }
+        }
+
+        /**
+         * 处理jessibuca目录下的文件请求
+         */
+        private void handleJessibucaFile(ChannelHandlerContext ctx, String uri) throws IOException {
+            // 提取jessibuca目录下的文件名部分
+            String fileName = uri.substring(uri.indexOf("/jessibuca/") + "/jessibuca/".length());
+            String resourcePath = "web/jessibuca/" + fileName;
+            
+            try (InputStream inputStream = ManagerWebServer.class.getClassLoader().getResourceAsStream(resourcePath)) {
+                if (inputStream == null) {
+                    handleNotFound(ctx);
+                    return;
+                }
+                
+                byte[] content = readAllBytes(inputStream);
+                FullHttpResponse response = new DefaultFullHttpResponse(
+                        HttpVersion.HTTP_1_1,
+                        HttpResponseStatus.OK,
+                        Unpooled.wrappedBuffer(content));
+                
+                // 根据文件扩展名设置内容类型
+                if (fileName.endsWith(".js")) {
+                    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/javascript; charset=UTF-8");
+                } else if (fileName.endsWith(".html")) {
+                    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8");
+                } else if (fileName.endsWith(".jpg")) {
+                    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "image/jpeg");
+                } else if (fileName.endsWith(".png")) {
+                    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "image/png");
+                } else {
+                    response.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/octet-stream");
+                }
+                
+                response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, content.length);
+                ctx.writeAndFlush(response);
+            } catch (IOException e) {
+                logger.error("处理jessibuca文件失败: {}", uri, e);
+                handleNotFound(ctx);
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+            // 检查是否为连接中断异常
+            if (cause instanceof IOException &&
+                    (cause.getMessage().contains("An established connection was aborted") ||
+                            cause.getMessage().contains("软件中止了一个已建立的连接"))) {
+                logger.info("Web客户端主动断开连接: {}", ctx.channel().remoteAddress());
+            } else {
+                logger.error("Web请求处理异常", cause);
+            }
+            ctx.close();
+        }
+
+    }
+
+}

+ 31 - 2
src/main/java/com/jttserver/service/publisher/PublishServer.java

@@ -2,11 +2,29 @@ package com.jttserver.service.publisher;
 
 import com.jttserver.relay.StreamRelay;
 
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+
 /* 
  * 发布服务器基类
  */
 public abstract class PublishServer {
 
+
+    protected EventLoopGroup bossGroup;
+    protected EventLoopGroup workerGroup;
+    protected Channel serverChannel;
+
+    /**
+     * 启动服务器
+     */
+    public abstract void start() throws InterruptedException;
+
+    /**
+     * 停止服务器
+     */
+    public abstract void stop();
+
     /**
      * 根据原始 channelId 移除映射(视频连接断开时调用)
      */
@@ -28,10 +46,21 @@ public abstract class PublishServer {
     /* 
      * 添加流转发器映射
      */
-    public abstract void addStreamRelay(String streamId, StreamRelay relay);
+    public abstract void addPrefixRelay(String streamId, StreamRelay relay);
 
     /* 
      * 移除流转发器映射
      */
-    public abstract void removeStreamRelay(String streamId);
+    public abstract void removePrefixRelay(String streamId);
+
+
+    /**
+     * 等待服务器关闭
+     */
+    public void waitForShutdown() throws InterruptedException {
+        if (serverChannel != null) {
+            // 等待服务器通道关闭
+            serverChannel.closeFuture().sync();
+        }
+    }
 }

+ 52 - 17
src/main/java/com/jttserver/service/publisher/WebsockServer.java

@@ -39,17 +39,15 @@ public class WebsockServer extends PublishServer {
 
     private final int port;
 
-    private EventLoopGroup bossGroup;
-    private EventLoopGroup workerGroup;
-    private Channel serverChannel;
-
-    // 管理 streamId 与对应的 StreamRelay 实例
-    private Map<String, StreamRelay> streamRelays = new ConcurrentHashMap<>();
-    public Map<String, StreamRelay> getStreamRelays() {
-        return streamRelays;
+    
+
+    // 管理 prefix类型 与对应的 StreamRelay 实例 如/realtime/、 /playback/ 等
+    private Map<String, StreamRelay> prefixRelays = new ConcurrentHashMap<>();
+    public Map<String, StreamRelay> getPrefixRelays() {
+        return prefixRelays;
     }
-    public StreamRelay getStreamRelay(String streamId) {
-        return streamRelays.get(streamId);
+    public StreamRelay getPrefixRelay(String prefix) {
+        return prefixRelays.get(prefix);
     }
 
 
@@ -73,6 +71,7 @@ public class WebsockServer extends PublishServer {
     /**
      * 启动 WebSocket 服务器
      */
+    @Override
     public void start() throws InterruptedException {
         bossGroup = new NioEventLoopGroup();
         workerGroup = new NioEventLoopGroup();
@@ -110,24 +109,60 @@ public class WebsockServer extends PublishServer {
     }
 
     /**
+     * 停止 WebSocket 服务器
+     */
+    @Override
+    public void stop() {
+        if (serverChannel != null) {
+            serverChannel.close();
+        }
+
+        if (bossGroup != null) {
+            bossGroup.shutdownGracefully();
+        }
+
+        if (workerGroup != null) {
+            workerGroup.shutdownGracefully();
+        }
+
+        // 清理订阅信息
+        streamGroups.clear();
+        channelStreamMap.clear();
+        // 清理流与通道映射
+        streamIdToChannelId.clear();
+        channelIdToStreamId.clear();
+
+        channelRelayMap.clear();
+        prefixRelays.clear();
+
+        logger.info("WebSocket 服务器已停止");
+    }
+
+    /**
      * 增加流中转器
      */
     @Override
-    public void addStreamRelay(String streamId, StreamRelay relay) {
-        streamRelays.put(streamId, relay);
+    public void addPrefixRelay(String prefix, StreamRelay relay) {
+        prefixRelays.put(prefix, relay);
     }
 
     /**
      * 移除流中转器
      */
     @Override
-    public void removeStreamRelay(String streamId) {
-        streamRelays.remove(streamId);
+    public void removePrefixRelay(String prefix) {
+        prefixRelays.remove(prefix);
     }
 
     @Override
     public void broadcast(String streamId, byte[] data) {
-
+        if (streamId == null || streamId.isEmpty() || data == null || data.length == 0) {
+            return;
+        }
+        ChannelGroup g = streamGroups.get(streamId);
+        if (g != null && !g.isEmpty()) {
+            g.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(data)));
+        }
     }
 
     /**
@@ -209,11 +244,11 @@ public class WebsockServer extends PublishServer {
                 // 流媒体转发类
                 StreamRelay relay = null;
 
-                for (String prefix : server.getStreamRelays().keySet()) {
+                for (String prefix : server.getPrefixRelays().keySet()) {
                     if (path.startsWith(prefix) && path.length() > prefix.length()) {
                         streamId = path.substring(prefix.length());
                         currPrefix = prefix;
-                        relay = server.getStreamRelay(currPrefix);
+                        relay = server.getPrefixRelay(currPrefix);
 
                         // 移除可能的.flv后缀
                         if (streamId.endsWith(".flv")) {

+ 7 - 11
src/main/java/com/jttserver/service/receiver/JttVideoRecvServer.java

@@ -14,6 +14,7 @@ import com.jttserver.device.DeviceManager;
 import com.jttserver.protocol.Jtt1078NaluPacket;
 import com.jttserver.protocol.Jtt1078PacketParams;
 import com.jttserver.protocol.Jtt1078PacketParser;
+import com.jttserver.relay.FlvStreamRelay;
 import com.jttserver.relay.StreamRelay;
 import com.jttserver.relay.workerthreads.BroadcastWorker;
 import com.jttserver.relay.workerthreads.VideoPublishWorker;
@@ -41,9 +42,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 public class JttVideoRecvServer extends RecvSever {
     private static final Logger logger = LoggerFactory.getLogger(JttVideoRecvServer.class);
 
-    // 接收服务器监听端口,-1表示未配置
-    private final int port = -1;
-
     private EventLoopGroup bossGroup;
     private EventLoopGroup workerGroup;
     private Channel serverChannel;
@@ -57,17 +55,15 @@ public class JttVideoRecvServer extends RecvSever {
     // 流转发器
     private StreamRelay streamRelay;
 
-    // 流发布服务器
-    private PublishServer publishServer;
-
-    JttVideoRecvServer(StreamRelay streamRelay, PublishServer publishServer) {
-        this.streamRelay = streamRelay;
-        this.publishServer = publishServer;
+    public JttVideoRecvServer(PublishServer publishServer, int port, String prefix) {
+        super(publishServer, port, prefix);
+        this.streamRelay = new FlvStreamRelay(publishServer, this, prefix);
     }
 
     /**
      * 启动服务器
      */
+    @Override
     public void start() throws InterruptedException {
         // 用于接受新的连接
         bossGroup = new NioEventLoopGroup(1);
@@ -84,8 +80,7 @@ public class JttVideoRecvServer extends RecvSever {
                         ch.pipeline().addLast(new Jtt1078MessageDecoder()); // 添加JTT1078解码器处理粘包拆包
 
                         // 添加处理视频流数据的处理器
-                        ch.pipeline()
-                                .addLast(new VideoStreamHandler(deviceManagementEnabled, streamRelay, publishServer));
+                        ch.pipeline().addLast(new VideoStreamHandler(deviceManagementEnabled, streamRelay, publishServer));
                     }
                 })
                 .option(ChannelOption.SO_BACKLOG, 128)
@@ -111,6 +106,7 @@ public class JttVideoRecvServer extends RecvSever {
     /**
      * 停止服务器
      */
+    @Override
     public void stop() {
         if (serverChannel != null) {
             serverChannel.close();

+ 30 - 0
src/main/java/com/jttserver/service/receiver/RecvSever.java

@@ -1,14 +1,44 @@
 package com.jttserver.service.receiver;
 
+import com.jttserver.service.publisher.PublishServer;
+
 /* 
  * 接收服务器基类
  */
 public abstract class RecvSever {
 
+    // 流发布服务器
+    protected PublishServer publishServer;
+
+    // 接收服务器监听端口,-1表示未配置
+    protected int port = -1;
+
+    // 路径前缀 如 /realtime/ 、/playback/
+    protected String prefix = "";
+
     /**
      * 主动断开通道
      * 
      * @param channelId 通道ID
      */
     public abstract void disconnChannel(String channelId);
+
+    /**
+     * 构造函数
+     */
+    public RecvSever(PublishServer publishServer, int port, String prefix) {
+        this.publishServer = publishServer;
+        this.port = port;
+        this.prefix = prefix;
+    }
+
+    /*
+     * 启动服务器
+     */
+    public abstract void start() throws InterruptedException;
+
+    /**
+     * 停止服务器
+     */
+    public abstract void stop();
 }

+ 1 - 1
src/test/java/com/jttserver/relay/FlvStreamRelayTest.java

@@ -16,7 +16,7 @@ public class FlvStreamRelayTest {
 
     @BeforeEach
     void setUp() {
-        flvStreamRelay = new FlvStreamRelay();
+        flvStreamRelay = new FlvStreamRelay(null, null, "/realtime/");
     }
 
     @Test