diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index 7ddfbbd..d9e8078 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -234,10 +234,14 @@ pub struct TcpSocket<'a> { remote_win_len: usize, /// The receive window scaling factor for remotes which support RFC 1323, None if unsupported. remote_win_scale: Option, + /// Whether or not the remote supports selective ACK as described in RFC 2018. + remote_has_sack: bool, /// The maximum number of data octets that the remote side may receive. remote_mss: usize, /// The timestamp of the last packet received. remote_last_ts: Option, + /// The sequence number of the last packet recived, used for sACK + local_rx_last_seq: Option, /// The ACK number of the last packet recived. local_rx_last_ack: Option, /// The number of packets recived directly after @@ -286,9 +290,11 @@ impl<'a> TcpSocket<'a> { remote_win_len: 0, remote_win_shift: rx_cap_log2.saturating_sub(16) as u8, remote_win_scale: None, + remote_has_sack: false, remote_mss: DEFAULT_MSS, remote_last_ts: None, local_rx_last_ack: None, + local_rx_last_seq: None, local_rx_dup_acks: 0, } } @@ -800,6 +806,8 @@ impl<'a> TcpSocket<'a> { window_len: 0, window_scale: None, max_seg_size: None, + sack_permitted: false, + sack_ranges: [None, None, None], payload: &[] }; let ip_reply_repr = IpRepr::Unspecified { @@ -829,7 +837,7 @@ impl<'a> TcpSocket<'a> { } fn ack_reply(&mut self, ip_repr: &IpRepr, repr: &TcpRepr) -> (IpRepr, TcpRepr<'static>) { - let (ip_reply_repr, mut reply_repr) = Self::reply(ip_repr, repr); + let (mut ip_reply_repr, mut reply_repr) = Self::reply(ip_repr, repr); // From RFC 793: // [...] an empty acknowledgment segment containing the current send-sequence number @@ -844,6 +852,42 @@ impl<'a> TcpSocket<'a> { reply_repr.window_len = self.scaled_window(); self.remote_last_win = reply_repr.window_len; + // If the remote supports selective acknowledgement, add the option to the outgoing + // segment. + if self.remote_has_sack { + net_debug!("sending sACK option with current assembler ranges"); + + // RFC 2018: The first SACK block (i.e., the one immediately following the kind and + // length fields in the option) MUST specify the contiguous block of data containing + // the segment which triggered this ACK, unless that segment advanced the + // Acknowledgment Number field in the header. + reply_repr.sack_ranges[0] = None; + + if let Some(last_seg_seq) = self.local_rx_last_seq.map(|s| s.0 as u32) { + reply_repr.sack_ranges[0] = self.assembler.iter_data( + reply_repr.ack_number.map(|s| s.0 as usize).unwrap_or(0)) + .map(|(left, right)| (left as u32, right as u32)) + .skip_while(|(left, right)| *left > last_seg_seq || *right < last_seg_seq) + .next(); + } + + if reply_repr.sack_ranges[0].is_none() { + // The matching segment was removed from the assembler, meaning the acknowledgement + // number has advanced, or there was no previous sACK. + // + // While the RFC says we SHOULD keep a list of reported sACK ranges, and iterate + // through those, that is currently infeasable. Instead, we offer the range with + // the lowest sequence number (if one exists) to hint at what segments would + // most quickly advance the acknowledgement number. + reply_repr.sack_ranges[0] = self.assembler.iter_data( + reply_repr.ack_number.map(|s| s.0 as usize).unwrap_or(0)) + .map(|(left, right)| (left as u32, right as u32)) + .next(); + } + } + + // Since the sACK option may have changed the length of the payload, update that. + ip_reply_repr.set_payload_len(reply_repr.buffer_len()); (ip_reply_repr, reply_repr) } @@ -975,6 +1019,7 @@ impl<'a> TcpSocket<'a> { if segment_in_window { // We've checked that segment_start >= window_start above. payload_offset = (segment_start - window_start) as usize; + self.local_rx_last_seq = Some(repr.seq_number); } else { // If we're in the TIME-WAIT state, restart the TIME-WAIT timeout, since // the remote end may not have realized we've closed the connection. @@ -1056,6 +1101,7 @@ impl<'a> TcpSocket<'a> { self.local_seq_no = TcpSeqNumber(-repr.seq_number.0); self.remote_seq_no = repr.seq_number + 1; self.remote_last_seq = self.local_seq_no; + self.remote_has_sack = repr.sack_permitted; if let Some(max_seg_size) = repr.max_seg_size { self.remote_mss = max_seg_size as usize } @@ -1422,6 +1468,8 @@ impl<'a> TcpSocket<'a> { window_len: self.scaled_window(), window_scale: None, max_seg_size: None, + sack_permitted: false, + sack_ranges: [None, None, None], payload: &[] }; @@ -1442,7 +1490,9 @@ impl<'a> TcpSocket<'a> { if self.state == State::SynSent { repr.ack_number = None; repr.window_scale = Some(self.remote_win_shift); + repr.sack_permitted = true; } else { + repr.sack_permitted = self.remote_has_sack; repr.window_scale = self.remote_win_scale.map( |_| self.remote_win_shift); } @@ -1641,6 +1691,8 @@ mod test { seq_number: TcpSeqNumber(0), ack_number: Some(TcpSeqNumber(0)), window_len: 256, window_scale: None, max_seg_size: None, + sack_permitted: false, + sack_ranges: [None, None, None], payload: &[] }; const _RECV_IP_TEMPL: IpRepr = IpRepr::Unspecified { @@ -1654,6 +1706,8 @@ mod test { seq_number: TcpSeqNumber(0), ack_number: Some(TcpSeqNumber(0)), window_len: 64, window_scale: None, max_seg_size: None, + sack_permitted: false, + sack_ranges: [None, None, None], payload: &[] }; @@ -1795,8 +1849,11 @@ mod test { TcpSocket::new(rx_buffer, tx_buffer) } - fn socket_syn_received() -> TcpSocket<'static> { - let mut s = socket(); + fn socket_syn_received_with_buffer_sizes( + tx_len: usize, + rx_len: usize + ) -> TcpSocket<'static> { + let mut s = socket_with_buffer_sizes(tx_len, rx_len); s.state = State::SynReceived; s.local_endpoint = LOCAL_END; s.remote_endpoint = REMOTE_END; @@ -1807,6 +1864,10 @@ mod test { s } + fn socket_syn_received() -> TcpSocket<'static> { + socket_syn_received_with_buffer_sizes(64, 64) + } + fn socket_syn_sent() -> TcpSocket<'static> { let mut s = socket(); s.state = State::SynSent; @@ -1817,8 +1878,8 @@ mod test { s } - fn socket_established() -> TcpSocket<'static> { - let mut s = socket_syn_received(); + fn socket_established_with_buffer_sizes(tx_len: usize, rx_len: usize) -> TcpSocket<'static> { + let mut s = socket_syn_received_with_buffer_sizes(tx_len, rx_len); s.state = State::Established; s.local_seq_no = LOCAL_SEQ + 1; s.remote_last_seq = LOCAL_SEQ + 1; @@ -1827,6 +1888,10 @@ mod test { s } + fn socket_established() -> TcpSocket<'static> { + socket_established_with_buffer_sizes(64, 64) + } + fn socket_fin_wait_1() -> TcpSocket<'static> { let mut s = socket_established(); s.state = State::FinWait1; @@ -1936,6 +2001,44 @@ mod test { s } + #[test] + fn test_listen_sack_option() { + let mut s = socket_listen(); + send!(s, TcpRepr { + control: TcpControl::Syn, + seq_number: REMOTE_SEQ, + ack_number: None, + sack_permitted: false, + ..SEND_TEMPL + }); + assert!(!s.remote_has_sack); + recv!(s, [TcpRepr { + control: TcpControl::Syn, + seq_number: LOCAL_SEQ, + ack_number: Some(REMOTE_SEQ + 1), + max_seg_size: Some(BASE_MSS), + ..RECV_TEMPL + }]); + + let mut s = socket_listen(); + send!(s, TcpRepr { + control: TcpControl::Syn, + seq_number: REMOTE_SEQ, + ack_number: None, + sack_permitted: true, + ..SEND_TEMPL + }); + assert!(s.remote_has_sack); + recv!(s, [TcpRepr { + control: TcpControl::Syn, + seq_number: LOCAL_SEQ, + ack_number: Some(REMOTE_SEQ + 1), + max_seg_size: Some(BASE_MSS), + sack_permitted: true, + ..RECV_TEMPL + }]); + } + #[test] fn test_listen_syn_win_scale_buffers() { for (buffer_size, shift_amt) in &[ @@ -2213,6 +2316,7 @@ mod test { ack_number: None, max_seg_size: Some(BASE_MSS), window_scale: Some(0), + sack_permitted: true, ..RECV_TEMPL }]); send!(s, TcpRepr { @@ -2270,6 +2374,7 @@ mod test { ack_number: None, max_seg_size: Some(BASE_MSS), window_scale: Some(0), + sack_permitted: true, ..RECV_TEMPL }]); send!(s, TcpRepr { @@ -2358,6 +2463,7 @@ mod test { max_seg_size: Some(BASE_MSS), window_scale: Some(*shift_amt), window_len: cmp::min(*buffer_size >> *shift_amt, 65535) as u16, + sack_permitted: true, ..RECV_TEMPL }]); } @@ -2385,6 +2491,88 @@ mod test { assert_eq!(s.rx_buffer.dequeue_many(6), &b"abcdef"[..]); } + fn setup_rfc2018_cases() -> (TcpSocket<'static>, Vec) { + // This is a utility function used by the tests for RFC 2018 cases. It configures a socket + // in a particular way suitable for those cases. + // + // RFC 2018: Assume the left window edge is 5000 and that the data transmitter sends [...] + // segments, each containing 500 data bytes. + let mut s = socket_established_with_buffer_sizes(4000, 4000); + s.remote_has_sack = true; + + // create a segment that is 500 bytes long + let mut segment: Vec = Vec::with_capacity(500); + + // move the last ack to 5000 by sending ten of them + for _ in 0..50 { segment.extend_from_slice(b"abcdefghij") } + for offset in (0..5000).step_by(500) { + send!(s, TcpRepr { + seq_number: REMOTE_SEQ + 1 + offset, + ack_number: Some(LOCAL_SEQ + 1), + payload: &segment, + ..SEND_TEMPL + }); + recv!(s, [TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1 + offset + 500), + window_len: 3500, + ..RECV_TEMPL + }]); + s.recv(|data| { + assert_eq!(data.len(), 500); + assert_eq!(data, segment.as_slice()); + (500, ()) + }).unwrap(); + } + assert_eq!(s.remote_last_win, 3500); + (s, segment) + } + + #[test] + fn test_established_rfc2018_cases() { + // This test case verifies the exact scenarios described on pages 8-9 of RFC 2018. Please + // ensure its behavior does not deviate from those scenarios. + + let (mut s, segment) = setup_rfc2018_cases(); + // RFC 2018: + // + // Case 2: The first segment is dropped but the remaining 7 are received. + // + // Upon receiving each of the last seven packets, the data receiver will return a TCP ACK + // segment that acknowledges sequence number 5000 and contains a SACK option specifying one + // block of queued data: + // + // Triggering ACK Left Edge Right Edge + // Segment + // + // 5000 (lost) + // 5500 5000 5500 6000 + // 6000 5000 5500 6500 + // 6500 5000 5500 7000 + // 7000 5000 5500 7500 + // 7500 5000 5500 8000 + // 8000 5000 5500 8500 + // 8500 5000 5500 9000 + // + for offset in (500..3500).step_by(500) { + send!(s, TcpRepr { + seq_number: REMOTE_SEQ + 1 + offset + 5000, + ack_number: Some(LOCAL_SEQ + 1), + payload: &segment, + ..SEND_TEMPL + }, Ok(Some(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1 + 5000), + window_len: 4000, + sack_ranges: [ + Some((REMOTE_SEQ.0 as u32 + 1 + 5500, + REMOTE_SEQ.0 as u32 + 1 + 5500 + offset as u32)), + None, None], + ..RECV_TEMPL + }))); + } + } + #[test] fn test_established_sliding_window_recv() { let mut s = socket_established(); @@ -3987,6 +4175,7 @@ mod test { ack_number: None, max_seg_size: Some(BASE_MSS), window_scale: Some(0), + sack_permitted: true, ..RECV_TEMPL })); assert_eq!(s.state, State::SynSent); diff --git a/src/wire/tcp.rs b/src/wire/tcp.rs index 7b7fe85..c239729 100644 --- a/src/wire/tcp.rs +++ b/src/wire/tcp.rs @@ -103,6 +103,8 @@ mod field { pub const OPT_NOP: u8 = 0x01; pub const OPT_MSS: u8 = 0x02; pub const OPT_WS: u8 = 0x03; + pub const OPT_SACKPERM: u8 = 0x04; + pub const OPT_SACKRNG: u8 = 0x05; } impl> Packet { @@ -288,6 +290,44 @@ impl> Packet { length } + /// Returns whether the selective acknowledgement SYN flag is set or not. + pub fn selective_ack_permitted(&self) -> Result { + let data = self.buffer.as_ref(); + let mut options = &data[field::OPTIONS(self.header_len())]; + while options.len() > 0 { + let (next_options, option) = TcpOption::parse(options)?; + match option { + TcpOption::SackPermitted => { + return Ok(true); + }, + _ => {}, + } + options = next_options; + } + Ok(false) + } + + /// Return the selective acknowledgement ranges, if any. If there are none in the packet, an + /// array of ``None`` values will be returned. + /// + pub fn selective_ack_ranges<'s>( + &'s self + ) -> Result<[Option<(u32, u32)>; 3]> { + let data = self.buffer.as_ref(); + let mut options = &data[field::OPTIONS(self.header_len())]; + while options.len() > 0 { + let (next_options, option) = TcpOption::parse(options)?; + match option { + TcpOption::SackRange(slice) => { + return Ok(slice); + }, + _ => {}, + } + options = next_options; + } + Ok([None, None, None]) + } + /// Validate the packet checksum. /// /// # Panics @@ -523,6 +563,8 @@ pub enum TcpOption<'a> { NoOperation, MaxSegmentSize(u16), WindowScale(u8), + SackPermitted, + SackRange([Option<(u32, u32)>; 3]), Unknown { kind: u8, data: &'a [u8] } } @@ -553,6 +595,47 @@ impl<'a> TcpOption<'a> { option = TcpOption::WindowScale(data[0]), (field::OPT_WS, _) => return Err(Error::Malformed), + (field::OPT_SACKPERM, 2) => + option = TcpOption::SackPermitted, + (field::OPT_SACKPERM, _) => + return Err(Error::Malformed), + (field::OPT_SACKRNG, n) => { + if n < 10 || (n-2) % 8 != 0 { + return Err(Error::Malformed) + } + if n > 26 { + // It's possible for a remote to send 4 SACK blocks, but extremely rare. + // Better to "lose" that 4th block and save the extra RAM and CPU + // cycles in the vastly more common case. + // + // RFC 2018: SACK option that specifies n blocks will have a length of + // 8*n+2 bytes, so the 40 bytes available for TCP options can specify a + // maximum of 4 blocks. It is expected that SACK will often be used in + // conjunction with the Timestamp option used for RTTM [...] thus a + // maximum of 3 SACK blocks will be allowed in this case. + net_debug!("sACK with >3 blocks, truncating to 3"); + } + let mut sack_ranges: [Option<(u32, u32)>; 3] = [None; 3]; + + // RFC 2018: Each contiguous block of data queued at the data receiver is + // defined in the SACK option by two 32-bit unsigned integers in network + // byte order[...] + sack_ranges.iter_mut().enumerate().for_each(|(i, nmut)| { + let left = i * 8; + *nmut = if left < data.len() { + let mid = left + 4; + let right = mid + 4; + let range_left = NetworkEndian::read_u32( + &data[left..mid]); + let range_right = NetworkEndian::read_u32( + &data[mid..right]); + Some((range_left, range_right)) + } else { + None + }; + }); + option = TcpOption::SackRange(sack_ranges); + }, (_, _) => option = TcpOption::Unknown { kind: kind, data: data } } @@ -567,6 +650,8 @@ impl<'a> TcpOption<'a> { &TcpOption::NoOperation => 1, &TcpOption::MaxSegmentSize(_) => 4, &TcpOption::WindowScale(_) => 3, + &TcpOption::SackPermitted => 2, + &TcpOption::SackRange(s) => s.iter().filter(|s| s.is_some()).count() * 8 + 2, &TcpOption::Unknown { data, .. } => 2 + data.len() } } @@ -600,6 +685,18 @@ impl<'a> TcpOption<'a> { buffer[0] = field::OPT_WS; buffer[2] = value; } + &TcpOption::SackPermitted => { + buffer[0] = field::OPT_SACKPERM; + } + &TcpOption::SackRange(slice) => { + buffer[0] = field::OPT_SACKRNG; + slice.iter().filter(|s| s.is_some()).enumerate().for_each(|(i, s)| { + let (first, second) = *s.as_ref().unwrap(); + let pos = i * 8 + 2; + NetworkEndian::write_u32(&mut buffer[pos..], first); + NetworkEndian::write_u32(&mut buffer[pos+4..], second); + }); + } &TcpOption::Unknown { kind, data: provided } => { buffer[0] = kind; buffer[2..].copy_from_slice(provided) @@ -650,6 +747,8 @@ pub struct Repr<'a> { pub window_len: u16, pub window_scale: Option, pub max_seg_size: Option, + pub sack_permitted: bool, + pub sack_ranges: [Option<(u32, u32)>; 3], pub payload: &'a [u8] } @@ -688,6 +787,8 @@ impl<'a> Repr<'a> { let mut max_seg_size = None; let mut window_scale = None; let mut options = packet.options(); + let mut sack_permitted = false; + let mut sack_ranges = [None, None, None]; while options.len() > 0 { let (next_options, option) = TcpOption::parse(options)?; match option { @@ -706,7 +807,11 @@ impl<'a> Repr<'a> { } else { Some(value) }; - } + }, + TcpOption::SackPermitted => + sack_permitted = true, + TcpOption::SackRange(slice) => + sack_ranges = slice, _ => (), } options = next_options; @@ -721,6 +826,8 @@ impl<'a> Repr<'a> { window_len: packet.window_len(), window_scale: window_scale, max_seg_size: max_seg_size, + sack_permitted: sack_permitted, + sack_ranges: sack_ranges, payload: packet.payload() }) } @@ -737,6 +844,15 @@ impl<'a> Repr<'a> { if self.window_scale.is_some() { length += 3 } + if self.sack_permitted { + length += 2; + } + let sack_range_len: usize = self.sack_ranges.iter().map( + |o| o.map(|_| 8).unwrap_or(0) + ).sum(); + if sack_range_len > 0 { + length += sack_range_len + 2; + } if length % 4 != 0 { length += 4 - length % 4; } @@ -783,6 +899,12 @@ impl<'a> Repr<'a> { if let Some(value) = self.max_seg_size { let tmp = options; options = TcpOption::MaxSegmentSize(value).emit(tmp); } + if self.sack_permitted { + let tmp = options; options = TcpOption::SackPermitted.emit(tmp); + } else if self.ack_number.is_some() && self.sack_ranges.iter().any(|s| s.is_some()) { + let tmp = options; options = TcpOption::SackRange(self.sack_ranges).emit(tmp); + } + if options.len() > 0 { TcpOption::EndOfList.emit(options); } @@ -850,6 +972,10 @@ impl<'a, T: AsRef<[u8]> + ?Sized> fmt::Display for Packet<&'a T> { write!(f, " mss={}", value)?, TcpOption::WindowScale(value) => write!(f, " ws={}", value)?, + TcpOption::SackPermitted => + write!(f, " sACK")?, + TcpOption::SackRange(slice) => + write!(f, " sACKr{:?}", slice)?, // debug print conveniently includes the []s TcpOption::Unknown { kind, .. } => write!(f, " opt({})", kind)?, } @@ -1008,6 +1134,8 @@ mod test { window_scale: None, control: Control::Syn, max_seg_size: None, + sack_permitted: false, + sack_ranges: [None, None, None], payload: &PAYLOAD_BYTES } } @@ -1041,7 +1169,7 @@ mod test { macro_rules! assert_option_parses { ($opt:expr, $data:expr) => ({ assert_eq!(TcpOption::parse($data), Ok((&[][..], $opt))); - let buffer = &mut [0; 20][..$opt.buffer_len()]; + let buffer = &mut [0; 40][..$opt.buffer_len()]; assert_eq!($opt.emit(buffer), &mut []); assert_eq!(&*buffer, $data); }) @@ -1057,6 +1185,22 @@ mod test { &[0x02, 0x04, 0x05, 0xdc]); assert_option_parses!(TcpOption::WindowScale(12), &[0x03, 0x03, 0x0c]); + assert_option_parses!(TcpOption::SackPermitted, + &[0x4, 0x02]); + assert_option_parses!(TcpOption::SackRange([Some((500, 1500)), None, None]), + &[0x05, 0x0a, + 0x00, 0x00, 0x01, 0xf4, 0x00, 0x00, 0x05, 0xdc]); + assert_option_parses!(TcpOption::SackRange([Some((875, 1225)), Some((1500, 2500)), None]), + &[0x05, 0x12, + 0x00, 0x00, 0x03, 0x6b, 0x00, 0x00, 0x04, 0xc9, + 0x00, 0x00, 0x05, 0xdc, 0x00, 0x00, 0x09, 0xc4]); + assert_option_parses!(TcpOption::SackRange([Some((875000, 1225000)), + Some((1500000, 2500000)), + Some((876543210, 876654320))]), + &[0x05, 0x1a, + 0x00, 0x0d, 0x59, 0xf8, 0x00, 0x12, 0xb1, 0x28, + 0x00, 0x16, 0xe3, 0x60, 0x00, 0x26, 0x25, 0xa0, + 0x34, 0x3e, 0xfc, 0xea, 0x34, 0x40, 0xae, 0xf0]); assert_option_parses!(TcpOption::Unknown { kind: 12, data: &[1, 2, 3][..] }, &[0x0c, 0x05, 0x01, 0x02, 0x03]) }