计算机网络:TCP Sender的实现

本文最后更新于:2022年12月7日 晚上

TCP Sender

需要实现的主要逻辑

  1. 追踪Receiver返回的windows_size(可接受的剩余容量)和ackno(已经确认接收的字符位置)
  2. 只要数据来了就直接对数据进行封装并发送,只有在窗口被消耗为零的情况下才停止发送
  3. 将没有被acknowledge的数据包存储起来,在超时的时候进行发送

实现细节

  1. 对于超时重传的时间判断,使用已经提供的tick()函数,每次调用的时候传入多少时间就消耗了多少时间
  2. 超时重传的默认基准值会以成员变量的形式在TCPSender中进行初始化
  3. TCPSegment中有一个_segments_out的成员,只需要向这个queuepush一个TCPSegment就相当于将这个数据段发送了

代码实现

额外定义成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 计时器
size_t _current_tick = 0;
size_t _sent_tick = 0;
size_t _rto_tick = 0;

// 记录确认的_ackno
size_t _ackno = 0;

// 记录窗口大小,并标记是否为空窗口
size_t _window_size = 1;
bool _is_zero = false;

// 正在被RTO记录的TCP段
bool _rto_trigger = false;

// 超时重传的次数
size_t _rto_count = 0;

// 缓存队列
std::queue<TCPSegment> _cache{};

额外定义函数

额外定义的函数主要作用为将已经封装好的TCP报文进行发送,如果在发送的时候检测到RTO重传计时器并没有工作,则发送的同时激活重传计时器。同时在发送了报文后减少_window_size的空闲部分,并将下一个“待发送的字符序号”后移和报文长度相同的量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TCPSender::_send_segment(const TCPSegment &seg) {
if (not _rto_trigger) {
// 如果当前没有报文开始RTO计时的话,则启动RTO
_rto_tick = _initial_retransmission_timeout;
_sent_tick = _current_tick;
_rto_trigger = true;
}
// 当前报文需要占用的长度
const size_t seg_len = seg.length_in_sequence_space();
_window_size -= seg_len;
_next_seqno += seg_len;
_cache.push(seg);
_segments_out.push(seg);
}

fill_window()

对于需要封装的报文,大致可以分为三类,一类是最开始用于建立连接的SYN报文,一类是携带数据的PAYLOAD报文,最后一类是用于发送结束连接的挥手FIN报文。在该方法中主要的难点就是通过对目前已经确认的acknonext_seqno等数据来判断当前需要封装的报文具体是哪一类

SYN报文

SYN报文的判断很简单,因为发送SYN的话无非是打开连接的建立者A自己,又或者是收到了A发来报文的B返回一个携带ACKSYN报文进行确认。而对于A和B来说,由于SYN报文都是他们自己发送的第一个报文,因此在封装的过程中,他们的“下一个发送序列号”_next_seqno显而易见的应该为零。大致逻辑代码如下

1
2
3
4
bool is_syn = (_next_seqno == 0);

// 判断是否为SYN后的确认报文
section.header().syn = is_syn;
PAYLOAD报文

对于含有内容的报文,主要的工作就是对payload长度的合理切割,对此只需要在TCPConfig::MAX_PAYLOAD_SIZE和当前剩余_window_size中取最小值并从_stream当中读入。

1
2
size_t segment_payload_size = min(TCPConfig::MAX_PAYLOAD_SIZE, _window_size);
section.payload() = _stream.read(segment_payload_size);
FIN报文

_stream发送完毕的时候发送一个携带FIN的报文,告知对方我方已经发送完毕。由于FIN本身需要消耗一个序列号,因此发送前需检查当前window_size在处理完所有字符后是否还有空间。代码如下:

1
2
3
4
// 如果要发送FIN的话,窗口内至少还要剩余一个字符(bytes_in_flight的也会占用窗口)
if (is_fin && _window_size > (section.length_in_sequence_space() + bytes_in_flight())) {
section.header().fin = true;
}
发送过滤

在标记完了FIN之后,如果这个报文依旧不占用序列号,则说明这个报文不是TCP Sender处理的部分;又或者此时在FIN已经发送的基础上,重复发送了一个FIN,这时多的FIN应该被抛弃

1
2
3
4
// 空字符报的报文或错误溢出的报文不应该由`TCP Sender`进行发送
if (section.length_in_sequence_space() == 0 || _next_seqno == _stream.bytes_written() + 2) {
return;
}
最后总的代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
void TCPSender::fill_window() {
while (_window_size != 0) {
TCPSegment section;

section.header().seqno = next_seqno();

bool is_syn = (_next_seqno == 0);
bool is_fin = _stream.input_ended();

// 判断是否为SYN后确认报文
section.header().syn = is_syn;

size_t segment_payload_size = min(TCPConfig::MAX_PAYLOAD_SIZE, _window_size);
section.payload() = _stream.read(segment_payload_size);

// 如果要发送FIN的话,窗口内至少还要剩余一个字符(bytes_in_flight的也会占用窗口)
if (is_fin && _window_size > (section.length_in_sequence_space() + bytes_in_flight())) {
section.header().fin = true;
}

// 空字符报的报文或错误溢出的报文不应该由`TCP Sender`进行发送
if (section.length_in_sequence_space() == 0 || _next_seqno == _stream.bytes_written() + 2) {
return;
}

_send_segment(section);
}
}

