nal: Prevent looping until the stack successfully connects to remote
* `NetworkStack::connect()`: * Add timeout for connection attempt * Now returns the socket at TCP ESTABLISHED or CLOSED states, or after connection timeout * Split `NetworkStack::update()` into `update()` (for controlling the clock) and `poll()` (for polling the smoltcp EthernetInterface) * Also remove option `auto_time_update`; the main application is responsible for what values `embedded_time::clock::Clock::try_now()` should return
This commit is contained in:
parent
6506562c3a
commit
d9e50bbcb6
166
src/nal.rs
166
src/nal.rs
|
@ -1,5 +1,5 @@
|
||||||
use core::cell::RefCell;
|
use core::cell::RefCell;
|
||||||
use core::convert::TryFrom;
|
use core::convert::TryInto;
|
||||||
use heapless::{consts, Vec};
|
use heapless::{consts, Vec};
|
||||||
use embedded_nal as nal;
|
use embedded_nal as nal;
|
||||||
use nal::nb;
|
use nal::nb;
|
||||||
|
@ -40,7 +40,8 @@ where
|
||||||
unused_handles: RefCell<Vec<net::socket::SocketHandle, consts::U16>>,
|
unused_handles: RefCell<Vec<net::socket::SocketHandle, consts::U16>>,
|
||||||
time_ms: RefCell<u32>,
|
time_ms: RefCell<u32>,
|
||||||
last_update_instant: RefCell<Option<time::Instant<IntClock>>>,
|
last_update_instant: RefCell<Option<time::Instant<IntClock>>>,
|
||||||
clock: IntClock
|
clock: IntClock,
|
||||||
|
connection_timeout_ms: u32,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a, SPI, NSS, IntClock> NetworkStack<'a, SPI, NSS, IntClock>
|
impl<'a, SPI, NSS, IntClock> NetworkStack<'a, SPI, NSS, IntClock>
|
||||||
|
@ -49,7 +50,12 @@ where
|
||||||
NSS: OutputPin,
|
NSS: OutputPin,
|
||||||
IntClock: time::Clock<T = u32>,
|
IntClock: time::Clock<T = u32>,
|
||||||
{
|
{
|
||||||
pub fn new(interface: NetworkInterface<SPI, NSS>, sockets: net::socket::SocketSet<'a>, clock: IntClock) -> Self {
|
pub fn new(
|
||||||
|
interface: NetworkInterface<SPI, NSS>,
|
||||||
|
sockets: net::socket::SocketSet<'a>,
|
||||||
|
clock: IntClock,
|
||||||
|
connection_timeout_ms: u32,
|
||||||
|
) -> Self {
|
||||||
let mut unused_handles: Vec<net::socket::SocketHandle, consts::U16> = Vec::new();
|
let mut unused_handles: Vec<net::socket::SocketHandle, consts::U16> = Vec::new();
|
||||||
for socket in sockets.iter() {
|
for socket in sockets.iter() {
|
||||||
unused_handles.push(socket.handle()).unwrap();
|
unused_handles.push(socket.handle()).unwrap();
|
||||||
|
@ -62,34 +68,55 @@ where
|
||||||
time_ms: RefCell::new(0),
|
time_ms: RefCell::new(0),
|
||||||
last_update_instant: RefCell::new(None),
|
last_update_instant: RefCell::new(None),
|
||||||
clock,
|
clock,
|
||||||
|
connection_timeout_ms,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Include auto_time_update to allow Instant::now() to not be called
|
// Initiate or advance the timer, and return the duration in ms as u32.
|
||||||
// Instant::now() is not safe to call in `init()` context
|
fn update(&self) -> Result<u32, NetworkError> {
|
||||||
pub fn update(&self, auto_time_update: bool) -> Result<bool, NetworkError> {
|
let mut duration_ms: u32 = 0;
|
||||||
if auto_time_update {
|
// Check if it is the first time the stack has updated the time itself
|
||||||
// Check if it is the first time the stack has updated the time itself
|
let now = match *self.last_update_instant.borrow() {
|
||||||
let now = match *self.last_update_instant.borrow() {
|
// If it is the first time, do not advance time
|
||||||
// If it is the first time, do not advance time
|
// Simply store the current instant to initiate time updating
|
||||||
// Simply store the current instant to initiate time updating
|
None => self.clock.try_now().map_err(|_| NetworkError::TimeFault)?,
|
||||||
None => self.clock.try_now().map_err(|_| NetworkError::TimeFault)?,
|
// If it was updated before, advance time and update last_update_instant
|
||||||
// If it was updated before, advance time and update last_update_instant
|
Some(instant) => {
|
||||||
Some(instant) => {
|
// Calculate elapsed time
|
||||||
// Calculate elapsed time
|
let now = self.clock.try_now().map_err(|_| NetworkError::TimeFault)?;
|
||||||
let now = self.clock.try_now().map_err(|_| NetworkError::TimeFault)?;
|
let mut duration = now.checked_duration_since(&instant);
|
||||||
let duration = now.checked_duration_since(&instant).ok_or(NetworkError::TimeFault)?;
|
// Normally, the wrapping clock should produce a valid duration.
|
||||||
let duration_ms = time::duration::Milliseconds::<u32>::try_from(duration).map_err(|_| NetworkError::TimeFault)?;
|
// However, if `now` is earlier than `instant` (e.g. because the main
|
||||||
// Adjust duration into ms (note: decimal point truncated)
|
// application cannot get a valid epoch time during initialisation,
|
||||||
self.advance_time(*duration_ms.integer());
|
// we should still produce a duration that is just 1ms.
|
||||||
now
|
if duration.is_none() {
|
||||||
|
self.time_ms.replace(0);
|
||||||
|
duration = Some(Milliseconds(1_u32)
|
||||||
|
.to_generic::<u32>(IntClock::SCALING_FACTOR)
|
||||||
|
.map_err(|_| NetworkError::TimeFault)?);
|
||||||
}
|
}
|
||||||
};
|
let duration_ms_time: Milliseconds<u32> = duration.unwrap().try_into()
|
||||||
self.last_update_instant.replace(Some(now));
|
.map_err(|_| NetworkError::TimeFault)?;
|
||||||
}
|
duration_ms = *duration_ms_time.integer();
|
||||||
|
// Adjust duration into ms (note: decimal point truncated)
|
||||||
|
self.advance_time(duration_ms);
|
||||||
|
now
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.last_update_instant.replace(Some(now));
|
||||||
|
Ok(duration_ms)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn advance_time(&self, duration_ms: u32) {
|
||||||
|
let time = self.time_ms.borrow().wrapping_add(duration_ms);
|
||||||
|
self.time_ms.replace(time);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Poll on the smoltcp interface
|
||||||
|
fn poll(&self) -> Result<bool, NetworkError> {
|
||||||
match self.network_interface.borrow_mut().poll(
|
match self.network_interface.borrow_mut().poll(
|
||||||
&mut self.sockets.borrow_mut(),
|
&mut self.sockets.borrow_mut(),
|
||||||
net::time::Instant::from_millis(*self.time_ms.borrow() as i64),
|
net::time::Instant::from_millis(*self.time_ms.borrow() as u32),
|
||||||
) {
|
) {
|
||||||
Ok(changed) => Ok(!changed),
|
Ok(changed) => Ok(!changed),
|
||||||
Err(_e) => {
|
Err(_e) => {
|
||||||
|
@ -98,11 +125,6 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn advance_time(&self, duration: u32) {
|
|
||||||
let time = self.time_ms.try_borrow().unwrap().wrapping_add(duration);
|
|
||||||
self.time_ms.replace(time);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_ephemeral_port(&self) -> u16 {
|
fn get_ephemeral_port(&self) -> u16 {
|
||||||
// Get the next ephemeral port
|
// Get the next ephemeral port
|
||||||
let current_port = self.next_port.borrow().clone();
|
let current_port = self.next_port.borrow().clone();
|
||||||
|
@ -132,20 +154,27 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ideally connect is only to be performed in `init()` of `main.rs`
|
|
||||||
// Calling `Instant::now()` of `rtic::cyccnt` would face correctness issue during `init()`
|
|
||||||
fn connect(
|
fn connect(
|
||||||
&self,
|
&self,
|
||||||
socket: Self::TcpSocket,
|
socket: Self::TcpSocket,
|
||||||
remote: nal::SocketAddr,
|
remote: nal::SocketAddr,
|
||||||
) -> Result<Self::TcpSocket, Self::Error> {
|
) -> Result<Self::TcpSocket, Self::Error> {
|
||||||
let address = {
|
{
|
||||||
|
// If the socket has already been connected, ignore the connection
|
||||||
|
// request silently.
|
||||||
let mut sockets = self.sockets.borrow_mut();
|
let mut sockets = self.sockets.borrow_mut();
|
||||||
let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket);
|
let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket);
|
||||||
// If we're already in the process of connecting, ignore the request silently.
|
if internal_socket.state() == net::socket::TcpState::Established {
|
||||||
if internal_socket.is_open() {
|
return Ok(socket)
|
||||||
return Ok(socket);
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let mut sockets = self.sockets.borrow_mut();
|
||||||
|
let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket);
|
||||||
|
// abort() instead of close() prevents TcpSocket::connect() from
|
||||||
|
// raising an error
|
||||||
|
internal_socket.abort();
|
||||||
match remote.ip() {
|
match remote.ip() {
|
||||||
nal::IpAddr::V4(addr) => {
|
nal::IpAddr::V4(addr) => {
|
||||||
let address =
|
let address =
|
||||||
|
@ -154,45 +183,56 @@ where
|
||||||
.connect((address, remote.port()), self.get_ephemeral_port())
|
.connect((address, remote.port()), self.get_ephemeral_port())
|
||||||
.map_err(|_| NetworkError::ConnectionFailure)?;
|
.map_err(|_| NetworkError::ConnectionFailure)?;
|
||||||
net::wire::IpAddress::Ipv4(address)
|
net::wire::IpAddress::Ipv4(address)
|
||||||
}
|
},
|
||||||
nal::IpAddr::V6(addr) => {
|
nal::IpAddr::V6(addr) => {
|
||||||
let address = net::wire::Ipv6Address::from_parts(&addr.segments()[..]);
|
let address =
|
||||||
internal_socket.connect((address, remote.port()), self.get_ephemeral_port())
|
net::wire::Ipv6Address::from_parts(&addr.segments()[..]);
|
||||||
|
internal_socket
|
||||||
|
.connect((address, remote.port()), self.get_ephemeral_port())
|
||||||
.map_err(|_| NetworkError::ConnectionFailure)?;
|
.map_err(|_| NetworkError::ConnectionFailure)?;
|
||||||
net::wire::IpAddress::Ipv6(address)
|
net::wire::IpAddress::Ipv6(address)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
// Blocking connect
|
|
||||||
|
// Loop to wait until the socket is staying established or closed,
|
||||||
|
// or the connection attempt has timed out.
|
||||||
|
let mut timeout_ms: u32 = 0;
|
||||||
loop {
|
loop {
|
||||||
match self.is_connected(&socket) {
|
{
|
||||||
Ok(true) => break,
|
let mut sockets = self.sockets.borrow_mut();
|
||||||
_ => {
|
let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket);
|
||||||
let mut sockets = self.sockets.borrow_mut();
|
// TCP state at ESTABLISHED means there is connection, so
|
||||||
let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(socket);
|
// simply return the socket.
|
||||||
// If the connect got ACK->RST, it will end up in Closed TCP state
|
if internal_socket.state() == net::socket::TcpState::Established {
|
||||||
// Perform reconnection in this case
|
return Ok(socket)
|
||||||
if internal_socket.state() == net::socket::TcpState::Closed {
|
}
|
||||||
internal_socket.close();
|
// TCP state at CLOSED implies that the remote rejected connection;
|
||||||
internal_socket
|
// In this case, abort the connection, and then return the socket
|
||||||
.connect((address, remote.port()), self.get_ephemeral_port())
|
// for re-connection in the future.
|
||||||
.map_err(|_| NetworkError::ConnectionFailure)?;
|
if internal_socket.state() == net::socket::TcpState::Closed {
|
||||||
}
|
internal_socket.abort();
|
||||||
|
// TODO: Return Err(), but would require changes in quartiq/minimq
|
||||||
|
return Ok(socket)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Avoid using Instant::now() and Advance time manually
|
// Any TCP states other than CLOSED and ESTABLISHED are considered
|
||||||
self.update(false)?;
|
// "transient", so this function should keep waiting and let smoltcp poll
|
||||||
{
|
// (e.g. for handling echo reqeust/reply) at the same time.
|
||||||
self.advance_time(1);
|
timeout_ms += self.update()?;
|
||||||
|
self.poll()?;
|
||||||
|
// Time out, and return the socket for re-connection in the future.
|
||||||
|
if timeout_ms > self.connection_timeout_ms {
|
||||||
|
// TODO: Return Err(), but would require changes in quartiq/minimq
|
||||||
|
return Ok(socket)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(socket)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_connected(&self, socket: &Self::TcpSocket) -> Result<bool, Self::Error> {
|
fn is_connected(&self, socket: &Self::TcpSocket) -> Result<bool, Self::Error> {
|
||||||
let mut sockets = self.sockets.borrow_mut();
|
let mut sockets = self.sockets.borrow_mut();
|
||||||
let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket);
|
let internal_socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket);
|
||||||
Ok(socket.may_send() && socket.may_recv())
|
Ok(internal_socket.state() == net::socket::TcpState::Established)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write(&self, socket: &mut Self::TcpSocket, buffer: &[u8]) -> nb::Result<usize, Self::Error> {
|
fn write(&self, socket: &mut Self::TcpSocket, buffer: &[u8]) -> nb::Result<usize, Self::Error> {
|
||||||
|
@ -208,7 +248,8 @@ where
|
||||||
Ok(num_bytes) => {
|
Ok(num_bytes) => {
|
||||||
// In case the buffer is filled up, push bytes into ethernet driver
|
// In case the buffer is filled up, push bytes into ethernet driver
|
||||||
if num_bytes != non_queued_bytes.len() {
|
if num_bytes != non_queued_bytes.len() {
|
||||||
self.update(true)?;
|
self.update()?;
|
||||||
|
self.poll()?;
|
||||||
}
|
}
|
||||||
// Process the unwritten bytes again, if any
|
// Process the unwritten bytes again, if any
|
||||||
non_queued_bytes = &non_queued_bytes[num_bytes..]
|
non_queued_bytes = &non_queued_bytes[num_bytes..]
|
||||||
|
@ -225,7 +266,8 @@ where
|
||||||
buffer: &mut [u8],
|
buffer: &mut [u8],
|
||||||
) -> nb::Result<usize, Self::Error> {
|
) -> nb::Result<usize, Self::Error> {
|
||||||
// Enqueue received bytes into the TCP socket buffer
|
// Enqueue received bytes into the TCP socket buffer
|
||||||
self.update(true)?;
|
self.update()?;
|
||||||
|
self.poll()?;
|
||||||
let mut sockets = self.sockets.borrow_mut();
|
let mut sockets = self.sockets.borrow_mut();
|
||||||
let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket);
|
let socket: &mut net::socket::TcpSocket = &mut *sockets.get(*socket);
|
||||||
let result = socket.recv_slice(buffer);
|
let result = socket.recv_slice(buffer);
|
||||||
|
|
Loading…
Reference in New Issue