From e8541c4cf507495f2a1094ced6c07f0289dd7494 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Thu, 23 Sep 2021 13:22:27 +0200 Subject: [PATCH] runtime: moving rtio_mgt to async/await --- src/runtime/src/rtio_mgt.rs | 92 ++++++++++++++++++++----------------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index ffe697b4..6baff995 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -18,7 +18,7 @@ pub mod drtio { let aux_mutex = aux_mutex.clone(); let routing_table = routing_table.clone(); let up_destinations = up_destinations.clone(); - task::spawn( || { + task::spawn( async move { let routing_table = routing_table.borrow(); link_thread(&aux_mutex, &routing_table, &up_destinations); }); @@ -31,7 +31,7 @@ pub mod drtio { } } - fn recv_aux_timeout(linkno: u8, timeout: u32, timer: GlobalTimer) -> Result { + async fn recv_aux_timeout(linkno: u8, timeout: u32, timer: GlobalTimer) -> Result { let max_time = timer.get_time() + Milliseconds(timeout); loop { if !link_rx_up(linkno) { @@ -48,14 +48,24 @@ pub mod drtio { } } - pub fn aux_transact(aux_mutex: &Mutex, linkno: u8, request: &drtioaux::Packet, + pub async fn aux_transact(aux_mutex: &Mutex, linkno: u8, request: &drtioaux::Packet, timer: GlobalTimer) -> Result { let _lock = aux_mutex.lock(); drtioaux::send(linkno, request).unwrap(); - recv_aux_timeout(linkno, 200, timer) + recv_aux_timeout(linkno, 200, timer).await } - fn ping_remote(aux_mutex: &Mutex, linkno: u8, timer: GlobalTimer) -> u32 { + async fn drain_buffer(linkno: u8, draining_time: Milliseconds) { + let max_time = timer.get_time() + draining_time; + loop { + if timer.get_time() > max_time { + return; + } //could this be cut short? + let _ = drtioaux::recv(linkno); + } + } + + async fn ping_remote(aux_mutex: &Mutex, linkno: u8, timer: GlobalTimer) -> u32 { let mut count = 0; loop { if !link_rx_up(linkno) { @@ -65,24 +75,20 @@ pub mod drtio { if count > 100 { return 0; } - let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::EchoRequest, timer); + let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::EchoRequest, timer).await; match reply { Ok(drtioaux::Packet::EchoReply) => { // make sure receive buffer is drained - let max_time = timer.get_time() + Milliseconds(200); - loop { - if timer.get_time() > max_time { - return count; - } - let _ = drtioaux::recv(linkno); - } + let draining_time = Milliseconds(200); + drain_buffer(linkno, draining_time).await; + return count; } _ => {} } } } - fn sync_tsc(aux_mutex: &Mutex, linkno: u8, timer: GlobalTimer) -> Result<(), &'static str> { + async fn sync_tsc(aux_mutex: &Mutex, linkno: u8, timer: GlobalTimer) -> Result<(), &'static str> { let _lock = aux_mutex.lock(); unsafe { @@ -91,7 +97,7 @@ pub mod drtio { } // TSCAck is the only aux packet that is sent spontaneously // by the satellite, in response to a TSC set on the RT link. - let reply = recv_aux_timeout(linkno, 10000, timer)?; + let reply = recv_aux_timeout(linkno, 10000, timer).await?; if reply == drtioaux::Packet::TSCAck { return Ok(()); } else { @@ -99,13 +105,13 @@ pub mod drtio { } } - fn load_routing_table(aux_mutex: &Mutex, linkno: u8, routing_table: &drtio_routing::RoutingTable, + async fn load_routing_table(aux_mutex: &Mutex, linkno: u8, routing_table: &drtio_routing::RoutingTable, timer: GlobalTimer) -> Result<(), &'static str> { for i in 0..drtio_routing::DEST_COUNT { let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::RoutingSetPath { destination: i as u8, hops: routing_table.0[i] - }, timer)?; + }, timer).await?; if reply != drtioaux::Packet::RoutingAck { return Err("unexpected reply"); } @@ -113,17 +119,17 @@ pub mod drtio { Ok(()) } - fn set_rank(aux_mutex: &Mutex, linkno: u8, rank: u8, timer: GlobalTimer) -> Result<(), &'static str> { + async fn set_rank(aux_mutex: &Mutex, linkno: u8, rank: u8, timer: GlobalTimer) -> Result<(), &'static str> { let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::RoutingSetRank { rank: rank - }, timer)?; + }, timer).await?; if reply != drtioaux::Packet::RoutingAck { return Err("unexpected reply"); } Ok(()) } - fn init_buffer_space(destination: u8, linkno: u8) { + async fn init_buffer_space(destination: u8, linkno: u8) { let linkno = linkno as usize; unsafe { (csr::DRTIO[linkno].destination_write)(destination); @@ -136,7 +142,7 @@ pub mod drtio { } } - fn process_unsolicited_aux(aux_mutex: &Mutex, linkno: u8) { + async fn process_unsolicited_aux(aux_mutex: &Mutex, linkno: u8) { let _lock = aux_mutex.lock(); match drtioaux::recv(linkno) { Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), @@ -145,7 +151,7 @@ pub mod drtio { } } - fn process_local_errors(linkno: u8) { + async fn process_local_errors(linkno: u8) { let errors; let linkidx = linkno as usize; unsafe { @@ -166,7 +172,7 @@ pub mod drtio { } } - fn destination_set_up(routing_table: &drtio_routing::RoutingTable, + async fn destination_set_up(routing_table: &drtio_routing::RoutingTable, up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>, destination: u8, up: bool) { let mut up_destinations = up_destinations.borrow_mut(); @@ -180,12 +186,12 @@ pub mod drtio { } } - fn destination_up(up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>, destination: u8) -> bool { + async fn destination_up(up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>, destination: u8) -> bool { let up_destinations = up_destinations.borrow(); up_destinations[destination as usize] } - fn destination_survey(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, + async fn destination_survey(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_links: &[bool], up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>, timer: GlobalTimer) { @@ -195,16 +201,16 @@ pub mod drtio { if hop == 0 { /* local RTIO */ - if !destination_up(up_destinations, destination) { - destination_set_up(routing_table, up_destinations, destination, true); + if !destination_up(up_destinations, destination).await { + destination_set_up(routing_table, up_destinations, destination, true).await; } } else if hop as usize <= csr::DRTIO.len() { let linkno = hop - 1; - if destination_up(up_destinations, destination) { + if destination_up(up_destinations, destination).await { if up_links[linkno as usize] { let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest { destination: destination - }, timer); + }, timer).await; match reply { Ok(drtioaux::Packet::DestinationDownReply) => destination_set_up(routing_table, up_destinations, destination, false), @@ -219,18 +225,18 @@ pub mod drtio { Err(e) => error!("[DEST#{}] communication failed ({})", destination, e) } } else { - destination_set_up(routing_table, up_destinations, destination, false); + destination_set_up(routing_table, up_destinations, destination, false).await; } } else { if up_links[linkno as usize] { let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest { destination: destination - }, timer); + }, timer).await; match reply { Ok(drtioaux::Packet::DestinationDownReply) => (), Ok(drtioaux::Packet::DestinationOkReply) => { - destination_set_up(routing_table, up_destinations, destination, true); - init_buffer_space(destination as u8, linkno); + destination_set_up(routing_table, up_destinations, destination, true).await; + init_buffer_space(destination as u8, linkno).await; }, Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), Err(e) => error!("[DEST#{}] communication failed ({})", destination, e) @@ -241,7 +247,7 @@ pub mod drtio { } } - pub fn link_thread(io: Io, aux_mutex: &Mutex, + pub async fn link_thread(io: Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>, timer: GlobalTimer) { @@ -252,8 +258,8 @@ pub mod drtio { if up_links[linkno as usize] { /* link was previously up */ if link_rx_up(linkno) { - process_unsolicited_aux(&io, aux_mutex, linkno, timer); - process_local_errors(linkno); + process_unsolicited_aux(aux_mutex, linkno, timer).await; + process_local_errors(linkno).await; } else { info!("[LINK#{}] link is down", linkno); up_links[linkno as usize] = false; @@ -262,17 +268,17 @@ pub mod drtio { /* link was previously down */ if link_rx_up(linkno) { info!("[LINK#{}] link RX became up, pinging", linkno); - let ping_count = ping_remote(aux_mutex, linkno, timer); + let ping_count = ping_remote(aux_mutex, linkno, timer).await; if ping_count > 0 { info!("[LINK#{}] remote replied after {} packets", linkno, ping_count); up_links[linkno as usize] = true; - if let Err(e) = sync_tsc(aux_mutex, linkno, timer) { + if let Err(e) = sync_tsc(aux_mutex, linkno, timer).await { error!("[LINK#{}] failed to sync TSC ({})", linkno, e); } - if let Err(e) = load_routing_table(aux_mutex, linkno, routing_table) { + if let Err(e) = load_routing_table(aux_mutex, linkno, routing_table).await { error!("[LINK#{}] failed to load routing table ({})", linkno, e); } - if let Err(e) = set_rank(aux_mutex, linkno, 1) { + if let Err(e) = set_rank(aux_mutex, linkno, 1).await { error!("[LINK#{}] failed to set rank ({})", linkno, e); } info!("[LINK#{}] link initialization completed", linkno); @@ -282,7 +288,7 @@ pub mod drtio { } } } - destination_survey(aux_mutex, routing_table, &up_links, up_destinations, timer); + destination_survey(aux_mutex, routing_table, &up_links, up_destinations, timer).await; timer.delay_ms(200); } } @@ -303,8 +309,8 @@ pub mod drtio { for linkno in 0..csr::DRTIO.len() { let linkno = linkno as u8; if link_rx_up(linkno) { - let reply = aux_transact(io, aux_mutex, linkno, - &drtioaux::Packet::ResetRequest, timer); + let reply = task::block_on(aux_transact(io, aux_mutex, linkno, + &drtioaux::Packet::ResetRequest, timer)); match reply { Ok(drtioaux::Packet::ResetAck) => (), Ok(_) => error!("[LINK#{}] reset failed, received unexpected aux packet", linkno),