moninj: restructure timeout

stop logging errors if satellite is unavailable
drtio: don't even send message if link is down
pull/190/head
mwojcik 2022-05-20 12:56:00 +08:00
parent 48c9b43171
commit 24df52268e
2 changed files with 16 additions and 9 deletions

View File

@ -21,6 +21,7 @@ pub enum Error {
NetworkError(smoltcp::Error), NetworkError(smoltcp::Error),
UnexpectedPattern, UnexpectedPattern,
UnrecognizedPacket, UnrecognizedPacket,
} }
pub type Result<T> = core::result::Result<T, Error>; pub type Result<T> = core::result::Result<T, Error>;
@ -71,6 +72,7 @@ mod remote_moninj {
match reply { match reply {
Ok(drtioaux_async::Packet::MonitorReply { value }) => return value as i64, Ok(drtioaux_async::Packet::MonitorReply { value }) => return value as i64,
Ok(packet) => error!("received unexpected aux packet: {:?}", packet), Ok(packet) => error!("received unexpected aux packet: {:?}", packet),
Err("link went down") => {},
Err(e) => error!("aux packet error ({})", e) Err(e) => error!("aux packet error ({})", e)
} }
0 0
@ -97,6 +99,7 @@ mod remote_moninj {
match reply { match reply {
Ok(drtioaux_async::Packet::InjectionStatusReply { value }) => return value as i8, Ok(drtioaux_async::Packet::InjectionStatusReply { value }) => return value as i8,
Ok(packet) => error!("received unexpected aux packet: {:?}", packet), Ok(packet) => error!("received unexpected aux packet: {:?}", packet),
Err("link went down") => {},
Err(e) => error!("aux packet error ({})", e) Err(e) => error!("aux packet error ({})", e)
} }
0 0
@ -164,19 +167,20 @@ async fn handle_connection(stream: &TcpStream, timer: GlobalTimer,
let mut probe_watch_list: BTreeMap<(i32, i8), Option<i64>> = BTreeMap::new(); let mut probe_watch_list: BTreeMap<(i32, i8), Option<i64>> = BTreeMap::new();
let mut inject_watch_list: BTreeMap<(i32, i8), Option<i8>> = BTreeMap::new(); let mut inject_watch_list: BTreeMap<(i32, i8), Option<i8>> = BTreeMap::new();
let mut next_check = timer.get_time(); let mut next_check = timer.get_time();
let timeout = |next_check: Milliseconds| -> nb::Result<(), Void> {
if timer.get_time() < next_check {
Err(nb::Error::WouldBlock)
} else {
Ok(())
}
};
loop { loop {
// TODO: we don't need fuse() here. // TODO: we don't need fuse() here.
// remove after https://github.com/rust-lang/futures-rs/issues/1989 lands // remove after https://github.com/rust-lang/futures-rs/issues/1989 lands
let read_message_f = read_i8(&stream).fuse(); let read_message_f = read_i8(&stream).fuse();
let next_check_c = next_check.clone(); let next_check_c = next_check.clone();
let timeout = || -> nb::Result<(), Void> {
if timer.get_time() < next_check_c { let timeout_f = block_async!(timeout(next_check_c)).fuse();
Err(nb::Error::WouldBlock)
} else {
Ok(())
}
};
let timeout_f = block_async!(timeout()).fuse();
pin_mut!(read_message_f, timeout_f); pin_mut!(read_message_f, timeout_f);
select_biased! { select_biased! {
message = read_message_f => { message = read_message_f => {
@ -246,7 +250,7 @@ async fn handle_connection(stream: &TcpStream, timer: GlobalTimer,
*previous = Some(current); *previous = Some(current);
} }
} }
next_check = next_check + Milliseconds(200); next_check = timer.get_time() + Milliseconds(200);
} }
} }
} }

View File

@ -49,6 +49,9 @@ pub mod drtio {
pub async fn aux_transact(aux_mutex: &Mutex<bool>, linkno: u8, request: &Packet, pub async fn aux_transact(aux_mutex: &Mutex<bool>, linkno: u8, request: &Packet,
timer: GlobalTimer) -> Result<Packet, &'static str> { timer: GlobalTimer) -> Result<Packet, &'static str> {
if !link_rx_up(linkno).await {
return Err("link went down");
}
let _lock = aux_mutex.lock(); let _lock = aux_mutex.lock();
drtioaux_async::send(linkno, request).await.unwrap(); drtioaux_async::send(linkno, request).await.unwrap();
recv_aux_timeout(linkno, 200, timer).await recv_aux_timeout(linkno, 200, timer).await