From 88016e675ffda29ee9375c29b02bba78b439cc36 Mon Sep 17 00:00:00 2001 From: jhwgh1968 Date: Sun, 19 Aug 2018 15:32:27 -0500 Subject: [PATCH] Fully implement TCP Window Scaling Closes: #253 Approved by: whitequark --- README.md | 2 +- benches/bench.rs | 1 + src/socket/tcp.rs | 198 +++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 189 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index c19c5f9..e121bd6 100644 --- a/README.md +++ b/README.md @@ -99,13 +99,13 @@ The TCP protocol is supported over IPv4, and server and client TCP sockets are a * Header checksum is generated and validated. * Maximum segment size is negotiated. + * Window scaling is negotiated. * Multiple packets are transmitted without waiting for an acknowledgement. * Reassembly of out-of-order segments is supported, with no more than 4 gaps in sequence space. * Keep-alive packets may be sent at a configurable interval. * Retransmission timeout starts at a fixed interval of 100 ms and doubles every time. * Time-wait timeout has a fixed interval of 10 s. * User timeout has a configurable interval. - * Window scaling is **not** supported, and the maximum buffer size is 65536. * Selective acknowledgements are **not** implemented. * Delayed acknowledgements are **not** implemented. * Silly window syndrome avoidance is **not** implemented. diff --git a/benches/bench.rs b/benches/bench.rs index a1fa436..4aed8ce 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -39,6 +39,7 @@ mod wire { window_len: 0x0123, control: TcpControl::Syn, max_seg_size: None, + window_scale: None, payload: &PAYLOAD_BYTES }; let mut bytes = vec![0xa5; repr.buffer_len()]; diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 2acdea7..7ddfbbd 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -2,7 +2,7 @@ // the parts of RFC 1122 that discuss TCP. Consult RFC 7414 when implementing // a new feature. -use core::{cmp, fmt}; +use core::{cmp, fmt, mem}; use {Error, Result}; use phy::DeviceCapabilities; @@ -226,6 +226,9 @@ pub struct TcpSocket<'a> { remote_last_ack: Option, /// The last window length sent. remote_last_win: u16, + /// The sending window scaling factor advertised to remotes which support RFC 1323. + /// It is zero if the window <= 64KiB and/or the remote does not support it. + remote_win_shift: u8, /// The speculative remote window size. /// I.e. the actual remote window size minus the count of in-flight octets. remote_win_len: usize, @@ -245,14 +248,22 @@ pub struct TcpSocket<'a> { const DEFAULT_MSS: usize = 536; impl<'a> TcpSocket<'a> { + #[allow(unused_comparisons)] // small usize platforms always pass rx_capacity check /// Create a socket using the given buffers. pub fn new(rx_buffer: T, tx_buffer: T) -> TcpSocket<'a> where T: Into> { let (rx_buffer, tx_buffer) = (rx_buffer.into(), tx_buffer.into()); - if rx_buffer.capacity() > ::max_value() as usize { - panic!("buffers larger than {} require window scaling, which is not implemented", - ::max_value()) + let rx_capacity = rx_buffer.capacity(); + + // From RFC 1323: + // [...] the above constraints imply that 2 * the max window size must be less + // than 2**31 [...] Thus, the shift count must be limited to 14 (which allows + // windows of 2**30 = 1 Gbyte). + if rx_capacity > (1 << 30) { + panic!("receiving buffer too large, cannot exceed 1 GiB") } + let rx_cap_log2 = mem::size_of::() * 8 - + rx_capacity.leading_zeros() as usize; TcpSocket { meta: SocketMeta::default(), @@ -273,6 +284,7 @@ impl<'a> TcpSocket<'a> { remote_last_ack: None, remote_last_win: 0, remote_win_len: 0, + remote_win_shift: rx_cap_log2.saturating_sub(16) as u8, remote_win_scale: None, remote_mss: DEFAULT_MSS, remote_last_ts: None, @@ -294,6 +306,16 @@ impl<'a> TcpSocket<'a> { self.timeout } + /// Return the current window field value, including scaling according to RFC 1323. + /// + /// Used in internal calculations as well as packet generation. + /// + #[inline] + fn scaled_window(&self) -> u16 { + cmp::min(self.rx_buffer.window() >> self.remote_win_shift as usize, + (1 << 16) - 1) as u16 + } + /// Set the timeout duration. /// /// A socket with a timeout duration set will abort the connection if either of the following @@ -383,6 +405,9 @@ impl<'a> TcpSocket<'a> { } fn reset(&mut self) { + let rx_cap_log2 = mem::size_of::() * 8 - + self.rx_buffer.capacity().leading_zeros() as usize; + self.state = State::Closed; self.timer = Timer::default(); self.assembler = Assembler::new(self.rx_buffer.capacity()); @@ -401,6 +426,7 @@ impl<'a> TcpSocket<'a> { self.remote_last_win = 0; self.remote_win_len = 0; self.remote_win_scale = None; + self.remote_win_shift = rx_cap_log2.saturating_sub(16) as u8; self.remote_mss = DEFAULT_MSS; self.remote_last_ts = None; } @@ -811,7 +837,11 @@ impl<'a> TcpSocket<'a> { // to be received. reply_repr.seq_number = self.remote_last_seq; reply_repr.ack_number = self.remote_last_ack; - reply_repr.window_len = self.rx_buffer.window() as u16; + + // From RFC 1323: + // The window field [...] of every outgoing segment, with the exception of SYN + // segments, is right-shifted by [advertised scale value] bits[...] + reply_repr.window_len = self.scaled_window(); self.remote_last_win = reply_repr.window_len; (ip_reply_repr, reply_repr) @@ -1030,6 +1060,10 @@ impl<'a> TcpSocket<'a> { self.remote_mss = max_seg_size as usize } self.remote_win_scale = repr.window_scale; + // No window scaling means don't do any window shifting + if self.remote_win_scale.is_none() { + self.remote_win_shift = 0; + } self.set_state(State::SynReceived); self.timer.set_for_idle(timestamp, self.keep_alive); } @@ -1296,7 +1330,8 @@ impl<'a> TcpSocket<'a> { } fn window_to_update(&self) -> bool { - self.rx_buffer.window() as u16 > self.remote_last_win + (self.rx_buffer.window() >> self.remote_win_shift) as u16 > + self.remote_last_win } pub(crate) fn dispatch(&mut self, timestamp: Instant, caps: &DeviceCapabilities, @@ -1384,7 +1419,7 @@ impl<'a> TcpSocket<'a> { control: TcpControl::None, seq_number: self.remote_last_seq, ack_number: Some(self.remote_seq_no + self.rx_buffer.len()), - window_len: self.rx_buffer.window() as u16, + window_len: self.scaled_window(), window_scale: None, max_seg_size: None, payload: &[] @@ -1406,9 +1441,10 @@ impl<'a> TcpSocket<'a> { repr.control = TcpControl::Syn; if self.state == State::SynSent { repr.ack_number = None; - repr.window_scale = Some(0); + repr.window_scale = Some(self.remote_win_shift); } else { - repr.window_scale = self.remote_win_scale.map(|_| 0); + repr.window_scale = self.remote_win_scale.map( + |_| self.remote_win_shift); } } @@ -1578,6 +1614,7 @@ impl<'a> fmt::Write for TcpSocket<'a> { #[cfg(test)] mod test { use core::i32; + use std::vec::Vec; use wire::{IpAddress, IpRepr, IpCidr}; use wire::ip::test::{MOCK_IP_ADDR_1, MOCK_IP_ADDR_2, MOCK_IP_ADDR_3, MOCK_UNSPECIFIED}; use super::*; @@ -1746,11 +1783,15 @@ mod test { } fn socket() -> TcpSocket<'static> { + socket_with_buffer_sizes(64, 64) + } + + fn socket_with_buffer_sizes(tx_len: usize, rx_len: usize) -> TcpSocket<'static> { #[cfg(feature = "log")] init_logger(); - let rx_buffer = SocketBuffer::new(vec![0; 64]); - let tx_buffer = SocketBuffer::new(vec![0; 64]); + let rx_buffer = SocketBuffer::new(vec![0; rx_len]); + let tx_buffer = SocketBuffer::new(vec![0; tx_len]); TcpSocket::new(rx_buffer, tx_buffer) } @@ -1895,6 +1936,46 @@ mod test { s } + #[test] + fn test_listen_syn_win_scale_buffers() { + for (buffer_size, shift_amt) in &[ + (64, 0), + (128, 0), + (1024, 0), + (65535, 0), + (65536, 1), + (65537, 1), + (131071, 1), + (131072, 2), + (524287, 3), + (524288, 4), + (655350, 4), + (1048576, 5), + ] { + let mut s = socket_with_buffer_sizes(64, *buffer_size); + s.state = State::Listen; + s.local_endpoint = IpEndpoint::new(IpAddress::default(), LOCAL_PORT); + assert_eq!(s.remote_win_shift, *shift_amt); + send!(s, TcpRepr { + control: TcpControl::Syn, + seq_number: REMOTE_SEQ, + ack_number: None, + window_scale: Some(0), + ..SEND_TEMPL + }); + assert_eq!(s.remote_win_shift, *shift_amt); + recv!(s, [TcpRepr { + control: TcpControl::Syn, + seq_number: LOCAL_SEQ, + ack_number: Some(REMOTE_SEQ + 1), + max_seg_size: Some(BASE_MSS), + window_scale: Some(*shift_amt), + window_len: cmp::min(*buffer_size >> *shift_amt, 65535) as u16, + ..RECV_TEMPL + }]); + } + } + #[test] fn test_listen_sanity() { let mut s = socket(); @@ -2064,6 +2145,38 @@ mod test { assert_eq!(s.remote_win_scale, None); } + #[test] + fn test_syn_received_window_scaling() { + for scale in 0..14 { + let mut s = socket_listen(); + send!(s, TcpRepr { + control: TcpControl::Syn, + seq_number: REMOTE_SEQ, + ack_number: None, + window_scale: Some(scale), + ..SEND_TEMPL + }); + assert_eq!(s.state(), State::SynReceived); + assert_eq!(s.local_endpoint(), LOCAL_END); + assert_eq!(s.remote_endpoint(), REMOTE_END); + recv!(s, [TcpRepr { + control: TcpControl::Syn, + seq_number: LOCAL_SEQ, + ack_number: Some(REMOTE_SEQ + 1), + max_seg_size: Some(BASE_MSS), + window_scale: Some(0), + ..RECV_TEMPL + }]); + send!(s, TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + window_scale: None, + ..SEND_TEMPL + }); + assert_eq!(s.remote_win_scale, Some(scale)); + } + } + #[test] fn test_syn_received_close() { let mut s = socket_syn_received(); @@ -2220,6 +2333,36 @@ mod test { assert_eq!(s.state, State::Closed); } + #[test] + fn test_syn_sent_win_scale_buffers() { + for (buffer_size, shift_amt) in &[ + (64, 0), + (128, 0), + (1024, 0), + (65535, 0), + (65536, 1), + (65537, 1), + (131071, 1), + (131072, 2), + (524287, 3), + (524288, 4), + (655350, 4), + (1048576, 5), + ] { + let mut s = socket_with_buffer_sizes(64, *buffer_size); + assert_eq!(s.remote_win_shift, *shift_amt); + s.connect(REMOTE_END, LOCAL_END).unwrap(); + recv!(s, [TcpRepr { + control: TcpControl::Syn, + ack_number: None, + max_seg_size: Some(BASE_MSS), + window_scale: Some(*shift_amt), + window_len: cmp::min(*buffer_size >> *shift_amt, 65535) as u16, + ..RECV_TEMPL + }]); + } + } + // =========================================================================================// // Tests for the ESTABLISHED state. // =========================================================================================// @@ -2242,6 +2385,39 @@ mod test { assert_eq!(s.rx_buffer.dequeue_many(6), &b"abcdef"[..]); } + #[test] + fn test_established_sliding_window_recv() { + let mut s = socket_established(); + // Update our scaling parameters for a TCP with a scaled buffer. + assert_eq!(s.rx_buffer.len(), 0); + s.rx_buffer = SocketBuffer::new(vec![0; 262143]); + s.assembler = Assembler::new(s.rx_buffer.capacity()); + s.remote_win_scale = Some(0); + s.remote_last_win = 65535; + s.remote_win_shift = 2; + + // Create a TCP segment that will mostly fill an IP frame. + let mut segment: Vec = Vec::with_capacity(1400); + for _ in 0..100 { segment.extend_from_slice(b"abcdefghijklmn") } + assert_eq!(segment.len(), 1400); + + // Send the frame + send!(s, TcpRepr { + seq_number: REMOTE_SEQ + 1, + ack_number: Some(LOCAL_SEQ + 1), + payload: &segment, + ..SEND_TEMPL + }); + + // Ensure that the received window size is shifted right by 2. + recv!(s, [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1 + 1400), + window_len: 65185, + ..RECV_TEMPL + }]); + } + #[test] fn test_established_send() { let mut s = socket_established();