CS144-Lab3 计算机网络:TCP Sender的实现

本文最后更新于:2023年4月11日 凌晨

TCP Sender

需要实现的主要逻辑

  1. 追踪Receiver返回的windows_size(可接受的剩余容量)和ackno(已经确认接收的字符位置)
  2. 只要数据来了就直接对数据进行封装并发送,只有在窗口被消耗为零的情况下才停止发送
  3. 将没有被acknowledge的数据包存储起来,在超时的时候进行发送

实现细节

  1. 对于超时重传的时间判断,使用已经提供的tick()函数,每次调用的时候传入多少时间就消耗了多少时间
  2. 超时重传的默认基准值会以成员变量的形式在TCPSender中进行初始化
  3. TCPSegment中有一个_segments_out的成员,只需要向这个queuepush一个TCPSegment就相当于将这个数据段发送了

代码实现

额外定义成员

对于计时器的部分,为了方便抽象管理,我这里选择直接创建一个类来进行封装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class TCPTimer {
private:
size_t _tick_passed = 0; // 记录实时的时间戳
size_t _rto_timeout = 0; // 记录超过多久时间没有收到ACK就重传
unsigned int _rto_count = 0; // 记录重传的次数

bool _is_running{false}; // 记录计时器是否启动

public:
// 重置计时器
void reset(const uint16_t retx_timeout) {
_rto_count = 0;
_rto_timeout = retx_timeout;
_tick_passed = 0;
}

// 启动计时器
void run() { _is_running = true; }

// 暂停计时器
void stop() { _is_running = false; }

// 计时器是否启动
bool is_running() const { return _is_running; }

// 重传次数
unsigned int rto_count() const { return _rto_count; }

// 慢启动
void slow_start() {
_rto_count++;
_rto_timeout *= 2;
}

// 更新当前时间
void update(const size_t ms_since_last_tick) { _tick_passed += ms_since_last_tick; }

// 检测是否超时
bool is_timeout() const { return _is_running && _tick_passed >= _rto_timeout; }

// 重新计时
void restart() { _tick_passed = 0; }
};

private的部分定义则如下:

1
2
3
4
5
6
7
8
9
10
11
// 超时重传计时器
TCPTimer _rto_timer{};

// 记录确认的_ackno
size_t _ackno = 0;

// 记录窗口大小,并标记是否为空窗口
size_t _window_size = 1;

// 缓存队列
std::queue<TCPSegment> _cache{};

额外定义函数

额外定义的函数主要作用为将已经封装好的TCP报文进行发送,如果在发送的时候检测到RTO重传计时器并没有工作,则发送的同时激活重传计时器。同时在发送了报文后对seqno序号进行消耗,移动_next_seqno指针

1
2
3
4
5
6
7
8
9
10
11
12
void TCPSender::_send_segment(const TCPSegment &seg) {
// 当前报文需要占用的长度
const size_t seg_len = seg.length_in_sequence_space();
_next_seqno += seg_len;
_cache.push(seg);
_segments_out.push(seg);
// 如果没启动计时器,就启动计时器
if (not _rto_timer.is_running()) {
_rto_timer.run();
_rto_timer.reset(_initial_retransmission_timeout);
}
}

fill_window()

对于需要封装的报文,大致可以分为三类,一类是最开始用于建立连接的SYN报文,一类是携带数据的PAYLOAD报文,最后一类是用于发送结束连接的挥手FIN报文。在该方法中主要的难点就是通过对目前已经确认的acknonext_seqno等数据来判断当前需要封装的报文具体是哪一类,以及根据还未接收到的数据以及零窗口本身的机制来判断空闲的窗口大小

fill_space窗口大小

首先,为了防止出现对方当前空闲窗口已满,而sender就一直啥也不发的情况出现,因此在接受到的窗口大小是0的时候,要将其改为1,来避免零窗口堵塞。同时由于部分数据还在传输的路上,这一部分的数据也需要被减掉,从而得到最后的空闲大小fill_space。

1
2
size_t fill_space = _window_size ? _window_size : 1;
fill_space -= bytes_in_flight();
SYN报文