bytes_in_flight()

这个感觉可能是看起来最简单的一个函数了,因为用了_ackno来记录已经确认过的报文,同时_next_seqno又代表的是将要发送的数据流位置,因此只需要将_next_seqno - _ackno返回的就是正在发送中的数据长度了。(最开始想实现的时候还在考虑要不要在每次fill_windowack_received的时候添加计数器。。)

1
uint64_t TCPSender::bytes_in_flight() const { return _next_seqno - _ackno; }

ack_received()

确认报文主要需要的逻辑有以下四个部分:

  1. 只处理有效并且正确的ackno。如果ackno有效,记录acknowindow_size用以fill_window()来进行报文的封装
  2. 如果传入的window_size不为空,则正常设置_window_size。否则代表接收方繁忙,此时将window_size视为窗口大小为1进行数据发送,同时后续超时重传时间也不翻倍。
  3. 将记录连接是否可靠的超时重传次数计数器清零,返回ack代表接收方和我方依旧存在通信连接
  4. 如果曾经的报文已经确认过,则报文已经送达,将送达的报文从缓冲区中弹出
对于第一个逻辑

对于判断ackno是否是正确的ackno,只需要判断ackno是否处于已经记录的_ackno_next_seqno之间,如果在这个区间之外,意味着要么是老的ackno,要么是确认了不存在的数据,需要进行短路丢弃,逻辑如下

1
2
3
4
5
uint64_t unwrap_ackno = unwrap(ackno, _isn, _next_seqno);
if (unwrap_ackno > _next_seqno || unwrap_ackno < _ackno) {
return;
}
_ackno = unwrap_ackno;
对于第二个逻辑

为了区分_window_size到底是本来传入的就是1,还是说传入的0而视为1,这里采用了一个单独的flag:_is_zero来对窗口进行记录,以便于后续tick()函数针对non-zero window的超时重传加以区分。代码如下

1
2
3
4
// 记录是否为空窗口
_is_zero = (window_size == 0);
// 将0视为1
_window_size = window_size ? window_size : 1;
对于第三、四个逻辑

这部分都是属于对于超时重传的处理,其中主要需要实现的是对缓冲区确认后的报文进行弹出,同时弹出后取消对RTO的占用,初始化超时重传的等待时间并记录当前的时间。

其中弹出操作只有在_ackno确认的是第一个报文对应的seqnolength的时候才进行

1
2
3
4
5
6
7
8
_rto_count = 0;
if (not _cache.empty() &&
_cache.front().header().seqno.raw_value() + _cache.front().length_in_sequence_space() == ackno.raw_value()) {
_cache.pop();
_rto_trigger = false;
_sent_tick = _current_tick;
_rto_tick = _initial_retransmission_timeout;
}
最后总的代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
uint64_t unwrap_ackno = unwrap(ackno, _isn, _next_seqno);
if (unwrap_ackno > _next_seqno || unwrap_ackno < _ackno) {
return;
}
_ackno = unwrap_ackno;

// 记录是否为空窗口
_is_zero = (window_size == 0);
// 将0视为1
_window_size = window_size ? window_size : 1;

// 重传计数器清零
_rto_count = 0;
if (not _cache.empty() &&
_cache.front().header().seqno.raw_value() + _cache.front().length_in_sequence_space() == ackno.raw_value()) {
_cache.pop();
_rto_trigger = false;
_sent_tick = _current_tick;
_rto_tick = _initial_retransmission_timeout;
}
}

tick()

该函数主要的作用是推动时间流动,并且判断是否触发超时重传,如果触发了超时重传首先将计时器更新到当前时间。然后当对方窗口不繁忙的情况下(window_size非零)触发了重传就把下次重传的等待时间翻倍,并且记录一次重连;如果对方窗口正处于繁忙期(window_size为零),则不翻倍连接时间。然后再将缓冲区内第一个发送的报文进行重新发送。代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
_current_tick += ms_since_last_tick;
if (_current_tick - _sent_tick < _rto_tick || _cache.empty()) {
return;
}
_sent_tick = _current_tick;
if (not _is_zero) {
_rto_count++;
_rto_tick *= 2;
}
_segments_out.push(_cache.front());
}

consecutive_retransmissions()

这个函数就是直接返回次数的,直接返回_rto_count的大小即可。


计算机网络:TCP Sender的实现
https://halc.top/p/73e1b791
作者
HalcyonAzure
发布于
2022年12月3日
许可协议