From 5b90d469674b2dad99e6621902835ebca7e54da2 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Mon, 8 Jul 2024 15:31:54 +0800 Subject: [PATCH] repeater: handle async messages --- src/satman/src/main.rs | 343 +++++++++++++++++++++++++++++++++---- src/satman/src/repeater.rs | 33 +++- 2 files changed, 342 insertions(+), 34 deletions(-) diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index bb41fda..c29eb9a 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -83,13 +83,29 @@ fn drtiosat_tsc_loaded() -> bool { #[cfg(has_drtio_routing)] macro_rules! forward { - ($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{ + ( + $router:expr, + $routing_table:expr, + $destination:expr, + $rank:expr, + $self_destination:expr, + $repeaters:expr, + $packet:expr, + $timer:expr + ) => {{ let hop = $routing_table.0[$destination as usize][$rank as usize]; if hop != 0 { let repno = (hop - 1) as usize; if repno < $repeaters.len() { if $packet.expects_response() { - return $repeaters[repno].aux_forward($packet, $timer); + return $repeaters[repno].aux_forward( + $packet, + $router, + $routing_table, + $rank, + $self_destination, + $timer, + ); } else { return $repeaters[repno].aux_send($packet); } @@ -102,7 +118,16 @@ macro_rules! forward { #[cfg(not(has_drtio_routing))] macro_rules! forward { - ($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {}; + ( + $router:expr, + $routing_table:expr, + $destination:expr, + $rank:expr, + $self_destination:expr, + $repeaters:expr, + $packet:expr, + $timer:expr + ) => {}; } fn process_aux_packet( @@ -183,6 +208,10 @@ fn process_aux_packet( &drtioaux::Packet::DestinationStatusRequest { destination: destination, }, + router, + _routing_table, + *rank, + *self_destination, timer, ) { Ok(()) => (), @@ -244,7 +273,16 @@ fn process_aux_packet( channel, probe, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let value; #[cfg(has_rtio_moninj)] unsafe { @@ -266,7 +304,16 @@ fn process_aux_packet( overrd, value, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); #[cfg(has_rtio_moninj)] unsafe { csr::rtio_moninj::inj_chan_sel_write(channel as _); @@ -280,7 +327,16 @@ fn process_aux_packet( channel, overrd, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let value; #[cfg(has_rtio_moninj)] unsafe { @@ -299,7 +355,16 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let succeeded = i2c.start().is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } @@ -307,7 +372,16 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let succeeded = i2c.restart().is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } @@ -315,7 +389,16 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let succeeded = i2c.stop().is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } @@ -324,7 +407,16 @@ fn process_aux_packet( busno: _busno, data, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); match i2c.write(data) { Ok(ack) => drtioaux::send( 0, @@ -347,7 +439,16 @@ fn process_aux_packet( busno: _busno, ack, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); match i2c.read(ack) { Ok(data) => drtioaux::send( 0, @@ -371,7 +472,16 @@ fn process_aux_packet( address, mask, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let ch = match mask { //decode from mainline, PCA9548-centric API 0x00 => None, @@ -397,7 +507,16 @@ fn process_aux_packet( div: _div, cs: _cs, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); // todo: reimplement when/if SPI is available //let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok(); drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) @@ -407,7 +526,16 @@ fn process_aux_packet( busno: _busno, data: _data, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); // todo: reimplement when/if SPI is available //let succeeded = spi::write(busno, data).is_ok(); drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) @@ -416,7 +544,16 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); // todo: reimplement when/if SPI is available // match spi::read(busno) { // Ok(data) => drtioaux::send(0, @@ -436,7 +573,16 @@ fn process_aux_packet( drtioaux::Packet::AnalyzerHeaderRequest { destination: _destination, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let header = analyzer.get_header(); drtioaux::send( 0, @@ -450,7 +596,16 @@ fn process_aux_packet( drtioaux::Packet::AnalyzerDataRequest { destination: _destination, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let meta = analyzer.get_data(&mut data_slice); drtioaux::send( @@ -471,7 +626,16 @@ fn process_aux_packet( length, trace, } => { - forward!(_routing_table, destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); *self_destination = destination; let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok(); router.send( @@ -492,7 +656,16 @@ fn process_aux_packet( id, succeeded, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); dma_manager.ack_upload( kernel_manager, source, @@ -510,7 +683,16 @@ fn process_aux_packet( destination: _destination, id, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let succeeded = dma_manager.erase(source, id).is_ok(); router.send( drtioaux::Packet::DmaRemoveTraceReply { @@ -526,7 +708,16 @@ fn process_aux_packet( destination: _destination, succeeded: _, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); Ok(()) } drtioaux::Packet::DmaPlaybackRequest { @@ -535,7 +726,16 @@ fn process_aux_packet( id, timestamp, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let succeeded = if !kernel_manager.running() { dma_manager.playback(source, id, timestamp).is_ok() } else { @@ -555,7 +755,16 @@ fn process_aux_packet( destination: _destination, succeeded, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); if !succeeded { kernel_manager.ddma_nack(); } @@ -569,7 +778,16 @@ fn process_aux_packet( channel, timestamp, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp); Ok(()) } @@ -581,7 +799,16 @@ fn process_aux_packet( length, data, } => { - forward!(_routing_table, destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); *self_destination = destination; let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) @@ -592,7 +819,16 @@ fn process_aux_packet( id, run, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let mut succeeded = kernel_manager.load(id).is_ok(); // allow preloading a kernel with delayed run if run { @@ -617,7 +853,16 @@ fn process_aux_packet( destination: _destination, succeeded, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); // received if local subkernel started another, remote subkernel kernel_manager.subkernel_load_run_reply(succeeded); Ok(()) @@ -628,14 +873,32 @@ fn process_aux_packet( with_exception, exception_src, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); kernel_manager.remote_subkernel_finished(id, with_exception, exception_src); Ok(()) } drtioaux::Packet::SubkernelExceptionRequest { destination: _destination, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let meta = kernel_manager.exception_get_slice(&mut data_slice); drtioaux::send( @@ -655,7 +918,16 @@ fn process_aux_packet( length, data, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); kernel_manager.message_handle_incoming(status, id, length as usize, &data); router.send( drtioaux::Packet::SubkernelMessageAck { destination: source }, @@ -667,7 +939,16 @@ fn process_aux_packet( drtioaux::Packet::SubkernelMessageAck { destination: _destination, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); if kernel_manager.message_ack_slice() { let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) { diff --git a/src/satman/src/repeater.rs b/src/satman/src/repeater.rs index 9ebc9b7..7834007 100644 --- a/src/satman/src/repeater.rs +++ b/src/satman/src/repeater.rs @@ -208,10 +208,37 @@ impl Repeater { } } - pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { + pub fn aux_forward( + &self, + request: &drtioaux::Packet, + router: &mut Router, + routing_table: &drtio_routing::RoutingTable, + rank: u8, + self_destination: u8, + timer: &mut GlobalTimer, + ) -> Result<(), drtioaux::Error> { self.aux_send(request)?; - let reply = self.recv_aux_timeout(200, timer)?; - drtioaux::send(0, &reply).unwrap(); + loop { + let reply = self.recv_aux_timeout(200, timer)?; + match reply { + // async/locally requested packets to be consumed or routed + // these may come while a packet would be forwarded + drtioaux::Packet::DmaPlaybackStatus { .. } + | drtioaux::Packet::SubkernelFinished { .. } + | drtioaux::Packet::SubkernelMessage { .. } + | drtioaux::Packet::SubkernelMessageAck { .. } + | drtioaux::Packet::SubkernelLoadRunReply { .. } + | drtioaux::Packet::SubkernelException { .. } + | drtioaux::Packet::DmaAddTraceReply { .. } + | drtioaux::Packet::DmaPlaybackReply { .. } => { + router.route(reply, routing_table, rank, self_destination); + } + _ => { + drtioaux::send(0, &reply).unwrap(); + break; + } + } + } Ok(()) } -- 2.44.1