SYN报文的判断很简单,因为发送SYN的话无非是打开连接的建立者A自己,又或者是收到了A发来报文的B返回一个携带ACKSYN报文进行确认。而对于A和B来说,由于SYN报文都是他们自己发送的第一个报文,因此在封装的过程中,他们的“下一个发送序列号”_next_seqno显而易见的应该为零。大致逻辑代码如下

1
2
// _next_seqno == 0 代表还没有开始发送数据,此时需要发送SYN报文
section.header().syn = (_next_seqno == 0);
PAYLOAD报文

对于含有内容的报文,主要的工作就是对payload长度的合理切割,对此只需要在TCPConfig::MAX_PAYLOAD_SIZE和当前剩余``中取最小值并从_stream当中读入。

1
2
3
// 将数据进行封装
size_t segment_payload_size = min(TCPConfig::MAX_PAYLOAD_SIZE, fill_space);
section.payload() = _stream.read(segment_payload_size);
FIN报文

_stream发送完毕,并且被我方全部接受了的时候发送一个携带FIN的报文,告知对方我方已经发送完毕。由于FIN本身需要消耗一个序列号,因此发送前需检查当前数据段是否还有一个空位来放FIN

1
2
3
4
// 如果要发送FIN的话,窗口内至少还要剩余一个字符(bytes_in_flight的也会占用窗口)
if (_stream.eof() && fill_space > section.length_in_sequence_space()) {
section.header().fin = true;
}
发送过滤

在标记完了FIN之后,如果这个报文依旧不占用序列号,则说明这个报文不是TCP Sender处理的部分;又或者此时在FIN已经发送的基础上,重复发送了一个FIN,这时多的FIN应该被抛弃

1
2
3
4
// 空字符报的报文或错误溢出的报文不应该由`TCP Sender`进行发送
if (section.length_in_sequence_space() == 0 || _next_seqno == _stream.bytes_written() + 2) {
return;
}
最后总的代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void TCPSender::fill_window() {
size_t fill_space = _window_size ? _window_size : 1;
fill_space -= bytes_in_flight();
while (fill_space > 0) {
TCPSegment section;

// 发送的数据包的序号是将要写入的下一个序号
section.header().seqno = next_seqno();

// _next_seqno == 0 代表还没有开始发送数据,此时需要发送SYN报文
section.header().syn = (_next_seqno == 0);

// 将数据进行封装
size_t segment_payload_size = min(TCPConfig::MAX_PAYLOAD_SIZE, fill_space);
section.payload() = _stream.read(segment_payload_size);

// 空闲窗口中至少要留有一位序号的位置才能将当前数据包添加FIN(bytes_in_flight的也会占用窗口)
if (_stream.eof() && fill_space > section.length_in_sequence_space()) {
section.header().fin = true;
}

// 如果这个报文啥都没有,或者FIN报文已经发送了,就没必要发送新的数据段了
if (section.length_in_sequence_space() == 0 || _next_seqno == _stream.bytes_written() + 2) {
return;
}

fill_space -= section.length_in_sequence_space();

_send_segment(section);
}
}

bytes_in_flight()

这个感觉可能是看起来最简单的一个函数了,因为用了_ackno来记录已经确认过的报文,同时_next_seqno又代表的是将要发送的数据流位置,因此只需要将_next_seqno - _ackno返回的就是正在发送中的数据长度了。(最开始想实现的时候还在考虑要不要在每次fill_windowack_received的时候添加计数器。。)

1
uint64_t TCPSender::bytes_in_flight() const { return _next_seqno - _ackno; }

ack_received()

确认报文主要需要的逻辑有以下四个部分:

  1. 只处理有效并且正确的ackno。如果ackno有效,记录acknowindow_size用以fill_window()来进行报文的封装
  2. 记录ack报文中包含的窗口大小
  3. 如果曾经的报文已经确认过,则报文已经送达,将送达的报文从缓冲区中弹出,如果所有的报文都被弹出了,则关闭RTO计时器
  4. 如果接受到了对方这时的窗口又有了空闲大小,则使用fill_window()来填充新的空报文
对于第一个逻辑

