From 24df52268e13f2ca1ed9756bfe7c57faa7c71c98 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 20 May 2022 12:56:00 +0800 Subject: [PATCH] moninj: restructure timeout stop logging errors if satellite is unavailable drtio: don't even send message if link is down --- src/runtime/src/moninj.rs | 22 +++++++++++++--------- src/runtime/src/rtio_mgt.rs | 3 +++ 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/runtime/src/moninj.rs b/src/runtime/src/moninj.rs index 78118a04..0a92b4bf 100644 --- a/src/runtime/src/moninj.rs +++ b/src/runtime/src/moninj.rs @@ -21,6 +21,7 @@ pub enum Error { NetworkError(smoltcp::Error), UnexpectedPattern, UnrecognizedPacket, + } pub type Result = core::result::Result; @@ -71,6 +72,7 @@ mod remote_moninj { match reply { Ok(drtioaux_async::Packet::MonitorReply { value }) => return value as i64, Ok(packet) => error!("received unexpected aux packet: {:?}", packet), + Err("link went down") => {}, Err(e) => error!("aux packet error ({})", e) } 0 @@ -97,6 +99,7 @@ mod remote_moninj { match reply { Ok(drtioaux_async::Packet::InjectionStatusReply { value }) => return value as i8, Ok(packet) => error!("received unexpected aux packet: {:?}", packet), + Err("link went down") => {}, Err(e) => error!("aux packet error ({})", e) } 0 @@ -164,19 +167,20 @@ async fn handle_connection(stream: &TcpStream, timer: GlobalTimer, let mut probe_watch_list: BTreeMap<(i32, i8), Option> = BTreeMap::new(); let mut inject_watch_list: BTreeMap<(i32, i8), Option> = BTreeMap::new(); let mut next_check = timer.get_time(); + let timeout = |next_check: Milliseconds| -> nb::Result<(), Void> { + if timer.get_time() < next_check { + Err(nb::Error::WouldBlock) + } else { + Ok(()) + } + }; loop { // TODO: we don't need fuse() here. // remove after https://github.com/rust-lang/futures-rs/issues/1989 lands let read_message_f = read_i8(&stream).fuse(); let next_check_c = next_check.clone(); - let timeout = || -> nb::Result<(), Void> { - if timer.get_time() < next_check_c { - Err(nb::Error::WouldBlock) - } else { - Ok(()) - } - }; - let timeout_f = block_async!(timeout()).fuse(); + + let timeout_f = block_async!(timeout(next_check_c)).fuse(); pin_mut!(read_message_f, timeout_f); select_biased! { message = read_message_f => { @@ -246,7 +250,7 @@ async fn handle_connection(stream: &TcpStream, timer: GlobalTimer, *previous = Some(current); } } - next_check = next_check + Milliseconds(200); + next_check = timer.get_time() + Milliseconds(200); } } } diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index d72a9028..17f86796 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -49,6 +49,9 @@ pub mod drtio { pub async fn aux_transact(aux_mutex: &Mutex, linkno: u8, request: &Packet, timer: GlobalTimer) -> Result { + if !link_rx_up(linkno).await { + return Err("link went down"); + } let _lock = aux_mutex.lock(); drtioaux_async::send(linkno, request).await.unwrap(); recv_aux_timeout(linkno, 200, timer).await