From d9e50bbcb60ab403037d23dde40e90e00bbdac0a Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Fri, 5 Mar 2021 13:27:26 +0800 Subject: [PATCH 1/6] nal: Prevent looping until the stack successfully connects to remote * `NetworkStack::connect()`: * Add timeout for connection attempt * Now returns the socket at TCP ESTABLISHED or CLOSED states, or after connection timeout * Split `NetworkStack::update()` into `update()` (for controlling the clock) and `poll()` (for polling the smoltcp EthernetInterface) * Also remove option `auto_time_update`; the main application is responsible for what values `embedded_time::clock::Clock::try_now()` should return --- src/nal.rs | 166 +++++++++++++++++++++++++++++++++-------------------- 1 file changed, 104 insertions(+), 62 deletions(-) diff --git a/src/nal.rs b/src/nal.rs index 0cb71da..7a0ed9f 100644 --- a/src/nal.rs +++ b/src/nal.rs @@ -1,5 +1,5 @@ use core::cell::RefCell; -use core::convert::TryFrom; +use core::convert::TryInto; use heapless::{consts, Vec}; use embedded_nal as nal; use nal::nb; @@ -40,7 +40,8 @@ where unused_handles: RefCell>, time_ms: RefCell, last_update_instant: RefCell>>, - clock: IntClock + clock: IntClock, + connection_timeout_ms: u32, } impl<'a, SPI, NSS, IntClock> NetworkStack<'a, SPI, NSS, IntClock> @@ -49,7 +50,12 @@ where NSS: OutputPin, IntClock: time::Clock, { - pub fn new(interface: NetworkInterface, sockets: net::socket::SocketSet<'a>, clock: IntClock) -> Self { + pub fn new( + interface: NetworkInterface, + sockets: net::socket::SocketSet<'a>, + clock: IntClock, + connection_timeout_ms: u32, + ) -> Self { let mut unused_handles: Vec = Vec::new(); for socket in sockets.iter() { unused_handles.push(socket.handle()).unwrap(); @@ -62,34 +68,55 @@ where time_ms: RefCell::new(0), last_update_instant: RefCell::new(None), clock, + connection_timeout_ms, } } - // Include auto_time_update to allow Instant::now() to not be called - // Instant::now() is not safe to call in `init()` context - pub fn update(&self, auto_time_update: bool) -> Result { - if auto_time_update { - // Check if it is the first time the stack has updated the time itself - let now = match *self.last_update_instant.borrow() { - // If it is the first time, do not advance time - // Simply store the current instant to initiate time updating - None => self.clock.try_now().map_err(|_| NetworkError::TimeFault)?, - // If it was updated before, advance time and update last_update_instant - Some(instant) => { - // Calculate elapsed time - let now = self.clock.try_now().map_err(|_| NetworkError::TimeFault)?; - let duration = now.checked_duration_since(&instant).ok_or(NetworkError::TimeFault)?; - let duration_ms = time::duration::Milliseconds::::try_from(duration).map_err(|_| NetworkError::TimeFault)?; - // Adjust duration into ms (note: decimal point truncated) - self.advance_time(*duration_ms.integer()); - now + // Initiate or advance the timer, and return the duration in ms as u32. + fn update(&self) -> Result { + let mut duration_ms: u32 = 0; + // Check if it is the first time the stack has updated the time itself + let now = match *self.last_update_instant.borrow() { + // If it is the first time, do not advance time + // Simply store the current instant to initiate time updating + None => self.clock.try_now().map_err(|_| NetworkError::TimeFault)?, + // If it was updated before, advance time and update last_update_instant + Some(instant) => { + // Calculate elapsed time + let now = self.clock.try_now().map_err(|_| NetworkError::TimeFault)?; + let mut duration = now.checked_duration_since(&instant); + // Normally, the wrapping clock should produce a valid duration. + // However, if `now` is earlier than `instant` (e.g. because the main + // application cannot get a valid epoch time during initialisation, + // we should still produce a duration that is just 1ms. + if duration.is_none() { + self.time_ms.replace(0); + duration = Some(Milliseconds(1_u32) + .to_generic::(IntClock::SCALING_FACTOR) + .map_err(|_| NetworkError::TimeFault)?); } - }; - self.last_update_instant.replace(Some(now)); - } + let duration_ms_time: Milliseconds = duration.unwrap().try_into() + .map_err(|_| NetworkError::TimeFault)?; + duration_ms = *duration_ms_time.integer(); + // Adjust duration into ms (note: decimal point truncated) + self.advance_time(duration_ms); + now + } + }; + self.last_update_instant.replace(Some(now)); + Ok(duration_ms) + } + + fn advance_time(&self, duration_ms: u32) { + let time = self.time_ms.borrow().wrapping_add(duration_ms); + self.time_ms.replace(time); + } + + // Poll on the smoltcp interface + fn poll(&self) -> Result { match self.network_interface.borrow_mut().poll( &mut self.sockets.borrow_mut(), - net::time::Instant::from_millis(*self.time_ms.borrow() as i64), + net::time::Instant::from_millis(*self.time_ms.borrow() as u32), ) { Ok(changed) => Ok(!changed), Err(_e) => { @@ -98,11 +125,6 @@ where } } - pub fn advance_time(&self, duration: u32) { - let time = self.time_ms.try_borrow().unwrap().wrapping_add(duration); - self.time_ms.replace(time); - } - fn get_ephemeral_port(&self) -> u16 { // Get the next ephemeral port let current_port = self.next_port.borrow().clone(); @@ -132,20 +154,27 @@ where } } - // Ideally connect is only to be performed in `init()` of `main.rs` - // Calling `Instant::now()` of `rtic::cyccnt` would face correctness issue during `init()` fn connect( &self, socket: Self::TcpSocket, remote: nal::SocketAddr, ) -> Result { - let address = { + { + // If the socket has already been connected, ignore the connection + // request silently. let mut sockets = self.sockets.borrow_mut(); let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket); - // If we're already in the process of connecting, ignore the request silently. - if internal_socket.is_open() { - return Ok(socket); + if internal_socket.state() == net::socket::TcpState::Established { + return Ok(socket) } + } + + { + let mut sockets = self.sockets.borrow_mut(); + let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket); + // abort() instead of close() prevents TcpSocket::connect() from + // raising an error + internal_socket.abort(); match remote.ip() { nal::IpAddr::V4(addr) => { let address = @@ -154,45 +183,56 @@ where .connect((address, remote.port()), self.get_ephemeral_port()) .map_err(|_| NetworkError::ConnectionFailure)?; net::wire::IpAddress::Ipv4(address) - } + }, nal::IpAddr::V6(addr) => { - let address = net::wire::Ipv6Address::from_parts(&addr.segments()[..]); - internal_socket.connect((address, remote.port()), self.get_ephemeral_port()) + let address = + net::wire::Ipv6Address::from_parts(&addr.segments()[..]); + internal_socket + .connect((address, remote.port()), self.get_ephemeral_port()) .map_err(|_| NetworkError::ConnectionFailure)?; net::wire::IpAddress::Ipv6(address) } } }; - // Blocking connect + + // Loop to wait until the socket is staying established or closed, + // or the connection attempt has timed out. + let mut timeout_ms: u32 = 0; loop { - match self.is_connected(&socket) { - Ok(true) => break, - _ => { - let mut sockets = self.sockets.borrow_mut(); - let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket); - // If the connect got ACK->RST, it will end up in Closed TCP state - // Perform reconnection in this case - if internal_socket.state() == net::socket::TcpState::Closed { - internal_socket.close(); - internal_socket - .connect((address, remote.port()), self.get_ephemeral_port()) - .map_err(|_| NetworkError::ConnectionFailure)?; - } + { + let mut sockets = self.sockets.borrow_mut(); + let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket); + // TCP state at ESTABLISHED means there is connection, so + // simply return the socket. + if internal_socket.state() == net::socket::TcpState::Established { + return Ok(socket) + } + // TCP state at CLOSED implies that the remote rejected connection; + // In this case, abort the connection, and then return the socket + // for re-connection in the future. + if internal_socket.state() == net::socket::TcpState::Closed { + internal_socket.abort(); + // TODO: Return Err(), but would require changes in quartiq/minimq + return Ok(socket) } } - // Avoid using Instant::now() and Advance time manually - self.update(false)?; - { - self.advance_time(1); + // Any TCP states other than CLOSED and ESTABLISHED are considered + // "transient", so this function should keep waiting and let smoltcp poll + // (e.g. for handling echo reqeust/reply) at the same time. + timeout_ms += self.update()?; + self.poll()?; + // Time out, and return the socket for re-connection in the future. + if timeout_ms > self.connection_timeout_ms { + // TODO: Return Err(), but would require changes in quartiq/minimq + return Ok(socket) } } - Ok(socket) } fn is_connected(&self, socket: &Self::TcpSocket) -> Result { let mut sockets = self.sockets.borrow_mut(); - let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket); - Ok(socket.may_send() && socket.may_recv()) + let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket); + Ok(internal_socket.state() == net::socket::TcpState::Established) } fn write(&self, socket: &mut Self::TcpSocket, buffer: &[u8]) -> nb::Result { @@ -208,7 +248,8 @@ where Ok(num_bytes) => { // In case the buffer is filled up, push bytes into ethernet driver if num_bytes != non_queued_bytes.len() { - self.update(true)?; + self.update()?; + self.poll()?; } // Process the unwritten bytes again, if any non_queued_bytes = &non_queued_bytes[num_bytes..] @@ -225,7 +266,8 @@ where buffer: &mut [u8], ) -> nb::Result { // Enqueue received bytes into the TCP socket buffer - self.update(true)?; + self.update()?; + self.poll()?; let mut sockets = self.sockets.borrow_mut(); let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket); let result = socket.recv_slice(buffer); -- 2.42.0 From 99899e6657a0fdc261652517bc5d7d4cb17193d8 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Thu, 11 Mar 2021 17:15:39 +0800 Subject: [PATCH 2/6] nal: Fix read/write not pushing erroneous socket back to the stack * Based on quartiq's minimq as of https://github.com/quartiq/minimq/commit/933687c2e4bc8a4d972de9a4d1508b0b554a8b38 * In minimq applications, a socket is expected to be returned when `nal::TcpStack::open()` is called * `MqttClient::read()`/`write()` takes away the TCP socket handle (wrapped as an `Option`) from its `RefCell`, and then calls `nal::TcpStack::read()`/`write()`; if NAL returns `nb::Error`, then the MQTT client will propagate and return the error, leaving `None` behind * Afterwards, when `MqttClient::socket_is_connected()` gets called (e.g. while polling the interface), it will detect that the socket handle is `None`, and attempt to call `nal::TcpStack::open()` * Since `open()` pops a socket from the array (`unused_handles`), when implementing this NAL the socket should have been pushed back to the stack, i.e. by `close()`; this prevents any future calls of `open()` from returning `NetworkError::NoSocket` due to emptiness of the array of socket handles --- src/nal.rs | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/nal.rs b/src/nal.rs index 7a0ed9f..5bb9fd3 100644 --- a/src/nal.rs +++ b/src/nal.rs @@ -236,6 +236,7 @@ where } fn write(&self, socket: &mut Self::TcpSocket, buffer: &[u8]) -> nb::Result { + let mut write_error = false; let mut non_queued_bytes = &buffer[..]; while non_queued_bytes.len() != 0 { let result = { @@ -254,9 +255,18 @@ where // Process the unwritten bytes again, if any non_queued_bytes = &non_queued_bytes[num_bytes..] } - Err(_) => return Err(nb::Error::Other(NetworkError::WriteFailure)), + Err(_) => { + write_error = true; + break; + } } } + if write_error { + // Close the socket to push it back to the array, for + // re-opening the socket in the future + self.close(*socket)?; + return Err(nb::Error::Other(NetworkError::WriteFailure)) + } Ok(buffer.len()) } @@ -268,13 +278,19 @@ where // Enqueue received bytes into the TCP socket buffer self.update()?; self.poll()?; - let mut sockets = self.sockets.borrow_mut(); - let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket); - let result = socket.recv_slice(buffer); - match result { - Ok(num_bytes) => Ok(num_bytes), - Err(_) => Err(nb::Error::Other(NetworkError::ReadFailure)), + { + let mut sockets = self.sockets.borrow_mut(); + let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket); + let result = socket.recv_slice(buffer); + match result { + Ok(num_bytes) => { return Ok(num_bytes) }, + Err(_) => {}, + } } + // Close the socket to push it back to the array, for + // re-opening the socket in the future + self.close(*socket)?; + Err(nb::Error::Other(NetworkError::ReadFailure)) } fn close(&self, socket: Self::TcpSocket) -> Result<(), Self::Error> { -- 2.42.0 From 66c3aa534f4b4a7628dae70700017164a9c5fc1d Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Fri, 12 Mar 2021 11:26:07 +0800 Subject: [PATCH 3/6] =?UTF-8?q?nal:=20socket=20(TcpSocket)=20=E2=86=92=20h?= =?UTF-8?q?andle;=20internal=5Fsocket=20=E2=86=92=20socket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/nal.rs | 60 +++++++++++++++++++++++++++--------------------------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/src/nal.rs b/src/nal.rs index 5bb9fd3..3ae4b76 100644 --- a/src/nal.rs +++ b/src/nal.rs @@ -146,8 +146,8 @@ where Some(handle) => { // Abort any active connections on the handle. let mut sockets = self.sockets.borrow_mut(); - let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(handle); - internal_socket.abort(); + let socket: &mut net::socket::TcpSocket = &mut *sockets.get(handle); + socket.abort(); Ok(handle) } None => Err(NetworkError::NoSocket), @@ -156,30 +156,30 @@ where fn connect( &self, - socket: Self::TcpSocket, + handle: Self::TcpSocket, remote: nal::SocketAddr, ) -> Result { { // If the socket has already been connected, ignore the connection // request silently. let mut sockets = self.sockets.borrow_mut(); - let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket); - if internal_socket.state() == net::socket::TcpState::Established { - return Ok(socket) + let socket: &mut net::socket::TcpSocket = &mut *sockets.get(handle); + if socket.state() == net::socket::TcpState::Established { + return Ok(handle) } } { let mut sockets = self.sockets.borrow_mut(); - let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket); + let socket: &mut net::socket::TcpSocket = &mut *sockets.get(handle); // abort() instead of close() prevents TcpSocket::connect() from // raising an error - internal_socket.abort(); + socket.abort(); match remote.ip() { nal::IpAddr::V4(addr) => { let address = net::wire::Ipv4Address::from_bytes(&addr.octets()[..]); - internal_socket + socket .connect((address, remote.port()), self.get_ephemeral_port()) .map_err(|_| NetworkError::ConnectionFailure)?; net::wire::IpAddress::Ipv4(address) @@ -187,7 +187,7 @@ where nal::IpAddr::V6(addr) => { let address = net::wire::Ipv6Address::from_parts(&addr.segments()[..]); - internal_socket + socket .connect((address, remote.port()), self.get_ephemeral_port()) .map_err(|_| NetworkError::ConnectionFailure)?; net::wire::IpAddress::Ipv6(address) @@ -201,19 +201,19 @@ where loop { { let mut sockets = self.sockets.borrow_mut(); - let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket); + let socket: &mut net::socket::TcpSocket = &mut *sockets.get(handle); // TCP state at ESTABLISHED means there is connection, so // simply return the socket. - if internal_socket.state() == net::socket::TcpState::Established { - return Ok(socket) + if socket.state() == net::socket::TcpState::Established { + return Ok(handle) } // TCP state at CLOSED implies that the remote rejected connection; // In this case, abort the connection, and then return the socket // for re-connection in the future. - if internal_socket.state() == net::socket::TcpState::Closed { - internal_socket.abort(); + if socket.state() == net::socket::TcpState::Closed { + socket.abort(); // TODO: Return Err(), but would require changes in quartiq/minimq - return Ok(socket) + return Ok(handle) } } // Any TCP states other than CLOSED and ESTABLISHED are considered @@ -224,24 +224,24 @@ where // Time out, and return the socket for re-connection in the future. if timeout_ms > self.connection_timeout_ms { // TODO: Return Err(), but would require changes in quartiq/minimq - return Ok(socket) + return Ok(handle) } } } - fn is_connected(&self, socket: &Self::TcpSocket) -> Result { + fn is_connected(&self, handle: &Self::TcpSocket) -> Result { let mut sockets = self.sockets.borrow_mut(); - let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket); - Ok(internal_socket.state() == net::socket::TcpState::Established) + let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*handle); + Ok(socket.state() == net::socket::TcpState::Established) } - fn write(&self, socket: &mut Self::TcpSocket, buffer: &[u8]) -> nb::Result { + fn write(&self, handle: &mut Self::TcpSocket, buffer: &[u8]) -> nb::Result { let mut write_error = false; let mut non_queued_bytes = &buffer[..]; while non_queued_bytes.len() != 0 { let result = { let mut sockets = self.sockets.borrow_mut(); - let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket); + let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*handle); let result = socket.send_slice(non_queued_bytes); result }; @@ -264,7 +264,7 @@ where if write_error { // Close the socket to push it back to the array, for // re-opening the socket in the future - self.close(*socket)?; + self.close(*handle)?; return Err(nb::Error::Other(NetworkError::WriteFailure)) } Ok(buffer.len()) @@ -272,7 +272,7 @@ where fn read( &self, - socket: &mut Self::TcpSocket, + handle: &mut Self::TcpSocket, buffer: &mut [u8], ) -> nb::Result { // Enqueue received bytes into the TCP socket buffer @@ -280,7 +280,7 @@ where self.poll()?; { let mut sockets = self.sockets.borrow_mut(); - let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket); + let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*handle); let result = socket.recv_slice(buffer); match result { Ok(num_bytes) => { return Ok(num_bytes) }, @@ -289,15 +289,15 @@ where } // Close the socket to push it back to the array, for // re-opening the socket in the future - self.close(*socket)?; + self.close(*handle)?; Err(nb::Error::Other(NetworkError::ReadFailure)) } - fn close(&self, socket: Self::TcpSocket) -> Result<(), Self::Error> { + fn close(&self, handle: Self::TcpSocket) -> Result<(), Self::Error> { let mut sockets = self.sockets.borrow_mut(); - let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket); - internal_socket.close(); - self.unused_handles.borrow_mut().push(socket).unwrap(); + let socket: &mut net::socket::TcpSocket = &mut *sockets.get(handle); + socket.close(); + self.unused_handles.borrow_mut().push(handle).unwrap(); Ok(()) } } \ No newline at end of file -- 2.42.0 From 6de19f43ccbf7ab60d3293e91c02c0c6f4a50064 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Fri, 12 Mar 2021 12:36:30 +0800 Subject: [PATCH 4/6] nal: Prevent pushing duplicate handles for the same TcpSocket --- src/nal.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/nal.rs b/src/nal.rs index 3ae4b76..5d23898 100644 --- a/src/nal.rs +++ b/src/nal.rs @@ -297,7 +297,10 @@ where let mut sockets = self.sockets.borrow_mut(); let socket: &mut net::socket::TcpSocket = &mut *sockets.get(handle); socket.close(); - self.unused_handles.borrow_mut().push(handle).unwrap(); + let mut unused_handles = self.unused_handles.borrow_mut(); + if unused_handles.iter().find(|&x| *x == handle).is_none() { + unused_handles.push(handle).unwrap(); + } Ok(()) } } \ No newline at end of file -- 2.42.0 From 2eadb652ff1d8e043736006aa74170dbb6d67f15 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Tue, 16 Mar 2021 10:25:23 +0800 Subject: [PATCH 5/6] nal: Fix infinite loop when TX buffer is full * For example, if the PHY linkup is down, instead of looping until resumption of the linkup, a write operation now closes the socket for re-connection in the future --- src/nal.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/nal.rs b/src/nal.rs index 5d23898..5648faf 100644 --- a/src/nal.rs +++ b/src/nal.rs @@ -247,6 +247,12 @@ where }; match result { Ok(num_bytes) => { + // If the buffer is completely filled, close the socket and + // return an error + if num_bytes == 0 { + write_error = true; + break; + } // In case the buffer is filled up, push bytes into ethernet driver if num_bytes != non_queued_bytes.len() { self.update()?; -- 2.42.0 From 232a08f11012fd749b68882140a8b9dc1a653363 Mon Sep 17 00:00:00 2001 From: Harry Ho Date: Wed, 17 Mar 2021 10:18:30 +0800 Subject: [PATCH 6/6] nal: Fix comments & styling --- src/nal.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/nal.rs b/src/nal.rs index 5648faf..798cd5f 100644 --- a/src/nal.rs +++ b/src/nal.rs @@ -194,7 +194,7 @@ where } } }; - + // Blocking connect // Loop to wait until the socket is staying established or closed, // or the connection attempt has timed out. let mut timeout_ms: u32 = 0; @@ -216,11 +216,13 @@ where return Ok(handle) } } + // Any TCP states other than CLOSED and ESTABLISHED are considered // "transient", so this function should keep waiting and let smoltcp poll // (e.g. for handling echo reqeust/reply) at the same time. timeout_ms += self.update()?; self.poll()?; + // Time out, and return the socket for re-connection in the future. if timeout_ms > self.connection_timeout_ms { // TODO: Return Err(), but would require changes in quartiq/minimq -- 2.42.0