TCP Sender 需要实现的主要逻辑追踪Receiver
返回的windows_size
(可接受的剩余容量)和ackno
(已经确认接收的字符位置) 只要数据来了就直接对数据进行封装并发送,只有在窗口被消耗为零的情况下才停止发送 将没有被acknowledge
的数据包存储起来,在超时的时候进行发送 实现细节对于超时重传的时间判断,使用已经提供的tick()
函数,每次调用的时候传入多少时间就消耗了多少时间 超时重传的默认基准值会以成员变量的形式在TCPSender
中进行初始化 在TCPSegment
中有一个_segments_out
的成员,只需要向这个queue
内push
一个TCPSegment
就相当于将这个数据段发送了 代码实现 额外定义成员对于计时器的部分,为了方便抽象管理,我这里选择直接创建一个类来进行封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 class TCPTimer { private : size_t _tick_passed = 0 ; size_t _rto_timeout = 0 ; unsigned int _rto_count = 0 ; bool _is_running{false }; public : void reset (const uint16_t retx_timeout) { _rto_count = 0 ; _rto_timeout = retx_timeout; _tick_passed = 0 ; } void run () { _is_running = true ; } void stop () { _is_running = false ; } bool is_running () const { return _is_running; } unsigned int rto_count () const { return _rto_count; } void slow_start () { _rto_count++; _rto_timeout *= 2 ; } void update (const size_t ms_since_last_tick) { _tick_passed += ms_since_last_tick; } bool is_timeout () const { return _is_running && _tick_passed >= _rto_timeout; } void restart () { _tick_passed = 0 ; } };
在private
的部分定义则如下:
1 2 3 4 5 6 7 8 9 10 11 TCPTimer _rto_timer{};size_t _ackno = 0 ;size_t _window_size = 1 ; std::queue<TCPSegment> _cache{};
额外定义函数额外定义的函数主要作用为将已经封装好的TCP
报文进行发送,如果在发送的时候检测到RTO
重传计时器并没有工作,则发送的同时激活重传计时器。同时在发送了报文后对seqno序号进行消耗,移动_next_seqno
指针
1 2 3 4 5 6 7 8 9 10 11 12 void TCPSender::_send_segment(const TCPSegment &seg) { const size_t seg_len = seg.length_in_sequence_space (); _next_seqno += seg_len; _cache.push (seg); _segments_out.push (seg); if (not _rto_timer.is_running ()) { _rto_timer.run (); _rto_timer.reset (_initial_retransmission_timeout); } }
fill_window()对于需要封装的报文,大致可以分为三类,一类是最开始用于建立连接的SYN
报文,一类是携带数据的PAYLOAD
报文,最后一类是用于发送结束连接的挥手FIN
报文。在该方法中主要的难点就是通过对目前已经确认的ackno
和next_seqno
等数据来判断当前需要封装的报文具体是哪一类,以及根据还未接收到的数据以及零窗口本身的机制来判断空闲的窗口大小
fill_space窗口大小首先,为了防止出现对方当前空闲窗口已满,而sender就一直啥也不发的情况出现,因此在接受到的窗口大小是0的时候,要将其改为1,来避免零窗口堵塞。同时由于部分数据还在传输的路上,这一部分的数据也需要被减掉,从而得到最后的空闲大小fill_space。
1 2 size_t fill_space = _window_size ? _window_size : 1 ; fill_space -= bytes_in_flight ();
SYN报文SYN
报文的判断很简单,因为发送SYN
的话无非是打开连接的建立者A自己,又或者是收到了A发来报文的B返回一个携带ACK
的SYN
报文进行确认。而对于A和B来说,由于SYN
报文都是他们自己发送的第一个报文,因此在封装的过程中,他们的“下一个发送序列号”_next_seqno
显而易见的应该为零。大致逻辑代码如下
1 2 section.header ().syn = (_next_seqno == 0 );
PAYLOAD报文对于含有内容的报文,主要的工作就是对payload
长度的合理切割,对此只需要在TCPConfig::MAX_PAYLOAD_SIZE
和当前剩余``中取最小值并从_stream
当中读入。
1 2 3 size_t segment_payload_size = min (TCPConfig::MAX_PAYLOAD_SIZE, fill_space); section.payload () = _stream.read (segment_payload_size);
FIN报文在_stream
发送完毕,并且被我方全部接受了的时候发送一个携带FIN
的报文,告知对方我方已经发送完毕。由于FIN
本身需要消耗一个序列号,因此发送前需检查当前数据段是否还有一个空位来放FIN
1 2 3 4 if (_stream.eof () && fill_space > section.length_in_sequence_space ()) { section.header ().fin = true ; }
发送过滤在标记完了FIN
之后,如果这个报文依旧不占用序列号,则说明这个报文不是TCP Sender
处理的部分;又或者此时在FIN
已经发送的基础上,重复发送了一个FIN
,这时多的FIN
应该被抛弃
1 2 3 4 if (section.length_in_sequence_space () == 0 || _next_seqno == _stream.bytes_written () + 2 ) { return ; }
最后总的代码如下 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void TCPSender::fill_window () { size_t fill_space = _window_size ? _window_size : 1 ; fill_space -= bytes_in_flight (); while (fill_space > 0 ) { TCPSegment section; section.header ().seqno = next_seqno (); section.header ().syn = (_next_seqno == 0 ); size_t segment_payload_size = min (TCPConfig::MAX_PAYLOAD_SIZE, fill_space); section.payload () = _stream.read (segment_payload_size); if (_stream.eof () && fill_space > section.length_in_sequence_space ()) { section.header ().fin = true ; } if (section.length_in_sequence_space () == 0 || _next_seqno == _stream.bytes_written () + 2 ) { return ; } fill_space -= section.length_in_sequence_space (); _send_segment(section); } }
bytes_in_flight()这个感觉可能是看起来最简单的一个函数了,因为用了_ackno
来记录已经确认过的报文,同时_next_seqno
又代表的是将要发送的数据流位置,因此只需要将_next_seqno - _ackno
返回的就是正在发送中的数据长度了。(最开始想实现的时候还在考虑要不要在每次fill_window
和ack_received
的时候添加计数器。。)
1 uint64_t TCPSender::bytes_in_flight () const { return _next_seqno - _ackno; }
ack_received()确认报文主要需要的逻辑有以下四个部分:
只处理有效并且正确的ackno
。如果ackno
有效,记录ackno
和window_size
用以fill_window()
来进行报文的封装 记录ack报文中包含的窗口大小 如果曾经的报文已经确认过,则报文已经送达,将送达的报文从缓冲区中弹出,如果所有的报文都被弹出了,则关闭RTO计时器 如果接受到了对方这时的窗口又有了空闲大小,则使用fill_window()
来填充新的空报文 对于第一个逻辑对于判断ackno
是否是正确的ackno
,只需要判断ackno
是否处于已经记录的_ackno
和_next_seqno
之间,如果在这个区间之外,意味着要么是老的ackno
,要么是确认了不存在的数据,需要进行短路丢弃,逻辑如下
1 2 3 4 5 uint64_t abs_ackno = unwrap (ackno, _isn, _next_seqno);if (abs_ackno < _ackno || abs_ackno > _next_seqno) { return ; }
对于第二个逻辑在接受到了窗口大小之后只需要直接将其记录
1 2 _window_size = window_size;
对于第三、四个逻辑这部分都是属于对于超时重传的处理,其中主要需要实现的是对缓冲区确认后的报文进行弹出,同时弹出所有报文后取消对RTO
的占用,初始化超时重传的等待时间并记录当前的时间。
其中弹出操作只有在_ackno
确认的是第一个报文对应的seqno
和length
的时候才进行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 bool has_reset = false ;while (not _cache.empty () && _cache.front ().header ().seqno.raw_value () + _cache.front ().length_in_sequence_space () <= ackno.raw_value ()) { if (not has_reset) { _rto_timer.reset (_initial_retransmission_timeout); has_reset = true ; } _cache.pop (); }if (_cache.empty ()) { _rto_timer.stop (); }fill_window ();
最后总的代码如下 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 void TCPSender::ack_received (const WrappingInt32 ackno, const uint16_t window_size) { uint64_t abs_ackno = unwrap (ackno, _isn, _next_seqno); if (abs_ackno < _ackno || abs_ackno > _next_seqno) { return ; } _ackno = abs_ackno; _window_size = window_size; bool has_reset = false ; while (not _cache.empty () && _cache.front ().header ().seqno.raw_value () + _cache.front ().length_in_sequence_space () <= ackno.raw_value ()) { if (not has_reset) { _rto_timer.reset (_initial_retransmission_timeout); has_reset = true ; } _cache.pop (); } if (_cache.empty ()) { _rto_timer.stop (); } fill_window (); }
tick()该函数主要的作用是推动时间流动,并且判断是否触发超时重传,如果触发了超时重传首先将计时器更新到当前时间。然后当对方窗口不繁忙的情况下(window_size非零)触发了重传就把下次重传的等待时间翻倍,并且记录一次重连;如果对方窗口正处于繁忙期(window_size为零),则不翻倍连接时间。然后再将缓冲区内第一个发送的报文进行重新发送。代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void TCPSender::tick (const size_t ms_since_last_tick) { _rto_timer.update (ms_since_last_tick); if ((not _rto_timer.is_timeout ())) { return ; } if (_window_size != 0 ) { _rto_timer.slow_start (); } if (_rto_timer.rto_count () <= TCPConfig::MAX_RETX_ATTEMPTS) { _segments_out.push (_cache.front ()); _rto_timer.restart (); } }
consecutive_retransmissions()这个函数就是直接返回次数的,直接返回_rto_timer.rto_count();
的大小即可。