yuqi-zheng

TCP 消息封帧:从字节流中重组长度前缀消息


TCP 是一个流协议——它没有消息边界的概念。一次 send() 调用可能以三次 recv() 到达,三次 send() 调用可能合并到一个缓冲区。如果你在 TCP 上构建交易协议,你需要消息封帧(message framing):一种让接收方知道一条消息在哪里结束、下一条从哪里开始的方法。

本文介绍最简单的封帧方案——长度前缀(length-prefixing)——以及一个处理所有边界情况的完整 C++ 实现。


问题

TCP 的抽象是一个双向字节流:

发送方: send(msg1)   send(msg2)   send(msg3)
         │             │             │
         └─────────────┼─────────────┘

                TCP 字节流

      ┌────────────────┼────────────────┐
      ▼                ▼                ▼
接收方: recv() → 4字节  recv() → 11字节  recv() → -1 (EOF)

接收方不知道消息边界在哪里。两种常见的封帧策略:

策略描述场景
分隔符消息以 \n 或特殊字节序列结尾文本协议、HTTP
长度前缀每条消息以 N 字节编码其总长度开头二进制协议、FIX、交易所行情

对交易系统,长度前缀是标准做法。它是确定性的、无需转义、接收方可以精确知道要等多少字节。


消息格式

我们的格式是最简单的:2 字节小端头编码消息总长度(含自身),后跟载荷。

字节描述
0–1消息长度(uint16_t小端序)——包含这 2 字节的消息总大小
2…N载荷 —— (length - 2) 字节

示例:[0x05, 0x00, 'A', 'B', 'C'] → length = 0x0005 = 5 → 总共 5 字节 → 载荷 = “ABC”。

长度包含头字节——这很重要,因为最小有效消息是 2 字节(仅长度字段,空载荷)。通常长度为 0 或 1 应视为无效。


接口

给定两个抽象:

struct IDataProvider {
    virtual int GetData(std::byte* data, int maxLength) { return 0; }
    virtual ~IDataProvider() = default;
};

struct ITcpSocket {
    virtual void OnMessage(std::byte* bytes, int length) { };
    virtual ~ITcpSocket() = default;
};

GetData 模拟 recv()——最多用 maxLength 字节填充 data,返回实际读取的字节数,-1 表示 EOF。OnMessage 是每个完整帧的回调。

任务:实现 TcpSocket::Process(),循环直到 EOF,对每条完整消息调用 OnMessage,处理所有边界情况。


实现

状态机

一个运行状态就够了:totalSize。为 0 时等待头部。非零时知道当前消息需要多少字节,正在累积。

class TcpSocket : public ITcpSocket {
public:
    TcpSocket(IDataProvider* provider) : provider_{provider} {}

    void Process() {
        const auto AllocationSize = 65655;
        auto bytes = std::make_unique<std::byte[]>(AllocationSize);

        int totalReceived = 0;
        uint16_t totalSize = 0;

        while (true) {
            // 确定要请求多少字节
            int remaining = totalReceived < 2
                ? AllocationSize - totalReceived        // 还不知道消息长度
                : totalSize - totalReceived;            // 精确知道还要等多少

            int received = provider_->GetData(
                bytes.get() + totalReceived,
                remaining
            );

            if (received == -1)
                break;

            totalReceived += received;

            // 至少有 2 字节时解析头部
            if (totalReceived > 1 && totalSize == 0) {
                totalSize = static_cast<uint16_t>(bytes[0])
                          | (static_cast<uint16_t>(bytes[1]) << 8);
            }

            // 处理完整消息
            while (totalSize > 0 && totalReceived >= totalSize) {
                OnMessage(bytes.get(), totalSize);

                // 将剩余数据平移到前端
                if (totalReceived > totalSize) {
                    std::memmove(
                        bytes.get(),
                        bytes.get() + totalSize,
                        totalReceived - totalSize
                    );
                }

                totalReceived -= totalSize;
                totalSize = 0;

                // 如果有足够的尾部数据,重新解析头部
                if (totalReceived > 1) {
                    totalSize = static_cast<uint16_t>(bytes[0])
                              | (static_cast<uint16_t>(bytes[1]) << 8);
                }
            }
        }
    }

private:
    IDataProvider* provider_;
};

关键操作拆解

1. 读取正确的数量

int remaining = totalReceived < 2
    ? AllocationSize - totalReceived    // 还不知道消息长度
    : totalSize - totalReceived;        // 知道精确字节数

解析头部前,尽量多地请求。解析后,只请求剩余的精确字节。这防止过度读取——如果下一条消息在同一 GetData 调用中开始,本来也需要 AllocationSize 才能装下。

2. 解析小端 uint16_t

totalSize = static_cast<uint16_t>(bytes[0])
          | (static_cast<uint16_t>(bytes[1]) << 8);

