From 2ea2b7ff6b91e3ea2fc75d9dc9c9059a65fded35 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 4 Jan 2021 00:52:17 +0100 Subject: [PATCH] tcp: add Delayed ACK --- src/socket/mod.rs | 10 --- src/socket/tcp.rs | 198 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 192 insertions(+), 16 deletions(-) diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 24afb5a..964e9b9 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -71,16 +71,6 @@ pub(crate) enum PollAt { Ingress, } -impl PollAt { - #[cfg(feature = "socket-tcp")] - fn is_ingress(&self) -> bool { - match *self { - PollAt::Ingress => true, - _ => false, - } - } -} - /// A network socket. /// /// This enumeration abstracts the various types of sockets based on the IP protocol. diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 0d2d571..38b3173 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -162,6 +162,7 @@ enum Timer { } } +const ACK_DELAY_DEFAULT: Duration = Duration { millis: 10 }; const CLOSE_DELAY: Duration = Duration { millis: 10_000 }; impl Default for Timer { @@ -341,6 +342,12 @@ pub struct TcpSocket<'a> { /// each other which have the same ACK number. local_rx_dup_acks: u8, + /// Duration for Delayed ACK. If None no ACKs will be delayed. + ack_delay: Option, + /// Delayed ack timer. If set, packets containing exclusively + /// ACK or window updates (ie, no data) won't be sent until expiry. + ack_delay_until: Option, + #[cfg(feature = "async")] rx_waker: WakerRegistration, #[cfg(feature = "async")] @@ -397,6 +404,8 @@ impl<'a> TcpSocket<'a> { local_rx_last_ack: None, local_rx_last_seq: None, local_rx_dup_acks: 0, + ack_delay: Some(ACK_DELAY_DEFAULT), + ack_delay_until: None, #[cfg(feature = "async")] rx_waker: WakerRegistration::new(), @@ -453,6 +462,13 @@ impl<'a> TcpSocket<'a> { self.timeout } + /// Return the ACK delay duration. + /// + /// See also the [set_ack_delay](#method.set_ack_delay) method. + pub fn ack_delay(&self) -> Option { + self.ack_delay + } + /// Return the current window field value, including scaling according to RFC 1323. /// /// Used in internal calculations as well as packet generation. @@ -478,6 +494,13 @@ impl<'a> TcpSocket<'a> { self.timeout = duration } + /// Set the ACK delay duration. + /// + /// By default, the ACK delay is set to 10ms. + pub fn set_ack_delay(&mut self, duration: Option) { + self.ack_delay = duration + } + /// Return the keep-alive interval. /// /// See also the [set_keep_alive](#method.set_keep_alive) method. @@ -578,6 +601,8 @@ impl<'a> TcpSocket<'a> { self.remote_win_shift = rx_cap_log2.saturating_sub(16) as u8; self.remote_mss = DEFAULT_MSS; self.remote_last_ts = None; + self.ack_delay = Some(ACK_DELAY_DEFAULT); + self.ack_delay_until = None; #[cfg(feature = "async")] { @@ -1541,6 +1566,30 @@ impl<'a> TcpSocket<'a> { self.assembler); } + // Handle delayed acks + if let Some(ack_delay) = self.ack_delay { + if self.ack_to_transmit() || self.window_to_update() { + self.ack_delay_until = match self.ack_delay_until { + None => { + net_trace!("{}:{}:{}: starting delayed ack timer", + self.meta.handle, self.local_endpoint, self.remote_endpoint + ); + + Some(timestamp + ack_delay) + } + // RFC1122 says "in a stream of full-sized segments there SHOULD be an ACK + // for at least every second segment". + // For now, we send an ACK every second received packet, full-sized or not. + Some(_) => { + net_trace!("{}:{}:{}: delayed ack timer already started, forcing expiry", + self.meta.handle, self.local_endpoint, self.remote_endpoint + ); + None + } + }; + } + } + // Per RFC 5681, we should send an immediate ACK when either: // 1) an out-of-order segment is received, or // 2) a segment arrives that fills in all or part of a gap in sequence space. @@ -1590,6 +1639,13 @@ impl<'a> TcpSocket<'a> { can_data || can_fin } + fn delayed_ack_expired(&self, timestamp: Instant) -> bool { + match self.ack_delay_until { + None => true, + Some(t) => t <= timestamp, + } + } + fn ack_to_transmit(&self) -> bool { if let Some(remote_last_ack) = self.remote_last_ack { remote_last_ack < self.remote_seq_no + self.rx_buffer.len() @@ -1644,11 +1700,11 @@ impl<'a> TcpSocket<'a> { // If we have data to transmit and it fits into partner's window, do it. net_trace!("{}:{}:{}: outgoing segment will send data or flags", self.meta.handle, self.local_endpoint, self.remote_endpoint); - } else if self.ack_to_transmit() { + } else if self.ack_to_transmit() && self.delayed_ack_expired(timestamp) { // If we have data to acknowledge, do it. net_trace!("{}:{}:{}: outgoing segment will acknowledge", self.meta.handle, self.local_endpoint, self.remote_endpoint); - } else if self.window_to_update() { + } else if self.window_to_update() && self.delayed_ack_expired(timestamp) { // If we have window length increase to advertise, do it. net_trace!("{}:{}:{}: outgoing segment will update window", self.meta.handle, self.local_endpoint, self.remote_endpoint); @@ -1812,6 +1868,15 @@ impl<'a> TcpSocket<'a> { // the keep-alive timer. self.timer.rewind_keep_alive(timestamp, self.keep_alive); + // Reset delayed-ack timer + if self.ack_delay_until.is_some() { + net_trace!("{}:{}:{}: stop delayed ack timer", + self.meta.handle, self.local_endpoint, self.remote_endpoint + ); + + self.ack_delay_until = None; + } + // Leave the rest of the state intact if sending a keep-alive packet, since those // carry a fake segment. if is_keep_alive { return Ok(()) } @@ -1851,10 +1916,17 @@ impl<'a> TcpSocket<'a> { } else if self.state == State::Closed { // Socket was aborted, we have an RST packet to transmit. PollAt::Now - } else if self.seq_to_transmit() || self.ack_to_transmit() || self.window_to_update() { + } else if self.seq_to_transmit() { // We have a data or flag packet to transmit. PollAt::Now } else { + let want_ack = self.ack_to_transmit() || self.window_to_update(); + let delayed_ack_poll_at = match (want_ack, self.ack_delay_until) { + (false, _) => PollAt::Ingress, + (true, None) => PollAt::Now, + (true, Some(t)) => PollAt::Time(t), + }; + let timeout_poll_at = match (self.remote_last_ts, self.timeout) { // If we're transmitting or retransmitting data, we need to poll at the moment // when the timeout would expire. @@ -1864,9 +1936,8 @@ impl<'a> TcpSocket<'a> { }; // We wait for the earliest of our timers to fire. - *[self.timer.poll_at(), timeout_poll_at] + *[self.timer.poll_at(), timeout_poll_at, delayed_ack_poll_at] .iter() - .filter(|x| !x.is_ingress()) .min().unwrap_or(&PollAt::Ingress) } } @@ -2076,7 +2147,9 @@ mod test { let rx_buffer = SocketBuffer::new(vec![0; rx_len]); let tx_buffer = SocketBuffer::new(vec![0; tx_len]); - TcpSocket::new(rx_buffer, tx_buffer) + let mut socket = TcpSocket::new(rx_buffer, tx_buffer); + socket.set_ack_delay(None); + socket } fn socket_syn_received_with_buffer_sizes( @@ -5084,6 +5157,119 @@ mod test { assert_eq!(s.recv(|_| (0, ())), Err(Error::Illegal)); } + // =========================================================================================// + // Tests for delayed ACK + // =========================================================================================// + + #[test] + fn test_delayed_ack() { + let mut s = socket_established(); + s.set_ack_delay(Some(ACK_DELAY_DEFAULT)); + send!(s, TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + payload: &b"abc"[..], + ..SEND_TEMPL + }); + + // No ACK is immediately sent. + recv!(s, Err(Error::Exhausted)); + + // After 10ms, it is sent. + recv!(s, time 11, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1 + 3), + window_len: 61, + ..RECV_TEMPL + })); + } + + #[test] + fn test_delayed_ack_win() { + let mut s = socket_established(); + s.set_ack_delay(Some(ACK_DELAY_DEFAULT)); + send!(s, TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + payload: &b"abc"[..], + ..SEND_TEMPL + }); + + // Reading the data off the buffer should cause a window update. + s.recv(|data| { + assert_eq!(data, b"abc"); + (3, ()) + }).unwrap(); + + // However, no ACK or window update is immediately sent. + recv!(s, Err(Error::Exhausted)); + + // After 10ms, it is sent. + recv!(s, time 11, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1 + 3), + ..RECV_TEMPL + })); + } + + #[test] + fn test_delayed_ack_reply() { + let mut s = socket_established(); + s.set_ack_delay(Some(ACK_DELAY_DEFAULT)); + send!(s, TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + payload: &b"abc"[..], + ..SEND_TEMPL + }); + + s.recv(|data| { + assert_eq!(data, b"abc"); + (3, ()) + }).unwrap(); + + s.send_slice(&b"xyz"[..]).unwrap(); + + // Writing data to the socket causes ACK to not be delayed, + // because it is immediately sent with the data. + recv!(s, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1 + 3), + payload: &b"xyz"[..], + ..RECV_TEMPL + })); + } + + #[test] + fn test_delayed_ack_every_second_packet() { + let mut s = socket_established(); + s.set_ack_delay(Some(ACK_DELAY_DEFAULT)); + send!(s, TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + payload: &b"abc"[..], + ..SEND_TEMPL + }); + + // No ACK is immediately sent. + recv!(s, Err(Error::Exhausted)); + + send!(s, TcpRepr { + seq_number: REMOTE_SEQ + 1 + 3, + ack_number: Some(LOCAL_SEQ + 1), + payload: &b"def"[..], + ..SEND_TEMPL + }); + + // Every 2nd packet, ACK is sent without delay. + recv!(s, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1 + 6), + window_len: 58, + ..RECV_TEMPL + })); + } + // =========================================================================================// // Tests for packet filtering. // =========================================================================================//