Merge pull request #404 from smoltcp-rs/delayed-ack

tcp: add Delayed ACK
v0.7.x
Dario Nieuwenhuis 2021-01-07 00:26:01 +01:00 committed by GitHub
commit 878c04250b
2 changed files with 192 additions and 16 deletions

View File

@ -71,16 +71,6 @@ pub(crate) enum PollAt {
Ingress,
}
impl PollAt {
#[cfg(feature = "socket-tcp")]
fn is_ingress(&self) -> bool {
match *self {
PollAt::Ingress => true,
_ => false,
}
}
}
/// A network socket.
///
/// This enumeration abstracts the various types of sockets based on the IP protocol.

View File

@ -162,6 +162,7 @@ enum Timer {
}
}
const ACK_DELAY_DEFAULT: Duration = Duration { millis: 10 };
const CLOSE_DELAY: Duration = Duration { millis: 10_000 };
impl Default for Timer {
@ -341,6 +342,12 @@ pub struct TcpSocket<'a> {
/// each other which have the same ACK number.
local_rx_dup_acks: u8,
/// Duration for Delayed ACK. If None no ACKs will be delayed.
ack_delay: Option<Duration>,
/// Delayed ack timer. If set, packets containing exclusively
/// ACK or window updates (ie, no data) won't be sent until expiry.
ack_delay_until: Option<Instant>,
#[cfg(feature = "async")]
rx_waker: WakerRegistration,
#[cfg(feature = "async")]
@ -397,6 +404,8 @@ impl<'a> TcpSocket<'a> {
local_rx_last_ack: None,
local_rx_last_seq: None,
local_rx_dup_acks: 0,
ack_delay: Some(ACK_DELAY_DEFAULT),
ack_delay_until: None,
#[cfg(feature = "async")]
rx_waker: WakerRegistration::new(),
@ -453,6 +462,13 @@ impl<'a> TcpSocket<'a> {
self.timeout
}
/// Return the ACK delay duration.
///
/// See also the [set_ack_delay](#method.set_ack_delay) method.
pub fn ack_delay(&self) -> Option<Duration> {
self.ack_delay
}
/// Return the current window field value, including scaling according to RFC 1323.
///
/// Used in internal calculations as well as packet generation.
@ -478,6 +494,13 @@ impl<'a> TcpSocket<'a> {
self.timeout = duration
}
/// Set the ACK delay duration.
///
/// By default, the ACK delay is set to 10ms.
pub fn set_ack_delay(&mut self, duration: Option<Duration>) {
self.ack_delay = duration
}
/// Return the keep-alive interval.
///
/// See also the [set_keep_alive](#method.set_keep_alive) method.
@ -578,6 +601,8 @@ impl<'a> TcpSocket<'a> {
self.remote_win_shift = rx_cap_log2.saturating_sub(16) as u8;
self.remote_mss = DEFAULT_MSS;
self.remote_last_ts = None;
self.ack_delay = Some(ACK_DELAY_DEFAULT);
self.ack_delay_until = None;
#[cfg(feature = "async")]
{
@ -1541,6 +1566,30 @@ impl<'a> TcpSocket<'a> {
self.assembler);
}
// Handle delayed acks
if let Some(ack_delay) = self.ack_delay {
if self.ack_to_transmit() || self.window_to_update() {
self.ack_delay_until = match self.ack_delay_until {
None => {
net_trace!("{}:{}:{}: starting delayed ack timer",
self.meta.handle, self.local_endpoint, self.remote_endpoint
);
Some(timestamp + ack_delay)
}
// RFC1122 says "in a stream of full-sized segments there SHOULD be an ACK
// for at least every second segment".
// For now, we send an ACK every second received packet, full-sized or not.
Some(_) => {
net_trace!("{}:{}:{}: delayed ack timer already started, forcing expiry",
self.meta.handle, self.local_endpoint, self.remote_endpoint
);
None
}
};
}
}
// Per RFC 5681, we should send an immediate ACK when either:
// 1) an out-of-order segment is received, or
// 2) a segment arrives that fills in all or part of a gap in sequence space.
@ -1590,6 +1639,13 @@ impl<'a> TcpSocket<'a> {
can_data || can_fin
}
fn delayed_ack_expired(&self, timestamp: Instant) -> bool {
match self.ack_delay_until {
None => true,
Some(t) => t <= timestamp,
}
}
fn ack_to_transmit(&self) -> bool {
if let Some(remote_last_ack) = self.remote_last_ack {
remote_last_ack < self.remote_seq_no + self.rx_buffer.len()
@ -1644,11 +1700,11 @@ impl<'a> TcpSocket<'a> {
// If we have data to transmit and it fits into partner's window, do it.
net_trace!("{}:{}:{}: outgoing segment will send data or flags",
self.meta.handle, self.local_endpoint, self.remote_endpoint);
} else if self.ack_to_transmit() {
} else if self.ack_to_transmit() && self.delayed_ack_expired(timestamp) {
// If we have data to acknowledge, do it.
net_trace!("{}:{}:{}: outgoing segment will acknowledge",
self.meta.handle, self.local_endpoint, self.remote_endpoint);
} else if self.window_to_update() {
} else if self.window_to_update() && self.delayed_ack_expired(timestamp) {
// If we have window length increase to advertise, do it.
net_trace!("{}:{}:{}: outgoing segment will update window",
self.meta.handle, self.local_endpoint, self.remote_endpoint);
@ -1812,6 +1868,15 @@ impl<'a> TcpSocket<'a> {
// the keep-alive timer.
self.timer.rewind_keep_alive(timestamp, self.keep_alive);
// Reset delayed-ack timer
if self.ack_delay_until.is_some() {
net_trace!("{}:{}:{}: stop delayed ack timer",
self.meta.handle, self.local_endpoint, self.remote_endpoint
);
self.ack_delay_until = None;
}
// Leave the rest of the state intact if sending a keep-alive packet, since those
// carry a fake segment.
if is_keep_alive { return Ok(()) }
@ -1851,10 +1916,17 @@ impl<'a> TcpSocket<'a> {
} else if self.state == State::Closed {
// Socket was aborted, we have an RST packet to transmit.
PollAt::Now
} else if self.seq_to_transmit() || self.ack_to_transmit() || self.window_to_update() {
} else if self.seq_to_transmit() {
// We have a data or flag packet to transmit.
PollAt::Now
} else {
let want_ack = self.ack_to_transmit() || self.window_to_update();
let delayed_ack_poll_at = match (want_ack, self.ack_delay_until) {
(false, _) => PollAt::Ingress,
(true, None) => PollAt::Now,
(true, Some(t)) => PollAt::Time(t),
};
let timeout_poll_at = match (self.remote_last_ts, self.timeout) {
// If we're transmitting or retransmitting data, we need to poll at the moment
// when the timeout would expire.
@ -1864,9 +1936,8 @@ impl<'a> TcpSocket<'a> {
};
// We wait for the earliest of our timers to fire.
*[self.timer.poll_at(), timeout_poll_at]
*[self.timer.poll_at(), timeout_poll_at, delayed_ack_poll_at]
.iter()
.filter(|x| !x.is_ingress())
.min().unwrap_or(&PollAt::Ingress)
}
}
@ -2076,7 +2147,9 @@ mod test {
let rx_buffer = SocketBuffer::new(vec![0; rx_len]);
let tx_buffer = SocketBuffer::new(vec![0; tx_len]);
TcpSocket::new(rx_buffer, tx_buffer)
let mut socket = TcpSocket::new(rx_buffer, tx_buffer);
socket.set_ack_delay(None);
socket
}
fn socket_syn_received_with_buffer_sizes(
@ -5084,6 +5157,119 @@ mod test {
assert_eq!(s.recv(|_| (0, ())), Err(Error::Illegal));
}
// =========================================================================================//
// Tests for delayed ACK
// =========================================================================================//
#[test]
fn test_delayed_ack() {
let mut s = socket_established();
s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"abc"[..],
..SEND_TEMPL
});
// No ACK is immediately sent.
recv!(s, Err(Error::Exhausted));
// After 10ms, it is sent.
recv!(s, time 11, Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 3),
window_len: 61,
..RECV_TEMPL
}));
}
#[test]
fn test_delayed_ack_win() {
let mut s = socket_established();
s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"abc"[..],
..SEND_TEMPL
});
// Reading the data off the buffer should cause a window update.
s.recv(|data| {
assert_eq!(data, b"abc");
(3, ())
}).unwrap();
// However, no ACK or window update is immediately sent.
recv!(s, Err(Error::Exhausted));
// After 10ms, it is sent.
recv!(s, time 11, Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 3),
..RECV_TEMPL
}));
}
#[test]
fn test_delayed_ack_reply() {
let mut s = socket_established();
s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"abc"[..],
..SEND_TEMPL
});
s.recv(|data| {
assert_eq!(data, b"abc");
(3, ())
}).unwrap();
s.send_slice(&b"xyz"[..]).unwrap();
// Writing data to the socket causes ACK to not be delayed,
// because it is immediately sent with the data.
recv!(s, Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 3),
payload: &b"xyz"[..],
..RECV_TEMPL
}));
}
#[test]
fn test_delayed_ack_every_second_packet() {
let mut s = socket_established();
s.set_ack_delay(Some(ACK_DELAY_DEFAULT));
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"abc"[..],
..SEND_TEMPL
});
// No ACK is immediately sent.
recv!(s, Err(Error::Exhausted));
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1 + 3,
ack_number: Some(LOCAL_SEQ + 1),
payload: &b"def"[..],
..SEND_TEMPL
});
// Every 2nd packet, ACK is sent without delay.
recv!(s, Ok(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 6),
window_len: 58,
..RECV_TEMPL
}));
}
// =========================================================================================//
// Tests for packet filtering.
// =========================================================================================//