字节 0 是 LSB,字节 1 是 MSB。按位 OR 组合。这是与主机字节序无关的——在小端(x86)和大端机器上都正确,因为我们从已知字节位置显式重建。

同样有效的替代方案:

uint16_t totalSize;
std::memcpy(&totalSize, bytes, sizeof(totalSize));

在小端主机上是零操作;大端主机则需要 __builtin_bswap16()。手动移位更显式,速度一样(编译器在 x86 上优化为单条 mov)。

3. 用 memmove 平移剩余数据

if (totalReceived > totalSize) {
    std::memmove(
        bytes.get(),
        bytes.get() + totalSize,
        totalReceived - totalSize
    );
}

这是初学者常犯错误的地方。处理完一条消息后,如果缓冲区还有额外字节,这些属于下一条消息。必须将它们平移到前端,不能丢弃。

为什么是 memmove 而不是 memcpy 源和目标区域重叠——bytes.get() + totalSizebytes.get() 前面。memcpy 在重叠内存上有未定义行为。memmove 正确处理重叠。

4. 平移后重新解析

totalReceived -= totalSize;
totalSize = 0;

if (totalReceived > 1) {
    totalSize = static_cast<uint16_t>(bytes[0])
              | (static_cast<uint16_t>(bytes[1]) << 8);
}

平移后,剩余字节在缓冲区前端。如果至少有 2 字节,立即解析下一条消息的头部。这处理了单次 GetData 包含多条完整消息的情况。


逐个边界情况

头部被分割

Read 1: [0x05]              → totalReceived=1, totalSize=0 (需要 2 字节才够)
Read 2: [0x00, 'A','B','C'] → totalReceived=5, 解析 length=0x0005, 完整消息!

消息体被分割

Read 1: [0x05, 0x00, 'A']   → totalReceived=3, totalSize=5, 不够
Read 2: ['B', 'C']          → totalReceived=5, 完整消息!

一次读取多条消息

GetData 返回: [0x03,0x00,'X', 0x04,0x00,'Y','Z']
              ↑ msg1 (3B)      ↑ msg2 (4B)

处理过程:

  1. 解析头 → length=3
  2. totalReceived=7 >= 3 → 投递 msg1(字节 0-2)
  3. 平移字节 [3,6] 到前端 → buffer = [0x04,0x00,‘Y’,‘Z’, …]
  4. totalReceived=4,重新解析头 → length=4
  5. totalReceived=4 >= 4 → 投递 msg2(字节 0-3)
  6. totalReceived=0,循环在 EOF 退出

无数据 / 立即 EOF

GetData 返回: -1 → 退出循环,无消息投递

追问:防止内存耗尽

常见面试追问:如果恶意客户端发送巨大的长度值怎么办?

修复:在分配或等待数据前验证长度:

constexpr uint16_t MAX_MESSAGE_SIZE = 4096;

if (totalReceived > 1 && totalSize == 0) {
    totalSize = static_cast<uint16_t>(bytes[0])
              | (static_cast<uint16_t>(bytes[1]) << 8);

    if (totalSize == 0 || totalSize > MAX_MESSAGE_SIZE) {
        // 协议违规——关闭连接
        return;
    }
}

没有这个检查,totalSize0xFFFF(65535)会让你分配并等待 65KB 数据。在有数千连接的生产交易网关上,这成为拒绝服务攻击向量。


追问:4 字节头部

如果头是 4 字节而不是 2 字节呢?

if (totalReceived > 3 && totalSize == 0) {
    totalSize = static_cast<uint32_t>(bytes[0])
              | (static_cast<uint32_t>(bytes[1]) << 8)
              | (static_cast<uint32_t>(bytes[2]) << 16)
              | (static_cast<uint32_t>(bytes[3]) << 24);
}

逻辑完全相同——只是解析前需要累积更多字节。4 字节头支持最大约 4GB 的消息,对大多数交易协议来说太大了(FIX 消息通常 < 8KB)。


核心要点

  1. TCP 是流 —— 消息边界是你的责任
  2. 长度前缀 是二进制协议最简单的封帧策略
  3. 维护运行状态 —— 在读取间追踪 totalReceivedtotalSize
  4. memmove 而不是 memcpy —— 缓冲区平移是重叠拷贝
  5. 平移后重新解析 —— 处理单次读取中的多条消息
  6. 验证长度 —— 防止恶意或损坏头导致内存耗尽
  7. 主机无关的字节序 —— 手动字节重建在任何地方都有效

消息封帧是那种看起来简单、直到你凌晨三点被漏掉的 memmove 咬一口才会意识到复杂的问题。正确的实现处理部分读取、多条消息和尾部数据,一个字节都不丢——现在你有一个了。