diff --git a/README.md b/README.md index 0415c14..9865a5d 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,8 @@ The TCP protocol is supported over IPv4. * TCP header checksum is supported. * Multiple packets will be transmitted without waiting for an acknowledgement. + * Lost packets will be retransmitted with exponential backoff, starting at + a fixed delay of 100 ms. * TCP urgent pointer is **not** supported; any urgent octets will be received alongside data octets. * Reassembly of out-of-order segments is **not** supported. diff --git a/examples/server.rs b/examples/server.rs index a132b32..38989a5 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -43,11 +43,11 @@ fn main() { .format(move |record: &LogRecord| { let elapsed = Instant::now().duration_since(startup_time); if record.target().starts_with("smoltcp::") { - format!("\x1b[0m[{:6}.{:03}ms] ({}): {}\x1b[0m", + format!("\x1b[0m[{:6}.{:03}s] ({}): {}\x1b[0m", elapsed.as_secs(), elapsed.subsec_nanos() / 1000000, record.target().replace("smoltcp::", ""), record.args()) } else { - format!("\x1b[32m[{:6}.{:03}ms] ({}): {}\x1b[0m", + format!("\x1b[32m[{:6}.{:03}s] ({}): {}\x1b[0m", elapsed.as_secs(), elapsed.subsec_nanos() / 1000000, record.target(), record.args()) } @@ -169,7 +169,10 @@ fn main() { } } - match iface.poll() { + let timestamp = Instant::now().duration_since(startup_time); + let timestamp_ms = (timestamp.as_secs() * 1000) + + (timestamp.subsec_nanos() / 1000000) as u64; + match iface.poll(timestamp_ms) { Ok(()) => (), Err(e) => debug!("poll error: {}", e) } diff --git a/src/iface/ethernet.rs b/src/iface/ethernet.rs index c6038a5..528f864 100644 --- a/src/iface/ethernet.rs +++ b/src/iface/ethernet.rs @@ -112,7 +112,9 @@ impl<'a, 'b: 'a, } /// Receive and process a packet, if available, and then transmit a packet, if necessary. - pub fn poll(&mut self) -> Result<(), Error> { + /// + /// The timestamp is a monotonically increasing number of milliseconds. + pub fn poll(&mut self, timestamp: u64) -> Result<(), Error> { enum Response<'a> { Nop, Arp(ArpRepr), @@ -122,7 +124,7 @@ impl<'a, 'b: 'a, // First, transmit any outgoing packets. loop { - if try!(self.emit()) { break } + if try!(self.emit(timestamp)) { break } } // Now, receive any incoming packets. @@ -214,7 +216,7 @@ impl<'a, 'b: 'a, let mut handled = false; for socket in self.sockets.borrow_mut() { let ip_repr = IpRepr::Ipv4(ipv4_repr); - match socket.process(&ip_repr, ipv4_packet.payload()) { + match socket.process(timestamp, &ip_repr, ipv4_packet.payload()) { Ok(()) => { // The packet was valid and handled by socket. handled = true; @@ -355,7 +357,7 @@ impl<'a, 'b: 'a, } } - fn emit(&mut self) -> Result { + fn emit(&mut self, timestamp: u64) -> Result { // Borrow checker is being overly careful around closures, so we have // to hack around that. let src_hardware_addr = self.hardware_addr; @@ -365,7 +367,7 @@ impl<'a, 'b: 'a, let mut nothing_to_transmit = true; for socket in self.sockets.borrow_mut() { - let result = socket.dispatch(&mut |repr, payload| { + let result = socket.dispatch(timestamp, &mut |repr, payload| { let repr = try!(repr.lower(src_protocol_addrs)); let dst_hardware_addr = diff --git a/src/socket/mod.rs b/src/socket/mod.rs index 6fb5756..52485ba 100644 --- a/src/socket/mod.rs +++ b/src/socket/mod.rs @@ -52,12 +52,13 @@ impl<'a, 'b> Socket<'a, 'b> { /// is returned. /// /// This function is used internally by the networking stack. - pub fn process(&mut self, ip_repr: &IpRepr, payload: &[u8]) -> Result<(), Error> { + pub fn process(&mut self, timestamp: u64, ip_repr: &IpRepr, + payload: &[u8]) -> Result<(), Error> { match self { &mut Socket::Udp(ref mut socket) => - socket.process(ip_repr, payload), + socket.process(timestamp, ip_repr, payload), &mut Socket::Tcp(ref mut socket) => - socket.process(ip_repr, payload), + socket.process(timestamp, ip_repr, payload), &mut Socket::__Nonexhaustive => unreachable!() } } @@ -69,13 +70,13 @@ impl<'a, 'b> Socket<'a, 'b> { /// is returned. /// /// This function is used internally by the networking stack. - pub fn dispatch(&mut self, emit: &mut F) -> Result + pub fn dispatch(&mut self, timestamp: u64, emit: &mut F) -> Result where F: FnMut(&IpRepr, &IpPayload) -> Result { match self { &mut Socket::Udp(ref mut socket) => - socket.dispatch(emit), + socket.dispatch(timestamp, emit), &mut Socket::Tcp(ref mut socket) => - socket.dispatch(emit), + socket.dispatch(timestamp, emit), &mut Socket::__Nonexhaustive => unreachable!() } } diff --git a/src/socket/tcp.rs b/src/socket/tcp.rs index d0148bc..fad681e 100644 --- a/src/socket/tcp.rs +++ b/src/socket/tcp.rs @@ -146,22 +146,53 @@ impl fmt::Display for State { #[derive(Debug)] struct Retransmit { - sent: bool // FIXME + resend_at: u64, + delay: u64 } impl Retransmit { fn new() -> Retransmit { - Retransmit { sent: false } + Retransmit { resend_at: 0, delay: 0 } } fn reset(&mut self) { - self.sent = false + self.resend_at = 0; + self.delay = 0; } - fn check(&mut self) -> bool { - let result = !self.sent; - self.sent = true; - result + fn may_send_old(&mut self, timestamp: u64) -> bool { + if self.delay == 0 { + // We haven't transmitted anything yet. + false + } else if timestamp < self.resend_at { + // We may not retransmit yet. + false + } else { + // We may retransmit! + true + } + } + + fn may_send_new(&mut self, timestamp: u64) -> bool { + if self.delay == 0 { + // We've something new to transmit, do it unconditionally. + self.delay = 100; // ms + self.resend_at = timestamp + self.delay; + true + } else { + false + } + } + + fn commit(&mut self, timestamp: u64) { + if self.delay == 0 { + self.delay = 100; // ms + self.resend_at = timestamp + self.delay; + } else if timestamp >= self.resend_at { + net_trace!("retransmitting after a {}ms delay", self.delay); + self.resend_at = timestamp + self.delay; + self.delay *= 2; + } } } @@ -373,6 +404,7 @@ impl<'a> TcpSocket<'a> { net_trace!("tcp:{}:{}: tx buffer: enqueueing {} octets (now {})", self.local_endpoint, self.remote_endpoint, buffer.len(), old_length + buffer.len()); + self.retransmit.reset(); } Ok(buffer) } @@ -445,7 +477,8 @@ impl<'a> TcpSocket<'a> { } /// See [Socket::process](enum.Socket.html#method.process). - pub fn process(&mut self, ip_repr: &IpRepr, payload: &[u8]) -> Result<(), Error> { + pub fn process(&mut self, _timestamp: u64, ip_repr: &IpRepr, + payload: &[u8]) -> Result<(), Error> { if ip_repr.protocol() != IpProtocol::Tcp { return Err(Error::Rejected) } let packet = try!(TcpPacket::new(payload)); @@ -545,6 +578,7 @@ impl<'a> TcpSocket<'a> { // If we've seen this sequence number already but the remote end is not aware // of that, make sure we send the acknowledgement again. self.remote_last_ack = next_remote_seq - 1; + self.retransmit.reset(); return Ok(()) } } @@ -679,7 +713,7 @@ impl<'a> TcpSocket<'a> { } /// See [Socket::dispatch](enum.Socket.html#method.dispatch). - pub fn dispatch(&mut self, emit: &mut F) -> Result + pub fn dispatch(&mut self, timestamp: u64, emit: &mut F) -> Result where F: FnMut(&IpRepr, &IpPayload) -> Result { if self.remote_endpoint.is_unspecified() { return Err(Error::Exhausted) } @@ -698,6 +732,17 @@ impl<'a> TcpSocket<'a> { payload: &[] }; + if self.retransmit.may_send_old(timestamp) { + // The retransmit timer has expired, so assume all in-flight packets that + // have not been acknowledged are lost. + self.remote_last_seq = self.local_seq_no; + } else if self.retransmit.may_send_new(timestamp) { + // The retransmit timer has reset, and we can send something new. + } else { + // We don't have anything to send at this time. + return Err(Error::Exhausted) + } + let mut should_send = false; match self.state { // We never transmit anything in the CLOSED, LISTEN, or FIN-WAIT-2 states. @@ -707,8 +752,6 @@ impl<'a> TcpSocket<'a> { // We transmit a SYN|ACK in the SYN-RECEIVED state. State::SynReceived => { - if !self.retransmit.check() { return Err(Error::Exhausted) } - repr.control = TcpControl::Syn; net_trace!("tcp:{}:{}: sending SYN|ACK", self.local_endpoint, self.remote_endpoint); @@ -717,8 +760,6 @@ impl<'a> TcpSocket<'a> { // We transmit a SYN in the SYN-SENT state. State::SynSent => { - if !self.retransmit.check() { return Err(Error::Exhausted) } - repr.control = TcpControl::Syn; repr.ack_number = None; net_trace!("tcp:{}:{}: sending SYN", @@ -728,55 +769,53 @@ impl<'a> TcpSocket<'a> { // We transmit data in the ESTABLISHED state, // ACK in CLOSE-WAIT, CLOSING, and TIME-WAIT states, - // FIN in FIN-WAIT-1 and LAST-ACK states. + // FIN in FIN-WAIT-1 and LAST-ACK states, + // but only if the receiver has a nonzero window. State::Established | State::CloseWait | State::Closing | State::TimeWait | - State::FinWait1 | State::LastAck => { - // See if we should send data to the remote end because: - let mut may_send = false; - // 1. the retransmit timer has expired or was reset, or... - if self.retransmit.check() { may_send = true } - // 2. we've got new data in the transmit buffer. - let remote_next_seq = self.local_seq_no + self.tx_buffer.len(); - if self.remote_last_seq != remote_next_seq { may_send = true } - - if self.tx_buffer.len() > 0 && self.remote_win_len > 0 && may_send { - // We can send something, so let's do that. - let mut size = self.tx_buffer.len(); - // Clamp to remote window length. - if size > self.remote_win_len { size = self.remote_win_len } - // Clamp to MSS. Currently we only support the default MSS value. - if size > 536 { size = 536 } - // Extract data from the buffer. This may return less than what we want, - // in case it's not possible to extract a contiguous slice. - let offset = self.remote_last_seq - self.local_seq_no; - let data = self.tx_buffer.peek(offset, size); - assert!(data.len() > 0); - // Send the extracted data. - net_trace!("tcp:{}:{}: tx buffer: peeking at {} octets (from {})", - self.local_endpoint, self.remote_endpoint, data.len(), offset); - repr.seq_number += offset; - repr.payload = data; - // Speculatively shrink the remote window. This will get updated the next - // time we receive a packet. - self.remote_win_len -= data.len(); - // Advance the in-flight sequence number. - self.remote_last_seq += data.len(); - should_send = true; - } - + State::FinWait1 | State::LastAck + if self.remote_win_len > 0 => { + // We can send something, so let's try doing that. + let mut size = self.tx_buffer.len(); + // Clamp to remote window length. + if size > self.remote_win_len { size = self.remote_win_len } + // Clamp to MSS. Currently we only support the default MSS value. + if size > 536 { size = 536 } + // Extract data from the buffer. This may return less than what we want, + // in case it's not possible to extract a contiguous slice. + let offset = self.remote_last_seq - self.local_seq_no; + let data = self.tx_buffer.peek(offset, size); match self.state { - State::FinWait1 | State::LastAck if may_send => { + _ if data.len() > 0 => { + // Send the extracted data. + net_trace!("tcp:{}:{}: tx buffer: peeking at {} octets (from {})", + self.local_endpoint, self.remote_endpoint, + data.len(), offset); + repr.seq_number += offset; + repr.payload = data; + // Speculatively shrink the remote window. This will get updated + // the next time we receive a packet. + self.remote_win_len -= data.len(); + // Advance the in-flight sequence number. + self.remote_last_seq += data.len(); + should_send = true; + } + State::FinWait1 | State::LastAck => { // We should notify the other side that we've closed the transmit half // of the connection. net_trace!("tcp:{}:{}: sending FIN|ACK", self.local_endpoint, self.remote_endpoint); repr.control = TcpControl::Fin; should_send = true; - }, + } _ => () } } + + // We don't transmit anything (except ACKs) if the receiver has a zero window. + State::Established | + State::CloseWait | State::Closing | State::TimeWait | + State::FinWait1 | State::LastAck => () } let ack_number = self.remote_seq_no + self.rx_buffer.len(); @@ -788,6 +827,8 @@ impl<'a> TcpSocket<'a> { } if should_send { + self.retransmit.commit(timestamp); + repr.ack_number = Some(ack_number); self.remote_last_ack = ack_number; @@ -842,6 +883,26 @@ mod test { assert_eq!(buffer.peek(3, 8), &b"bar"[..]); } + #[test] + fn test_retransmit_may_send() { + fn may_send(r: &mut Retransmit, t: u64) -> (bool, bool) { + (r.may_send_old(t), r.may_send_new(t)) + } + + let mut r = Retransmit::new(); + assert_eq!(may_send(&mut r, 1000), (false, true)); + r.commit(1000); + assert_eq!(may_send(&mut r, 1000), (false, false)); + assert_eq!(may_send(&mut r, 1050), (false, false)); + assert_eq!(may_send(&mut r, 1101), (true, false)); + r.commit(1101); + assert_eq!(may_send(&mut r, 1150), (false, false)); + assert_eq!(may_send(&mut r, 1200), (false, false)); + assert_eq!(may_send(&mut r, 1301), (true, false)); + r.reset(); + assert_eq!(may_send(&mut r, 1350), (false, true)); + } + const LOCAL_IP: IpAddress = IpAddress::Ipv4(Ipv4Address([10, 0, 0, 1])); const REMOTE_IP: IpAddress = IpAddress::Ipv4(Ipv4Address([10, 0, 0, 2])); const LOCAL_PORT: u16 = 80; @@ -864,7 +925,7 @@ mod test { window_len: 64, payload: &[] }; - fn send(socket: &mut TcpSocket, repr: &TcpRepr) -> Result<(), Error> { + fn send(socket: &mut TcpSocket, timestamp: u64, repr: &TcpRepr) -> Result<(), Error> { trace!("send: {}", repr); let mut buffer = vec![0; repr.buffer_len()]; let mut packet = TcpPacket::new(&mut buffer).unwrap(); @@ -874,13 +935,13 @@ mod test { dst_addr: LOCAL_IP, protocol: IpProtocol::Tcp }; - socket.process(&ip_repr, &packet.into_inner()[..]) + socket.process(timestamp, &ip_repr, &packet.into_inner()[..]) } - fn recv(socket: &mut TcpSocket, mut f: F) + fn recv(socket: &mut TcpSocket, timestamp: u64, mut f: F) where F: FnMut(Result) { let mut buffer = vec![]; - let result = socket.dispatch(&mut |ip_repr, payload| { + let result = socket.dispatch(timestamp, &mut |ip_repr, payload| { assert_eq!(ip_repr.protocol(), IpProtocol::Tcp); assert_eq!(ip_repr.src_addr(), LOCAL_IP); assert_eq!(ip_repr.dst_addr(), REMOTE_IP); @@ -900,11 +961,14 @@ mod test { } macro_rules! send { - ($socket:ident, [$( $repr:expr )*]) => ({ - $( send!($socket, $repr, Ok(())); )* - }); + ($socket:ident, $repr:expr) => + (send!($socket, time 0, $repr)); ($socket:ident, $repr:expr, $result:expr) => - (assert_eq!(send(&mut $socket, &$repr), $result)) + (send!($socket, time 0, $repr, $result)); + ($socket:ident, time $time:expr, $repr:expr) => + (send!($socket, time 0, $repr, Ok(()))); + ($socket:ident, time $time:expr, $repr:expr, $result:expr) => + (assert_eq!(send(&mut $socket, $time, &$repr), $result)); } macro_rules! recv { @@ -913,7 +977,9 @@ mod test { recv!($socket, Err(Error::Exhausted)) }); ($socket:ident, $result:expr) => - (recv(&mut $socket, |repr| assert_eq!(repr, $result))) + (recv!($socket, time 0, $result)); + ($socket:ident, time $time:expr, $result:expr) => + (recv(&mut $socket, $time, |repr| assert_eq!(repr, $result))); } fn init_logger() { @@ -997,12 +1063,12 @@ mod test { #[test] fn test_listen_rst() { let mut s = socket_listen(); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Rst, seq_number: REMOTE_SEQ, ack_number: None, ..SEND_TEMPL - }]); + }); } #[test] @@ -1028,12 +1094,12 @@ mod test { #[test] fn test_syn_received_rst() { let mut s = socket_syn_received(); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Rst, seq_number: REMOTE_SEQ, ack_number: Some(LOCAL_SEQ), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::Listen); assert_eq!(s.local_endpoint, IpEndpoint::new(IpAddress::Unspecified, LOCAL_END.port)); assert_eq!(s.remote_endpoint, IpEndpoint::default()); @@ -1061,12 +1127,12 @@ mod test { #[test] fn test_syn_sent_rst() { let mut s = socket_syn_sent(); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Rst, seq_number: REMOTE_SEQ, ack_number: Some(LOCAL_SEQ), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::Closed); } @@ -1118,12 +1184,12 @@ mod test { #[test] fn test_established_recv() { let mut s = socket_established(); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1), payload: &b"abcdef"[..], ..SEND_TEMPL - }]); + }); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, ack_number: Some(REMOTE_SEQ + 1 + 6), @@ -1137,7 +1203,7 @@ mod test { fn test_established_send() { let mut s = socket_established(); // First roundtrip after establishing. - s.tx_buffer.enqueue_slice(b"abcdef"); + s.send_slice(b"abcdef").unwrap(); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, ack_number: Some(REMOTE_SEQ + 1), @@ -1145,39 +1211,39 @@ mod test { ..RECV_TEMPL }]); assert_eq!(s.tx_buffer.len(), 6); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1 + 6), ..SEND_TEMPL - }]); + }); assert_eq!(s.tx_buffer.len(), 0); // Second roundtrip. - s.tx_buffer.enqueue_slice(b"foobar"); + s.send_slice(b"foobar").unwrap(); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1 + 6, ack_number: Some(REMOTE_SEQ + 1), payload: &b"foobar"[..], ..RECV_TEMPL }]); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1 + 6 + 6), ..SEND_TEMPL - }]); + }); assert_eq!(s.tx_buffer.len(), 0); } #[test] fn test_established_send_no_ack_send() { let mut s = socket_established(); - s.tx_buffer.enqueue_slice(b"abcdef"); + s.send_slice(b"abcdef").unwrap(); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, ack_number: Some(REMOTE_SEQ + 1), payload: &b"abcdef"[..], ..RECV_TEMPL }]); - s.tx_buffer.enqueue_slice(b"foobar"); + s.send_slice(b"foobar").unwrap(); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1 + 6, ack_number: Some(REMOTE_SEQ + 1), @@ -1191,7 +1257,7 @@ mod test { let mut s = socket_established(); s.remote_win_len = 16; // First roundtrip after establishing. - s.tx_buffer.enqueue_slice(&[0; 32][..]); + s.send_slice(&[0; 32][..]).unwrap(); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, ack_number: Some(REMOTE_SEQ + 1), @@ -1244,12 +1310,12 @@ mod test { #[test] fn test_established_fin() { let mut s = socket_established(); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Fin, seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::CloseWait); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, @@ -1261,13 +1327,13 @@ mod test { #[test] fn test_established_send_fin() { let mut s = socket_established(); - s.tx_buffer.enqueue_slice(b"abcdef"); - send!(s, [TcpRepr { + s.send_slice(b"abcdef").unwrap(); + send!(s, TcpRepr { control: TcpControl::Fin, seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::CloseWait); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, @@ -1280,24 +1346,24 @@ mod test { #[test] fn test_established_rst() { let mut s = socket_established(); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Rst, seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::Closed); } #[test] fn test_established_rst_no_ack() { let mut s = socket_established(); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Rst, seq_number: REMOTE_SEQ + 1, ack_number: None, ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::Closed); } @@ -1326,11 +1392,11 @@ mod test { ack_number: Some(REMOTE_SEQ + 1), ..RECV_TEMPL }]); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1 + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::FinWait2); } @@ -1343,12 +1409,12 @@ mod test { ack_number: Some(REMOTE_SEQ + 1), ..RECV_TEMPL }]); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Fin, seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::Closing); } @@ -1372,12 +1438,12 @@ mod test { #[test] fn test_fin_wait_2_fin() { let mut s = socket_fin_wait_2(); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Fin, seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1 + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::TimeWait); } @@ -1407,11 +1473,11 @@ mod test { ack_number: Some(REMOTE_SEQ + 1 + 1), ..RECV_TEMPL }]); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1 + 1, ack_number: Some(LOCAL_SEQ + 1 + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::TimeWait); } @@ -1472,18 +1538,18 @@ mod test { #[test] fn test_close_wait_ack() { let mut s = socket_close_wait(); - s.tx_buffer.enqueue_slice(b"abcdef"); + s.send_slice(b"abcdef").unwrap(); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, ack_number: Some(REMOTE_SEQ + 1 + 1), payload: &b"abcdef"[..], ..RECV_TEMPL }]); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1 + 1, ack_number: Some(LOCAL_SEQ + 1 + 6), ..SEND_TEMPL - }]); + }); } #[test] @@ -1512,11 +1578,11 @@ mod test { ..RECV_TEMPL }]); assert_eq!(s.state, State::LastAck); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1 + 1, ack_number: Some(LOCAL_SEQ + 1 + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::Closed); } @@ -1540,12 +1606,12 @@ mod test { #[test] fn test_three_way_handshake() { let mut s = socket_listen(); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Syn, seq_number: REMOTE_SEQ, ack_number: None, ..SEND_TEMPL - }]); + }); assert_eq!(s.state(), State::SynReceived); assert_eq!(s.local_endpoint(), LOCAL_END); assert_eq!(s.remote_endpoint(), REMOTE_END); @@ -1555,11 +1621,11 @@ mod test { ack_number: Some(REMOTE_SEQ + 1), ..RECV_TEMPL }]); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state(), State::Established); assert_eq!(s.local_seq_no, LOCAL_SEQ + 1); assert_eq!(s.remote_seq_no, REMOTE_SEQ + 1); @@ -1568,12 +1634,12 @@ mod test { #[test] fn test_remote_close() { let mut s = socket_established(); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Fin, seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::CloseWait); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, @@ -1588,11 +1654,11 @@ mod test { ack_number: Some(REMOTE_SEQ + 1 + 1), ..RECV_TEMPL }]); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1 + 1, ack_number: Some(LOCAL_SEQ + 1 + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::Closed); } @@ -1607,18 +1673,18 @@ mod test { ack_number: Some(REMOTE_SEQ + 1), ..RECV_TEMPL }]); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1 + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::FinWait2); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Fin, seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1 + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::TimeWait); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1 + 1, @@ -1638,12 +1704,12 @@ mod test { ack_number: Some(REMOTE_SEQ + 1), ..RECV_TEMPL }]); - send!(s, [TcpRepr { + send!(s, TcpRepr { control: TcpControl::Fin, seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::Closing); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, @@ -1651,11 +1717,11 @@ mod test { ..RECV_TEMPL }]); // ... at this point - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1 + 1, ack_number: Some(LOCAL_SEQ + 1 + 1), ..SEND_TEMPL - }]); + }); assert_eq!(s.state, State::TimeWait); recv!(s, []); } @@ -1665,12 +1731,12 @@ mod test { // =========================================================================================// fn socket_recved() -> TcpSocket<'static> { let mut s = socket_established(); - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1), payload: &b"abcdef"[..], ..SEND_TEMPL - }]); + }); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, ack_number: Some(REMOTE_SEQ + 1 + 6), @@ -1684,12 +1750,12 @@ mod test { fn test_duplicate_seq_ack() { let mut s = socket_recved(); // remote retransmission - send!(s, [TcpRepr { + send!(s, TcpRepr { seq_number: REMOTE_SEQ + 1, ack_number: Some(LOCAL_SEQ + 1), payload: &b"abcdef"[..], ..SEND_TEMPL - }]); + }); recv!(s, [TcpRepr { seq_number: LOCAL_SEQ + 1, ack_number: Some(REMOTE_SEQ + 1 + 6), @@ -1697,4 +1763,23 @@ mod test { ..RECV_TEMPL }]); } + + #[test] + fn test_data_retransmit() { + let mut s = socket_established(); + s.send_slice(b"abcdef").unwrap(); + recv!(s, time 1000, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"abcdef"[..], + ..RECV_TEMPL + })); + recv!(s, time 1050, Err(Error::Exhausted)); + recv!(s, time 1100, Ok(TcpRepr { + seq_number: LOCAL_SEQ + 1, + ack_number: Some(REMOTE_SEQ + 1), + payload: &b"abcdef"[..], + ..RECV_TEMPL + })); + } } diff --git a/src/socket/udp.rs b/src/socket/udp.rs index 5d852d5..af3aae5 100644 --- a/src/socket/udp.rs +++ b/src/socket/udp.rs @@ -168,7 +168,8 @@ impl<'a, 'b> UdpSocket<'a, 'b> { } /// See [Socket::process](enum.Socket.html#method.process). - pub fn process(&mut self, ip_repr: &IpRepr, payload: &[u8]) -> Result<(), Error> { + pub fn process(&mut self, _timestamp: u64, ip_repr: &IpRepr, + payload: &[u8]) -> Result<(), Error> { if ip_repr.protocol() != IpProtocol::Udp { return Err(Error::Rejected) } let packet = try!(UdpPacket::new(payload)); @@ -189,7 +190,7 @@ impl<'a, 'b> UdpSocket<'a, 'b> { } /// See [Socket::dispatch](enum.Socket.html#method.dispatch). - pub fn dispatch(&mut self, emit: &mut F) -> Result + pub fn dispatch(&mut self, _timestamp: u64, emit: &mut F) -> Result where F: FnMut(&IpRepr, &IpPayload) -> Result { let packet_buf = try!(self.tx_buffer.dequeue()); net_trace!("udp:{}:{}: sending {} octets",