repeater: handle async messages

This commit is contained in:
mwojcik 2024-07-08 15:31:54 +08:00 committed by sb10q
parent 23857eef63
commit d51e5e60c3
2 changed files with 342 additions and 34 deletions

View File

@ -91,13 +91,29 @@ fn toggle_sed_spread(val: u8) {
#[cfg(has_drtio_routing)] #[cfg(has_drtio_routing)]
macro_rules! forward { macro_rules! forward {
($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{ (
$router:expr,
$routing_table:expr,
$destination:expr,
$rank:expr,
$self_destination:expr,
$repeaters:expr,
$packet:expr,
$timer:expr
) => {{
let hop = $routing_table.0[$destination as usize][$rank as usize]; let hop = $routing_table.0[$destination as usize][$rank as usize];
if hop != 0 { if hop != 0 {
let repno = (hop - 1) as usize; let repno = (hop - 1) as usize;
if repno < $repeaters.len() { if repno < $repeaters.len() {
if $packet.expects_response() { if $packet.expects_response() {
return $repeaters[repno].aux_forward($packet, $timer); return $repeaters[repno].aux_forward(
$packet,
$router,
$routing_table,
$rank,
$self_destination,
$timer,
);
} else { } else {
return $repeaters[repno].aux_send($packet); return $repeaters[repno].aux_send($packet);
} }
@ -110,7 +126,16 @@ macro_rules! forward {
#[cfg(not(has_drtio_routing))] #[cfg(not(has_drtio_routing))]
macro_rules! forward { macro_rules! forward {
($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {}; (
$router:expr,
$routing_table:expr,
$destination:expr,
$rank:expr,
$self_destination:expr,
$repeaters:expr,
$packet:expr,
$timer:expr
) => {};
} }
fn process_aux_packet( fn process_aux_packet(
@ -191,6 +216,10 @@ fn process_aux_packet(
&drtioaux::Packet::DestinationStatusRequest { &drtioaux::Packet::DestinationStatusRequest {
destination: destination, destination: destination,
}, },
router,
_routing_table,
*rank,
*self_destination,
timer, timer,
) { ) {
Ok(()) => (), Ok(()) => (),
@ -252,7 +281,16 @@ fn process_aux_packet(
channel, channel,
probe, probe,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let value; let value;
#[cfg(has_rtio_moninj)] #[cfg(has_rtio_moninj)]
unsafe { unsafe {
@ -274,7 +312,16 @@ fn process_aux_packet(
overrd, overrd,
value, value,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
#[cfg(has_rtio_moninj)] #[cfg(has_rtio_moninj)]
unsafe { unsafe {
csr::rtio_moninj::inj_chan_sel_write(channel as _); csr::rtio_moninj::inj_chan_sel_write(channel as _);
@ -288,7 +335,16 @@ fn process_aux_packet(
channel, channel,
overrd, overrd,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let value; let value;
#[cfg(has_rtio_moninj)] #[cfg(has_rtio_moninj)]
unsafe { unsafe {
@ -307,7 +363,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = i2c.start().is_ok(); let succeeded = i2c.start().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
} }
@ -315,7 +380,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = i2c.restart().is_ok(); let succeeded = i2c.restart().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
} }
@ -323,7 +397,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = i2c.stop().is_ok(); let succeeded = i2c.stop().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
} }
@ -332,7 +415,16 @@ fn process_aux_packet(
busno: _busno, busno: _busno,
data, data,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
match i2c.write(data) { match i2c.write(data) {
Ok(ack) => drtioaux::send( Ok(ack) => drtioaux::send(
0, 0,
@ -355,7 +447,16 @@ fn process_aux_packet(
busno: _busno, busno: _busno,
ack, ack,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
match i2c.read(ack) { match i2c.read(ack) {
Ok(data) => drtioaux::send( Ok(data) => drtioaux::send(
0, 0,
@ -379,7 +480,16 @@ fn process_aux_packet(
address, address,
mask, mask,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let ch = match mask { let ch = match mask {
//decode from mainline, PCA9548-centric API //decode from mainline, PCA9548-centric API
0x00 => None, 0x00 => None,
@ -405,7 +515,16 @@ fn process_aux_packet(
div: _div, div: _div,
cs: _cs, cs: _cs,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// todo: reimplement when/if SPI is available // todo: reimplement when/if SPI is available
//let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok(); //let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok();
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
@ -415,7 +534,16 @@ fn process_aux_packet(
busno: _busno, busno: _busno,
data: _data, data: _data,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// todo: reimplement when/if SPI is available // todo: reimplement when/if SPI is available
//let succeeded = spi::write(busno, data).is_ok(); //let succeeded = spi::write(busno, data).is_ok();
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
@ -424,7 +552,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// todo: reimplement when/if SPI is available // todo: reimplement when/if SPI is available
// match spi::read(busno) { // match spi::read(busno) {
// Ok(data) => drtioaux::send(0, // Ok(data) => drtioaux::send(0,
@ -444,7 +581,16 @@ fn process_aux_packet(
drtioaux::Packet::AnalyzerHeaderRequest { drtioaux::Packet::AnalyzerHeaderRequest {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let header = analyzer.get_header(); let header = analyzer.get_header();
drtioaux::send( drtioaux::send(
0, 0,
@ -458,7 +604,16 @@ fn process_aux_packet(
drtioaux::Packet::AnalyzerDataRequest { drtioaux::Packet::AnalyzerDataRequest {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = analyzer.get_data(&mut data_slice); let meta = analyzer.get_data(&mut data_slice);
drtioaux::send( drtioaux::send(
@ -479,7 +634,16 @@ fn process_aux_packet(
length, length,
trace, trace,
} => { } => {
forward!(_routing_table, destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
*self_destination = destination; *self_destination = destination;
let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok(); let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok();
router.send( router.send(
@ -500,7 +664,16 @@ fn process_aux_packet(
id, id,
succeeded, succeeded,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
dma_manager.ack_upload( dma_manager.ack_upload(
kernel_manager, kernel_manager,
source, source,
@ -518,7 +691,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
id, id,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = dma_manager.erase(source, id).is_ok(); let succeeded = dma_manager.erase(source, id).is_ok();
router.send( router.send(
drtioaux::Packet::DmaRemoveTraceReply { drtioaux::Packet::DmaRemoveTraceReply {
@ -534,7 +716,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
succeeded: _, succeeded: _,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
Ok(()) Ok(())
} }
drtioaux::Packet::DmaPlaybackRequest { drtioaux::Packet::DmaPlaybackRequest {
@ -543,7 +734,16 @@ fn process_aux_packet(
id, id,
timestamp, timestamp,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = if !kernel_manager.running() { let succeeded = if !kernel_manager.running() {
dma_manager.playback(source, id, timestamp).is_ok() dma_manager.playback(source, id, timestamp).is_ok()
} else { } else {
@ -563,7 +763,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
succeeded, succeeded,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
if !succeeded { if !succeeded {
kernel_manager.ddma_nack(); kernel_manager.ddma_nack();
} }
@ -577,7 +786,16 @@ fn process_aux_packet(
channel, channel,
timestamp, timestamp,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp); dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp);
Ok(()) Ok(())
} }
@ -589,7 +807,16 @@ fn process_aux_packet(
length, length,
data, data,
} => { } => {
forward!(_routing_table, destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
*self_destination = destination; *self_destination = destination;
let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok(); let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok();
drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded })
@ -600,7 +827,16 @@ fn process_aux_packet(
id, id,
run, run,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let mut succeeded = kernel_manager.load(id).is_ok(); let mut succeeded = kernel_manager.load(id).is_ok();
// allow preloading a kernel with delayed run // allow preloading a kernel with delayed run
if run { if run {
@ -625,7 +861,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
succeeded, succeeded,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// received if local subkernel started another, remote subkernel // received if local subkernel started another, remote subkernel
kernel_manager.subkernel_load_run_reply(succeeded); kernel_manager.subkernel_load_run_reply(succeeded);
Ok(()) Ok(())
@ -636,14 +881,32 @@ fn process_aux_packet(
with_exception, with_exception,
exception_src, exception_src,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
kernel_manager.remote_subkernel_finished(id, with_exception, exception_src); kernel_manager.remote_subkernel_finished(id, with_exception, exception_src);
Ok(()) Ok(())
} }
drtioaux::Packet::SubkernelExceptionRequest { drtioaux::Packet::SubkernelExceptionRequest {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = kernel_manager.exception_get_slice(&mut data_slice); let meta = kernel_manager.exception_get_slice(&mut data_slice);
drtioaux::send( drtioaux::send(
@ -663,7 +926,16 @@ fn process_aux_packet(
length, length,
data, data,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
kernel_manager.message_handle_incoming(status, id, length as usize, &data); kernel_manager.message_handle_incoming(status, id, length as usize, &data);
router.send( router.send(
drtioaux::Packet::SubkernelMessageAck { destination: source }, drtioaux::Packet::SubkernelMessageAck { destination: source },
@ -675,7 +947,16 @@ fn process_aux_packet(
drtioaux::Packet::SubkernelMessageAck { drtioaux::Packet::SubkernelMessageAck {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
if kernel_manager.message_ack_slice() { if kernel_manager.message_ack_slice() {
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) { if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) {

View File

@ -208,10 +208,37 @@ impl Repeater {
} }
} }
pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { pub fn aux_forward(
&self,
request: &drtioaux::Packet,
router: &mut Router,
routing_table: &drtio_routing::RoutingTable,
rank: u8,
self_destination: u8,
timer: &mut GlobalTimer,
) -> Result<(), drtioaux::Error> {
self.aux_send(request)?; self.aux_send(request)?;
loop {
let reply = self.recv_aux_timeout(200, timer)?; let reply = self.recv_aux_timeout(200, timer)?;
match reply {
// async/locally requested packets to be consumed or routed
// these may come while a packet would be forwarded
drtioaux::Packet::DmaPlaybackStatus { .. }
| drtioaux::Packet::SubkernelFinished { .. }
| drtioaux::Packet::SubkernelMessage { .. }
| drtioaux::Packet::SubkernelMessageAck { .. }
| drtioaux::Packet::SubkernelLoadRunReply { .. }
| drtioaux::Packet::SubkernelException { .. }
| drtioaux::Packet::DmaAddTraceReply { .. }
| drtioaux::Packet::DmaPlaybackReply { .. } => {
router.route(reply, routing_table, rank, self_destination);
}
_ => {
drtioaux::send(0, &reply).unwrap(); drtioaux::send(0, &reply).unwrap();
break;
}
}
}
Ok(()) Ok(())
} }