Lab 3: The TCP Sender
- Lab Guidance: Checkpoint 3: the TCP sender
- Lab Code: https://github.com/peakcrosser7/sponge/tree/lab3-startercode
要点
- 跟踪哪些——称为未完成段(outstanding segments)
- 包含数据的发送()段(无论是第一次还是重传), 如果计时器没有启动,则启动
- 时,
fill_window()
函数应 时一样 - 的报文段(
send_empty_segment()
发送的报文段) - 发送报文段
_segments_out
队列表示发送报文段 - 初始接收窗口大小为 1
- 当报文段占用的所有序列号都小于
ackno
它被视为完全确认.
思路
计时器实现
首先介绍用于 TCPSender
计时使用的类别 Timer
, 实际上,实现相对简单, 这里就不赘述了.
TCPSender
新成员变量
_bytes_in_flight
: 发送了多少序列号的数据,但尚未确认_window_size
: 接收方的窗口大小, 初始时为 1._ackno
: 接收方获得的(相对)确认号, 初始为 0._sending_ending
: 标志发送是否结束, 发送带FIN
标志位的报文段后会被放置true
, 之后,即使有空间发送新数据(sending_space!=0
)新数据发送新的数据._outstanding_segments
: 用于存储已发送但未确认报文段的队列. 使用的数据结构是队列(std::queue<TCPSegment>
). 因为序列号最小的报文段会在超时重发时发送, 因此,选定的数据结构需要满足两个条件, 一个是能够, 另一个是. 由于任务指导说明不需要考虑报文段的部分确认和多个报文段的合并, 所以很容易想到可以用std::map
存储报文段. 但需要注意的是, 在每次调用fill_window()
由最新数据组成的报文段只会发送, 此时还需要记录_outstanding_segments
中, 不需要额外的操作, 其中, 因此,队列可以直接用来记录, ._timer
: 上述计时器实例._retransmission_timeout
: TCP 连接当前超时时间, 即 RTO_consecutive_retransmissions
: 连续超时重发次数.
TCPSender
方法实现
TCPSender
基本上按照任务指导, 其中 fill_window()
和 ack_recevied()
要注意的核心方法有很多, 以下说明.
fill_window()
根据任务指导, fill_window()
主要任务尽可能完成 _stream_in
中读数据并封装成报文段发送. 在此过程中,应设置报文段第一部分的标志位、序列号等. 首先要保证有发送新数据的空间和未完成的发送. 其中 sending_space
计算方法如下: 相对确认号 窗口大小 - 相对于下一个序列号. 具体表示可见下图. 需要注意.
sending_space
根据发送的新数据占用的序列号减少. 值得一提的是, 在从 _stream_in
中读数据时, 要在最大值 TCPConfig::MAX_PAYLOAD_SIZE
和当前 sending_space
中取较小者, 避免超过接收方的窗口尺寸. 而 sending_space
在 SYN
设置标志位时也可能减少 1. 考虑到最外层是循环的原因,接收方的窗口很大, 即 sending_space
值大到超过 TCPConfig::MAX_PAYLOAD_SIZE
, 在字节流中发送足够的数据, 此时不应只读一段报文, 反而要多读. _sending_ending
标志位置是必要的, 因为窗户的大小是可能的 sending_space
很大, 但此时没有数据发送, 需要设置 FIN
标志位, 但标志位只占 1 个序列号, 因此, 显然是不合理的. 若报文段长度为 0, 即没有负载数据, 且没有设置 SYN
和 FIN
标志位, 此时不应发送报文段, 因此,应直接返回. 另一点需要注意的是, . 即 sending_space
大小与最终可读字节数据一致, , 因此,此时无法设置 FIN
, 而是. 综上, 对于 FIN
设置标志位的条件, 不仅要 _stream_in
为 true
, 还要保证 sending_space
非 0.
ack_received()
根据任务指导, ack_received()
函数主要是确认已发送的报文段, 同时,获取接收方窗口大小信息. 首先要注意的是, 若, 理论上,这是不可能的, 函数应直接退出, . 下一步是试图从记录未确认的报中记录下来 _outstanding_segments
确认队列首中, 并将确认的报文段删除. (即任务指导#3.1.7中所述), 因为只要有数据从 _outstanding_segments
中移除, 这意味着已经确认了新的数据重复确认的报文段已经不在了 _outstanding_segments
中).
代码
libsponge/tcp_sender.hh
#
ifndef
SPONGE_LIBSPONGE_TCP_SENDER_HH
#define SPONGE_LIBSPONGE_TCP_SENDER_HH
#include "byte_stream.hh"
#include "tcp_config.hh"
#include "tcp_segment.hh"
#include "wrapping_integers.hh"
#include <functional>
#include <queue>
//! \brief The timer used for TCPSender
class Timer {
private:
//! total elapsed time
size_t _ticks{
0};
//! timer is started or not
bool _started{
false};
public:
//! check if the timer has expired and update `_ticks`
//! \param[in] timeout the timeout time, i.e. RTO
//! \return true if timer has expired
bool expired(const size_t ms_since_last_tick, const unsigned timeout) {
// The timer will only expire if it is started
return _started && ((_ticks += ms_since_last_tick) >= timeout);
}
//! \return true if timer is started
bool started() const {
return _started; }
//! stop the timer
void stop() {
_started = false; }
//! start the timer
void start() {
_ticks = 0;
_started = true;
}
};
//! \brief The "sender" part of a TCP implementation.
//! Accepts a ByteStream, divides it up into segments and sends the
//! segments, keeps track of which segments are still in-flight,
//! maintains the Retransmission Timer, and retransmits in-flight
//! segments if the retransmission timer expires.
class TCPSender {
private:
//! our initial sequence number, the number for our SYN.
WrappingInt32 _isn;
//! outbound queue of segments that the TCPSender wants sent
std::queue<TCPSegment> _segments_out{
};
//! retransmission timer for the connection
const unsigned int _initial_retransmission_timeout;
//! outgoing stream of bytes that have not yet been sent
ByteStream _stream;
//! the (absolute) sequence number for the next byte to be sent
uint64_t _next_seqno{
0};
//! the sequence numbers occupied by segments sent but not yet acknowledged
size_t _bytes_in_flight{
0};
//! the receiver's window size
size_t _window_size{
1};
//! the (absolute) acknowledge sequence number from receiver
uint64_t _ackno{
0};
//! flag indicating that FIN flag has been set and sender cannot send any new byte
bool _sending_ending{
false};
//! the queue storing the outstanding segments
std::queue<TCPSegment> _outstanding_segments{
};
//! the timer for this TCPSender
Timer _timer{
};
//! current retransmission timeout
unsigned _retransmission_timeout;
//! the number of consecutive retransmissions
unsigned _consecutive_retransmissions{
0};
public:
//! Initialize a TCPSender
TCPSender(const size_t capacity = TCPConfig::DEFAULT_CAPACITY,
const uint16_t retx_timeout = TCPConfig::TIMEOUT_DFLT,
const std::optional<WrappingInt32> fixed_isn = {
});
//! \name "Input" interface for the writer
//!@{
ByteStream &stream_in() {
return _stream; }
const ByteStream &stream_in() const {
return _stream; }
//!@}
//! \name Methods that can cause the TCPSender to send a segment
//!@{
//! \brief A new acknowledgment was received
void ack_received(const WrappingInt32 ackno, const uint16_t window_size);
//! \brief Generate an empty-payload segment (useful for creating empty ACK segments)
void send_empty_segment();
//! \brief create and send segments to fill as much of the window as possible
void fill_window();
//! \brief Notifies the TCPSender of the passage of time
void tick(const size_t ms_since_last_tick);
//!@}
//! \name Accessors
//!@{
//! \brief How many sequence numbers are occupied by segments sent but not yet acknowledged?
//! \note count is in "sequence space," i.e. SYN and FIN each count for one byte
//! (see TCPSegment::length_in_sequence_space())
size_t bytes_in_flight() const;
//! \brief Number of consecutive retransmissions that have occurred in a row
unsigned int consecutive_retransmissions() const;
//! \brief TCPSegments that the TCPSender has enqueued for transmission.
//! \note These must be dequeued and sent by the TCPConnection,
//! which will need to fill in the fields that are set by the TCPReceiver
//! (ackno and window size) before sending.
std::queue<TCPSegment> &segments_out() {
return _segments_out; }
//!@}
//! \name What is the next sequence number? (used for testing)
//!@{
//! \brief absolute seqno for the next byte to be sent
uint64_t next_seqno_absolute() const {
return _next_seqno; }
//! \brief relative seqno for the next byte to be sent
WrappingInt32 next_seqno() const {
return wrap(_next_seqno, _isn); }
//!@}
};
#endif // SPONGE_LIBSPONGE_TCP_SENDER_HH
libsponge/tcp_sender.cc
#include "tcp_sender.hh"
#include "tcp_config.hh"
#include <random>
// Dummy implementation of a TCP sender
// For Lab 3, please replace with a real implementation that passes the
// automated checks run by `make check_lab3`.
template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {
}
using namespace std;
//! \param[in] capacity the capacity of the outgoing byte stream
//! \param[in] retx_timeout the initial amount of time to wait before retransmitting the oldest outstanding segment
//! \param[in] fixed_isn the Initial Sequence Number to use, if set (otherwise uses a random ISN)
TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn)
: _isn(fixed_isn.value_or(WrappingInt32{
random_device()()}))
, _initial_retransmission_timeout{
retx_timeout}
, _stream(capacity)
, _retransmission_timeout(retx_timeout) {
}
uint64_t TCPSender::bytes_in_flight() const {
return _bytes_in_flight; }
void TCPSender::fill_window() {
// if `_sending_end` has been set, the sender shouldn't send any new bytes
if (_sending_ending) {
return;
}
// if the window size is 0, it should act like the window size is 1
size_t sending_space = _ackno + (_window_size != 0 ? _window_size : 1) - next_seqno_absolute();
// have the sending space and not get to sending ending
while (sending_space > 0 && !_sending_ending) {
TCPSegment segment;
TCPHeader &header = segment.header();
if (next_seqno_absolute() == 0) {
header.syn = true;
--sending_space;
}
header.seqno = next_seqno();
Buffer &buffer = segment.payload();
buffer = stream_in().read(min(sending_space, TCPConfig::MAX_PAYLOAD_SIZE));
// don't add FIN if this would make the segment exceed the receiver's window
sending_space -= buffer.size();
if (stream_in().eof() && sending_space > 0) {
header.fin = true;
--sending_space;
// set `_sending_ending` true, so that sender will never send any new bytes
_sending_ending = true;
}
size_t len = segment.length_in_sequence_space();
if (len == 0) {
return;
}
segments_out().emplace(segment);
if (!_timer.started()) {
_timer.start();
}
_outstanding_segments.emplace(segment);
_next_seqno += len;
_bytes_in_flight += len;
}
}
//! \param ackno The remote receiver's ackno (acknowledgment number)
//! \param window_size The remote receiver's advertised window size
void TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) {
_ackno = unwrap(ackno, _isn, next_seqno_absolute());
// impossible ackno (beyond next seqno) should be ignored
if (_ackno > next_seqno_absolute()) {
return;
}
_window_size = window_size;
// the flag indicating that if new data has been acknowledged
bool has_new = false;
while (!_outstanding_segments.empty()) {
TCPSegment segment = _outstanding_segments.front();
size_t len = segment.length_in_sequence_space();
uint64_t seqno = unwrap(segment.header().seqno, _isn, next_seqno_absolute());
// the segment is not fully acknowledged, should stop
if (seqno + len > _ackno) {
break;
}
_outstanding_segments.pop();
_bytes_in_flight -= len;
has_new = true;
}
if (has_new) {
_retransmission_timeout = _initial_retransmission_timeout;
if (!_outstanding_segments.empty()) {
_timer.start();
} else {
_timer.stop();
}
_consecutive_retransmissions = 0;
}
}
//! \param[in] ms_since_last_tick the number of milliseconds since the last call to this method
void TCPSender::tick(const size_t ms_since_last_tick) {
if (!_timer.expired(ms_since_last_tick, _retransmission_timeout)) {
return;
}
segments_out().push(_outstanding_segments.front());
if (_window_size != 0) {
++_consecutive_retransmissions;
_retransmission_timeout <<= 1;
}
_timer.start();
}
unsigned int TCPSender::consecutive_retransmissions() const {
return _consecutive_retransmissions; }
void TCPSender::send_empty_segment() {
TCPSegment segment;
segment.header().seqno = next_seqno();
segments_out().emplace(segment);
}
遇到问题
- Test #11-SYN acked test: 解决: 出现上述问题是因为在没有负载数据时直接退出, 而未考虑有
SYN
或FIN
标志位被设置的情况, 如下图所示代码. 以及在没有负载没有标志位设置时不应该发送报文段. 因此, 正确的做法应该是在segment.length_in_sequence_space()
为 0 时才可以退出不发送报文段. - Test #14-Piggyback FIN in segment when space is available: 解决: 出现上述问题的原因在
fill_window()
函数中, 错误地在字节流读取数据stream_in().read()
之前对FIN
标志位进行设置. 正确的做法应该是先读取数据, 再设置FIN
标志位, 因为. - Test #16-FIN acked test: 解决: 笔者出现上述原因在于
FIN
标志位设置时未使用_sending_ending
标志位, 而是选择将sending_space
置 0. 这样可以保证在fill_window()
中之后发送 1 个带有FIN
的报文段, 但同样会出现上述错误. 原因是ack_received()
函数后会调用fill_window()
, 此时根据接收方的窗口大小会计算出新的(非0的)sending_space
, 进而便会又多发一个FIN
报文段. 正确的做法便是利用_sending_ending
标志位在发送新报文段时添加新的条件, 这样即便ack_received()
对sending_space
产生影响也不会导致重复FIN
报文段. - Test #17-Don’t add FIN if this would make the segment exceed the receiver’s window:
解决: 出现上述错误的原因即在窗口大小不够时, 应不设置 FIN
标志位, 在有窗口时再发送一个带 FIN
标志位的报文段. 具体分析在上述 #思路.fill_window()
中已经说明, 在此不多赘述.
测试
在 build
目录下执行 make
后执行 make check_lab3
: