Minimal Implementation of TCP Selective Acknowledgement

Closes: #266
Approved by: whitequark
This commit is contained in:
jhwgh1968 2018-12-31 14:45:20 -06:00 committed by Homu
parent 04c9518dd2
commit e867832214
2 changed files with 340 additions and 7 deletions

View File

@ -234,10 +234,14 @@ pub struct TcpSocket<'a> {
remote_win_len: usize,
/// The receive window scaling factor for remotes which support RFC 1323, None if unsupported.
remote_win_scale: Option<u8>,
/// Whether or not the remote supports selective ACK as described in RFC 2018.
remote_has_sack: bool,
/// The maximum number of data octets that the remote side may receive.
remote_mss: usize,
/// The timestamp of the last packet received.
remote_last_ts: Option<Instant>,
/// The sequence number of the last packet recived, used for sACK
local_rx_last_seq: Option<TcpSeqNumber>,
/// The ACK number of the last packet recived.
local_rx_last_ack: Option<TcpSeqNumber>,
/// The number of packets recived directly after
@ -286,9 +290,11 @@ impl<'a> TcpSocket<'a> {
remote_win_len: 0,
remote_win_shift: rx_cap_log2.saturating_sub(16) as u8,
remote_win_scale: None,
remote_has_sack: false,
remote_mss: DEFAULT_MSS,
remote_last_ts: None,
local_rx_last_ack: None,
local_rx_last_seq: None,
local_rx_dup_acks: 0,
}
}
@ -800,6 +806,8 @@ impl<'a> TcpSocket<'a> {
window_len: 0,
window_scale: None,
max_seg_size: None,
sack_permitted: false,
sack_ranges: [None, None, None],
payload: &[]
};
let ip_reply_repr = IpRepr::Unspecified {
@ -829,7 +837,7 @@ impl<'a> TcpSocket<'a> {
}
fn ack_reply(&mut self, ip_repr: &IpRepr, repr: &TcpRepr) -> (IpRepr, TcpRepr<'static>) {
let (ip_reply_repr, mut reply_repr) = Self::reply(ip_repr, repr);
let (mut ip_reply_repr, mut reply_repr) = Self::reply(ip_repr, repr);
// From RFC 793:
// [...] an empty acknowledgment segment containing the current send-sequence number
@ -844,6 +852,42 @@ impl<'a> TcpSocket<'a> {
reply_repr.window_len = self.scaled_window();
self.remote_last_win = reply_repr.window_len;
// If the remote supports selective acknowledgement, add the option to the outgoing
// segment.
if self.remote_has_sack {
net_debug!("sending sACK option with current assembler ranges");
// RFC 2018: The first SACK block (i.e., the one immediately following the kind and
// length fields in the option) MUST specify the contiguous block of data containing
// the segment which triggered this ACK, unless that segment advanced the
// Acknowledgment Number field in the header.
reply_repr.sack_ranges[0] = None;
if let Some(last_seg_seq) = self.local_rx_last_seq.map(|s| s.0 as u32) {
reply_repr.sack_ranges[0] = self.assembler.iter_data(
reply_repr.ack_number.map(|s| s.0 as usize).unwrap_or(0))
.map(|(left, right)| (left as u32, right as u32))
.skip_while(|(left, right)| *left > last_seg_seq || *right < last_seg_seq)
.next();
}
if reply_repr.sack_ranges[0].is_none() {
// The matching segment was removed from the assembler, meaning the acknowledgement
// number has advanced, or there was no previous sACK.
//
// While the RFC says we SHOULD keep a list of reported sACK ranges, and iterate
// through those, that is currently infeasable. Instead, we offer the range with
// the lowest sequence number (if one exists) to hint at what segments would
// most quickly advance the acknowledgement number.
reply_repr.sack_ranges[0] = self.assembler.iter_data(
reply_repr.ack_number.map(|s| s.0 as usize).unwrap_or(0))
.map(|(left, right)| (left as u32, right as u32))
.next();
}
}
// Since the sACK option may have changed the length of the payload, update that.
ip_reply_repr.set_payload_len(reply_repr.buffer_len());
(ip_reply_repr, reply_repr)
}
@ -975,6 +1019,7 @@ impl<'a> TcpSocket<'a> {
if segment_in_window {
// We've checked that segment_start >= window_start above.
payload_offset = (segment_start - window_start) as usize;
self.local_rx_last_seq = Some(repr.seq_number);
} else {
// If we're in the TIME-WAIT state, restart the TIME-WAIT timeout, since
// the remote end may not have realized we've closed the connection.
@ -1056,6 +1101,7 @@ impl<'a> TcpSocket<'a> {
self.local_seq_no = TcpSeqNumber(-repr.seq_number.0);
self.remote_seq_no = repr.seq_number + 1;
self.remote_last_seq = self.local_seq_no;
self.remote_has_sack = repr.sack_permitted;
if let Some(max_seg_size) = repr.max_seg_size {
self.remote_mss = max_seg_size as usize
}
@ -1422,6 +1468,8 @@ impl<'a> TcpSocket<'a> {
window_len: self.scaled_window(),
window_scale: None,
max_seg_size: None,
sack_permitted: false,
sack_ranges: [None, None, None],
payload: &[]
};
@ -1442,7 +1490,9 @@ impl<'a> TcpSocket<'a> {
if self.state == State::SynSent {
repr.ack_number = None;
repr.window_scale = Some(self.remote_win_shift);
repr.sack_permitted = true;
} else {
repr.sack_permitted = self.remote_has_sack;
repr.window_scale = self.remote_win_scale.map(
|_| self.remote_win_shift);
}
@ -1641,6 +1691,8 @@ mod test {
seq_number: TcpSeqNumber(0), ack_number: Some(TcpSeqNumber(0)),
window_len: 256, window_scale: None,
max_seg_size: None,
sack_permitted: false,
sack_ranges: [None, None, None],
payload: &[]
};
const _RECV_IP_TEMPL: IpRepr = IpRepr::Unspecified {
@ -1654,6 +1706,8 @@ mod test {
seq_number: TcpSeqNumber(0), ack_number: Some(TcpSeqNumber(0)),
window_len: 64, window_scale: None,
max_seg_size: None,
sack_permitted: false,
sack_ranges: [None, None, None],
payload: &[]
};
@ -1795,8 +1849,11 @@ mod test {
TcpSocket::new(rx_buffer, tx_buffer)
}
fn socket_syn_received() -> TcpSocket<'static> {
let mut s = socket();
fn socket_syn_received_with_buffer_sizes(
tx_len: usize,
rx_len: usize
) -> TcpSocket<'static> {
let mut s = socket_with_buffer_sizes(tx_len, rx_len);
s.state = State::SynReceived;
s.local_endpoint = LOCAL_END;
s.remote_endpoint = REMOTE_END;
@ -1807,6 +1864,10 @@ mod test {
s
}
fn socket_syn_received() -> TcpSocket<'static> {
socket_syn_received_with_buffer_sizes(64, 64)
}
fn socket_syn_sent() -> TcpSocket<'static> {
let mut s = socket();
s.state = State::SynSent;
@ -1817,8 +1878,8 @@ mod test {
s
}
fn socket_established() -> TcpSocket<'static> {
let mut s = socket_syn_received();
fn socket_established_with_buffer_sizes(tx_len: usize, rx_len: usize) -> TcpSocket<'static> {
let mut s = socket_syn_received_with_buffer_sizes(tx_len, rx_len);
s.state = State::Established;
s.local_seq_no = LOCAL_SEQ + 1;
s.remote_last_seq = LOCAL_SEQ + 1;
@ -1827,6 +1888,10 @@ mod test {
s
}
fn socket_established() -> TcpSocket<'static> {
socket_established_with_buffer_sizes(64, 64)
}
fn socket_fin_wait_1() -> TcpSocket<'static> {
let mut s = socket_established();
s.state = State::FinWait1;
@ -1936,6 +2001,44 @@ mod test {
s
}
#[test]
fn test_listen_sack_option() {
let mut s = socket_listen();
send!(s, TcpRepr {
control: TcpControl::Syn,
seq_number: REMOTE_SEQ,
ack_number: None,
sack_permitted: false,
..SEND_TEMPL
});
assert!(!s.remote_has_sack);
recv!(s, [TcpRepr {
control: TcpControl::Syn,
seq_number: LOCAL_SEQ,
ack_number: Some(REMOTE_SEQ + 1),
max_seg_size: Some(BASE_MSS),
..RECV_TEMPL
}]);
let mut s = socket_listen();
send!(s, TcpRepr {
control: TcpControl::Syn,
seq_number: REMOTE_SEQ,
ack_number: None,
sack_permitted: true,
..SEND_TEMPL
});
assert!(s.remote_has_sack);
recv!(s, [TcpRepr {
control: TcpControl::Syn,
seq_number: LOCAL_SEQ,
ack_number: Some(REMOTE_SEQ + 1),
max_seg_size: Some(BASE_MSS),
sack_permitted: true,
..RECV_TEMPL
}]);
}
#[test]
fn test_listen_syn_win_scale_buffers() {
for (buffer_size, shift_amt) in &[
@ -2213,6 +2316,7 @@ mod test {
ack_number: None,
max_seg_size: Some(BASE_MSS),
window_scale: Some(0),
sack_permitted: true,
..RECV_TEMPL
}]);
send!(s, TcpRepr {
@ -2270,6 +2374,7 @@ mod test {
ack_number: None,
max_seg_size: Some(BASE_MSS),
window_scale: Some(0),
sack_permitted: true,
..RECV_TEMPL
}]);
send!(s, TcpRepr {
@ -2358,6 +2463,7 @@ mod test {
max_seg_size: Some(BASE_MSS),
window_scale: Some(*shift_amt),
window_len: cmp::min(*buffer_size >> *shift_amt, 65535) as u16,
sack_permitted: true,
..RECV_TEMPL
}]);
}
@ -2385,6 +2491,88 @@ mod test {
assert_eq!(s.rx_buffer.dequeue_many(6), &b"abcdef"[..]);
}
fn setup_rfc2018_cases() -> (TcpSocket<'static>, Vec<u8>) {
// This is a utility function used by the tests for RFC 2018 cases. It configures a socket
// in a particular way suitable for those cases.
//
// RFC 2018: Assume the left window edge is 5000 and that the data transmitter sends [...]
// segments, each containing 500 data bytes.
let mut s = socket_established_with_buffer_sizes(4000, 4000);
s.remote_has_sack = true;
// create a segment that is 500 bytes long
let mut segment: Vec<u8> = Vec::with_capacity(500);
// move the last ack to 5000 by sending ten of them
for _ in 0..50 { segment.extend_from_slice(b"abcdefghij") }
for offset in (0..5000).step_by(500) {
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1 + offset,
ack_number: Some(LOCAL_SEQ + 1),
payload: &segment,
..SEND_TEMPL
});
recv!(s, [TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + offset + 500),
window_len: 3500,
..RECV_TEMPL
}]);
s.recv(|data| {
assert_eq!(data.len(), 500);
assert_eq!(data, segment.as_slice());
(500, ())
}).unwrap();
}
assert_eq!(s.remote_last_win, 3500);
(s, segment)
}
#[test]
fn test_established_rfc2018_cases() {
// This test case verifies the exact scenarios described on pages 8-9 of RFC 2018. Please
// ensure its behavior does not deviate from those scenarios.
let (mut s, segment) = setup_rfc2018_cases();
// RFC 2018:
//
// Case 2: The first segment is dropped but the remaining 7 are received.
//
// Upon receiving each of the last seven packets, the data receiver will return a TCP ACK
// segment that acknowledges sequence number 5000 and contains a SACK option specifying one
// block of queued data:
//
// Triggering ACK Left Edge Right Edge
// Segment
//
// 5000 (lost)
// 5500 5000 5500 6000
// 6000 5000 5500 6500
// 6500 5000 5500 7000
// 7000 5000 5500 7500
// 7500 5000 5500 8000
// 8000 5000 5500 8500
// 8500 5000 5500 9000
//
for offset in (500..3500).step_by(500) {
send!(s, TcpRepr {
seq_number: REMOTE_SEQ + 1 + offset + 5000,
ack_number: Some(LOCAL_SEQ + 1),
payload: &segment,
..SEND_TEMPL
}, Ok(Some(TcpRepr {
seq_number: LOCAL_SEQ + 1,
ack_number: Some(REMOTE_SEQ + 1 + 5000),
window_len: 4000,
sack_ranges: [
Some((REMOTE_SEQ.0 as u32 + 1 + 5500,
REMOTE_SEQ.0 as u32 + 1 + 5500 + offset as u32)),
None, None],
..RECV_TEMPL
})));
}
}
#[test]
fn test_established_sliding_window_recv() {
let mut s = socket_established();
@ -3987,6 +4175,7 @@ mod test {
ack_number: None,
max_seg_size: Some(BASE_MSS),
window_scale: Some(0),
sack_permitted: true,
..RECV_TEMPL
}));
assert_eq!(s.state, State::SynSent);

View File

@ -103,6 +103,8 @@ mod field {
pub const OPT_NOP: u8 = 0x01;
pub const OPT_MSS: u8 = 0x02;
pub const OPT_WS: u8 = 0x03;
pub const OPT_SACKPERM: u8 = 0x04;
pub const OPT_SACKRNG: u8 = 0x05;
}
impl<T: AsRef<[u8]>> Packet<T> {
@ -288,6 +290,44 @@ impl<T: AsRef<[u8]>> Packet<T> {
length
}
/// Returns whether the selective acknowledgement SYN flag is set or not.
pub fn selective_ack_permitted(&self) -> Result<bool> {
let data = self.buffer.as_ref();
let mut options = &data[field::OPTIONS(self.header_len())];
while options.len() > 0 {
let (next_options, option) = TcpOption::parse(options)?;
match option {
TcpOption::SackPermitted => {
return Ok(true);
},
_ => {},
}
options = next_options;
}
Ok(false)
}
/// Return the selective acknowledgement ranges, if any. If there are none in the packet, an
/// array of ``None`` values will be returned.
///
pub fn selective_ack_ranges<'s>(
&'s self
) -> Result<[Option<(u32, u32)>; 3]> {
let data = self.buffer.as_ref();
let mut options = &data[field::OPTIONS(self.header_len())];
while options.len() > 0 {
let (next_options, option) = TcpOption::parse(options)?;
match option {
TcpOption::SackRange(slice) => {
return Ok(slice);
},
_ => {},
}
options = next_options;
}
Ok([None, None, None])
}
/// Validate the packet checksum.
///
/// # Panics
@ -523,6 +563,8 @@ pub enum TcpOption<'a> {
NoOperation,
MaxSegmentSize(u16),
WindowScale(u8),
SackPermitted,
SackRange([Option<(u32, u32)>; 3]),
Unknown { kind: u8, data: &'a [u8] }
}
@ -553,6 +595,47 @@ impl<'a> TcpOption<'a> {
option = TcpOption::WindowScale(data[0]),
(field::OPT_WS, _) =>
return Err(Error::Malformed),
(field::OPT_SACKPERM, 2) =>
option = TcpOption::SackPermitted,
(field::OPT_SACKPERM, _) =>
return Err(Error::Malformed),
(field::OPT_SACKRNG, n) => {
if n < 10 || (n-2) % 8 != 0 {
return Err(Error::Malformed)
}
if n > 26 {
// It's possible for a remote to send 4 SACK blocks, but extremely rare.
// Better to "lose" that 4th block and save the extra RAM and CPU
// cycles in the vastly more common case.
//
// RFC 2018: SACK option that specifies n blocks will have a length of
// 8*n+2 bytes, so the 40 bytes available for TCP options can specify a
// maximum of 4 blocks. It is expected that SACK will often be used in
// conjunction with the Timestamp option used for RTTM [...] thus a
// maximum of 3 SACK blocks will be allowed in this case.
net_debug!("sACK with >3 blocks, truncating to 3");
}
let mut sack_ranges: [Option<(u32, u32)>; 3] = [None; 3];
// RFC 2018: Each contiguous block of data queued at the data receiver is
// defined in the SACK option by two 32-bit unsigned integers in network
// byte order[...]
sack_ranges.iter_mut().enumerate().for_each(|(i, nmut)| {
let left = i * 8;
*nmut = if left < data.len() {
let mid = left + 4;
let right = mid + 4;
let range_left = NetworkEndian::read_u32(
&data[left..mid]);
let range_right = NetworkEndian::read_u32(
&data[mid..right]);
Some((range_left, range_right))
} else {
None
};
});
option = TcpOption::SackRange(sack_ranges);
},
(_, _) =>
option = TcpOption::Unknown { kind: kind, data: data }
}
@ -567,6 +650,8 @@ impl<'a> TcpOption<'a> {
&TcpOption::NoOperation => 1,
&TcpOption::MaxSegmentSize(_) => 4,
&TcpOption::WindowScale(_) => 3,
&TcpOption::SackPermitted => 2,
&TcpOption::SackRange(s) => s.iter().filter(|s| s.is_some()).count() * 8 + 2,
&TcpOption::Unknown { data, .. } => 2 + data.len()
}
}
@ -600,6 +685,18 @@ impl<'a> TcpOption<'a> {
buffer[0] = field::OPT_WS;
buffer[2] = value;
}
&TcpOption::SackPermitted => {
buffer[0] = field::OPT_SACKPERM;
}
&TcpOption::SackRange(slice) => {
buffer[0] = field::OPT_SACKRNG;
slice.iter().filter(|s| s.is_some()).enumerate().for_each(|(i, s)| {
let (first, second) = *s.as_ref().unwrap();
let pos = i * 8 + 2;
NetworkEndian::write_u32(&mut buffer[pos..], first);
NetworkEndian::write_u32(&mut buffer[pos+4..], second);
});
}
&TcpOption::Unknown { kind, data: provided } => {
buffer[0] = kind;
buffer[2..].copy_from_slice(provided)
@ -650,6 +747,8 @@ pub struct Repr<'a> {
pub window_len: u16,
pub window_scale: Option<u8>,
pub max_seg_size: Option<u16>,
pub sack_permitted: bool,
pub sack_ranges: [Option<(u32, u32)>; 3],
pub payload: &'a [u8]
}
@ -688,6 +787,8 @@ impl<'a> Repr<'a> {
let mut max_seg_size = None;
let mut window_scale = None;
let mut options = packet.options();
let mut sack_permitted = false;
let mut sack_ranges = [None, None, None];
while options.len() > 0 {
let (next_options, option) = TcpOption::parse(options)?;
match option {
@ -706,7 +807,11 @@ impl<'a> Repr<'a> {
} else {
Some(value)
};
}
},
TcpOption::SackPermitted =>
sack_permitted = true,
TcpOption::SackRange(slice) =>
sack_ranges = slice,
_ => (),
}
options = next_options;
@ -721,6 +826,8 @@ impl<'a> Repr<'a> {
window_len: packet.window_len(),
window_scale: window_scale,
max_seg_size: max_seg_size,
sack_permitted: sack_permitted,
sack_ranges: sack_ranges,
payload: packet.payload()
})
}
@ -737,6 +844,15 @@ impl<'a> Repr<'a> {
if self.window_scale.is_some() {
length += 3
}
if self.sack_permitted {
length += 2;
}
let sack_range_len: usize = self.sack_ranges.iter().map(
|o| o.map(|_| 8).unwrap_or(0)
).sum();
if sack_range_len > 0 {
length += sack_range_len + 2;
}
if length % 4 != 0 {
length += 4 - length % 4;
}
@ -783,6 +899,12 @@ impl<'a> Repr<'a> {
if let Some(value) = self.max_seg_size {
let tmp = options; options = TcpOption::MaxSegmentSize(value).emit(tmp);
}
if self.sack_permitted {
let tmp = options; options = TcpOption::SackPermitted.emit(tmp);
} else if self.ack_number.is_some() && self.sack_ranges.iter().any(|s| s.is_some()) {
let tmp = options; options = TcpOption::SackRange(self.sack_ranges).emit(tmp);
}
if options.len() > 0 {
TcpOption::EndOfList.emit(options);
}
@ -850,6 +972,10 @@ impl<'a, T: AsRef<[u8]> + ?Sized> fmt::Display for Packet<&'a T> {
write!(f, " mss={}", value)?,
TcpOption::WindowScale(value) =>
write!(f, " ws={}", value)?,
TcpOption::SackPermitted =>
write!(f, " sACK")?,
TcpOption::SackRange(slice) =>
write!(f, " sACKr{:?}", slice)?, // debug print conveniently includes the []s
TcpOption::Unknown { kind, .. } =>
write!(f, " opt({})", kind)?,
}
@ -1008,6 +1134,8 @@ mod test {
window_scale: None,
control: Control::Syn,
max_seg_size: None,
sack_permitted: false,
sack_ranges: [None, None, None],
payload: &PAYLOAD_BYTES
}
}
@ -1041,7 +1169,7 @@ mod test {
macro_rules! assert_option_parses {
($opt:expr, $data:expr) => ({
assert_eq!(TcpOption::parse($data), Ok((&[][..], $opt)));
let buffer = &mut [0; 20][..$opt.buffer_len()];
let buffer = &mut [0; 40][..$opt.buffer_len()];
assert_eq!($opt.emit(buffer), &mut []);
assert_eq!(&*buffer, $data);
})
@ -1057,6 +1185,22 @@ mod test {
&[0x02, 0x04, 0x05, 0xdc]);
assert_option_parses!(TcpOption::WindowScale(12),
&[0x03, 0x03, 0x0c]);
assert_option_parses!(TcpOption::SackPermitted,
&[0x4, 0x02]);
assert_option_parses!(TcpOption::SackRange([Some((500, 1500)), None, None]),
&[0x05, 0x0a,
0x00, 0x00, 0x01, 0xf4, 0x00, 0x00, 0x05, 0xdc]);
assert_option_parses!(TcpOption::SackRange([Some((875, 1225)), Some((1500, 2500)), None]),
&[0x05, 0x12,
0x00, 0x00, 0x03, 0x6b, 0x00, 0x00, 0x04, 0xc9,
0x00, 0x00, 0x05, 0xdc, 0x00, 0x00, 0x09, 0xc4]);
assert_option_parses!(TcpOption::SackRange([Some((875000, 1225000)),
Some((1500000, 2500000)),
Some((876543210, 876654320))]),
&[0x05, 0x1a,
0x00, 0x0d, 0x59, 0xf8, 0x00, 0x12, 0xb1, 0x28,
0x00, 0x16, 0xe3, 0x60, 0x00, 0x26, 0x25, 0xa0,
0x34, 0x3e, 0xfc, 0xea, 0x34, 0x40, 0xae, 0xf0]);
assert_option_parses!(TcpOption::Unknown { kind: 12, data: &[1, 2, 3][..] },
&[0x0c, 0x05, 0x01, 0x02, 0x03])
}