使用 Node.js 开发 TCP 客户端并拆分连续消息包

在网络编程中,TCP 是一种可靠的传输协议,它确保数据的完整性和顺序性。然而,在实际开发 TCP 客户端时,我们常常会遇到一个挑战:由于 TCP 是面向流的协议,接收到的数据可能并不总是按预期的消息边界到达,尤其是在高速数据传输时,多个消息包可能会连续到达。这时,我们需要自己处理数据的拆分和重组,以确保消息的完整性。

问题描述

假设我们在开发一个 TCP 客户端,该客户端接收到的每条消息都有固定的起始标志符和结束标志符。当收到的数据流中包含多个连续的消息包时,我们需要将它们正确地拆分并处理。

解决方案

为了解决这个问题,我们可以通过以下步骤来处理连续的消息包:

  1. 维护一个缓存区:使用一个缓存区来存储接收到的数据。这样即使消息包分成了多次接收,我们也可以将它们重组在一起。

  2. 检测起始标志符和结束标志符:在接收到的数据中查找起始标志符和结束标志符,并将完整的消息提取出来。

  3. 处理并移除已处理的消息:一旦我们找到了完整的消息,就可以处理它,并从缓存区中移除已经处理的部分。

下面是一个示例代码,展示了如何实现这些步骤:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import net from "net";
import fse from "fs-extra";

// 定义起始标志符和结束标志符
const START_FLAG = Buffer.from([0xeb, 0x90]);
const END_FLAG = Buffer.from([0xeb, 0x90]);

class TcpClient {
client: net.Socket;
buffer: Buffer;

constructor() {
this.client = new net.Socket();
this.buffer = Buffer.alloc(0);
this.client.on("data", this.handleResponse.bind(this));
}

handleResponse(data: Buffer) {
// 将接收到的数据追加到缓存区
this.buffer = Buffer.concat([this.buffer, data]);

let start, end;
// 查找并处理完整的消息
while ((start = this.buffer.indexOf(START_FLAG)) !== -1 && (end = this.buffer.indexOf(END_FLAG, start + START_FLAG.length)) !== -1) {
// 提取消息内容
const messageBuffer = this.buffer.slice(start + START_FLAG.length, end);

// 处理消息内容 (示例:输出消息)
console.log("Received message:", messageBuffer.toString());

// 移除已处理的部分
this.buffer = this.buffer.slice(end + END_FLAG.length);
}

// 记录原始数据以备调试
try {
fse.appendFileSync("buffer.txt", data.toString("hex") + "\n", "utf8");
} catch (err) {
console.error("Error writing to log file:", err);
}
}

connectToServer(port: number, host: string) {
this.client.connect(port, host, () => {
console.log("Connected to server");
});

this.client.on("error", err => {
console.error("Connection error:", err);
});

this.client.on("close", () => {
console.log("Connection closed");
});
}
}

// 创建并连接客户端
const tcpClientInstance = new TcpClient();
tcpClientInstance.connectToServer(12345, "127.0.0.1");

代码解析

  • 缓存区this.buffer 用来缓存接收到的数据。因为在实际网络传输中,数据可能会被分段接收,因此我们需要一个缓存来存储这些片段,直到我们能拼凑出完整的消息。

  • 标志符的检测与消息提取:通过 buffer.indexOf 方法,我们可以找到起始标志符和结束标志符的位置。找到后,我们就能提取出完整的消息内容。

  • 处理并移除已处理的消息:一旦消息被提取出来,我们将其从缓存中移除,以便处理下一条消息。

可能的问题

  1. 解析错误:当消息中包含与标志符相同的内容。存在解析错误的可能性,这可能导致消息被切断。

    要解决这个问题,可以在消息中添加长度标识符或者 crc 校验。
    我这里使用长度标识符举例。再解析过程中添加长度的判断
    假设的消息协议为:
    EhTyLe

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import net from "net";
import fse from "fs-extra";

const START_FLAG = Buffer.from([0x90, 0xeb]); // 小端字节序
const END_FLAG = Buffer.from([0x90, 0xeb]); // 小端字节序

class TcpClient {
client: net.Socket;
buffer: Buffer;

constructor() {
this.client = new net.Socket();
this.buffer = Buffer.alloc(0);
this.client.on("data", this.handleResponse.bind(this));
}

handleResponse(data: Buffer) {
this.buffer = Buffer.concat([this.buffer, data]);

while (true) {
// 找到起始标志符
const start = this.buffer.indexOf(START_FLAG);
if (start === -1) break;

// 确保有足够的长度读取固定头部部分
const headerLength = START_FLAG.length + 8 + 8 + 1 + 4; // 固定部分长度
if (this.buffer.length < start + headerLength) break;

// 解析头部
const sendSeq = this.buffer.readBigInt64LE(start + 2); // 发送会话序列号
const recvSeq = this.buffer.readBigInt64LE(start + 10); // 接收会话序列号
const sourceFlag = this.buffer.readUInt8(start + 18); // 会话源标识
const xmlLength = this.buffer.readUInt32LE(start + 19); // XML 的字节长度

// 确保有足够的长度读取完整消息
const totalLength = headerLength + xmlLength + END_FLAG.length;
if (this.buffer.length < start + totalLength) break;

// 提取并解析 XML 内容
const xmlStart = start + headerLength;
const xmlContent = this.buffer.slice(xmlStart, xmlStart + xmlLength).toString("utf-8");

// 确保结束标志符正确
const endFlagStart = xmlStart + xmlLength;
if (!this.buffer.slice(endFlagStart, endFlagStart + END_FLAG.length).equals(END_FLAG)) {
console.error("Invalid END_FLAG detected");
break;
}

// 打印解析结果
console.log("Received Message:");
console.log("Send Sequence:", sendSeq);
console.log("Receive Sequence:", recvSeq);
console.log("Source Flag:", sourceFlag);
console.log("XML Content:", xmlContent);

// 移除已处理的部分
this.buffer = this.buffer.slice(start + totalLength);
}
}

connectToServer(port: number, host: string) {
this.client.connect(port, host, () => {
console.log("Connected to server");
});

this.client.on("error", err => {
console.error("Connection error:", err);
});

this.client.on("close", () => {
console.log("Connection closed");
});
}
}

// 创建并连接客户端
const tcpClientInstance = new TcpClient();
tcpClientInstance.connectToServer(12345, "127.0.0.1");

总结

在 Node.js 中处理 TCP 数据流时,理解并处理消息边界是至关重要的。通过维护一个缓存区并检测消息的起始和结束标志符,我们可以确保即使在连续接收多个消息包的情况下,也能够正确地拆分和处理每条消息。这种方法不仅适用于 TCP 客户端,同样也可以应用于服务端数据接收的场景。

希望这篇文章能够帮助你更好地理解和处理 TCP 数据流中的消息拆分问题。如果你在实际开发中遇到了类似的挑战,可以参考本文的解决方案进行处理。Happy Coding!