Jtt1078MessageDecoder.java 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package com.jttserver.codec;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.handler.codec.ByteToMessageDecoder;
  5. import java.util.List;
  6. import com.jttserver.protocol.JttConstants;
  7. /**
  8. * JTT1078协议消息解码器
  9. * 处理TCP粘包和拆包问题
  10. */
  11. public class Jtt1078MessageDecoder extends ByteToMessageDecoder {
  12. // JTT1078数据包标识 0x30316364
  13. private static final int PACKET_HEADER = 0x30316364;
  14. private static final int MIN_DATA_LENGTH = 19; //最小为透传数据18+1个字节
  15. private static final int MIN_HEADER_LENGTH = 18;
  16. private static final int MAX_HEADER_LENGTH = 30;
  17. private final int jtt808Type;
  18. private int jttOffset = 0; // JTT808协议偏移量,2013版为0,2019版为4 设备sim的BCD字段多占4字节
  19. // 默认构造函数,使用JTT808 2013版
  20. public Jtt1078MessageDecoder() {
  21. this.jtt808Type = JttConstants.TYPE_JTT808_2013;
  22. }
  23. public Jtt1078MessageDecoder(int jtt808Type) {
  24. this.jtt808Type = jtt808Type;
  25. if (this.jtt808Type == JttConstants.TYPE_JTT808_2019) {
  26. jttOffset = 4;
  27. }
  28. }
  29. @Override
  30. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
  31. // 循环处理,直到没有完整数据包可读(至少需要最小头部长度+NALU数据长度)
  32. while (in.isReadable(MIN_DATA_LENGTH + jttOffset)) {
  33. // 查找数据包头部
  34. int headerIndex = findHeader(in);
  35. if (headerIndex == -1) {
  36. // 没有找到头部,等待更多数据
  37. return;
  38. }
  39. // 跳过无效数据到头部位置
  40. if (headerIndex > 0) {
  41. // System.out.println("Jtt1078MessageDecoder: 跳过了 " + headerIndex + " 个无效字节");
  42. in.skipBytes(headerIndex);
  43. }
  44. // 确保有足够的数据进行头部解析
  45. if (in.readableBytes() < MIN_HEADER_LENGTH + jttOffset) {
  46. return;
  47. }
  48. // 读取头部标识
  49. int header = in.readInt();
  50. if (header != PACKET_HEADER) {
  51. throw new IllegalStateException("Header mismatch after sync");
  52. }
  53. // 确保有足够的数据读取到第15字节(已经读取了4字节header)
  54. if (in.readableBytes() < 12 + jttOffset) {
  55. in.resetReaderIndex();
  56. return;
  57. }
  58. // 跳过前11字节到达第15字节 (已读取4字节header,还需跳过11字节到达第15字节)
  59. in.skipBytes(11 + jttOffset);
  60. // 读取第15字节并获取前4位
  61. byte fifteenthByte = in.readByte();
  62. int first4Bits = (fifteenthByte & 0xF0) >> 4;
  63. // 根据前4位确定包头长度
  64. int headerLength;
  65. switch (first4Bits) {
  66. case 0x0: // 0000
  67. case 0x1: // 0001
  68. case 0x2: // 0010
  69. headerLength = MAX_HEADER_LENGTH + jttOffset;
  70. break;
  71. case 0x3: // 0011
  72. headerLength = 26 + jttOffset;
  73. break;
  74. case 0x4: // 0100
  75. headerLength = 18 + jttOffset;
  76. break;
  77. default:
  78. // 未知类型, 报错
  79. throw new IllegalStateException("Nalu type unknow!");
  80. }
  81. // 回到包头开始位置
  82. in.readerIndex(in.readerIndex() - 16 - jttOffset); // 回退16字节
  83. // 确保有足够的数据读取整个包头
  84. if (in.readableBytes() < headerLength) {
  85. return;
  86. }
  87. // 跳到包头最后2个字节的位置读取数据长度 (使用大端序)
  88. in.skipBytes(headerLength - 2);
  89. int dataLength = in.readUnsignedShort(); // 使用大端序读取
  90. // 计算整个包长度
  91. int totalLength = headerLength + dataLength;
  92. // 回到包开始位置 (头部标识位置)
  93. in.readerIndex(in.readerIndex() - headerLength); // 回退到头部开始位置
  94. // 检查是否有足够的数据构成完整的消息
  95. if (in.readableBytes() < totalLength) {
  96. return;
  97. }
  98. // 读取完整数据包(包括包头和数据)
  99. byte[] packet = new byte[totalLength];
  100. in.readBytes(packet);
  101. // 将解码后的完整数据包添加到输出列表
  102. out.add(packet);
  103. }
  104. }
  105. /**
  106. * 在缓冲区中查找JTT1078数据包头部
  107. *
  108. * @param buffer 输入缓冲区
  109. * @return 头部位置,未找到返回-1
  110. */
  111. private int findHeader(ByteBuf buffer) {
  112. // 创建搜索用的头部字节数组
  113. byte[] headerBytes = new byte[] {
  114. (byte) ((PACKET_HEADER >> 24) & 0xFF),
  115. (byte) ((PACKET_HEADER >> 16) & 0xFF),
  116. (byte) ((PACKET_HEADER >> 8) & 0xFF),
  117. (byte) (PACKET_HEADER & 0xFF)
  118. };
  119. // 保存当前读索引
  120. int originalReaderIndex = buffer.readerIndex();
  121. // 在可读范围内搜索头部
  122. for (int i = 0; i <= buffer.readableBytes() - 4; i++) {
  123. boolean found = true;
  124. for (int j = 0; j < 4; j++) {
  125. byte b = buffer.getByte(originalReaderIndex + i + j);
  126. if (b != headerBytes[j]) {
  127. found = false;
  128. break;
  129. }
  130. }
  131. if (found) {
  132. return i;
  133. }
  134. }
  135. return -1;
  136. }
  137. }