CS144-Lab1 计算机网络:字节流重组器

本文最后更新于:2023年4月11日 凌晨

思路总结

有问题的方案

这个方案是采用了一个无限长的字符串cache,所有的TCP段中的部分数据先寄存在cache当中。之后通过创建一个在cache上滑动的写入位指针write_p来将能够顺序写入的内容写入_output当中,其中write_p每次滑动的距离len受限于_output还剩下的可容纳空间。

cache_slide

添加的私有成员:

1
2
3
4
5
6
7
8
// 用于存放缓存
std::string cache;
// 用于标记缓存对应字节上是否写入内容
std::string dirty_check;
// 标记写入指针
size_t write_p;
// 标记EOF位
size_t end_p;

对于push_string方法的实现:

  1. 检查传入的index是否在可写入范围,如果超出可写入范围则直接退出,保证程序的鲁棒性

  2. 因为写入的数据长度不能超过capacity,因此需要将扩容的长度设置为index + data.length()write_p + _output.remaining_capacity()中较小的那个

  3. 将传入的数据(包括可能超过范围的部分)写入cache中,同时将dirty_check中对应的位置标记为1

  4. cache的长度缩回到正确扩容后应该的长度,这样可以将多余的内容丢弃

  5. 检查write_p的位置上是否有数据可以被写入,如果有则通过滑动len来将内容写入_output,否则跳过

  6. 检查write_pend_p是否相同,如果相同则代表写入结束,调用_output.end_input()

具体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
// extend_size: 按照index和data.length()扩容后的大小,只会按扩大的来扩容
size_t extend_size = index + data.length();

// 记录EOF的位置
if (eof) {
end_p = extend_size;
}

// 扩容只会变大,不会缩小
if (extend_size > cache.length()) {
cache.resize(extend_size);
dirty_check.resize(extend_size);
}

// 将要排序的内容写入cache当中
cache.replace(index, data.length(), data);
dirty_check.replace(index, data.length(), data.length(), '1');

// 缩回原来的大小,将缓冲区外多余的内容丢弃
if (expand_size > cache_raw_length) {
cache.resize(expand_size);
dirty_check.resize(expand_size);
}

// 检查写入位上是否有字符,有字符则通过滑动len来写入_output,否则跳过
if (dirty_check[write_p]) {
size_t len = 0;
size_t output_remaining = _output.remaining_capacity();
while (dirty_check[write_p + len] && len < output_remaining) {
len++;
}
_output.write(cache.substr(write_p, len));
write_p += len;
}

// 写入位和EOF位相同,代表写入结束
if (write_p == end_p) {
_output.end_input();
}
}

对于没有统计的字符数量,直接使用一个循环进行统计即可

1
2
3
4
5
6
7
8
9
// 返回缓冲区内还没有处理的内容
size_t StreamReassembler::unassembled_bytes() const {
size_t n = write_p;
// 检查缓存区有多少字符
for (size_t i = write_p; n != cache.length() && not dirty_check[i]; i++) {
n++;
}
return cache.length() - n;
}

对于判断缓冲区是否使用完毕则是

1
2
// 当不再写入新的TCP段并且已有的字段全部排序结束的时候缓冲区不再需要排序
bool StreamReassembler::empty() const { return _output.eof() && not unassembled_bytes(); }

测试案例的补充

使用上面这种写法的话虽然可以达到100% tests passed,并且时间也都能控制在0.5s以内,但是在复习了真实情况下的重组过程发现这个思路存在一些BUG是测试案例没有检测出来的。

不会抛弃本来应该抛弃的数据,同时产生错误的EOF位置标记

以如下的test为例

1
2
3
4
5
6
7
8
9
10
{
ReassemblerTestHarness test{6};
test.execute(SubmitSegment{"defg", 3});
test.execute(BytesAssembled(0));
test.execute(SubmitSegment{"abc", 0});
test.execute(BytesAvailable("abcdef"));
test.execute(BytesAssembled(6));
test.execute(SubmitSegment{"kmg", 7});
test.execute(BytesAvailable(""));
}

运行后可以发现有报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Test Failure on expectation:
Expectation: stream_out().buffer_size() returned 0, and stream_out().read(0) returned the string ""

Failure message:
The reassembler was expected to have `0` bytes available, but there were `4`

