runtime: moving rtio_mgt to async/await
This commit is contained in:
parent
f1b22330d3
commit
e8541c4cf5
@ -18,7 +18,7 @@ pub mod drtio {
|
|||||||
let aux_mutex = aux_mutex.clone();
|
let aux_mutex = aux_mutex.clone();
|
||||||
let routing_table = routing_table.clone();
|
let routing_table = routing_table.clone();
|
||||||
let up_destinations = up_destinations.clone();
|
let up_destinations = up_destinations.clone();
|
||||||
task::spawn( || {
|
task::spawn( async move {
|
||||||
let routing_table = routing_table.borrow();
|
let routing_table = routing_table.borrow();
|
||||||
link_thread(&aux_mutex, &routing_table, &up_destinations);
|
link_thread(&aux_mutex, &routing_table, &up_destinations);
|
||||||
});
|
});
|
||||||
@ -31,7 +31,7 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn recv_aux_timeout(linkno: u8, timeout: u32, timer: GlobalTimer) -> Result<drtioaux::Packet, &'static str> {
|
async fn recv_aux_timeout(linkno: u8, timeout: u32, timer: GlobalTimer) -> Result<drtioaux::Packet, &'static str> {
|
||||||
let max_time = timer.get_time() + Milliseconds(timeout);
|
let max_time = timer.get_time() + Milliseconds(timeout);
|
||||||
loop {
|
loop {
|
||||||
if !link_rx_up(linkno) {
|
if !link_rx_up(linkno) {
|
||||||
@ -48,14 +48,24 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn aux_transact(aux_mutex: &Mutex, linkno: u8, request: &drtioaux::Packet,
|
pub async fn aux_transact(aux_mutex: &Mutex, linkno: u8, request: &drtioaux::Packet,
|
||||||
timer: GlobalTimer) -> Result<drtioaux::Packet, &'static str> {
|
timer: GlobalTimer) -> Result<drtioaux::Packet, &'static str> {
|
||||||
let _lock = aux_mutex.lock();
|
let _lock = aux_mutex.lock();
|
||||||
drtioaux::send(linkno, request).unwrap();
|
drtioaux::send(linkno, request).unwrap();
|
||||||
recv_aux_timeout(linkno, 200, timer)
|
recv_aux_timeout(linkno, 200, timer).await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn ping_remote(aux_mutex: &Mutex, linkno: u8, timer: GlobalTimer) -> u32 {
|
async fn drain_buffer(linkno: u8, draining_time: Milliseconds) {
|
||||||
|
let max_time = timer.get_time() + draining_time;
|
||||||
|
loop {
|
||||||
|
if timer.get_time() > max_time {
|
||||||
|
return;
|
||||||
|
} //could this be cut short?
|
||||||
|
let _ = drtioaux::recv(linkno);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ping_remote(aux_mutex: &Mutex, linkno: u8, timer: GlobalTimer) -> u32 {
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
loop {
|
loop {
|
||||||
if !link_rx_up(linkno) {
|
if !link_rx_up(linkno) {
|
||||||
@ -65,24 +75,20 @@ pub mod drtio {
|
|||||||
if count > 100 {
|
if count > 100 {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::EchoRequest, timer);
|
let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::EchoRequest, timer).await;
|
||||||
match reply {
|
match reply {
|
||||||
Ok(drtioaux::Packet::EchoReply) => {
|
Ok(drtioaux::Packet::EchoReply) => {
|
||||||
// make sure receive buffer is drained
|
// make sure receive buffer is drained
|
||||||
let max_time = timer.get_time() + Milliseconds(200);
|
let draining_time = Milliseconds(200);
|
||||||
loop {
|
drain_buffer(linkno, draining_time).await;
|
||||||
if timer.get_time() > max_time {
|
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
let _ = drtioaux::recv(linkno);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sync_tsc(aux_mutex: &Mutex, linkno: u8, timer: GlobalTimer) -> Result<(), &'static str> {
|
async fn sync_tsc(aux_mutex: &Mutex, linkno: u8, timer: GlobalTimer) -> Result<(), &'static str> {
|
||||||
let _lock = aux_mutex.lock();
|
let _lock = aux_mutex.lock();
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
@ -91,7 +97,7 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
// TSCAck is the only aux packet that is sent spontaneously
|
// TSCAck is the only aux packet that is sent spontaneously
|
||||||
// by the satellite, in response to a TSC set on the RT link.
|
// by the satellite, in response to a TSC set on the RT link.
|
||||||
let reply = recv_aux_timeout(linkno, 10000, timer)?;
|
let reply = recv_aux_timeout(linkno, 10000, timer).await?;
|
||||||
if reply == drtioaux::Packet::TSCAck {
|
if reply == drtioaux::Packet::TSCAck {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
} else {
|
} else {
|
||||||
@ -99,13 +105,13 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn load_routing_table(aux_mutex: &Mutex, linkno: u8, routing_table: &drtio_routing::RoutingTable,
|
async fn load_routing_table(aux_mutex: &Mutex, linkno: u8, routing_table: &drtio_routing::RoutingTable,
|
||||||
timer: GlobalTimer) -> Result<(), &'static str> {
|
timer: GlobalTimer) -> Result<(), &'static str> {
|
||||||
for i in 0..drtio_routing::DEST_COUNT {
|
for i in 0..drtio_routing::DEST_COUNT {
|
||||||
let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::RoutingSetPath {
|
let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::RoutingSetPath {
|
||||||
destination: i as u8,
|
destination: i as u8,
|
||||||
hops: routing_table.0[i]
|
hops: routing_table.0[i]
|
||||||
}, timer)?;
|
}, timer).await?;
|
||||||
if reply != drtioaux::Packet::RoutingAck {
|
if reply != drtioaux::Packet::RoutingAck {
|
||||||
return Err("unexpected reply");
|
return Err("unexpected reply");
|
||||||
}
|
}
|
||||||
@ -113,17 +119,17 @@ pub mod drtio {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_rank(aux_mutex: &Mutex, linkno: u8, rank: u8, timer: GlobalTimer) -> Result<(), &'static str> {
|
async fn set_rank(aux_mutex: &Mutex, linkno: u8, rank: u8, timer: GlobalTimer) -> Result<(), &'static str> {
|
||||||
let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::RoutingSetRank {
|
let reply = aux_transact(aux_mutex, linkno, &drtioaux::Packet::RoutingSetRank {
|
||||||
rank: rank
|
rank: rank
|
||||||
}, timer)?;
|
}, timer).await?;
|
||||||
if reply != drtioaux::Packet::RoutingAck {
|
if reply != drtioaux::Packet::RoutingAck {
|
||||||
return Err("unexpected reply");
|
return Err("unexpected reply");
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init_buffer_space(destination: u8, linkno: u8) {
|
async fn init_buffer_space(destination: u8, linkno: u8) {
|
||||||
let linkno = linkno as usize;
|
let linkno = linkno as usize;
|
||||||
unsafe {
|
unsafe {
|
||||||
(csr::DRTIO[linkno].destination_write)(destination);
|
(csr::DRTIO[linkno].destination_write)(destination);
|
||||||
@ -136,7 +142,7 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_unsolicited_aux(aux_mutex: &Mutex, linkno: u8) {
|
async fn process_unsolicited_aux(aux_mutex: &Mutex, linkno: u8) {
|
||||||
let _lock = aux_mutex.lock();
|
let _lock = aux_mutex.lock();
|
||||||
match drtioaux::recv(linkno) {
|
match drtioaux::recv(linkno) {
|
||||||
Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet),
|
Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet),
|
||||||
@ -145,7 +151,7 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_local_errors(linkno: u8) {
|
async fn process_local_errors(linkno: u8) {
|
||||||
let errors;
|
let errors;
|
||||||
let linkidx = linkno as usize;
|
let linkidx = linkno as usize;
|
||||||
unsafe {
|
unsafe {
|
||||||
@ -166,7 +172,7 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn destination_set_up(routing_table: &drtio_routing::RoutingTable,
|
async fn destination_set_up(routing_table: &drtio_routing::RoutingTable,
|
||||||
up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>,
|
up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>,
|
||||||
destination: u8, up: bool) {
|
destination: u8, up: bool) {
|
||||||
let mut up_destinations = up_destinations.borrow_mut();
|
let mut up_destinations = up_destinations.borrow_mut();
|
||||||
@ -180,12 +186,12 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn destination_up(up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>, destination: u8) -> bool {
|
async fn destination_up(up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>, destination: u8) -> bool {
|
||||||
let up_destinations = up_destinations.borrow();
|
let up_destinations = up_destinations.borrow();
|
||||||
up_destinations[destination as usize]
|
up_destinations[destination as usize]
|
||||||
}
|
}
|
||||||
|
|
||||||
fn destination_survey(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable,
|
async fn destination_survey(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable,
|
||||||
up_links: &[bool],
|
up_links: &[bool],
|
||||||
up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>,
|
up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>,
|
||||||
timer: GlobalTimer) {
|
timer: GlobalTimer) {
|
||||||
@ -195,16 +201,16 @@ pub mod drtio {
|
|||||||
|
|
||||||
if hop == 0 {
|
if hop == 0 {
|
||||||
/* local RTIO */
|
/* local RTIO */
|
||||||
if !destination_up(up_destinations, destination) {
|
if !destination_up(up_destinations, destination).await {
|
||||||
destination_set_up(routing_table, up_destinations, destination, true);
|
destination_set_up(routing_table, up_destinations, destination, true).await;
|
||||||
}
|
}
|
||||||
} else if hop as usize <= csr::DRTIO.len() {
|
} else if hop as usize <= csr::DRTIO.len() {
|
||||||
let linkno = hop - 1;
|
let linkno = hop - 1;
|
||||||
if destination_up(up_destinations, destination) {
|
if destination_up(up_destinations, destination).await {
|
||||||
if up_links[linkno as usize] {
|
if up_links[linkno as usize] {
|
||||||
let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest {
|
let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest {
|
||||||
destination: destination
|
destination: destination
|
||||||
}, timer);
|
}, timer).await;
|
||||||
match reply {
|
match reply {
|
||||||
Ok(drtioaux::Packet::DestinationDownReply) =>
|
Ok(drtioaux::Packet::DestinationDownReply) =>
|
||||||
destination_set_up(routing_table, up_destinations, destination, false),
|
destination_set_up(routing_table, up_destinations, destination, false),
|
||||||
@ -219,18 +225,18 @@ pub mod drtio {
|
|||||||
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e)
|
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
destination_set_up(routing_table, up_destinations, destination, false);
|
destination_set_up(routing_table, up_destinations, destination, false).await;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if up_links[linkno as usize] {
|
if up_links[linkno as usize] {
|
||||||
let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest {
|
let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DestinationStatusRequest {
|
||||||
destination: destination
|
destination: destination
|
||||||
}, timer);
|
}, timer).await;
|
||||||
match reply {
|
match reply {
|
||||||
Ok(drtioaux::Packet::DestinationDownReply) => (),
|
Ok(drtioaux::Packet::DestinationDownReply) => (),
|
||||||
Ok(drtioaux::Packet::DestinationOkReply) => {
|
Ok(drtioaux::Packet::DestinationOkReply) => {
|
||||||
destination_set_up(routing_table, up_destinations, destination, true);
|
destination_set_up(routing_table, up_destinations, destination, true).await;
|
||||||
init_buffer_space(destination as u8, linkno);
|
init_buffer_space(destination as u8, linkno).await;
|
||||||
},
|
},
|
||||||
Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet),
|
Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet),
|
||||||
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e)
|
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e)
|
||||||
@ -241,7 +247,7 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn link_thread(io: Io, aux_mutex: &Mutex,
|
pub async fn link_thread(io: Io, aux_mutex: &Mutex,
|
||||||
routing_table: &drtio_routing::RoutingTable,
|
routing_table: &drtio_routing::RoutingTable,
|
||||||
up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>,
|
up_destinations: &RefCell<[bool; drtio_routing::DEST_COUNT]>,
|
||||||
timer: GlobalTimer) {
|
timer: GlobalTimer) {
|
||||||
@ -252,8 +258,8 @@ pub mod drtio {
|
|||||||
if up_links[linkno as usize] {
|
if up_links[linkno as usize] {
|
||||||
/* link was previously up */
|
/* link was previously up */
|
||||||
if link_rx_up(linkno) {
|
if link_rx_up(linkno) {
|
||||||
process_unsolicited_aux(&io, aux_mutex, linkno, timer);
|
process_unsolicited_aux(aux_mutex, linkno, timer).await;
|
||||||
process_local_errors(linkno);
|
process_local_errors(linkno).await;
|
||||||
} else {
|
} else {
|
||||||
info!("[LINK#{}] link is down", linkno);
|
info!("[LINK#{}] link is down", linkno);
|
||||||
up_links[linkno as usize] = false;
|
up_links[linkno as usize] = false;
|
||||||
@ -262,17 +268,17 @@ pub mod drtio {
|
|||||||
/* link was previously down */
|
/* link was previously down */
|
||||||
if link_rx_up(linkno) {
|
if link_rx_up(linkno) {
|
||||||
info!("[LINK#{}] link RX became up, pinging", linkno);
|
info!("[LINK#{}] link RX became up, pinging", linkno);
|
||||||
let ping_count = ping_remote(aux_mutex, linkno, timer);
|
let ping_count = ping_remote(aux_mutex, linkno, timer).await;
|
||||||
if ping_count > 0 {
|
if ping_count > 0 {
|
||||||
info!("[LINK#{}] remote replied after {} packets", linkno, ping_count);
|
info!("[LINK#{}] remote replied after {} packets", linkno, ping_count);
|
||||||
up_links[linkno as usize] = true;
|
up_links[linkno as usize] = true;
|
||||||
if let Err(e) = sync_tsc(aux_mutex, linkno, timer) {
|
if let Err(e) = sync_tsc(aux_mutex, linkno, timer).await {
|
||||||
error!("[LINK#{}] failed to sync TSC ({})", linkno, e);
|
error!("[LINK#{}] failed to sync TSC ({})", linkno, e);
|
||||||
}
|
}
|
||||||
if let Err(e) = load_routing_table(aux_mutex, linkno, routing_table) {
|
if let Err(e) = load_routing_table(aux_mutex, linkno, routing_table).await {
|
||||||
error!("[LINK#{}] failed to load routing table ({})", linkno, e);
|
error!("[LINK#{}] failed to load routing table ({})", linkno, e);
|
||||||
}
|
}
|
||||||
if let Err(e) = set_rank(aux_mutex, linkno, 1) {
|
if let Err(e) = set_rank(aux_mutex, linkno, 1).await {
|
||||||
error!("[LINK#{}] failed to set rank ({})", linkno, e);
|
error!("[LINK#{}] failed to set rank ({})", linkno, e);
|
||||||
}
|
}
|
||||||
info!("[LINK#{}] link initialization completed", linkno);
|
info!("[LINK#{}] link initialization completed", linkno);
|
||||||
@ -282,7 +288,7 @@ pub mod drtio {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
destination_survey(aux_mutex, routing_table, &up_links, up_destinations, timer);
|
destination_survey(aux_mutex, routing_table, &up_links, up_destinations, timer).await;
|
||||||
timer.delay_ms(200);
|
timer.delay_ms(200);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -303,8 +309,8 @@ pub mod drtio {
|
|||||||
for linkno in 0..csr::DRTIO.len() {
|
for linkno in 0..csr::DRTIO.len() {
|
||||||
let linkno = linkno as u8;
|
let linkno = linkno as u8;
|
||||||
if link_rx_up(linkno) {
|
if link_rx_up(linkno) {
|
||||||
let reply = aux_transact(io, aux_mutex, linkno,
|
let reply = task::block_on(aux_transact(io, aux_mutex, linkno,
|
||||||
&drtioaux::Packet::ResetRequest, timer);
|
&drtioaux::Packet::ResetRequest, timer));
|
||||||
match reply {
|
match reply {
|
||||||
Ok(drtioaux::Packet::ResetAck) => (),
|
Ok(drtioaux::Packet::ResetAck) => (),
|
||||||
Ok(_) => error!("[LINK#{}] reset failed, received unexpected aux packet", linkno),
|
Ok(_) => error!("[LINK#{}] reset failed, received unexpected aux packet", linkno),
|
||||||
|
Loading…
Reference in New Issue
Block a user