对于判断ackno是否是正确的ackno,只需要判断ackno是否处于已经记录的_ackno_next_seqno之间,如果在这个区间之外,意味着要么是老的ackno,要么是确认了不存在的数据,需要进行短路丢弃,逻辑如下

1
2
3
4
5
uint64_t abs_ackno = unwrap(ackno, _isn, _next_seqno);
// 如果接收到对方发送的确认序号大于自己的下一个序号或者小于自己的已经被确认序号,说明接收到的确认序号是错误的
if (abs_ackno < _ackno || abs_ackno > _next_seqno) {
return;
}
对于第二个逻辑

在接受到了窗口大小之后只需要直接将其记录

1
2
// 记录窗口大小
_window_size = window_size;
对于第三、四个逻辑

这部分都是属于对于超时重传的处理,其中主要需要实现的是对缓冲区确认后的报文进行弹出,同时弹出所有报文后取消对RTO的占用,初始化超时重传的等待时间并记录当前的时间。

其中弹出操作只有在_ackno确认的是第一个报文对应的seqnolength的时候才进行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 用于判断是否重置计时器
bool has_reset = false;

// 当缓冲区内的报文已经被ackno确认,则将已经确认的报文进行丢弃
while (not _cache.empty() &&
_cache.front().header().seqno.raw_value() + _cache.front().length_in_sequence_space() <= ackno.raw_value()) {
if (not has_reset) {
// 有效的确认报文到达,重置计时器
_rto_timer.reset(_initial_retransmission_timeout);
has_reset = true;
}
_cache.pop();
}

if (_cache.empty()) {
// 所有数据包都被确认了,所以暂停计时器
_rto_timer.stop();
}

// 如果剩余的窗口还有空间,就填入内容
fill_window();
最后总的代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
uint64_t abs_ackno = unwrap(ackno, _isn, _next_seqno);
// 如果接收到对方发送的确认序号大于自己的下一个序号或者小于自己的已经被确认序号,说明接收到的确认序号是错误的
if (abs_ackno < _ackno || abs_ackno > _next_seqno) {
return;
}
_ackno = abs_ackno;

// 记录窗口大小
_window_size = window_size;

// 用于判断是否重置计时器
bool has_reset = false;

// 当缓冲区内的报文已经被ackno确认,则将已经确认的报文进行丢弃
while (not _cache.empty() &&
_cache.front().header().seqno.raw_value() + _cache.front().length_in_sequence_space() <= ackno.raw_value()) {
if (not has_reset) {
// 有效的确认报文到达,重置计时器
_rto_timer.reset(_initial_retransmission_timeout);
has_reset = true;
}
_cache.pop();
}

if (_cache.empty()) {
// 所有数据包都被确认了,所以暂停计时器
_rto_timer.stop();
}

// 如果剩余的窗口还有空间,就填入内容
fill_window();
}

tick()

该函数主要的作用是推动时间流动,并且判断是否触发超时重传,如果触发了超时重传首先将计时器更新到当前时间。然后当对方窗口不繁忙的情况下(window_size非零)触发了重传就把下次重传的等待时间翻倍,并且记录一次重连;如果对方窗口正处于繁忙期(window_size为零),则不翻倍连接时间。然后再将缓冲区内第一个发送的报文进行重新发送。代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
// 更新当前时间
_rto_timer.update(ms_since_last_tick);

// 检测是否超时
if ((not _rto_timer.is_timeout())) {
return;
}
// 如果上一个收到的报文中,窗口大小不是零,但是依旧超时,说明是网络堵塞,执行慢启动
if (_window_size != 0) {
_rto_timer.slow_start();
}

// 重传次数小于最大重传次数,就重传
if (_rto_timer.rto_count() <= TCPConfig::MAX_RETX_ATTEMPTS) {
// 发送缓冲区中的第一个报文段
_segments_out.push(_cache.front());
_rto_timer.restart();
}
}

consecutive_retransmissions()

这个函数就是直接返回次数的,直接返回_rto_timer.rto_count();的大小即可。


CS144-Lab3 计算机网络:TCP Sender的实现
https://halc.top/p/73e1b791
作者
HalcyonAzure
发布于
2022年12月3日
许可协议