List of steps that executed successfully:
Initialized (capacity = 6)
Action: substring submitted with data "defg", index `3`, eof `0`
Expectation: net bytes assembled = 0
Action: substring submitted with data "abc", index `0`, eof `0`
Expectation: stream_out().buffer_size() returned 6, and stream_out().read(6) returned the string "abcdef"
Expectation: net bytes assembled = 6
Action: substring submitted with data "kmg", index `7`, eof `0`

Exception: The reassembler was expected to have `0` bytes available, but there were `4`

可以发现,本来在传入第一个defg的时候,字符g应该因为超出capacity而被抛弃,但是是将并没有,导致g停留在了index6的位置上。读取了_output的所有内容之后,_output的窗口应该是从index6的位置上开始准备写入,但是由于这个位置上的g在上一个窗口期中并没有被抛弃,结果导致了index7kmg写入的时候,连带前面存在的g一起将gkmg写入了_output的窗口当中,从而出现了以下报错。

The reassembler was expected to have 0 bytes available, but there were 4

同时对于EOF的位置判断也有类似的BUG,测试样例如下

1
2
3
4
5
6
7
8
9
10
11
{
ReassemblerTestHarness test{6};
test.execute(SubmitSegment{"defx", 3}.with_eof(true));
test.execute(BytesAssembled(0));
test.execute(SubmitSegment{"abc", 0});
test.execute(BytesAvailable("abcdef"));
test.execute(BytesAssembled(6));
test.execute(SubmitSegment{"g", 6});
test.execute(BytesAvailable("g"));
test.execute(NotAtEof{});
}

运行后得到如下结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Test Failure on expectation:
Expectation: not at EOF

Failure message:
The reassembler was expected to **not** be at EOF, but was

List of steps that executed successfully:
Initialized (capacity = 6)
Action: substring submitted with data "defx", index `3`, eof `1`
Expectation: net bytes assembled = 0
Action: substring submitted with data "abc", index `0`, eof `0`
Expectation: stream_out().buffer_size() returned 6, and stream_out().read(6) returned the string "abcdef"
Expectation: net bytes assembled = 6
Action: substring submitted with data "g", index `6`, eof `0`
Expectation: stream_out().buffer_size() returned 1, and stream_out().read(1) returned the string "g"

Exception: The reassembler was expected to **not** be at EOF, but was

本来在第一个操作的时候作为eofx应该是被抛弃掉并不读取的,但是在最后这个位置的eof_p还是触发了EOF判断,导致产生了不应该出现的EOF

修正方案

本质的问题就是没有丢弃掉unacceptable的字节,这里采取了一个比较省事但是很不优雅的操作,我是选择在最后扩容后重新再用resize()函数将不需要的那部分丢弃掉,来达到限制容量的目的,最后修正完毕的实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
bool eof_flag = false;
size_t expand_size = index + data.length();

// 短路错误index
if (index > write_p + _output.remaining_capacity()) {
return;
}

// 取 index + data.length() 和
// write_p + _output.remaining_capacity() 中更小的那个作为扩容后的大小
if (index + data.length() <= write_p + _output.remaining_capacity()) {
// 用于判断EOF是否是在capacity当中的有效字符
eof_flag = true;
expand_size = index + data.length();
} else {
expand_size = write_p + _output.remaining_capacity();
}

// 记录EOF的位置
if (eof && eof_flag) {
end_p = expand_size;
}

const size_t cache_raw_length = cache.length();

// 先扩大一次容量,用于写入多余的内容
if (expand_size > cache_raw_length) {
cache.resize(expand_size);
dirty_check.resize(expand_size);
}

// 将要排序的内容先写入cache当中
cache.replace(index, data.length(), data);
dirty_check.replace(index, data.length(), data.length(), '1');

// 缩回原来的大小,将缓冲区外多余的内容丢弃
if (expand_size > cache_raw_length) {
cache.resize(expand_size);
dirty_check.resize(expand_size);
}

// 检查写入位上是否有字符,有字符则通过滑动len来写入_output,否则跳过
if (dirty_check[write_p]) {
size_t len = 0;
size_t output_remaining = _output.remaining_capacity();
while (dirty_check[write_p + len] && len < output_remaining) {
len++;
}
_output.write(cache.substr(write_p, len));
write_p += len;
}

// 写入位和EOF位相同,代表写入结束
if (write_p == end_p) {
_output.end_input();
}
}

CS144-Lab1 计算机网络:字节流重组器
https://halc.top/p/aeda2510
作者
HalcyonAzure
发布于
2022年11月7日
许可协议