From d51e5e60c3ba337cdefff5e918eb35bd05a03e84 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Mon, 8 Jul 2024 15:31:54 +0800 Subject: [PATCH] repeater: handle async messages --- src/satman/src/main.rs | 343 +++++++++++++++++++++++++++++++++---- src/satman/src/repeater.rs | 33 +++- 2 files changed, 342 insertions(+), 34 deletions(-) diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index eb986edc..422b260b 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -91,13 +91,29 @@ fn toggle_sed_spread(val: u8) { #[cfg(has_drtio_routing)] macro_rules! forward { - ($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{ + ( + $router:expr, + $routing_table:expr, + $destination:expr, + $rank:expr, + $self_destination:expr, + $repeaters:expr, + $packet:expr, + $timer:expr + ) => {{ let hop = $routing_table.0[$destination as usize][$rank as usize]; if hop != 0 { let repno = (hop - 1) as usize; if repno < $repeaters.len() { if $packet.expects_response() { - return $repeaters[repno].aux_forward($packet, $timer); + return $repeaters[repno].aux_forward( + $packet, + $router, + $routing_table, + $rank, + $self_destination, + $timer, + ); } else { return $repeaters[repno].aux_send($packet); } @@ -110,7 +126,16 @@ macro_rules! forward { #[cfg(not(has_drtio_routing))] macro_rules! forward { - ($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {}; + ( + $router:expr, + $routing_table:expr, + $destination:expr, + $rank:expr, + $self_destination:expr, + $repeaters:expr, + $packet:expr, + $timer:expr + ) => {}; } fn process_aux_packet( @@ -191,6 +216,10 @@ fn process_aux_packet( &drtioaux::Packet::DestinationStatusRequest { destination: destination, }, + router, + _routing_table, + *rank, + *self_destination, timer, ) { Ok(()) => (), @@ -252,7 +281,16 @@ fn process_aux_packet( channel, probe, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let value; #[cfg(has_rtio_moninj)] unsafe { @@ -274,7 +312,16 @@ fn process_aux_packet( overrd, value, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); #[cfg(has_rtio_moninj)] unsafe { csr::rtio_moninj::inj_chan_sel_write(channel as _); @@ -288,7 +335,16 @@ fn process_aux_packet( channel, overrd, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let value; #[cfg(has_rtio_moninj)] unsafe { @@ -307,7 +363,16 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let succeeded = i2c.start().is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } @@ -315,7 +380,16 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let succeeded = i2c.restart().is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } @@ -323,7 +397,16 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let succeeded = i2c.stop().is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } @@ -332,7 +415,16 @@ fn process_aux_packet( busno: _busno, data, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); match i2c.write(data) { Ok(ack) => drtioaux::send( 0, @@ -355,7 +447,16 @@ fn process_aux_packet( busno: _busno, ack, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); match i2c.read(ack) { Ok(data) => drtioaux::send( 0, @@ -379,7 +480,16 @@ fn process_aux_packet( address, mask, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let ch = match mask { //decode from mainline, PCA9548-centric API 0x00 => None, @@ -405,7 +515,16 @@ fn process_aux_packet( div: _div, cs: _cs, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); // todo: reimplement when/if SPI is available //let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok(); drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) @@ -415,7 +534,16 @@ fn process_aux_packet( busno: _busno, data: _data, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); // todo: reimplement when/if SPI is available //let succeeded = spi::write(busno, data).is_ok(); drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) @@ -424,7 +552,16 @@ fn process_aux_packet( destination: _destination, busno: _busno, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); // todo: reimplement when/if SPI is available // match spi::read(busno) { // Ok(data) => drtioaux::send(0, @@ -444,7 +581,16 @@ fn process_aux_packet( drtioaux::Packet::AnalyzerHeaderRequest { destination: _destination, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let header = analyzer.get_header(); drtioaux::send( 0, @@ -458,7 +604,16 @@ fn process_aux_packet( drtioaux::Packet::AnalyzerDataRequest { destination: _destination, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let meta = analyzer.get_data(&mut data_slice); drtioaux::send( @@ -479,7 +634,16 @@ fn process_aux_packet( length, trace, } => { - forward!(_routing_table, destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); *self_destination = destination; let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok(); router.send( @@ -500,7 +664,16 @@ fn process_aux_packet( id, succeeded, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); dma_manager.ack_upload( kernel_manager, source, @@ -518,7 +691,16 @@ fn process_aux_packet( destination: _destination, id, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let succeeded = dma_manager.erase(source, id).is_ok(); router.send( drtioaux::Packet::DmaRemoveTraceReply { @@ -534,7 +716,16 @@ fn process_aux_packet( destination: _destination, succeeded: _, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); Ok(()) } drtioaux::Packet::DmaPlaybackRequest { @@ -543,7 +734,16 @@ fn process_aux_packet( id, timestamp, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let succeeded = if !kernel_manager.running() { dma_manager.playback(source, id, timestamp).is_ok() } else { @@ -563,7 +763,16 @@ fn process_aux_packet( destination: _destination, succeeded, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); if !succeeded { kernel_manager.ddma_nack(); } @@ -577,7 +786,16 @@ fn process_aux_packet( channel, timestamp, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp); Ok(()) } @@ -589,7 +807,16 @@ fn process_aux_packet( length, data, } => { - forward!(_routing_table, destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); *self_destination = destination; let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) @@ -600,7 +827,16 @@ fn process_aux_packet( id, run, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let mut succeeded = kernel_manager.load(id).is_ok(); // allow preloading a kernel with delayed run if run { @@ -625,7 +861,16 @@ fn process_aux_packet( destination: _destination, succeeded, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); // received if local subkernel started another, remote subkernel kernel_manager.subkernel_load_run_reply(succeeded); Ok(()) @@ -636,14 +881,32 @@ fn process_aux_packet( with_exception, exception_src, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); kernel_manager.remote_subkernel_finished(id, with_exception, exception_src); Ok(()) } drtioaux::Packet::SubkernelExceptionRequest { destination: _destination, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let meta = kernel_manager.exception_get_slice(&mut data_slice); drtioaux::send( @@ -663,7 +926,16 @@ fn process_aux_packet( length, data, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); kernel_manager.message_handle_incoming(status, id, length as usize, &data); router.send( drtioaux::Packet::SubkernelMessageAck { destination: source }, @@ -675,7 +947,16 @@ fn process_aux_packet( drtioaux::Packet::SubkernelMessageAck { destination: _destination, } => { - forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); if kernel_manager.message_ack_slice() { let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) { diff --git a/src/satman/src/repeater.rs b/src/satman/src/repeater.rs index 9ebc9b7a..78340072 100644 --- a/src/satman/src/repeater.rs +++ b/src/satman/src/repeater.rs @@ -208,10 +208,37 @@ impl Repeater { } } - pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { + pub fn aux_forward( + &self, + request: &drtioaux::Packet, + router: &mut Router, + routing_table: &drtio_routing::RoutingTable, + rank: u8, + self_destination: u8, + timer: &mut GlobalTimer, + ) -> Result<(), drtioaux::Error> { self.aux_send(request)?; - let reply = self.recv_aux_timeout(200, timer)?; - drtioaux::send(0, &reply).unwrap(); + loop { + let reply = self.recv_aux_timeout(200, timer)?; + match reply { + // async/locally requested packets to be consumed or routed + // these may come while a packet would be forwarded + drtioaux::Packet::DmaPlaybackStatus { .. } + | drtioaux::Packet::SubkernelFinished { .. } + | drtioaux::Packet::SubkernelMessage { .. } + | drtioaux::Packet::SubkernelMessageAck { .. } + | drtioaux::Packet::SubkernelLoadRunReply { .. } + | drtioaux::Packet::SubkernelException { .. } + | drtioaux::Packet::DmaAddTraceReply { .. } + | drtioaux::Packet::DmaPlaybackReply { .. } => { + router.route(reply, routing_table, rank, self_destination); + } + _ => { + drtioaux::send(0, &reply).unwrap(); + break; + } + } + } Ok(()) }