repeater: handle async messages #304

Merged
sb10q merged 1 commits from mwojcik/artiq-zynq:repeater_async_forward into master 2024-07-09 23:04:48 +08:00
2 changed files with 342 additions and 34 deletions

View File

@ -83,13 +83,29 @@ fn drtiosat_tsc_loaded() -> bool {
#[cfg(has_drtio_routing)]
macro_rules! forward {
($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{
(
$router:expr,
$routing_table:expr,
$destination:expr,
$rank:expr,
$self_destination:expr,
$repeaters:expr,
$packet:expr,
$timer:expr
) => {{
let hop = $routing_table.0[$destination as usize][$rank as usize];
if hop != 0 {
let repno = (hop - 1) as usize;
if repno < $repeaters.len() {
if $packet.expects_response() {
return $repeaters[repno].aux_forward($packet, $timer);
return $repeaters[repno].aux_forward(
$packet,
$router,
$routing_table,
$rank,
$self_destination,
$timer,
);
} else {
return $repeaters[repno].aux_send($packet);
}
@ -102,7 +118,16 @@ macro_rules! forward {
#[cfg(not(has_drtio_routing))]
macro_rules! forward {
($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {};
(
$router:expr,
$routing_table:expr,
$destination:expr,
$rank:expr,
$self_destination:expr,
$repeaters:expr,
$packet:expr,
$timer:expr
) => {};
}
fn process_aux_packet(
@ -183,6 +208,10 @@ fn process_aux_packet(
&drtioaux::Packet::DestinationStatusRequest {
destination: destination,
},
router,
_routing_table,
*rank,
*self_destination,
timer,
) {
Ok(()) => (),
@ -244,7 +273,16 @@ fn process_aux_packet(
channel,
probe,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let value;
#[cfg(has_rtio_moninj)]
unsafe {
@ -266,7 +304,16 @@ fn process_aux_packet(
overrd,
value,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
#[cfg(has_rtio_moninj)]
unsafe {
csr::rtio_moninj::inj_chan_sel_write(channel as _);
@ -280,7 +327,16 @@ fn process_aux_packet(
channel,
overrd,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let value;
#[cfg(has_rtio_moninj)]
unsafe {
@ -299,7 +355,16 @@ fn process_aux_packet(
destination: _destination,
busno: _busno,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = i2c.start().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
}
@ -307,7 +372,16 @@ fn process_aux_packet(
destination: _destination,
busno: _busno,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = i2c.restart().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
}
@ -315,7 +389,16 @@ fn process_aux_packet(
destination: _destination,
busno: _busno,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = i2c.stop().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
}
@ -324,7 +407,16 @@ fn process_aux_packet(
busno: _busno,
data,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
match i2c.write(data) {
Ok(ack) => drtioaux::send(
0,
@ -347,7 +439,16 @@ fn process_aux_packet(
busno: _busno,
ack,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
match i2c.read(ack) {
Ok(data) => drtioaux::send(
0,
@ -371,7 +472,16 @@ fn process_aux_packet(
address,
mask,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let ch = match mask {
//decode from mainline, PCA9548-centric API
0x00 => None,
@ -397,7 +507,16 @@ fn process_aux_packet(
div: _div,
cs: _cs,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// todo: reimplement when/if SPI is available
//let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok();
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
@ -407,7 +526,16 @@ fn process_aux_packet(
busno: _busno,
data: _data,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// todo: reimplement when/if SPI is available
//let succeeded = spi::write(busno, data).is_ok();
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
@ -416,7 +544,16 @@ fn process_aux_packet(
destination: _destination,
busno: _busno,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// todo: reimplement when/if SPI is available
// match spi::read(busno) {
// Ok(data) => drtioaux::send(0,
@ -436,7 +573,16 @@ fn process_aux_packet(
drtioaux::Packet::AnalyzerHeaderRequest {
destination: _destination,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let header = analyzer.get_header();
drtioaux::send(
0,
@ -450,7 +596,16 @@ fn process_aux_packet(
drtioaux::Packet::AnalyzerDataRequest {
destination: _destination,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = analyzer.get_data(&mut data_slice);
drtioaux::send(
@ -471,7 +626,16 @@ fn process_aux_packet(
length,
trace,
} => {
forward!(_routing_table, destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
*self_destination = destination;
let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok();
router.send(
@ -492,7 +656,16 @@ fn process_aux_packet(
id,
succeeded,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
dma_manager.ack_upload(
kernel_manager,
source,
@ -510,7 +683,16 @@ fn process_aux_packet(
destination: _destination,
id,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = dma_manager.erase(source, id).is_ok();
router.send(
drtioaux::Packet::DmaRemoveTraceReply {
@ -526,7 +708,16 @@ fn process_aux_packet(
destination: _destination,
succeeded: _,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
Ok(())
}
drtioaux::Packet::DmaPlaybackRequest {
@ -535,7 +726,16 @@ fn process_aux_packet(
id,
timestamp,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = if !kernel_manager.running() {
dma_manager.playback(source, id, timestamp).is_ok()
} else {
@ -555,7 +755,16 @@ fn process_aux_packet(
destination: _destination,
succeeded,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
if !succeeded {
kernel_manager.ddma_nack();
}
@ -569,7 +778,16 @@ fn process_aux_packet(
channel,
timestamp,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp);
Ok(())
}
@ -581,7 +799,16 @@ fn process_aux_packet(
length,
data,
} => {
forward!(_routing_table, destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
*self_destination = destination;
let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok();
drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded })
@ -592,7 +819,16 @@ fn process_aux_packet(
id,
run,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let mut succeeded = kernel_manager.load(id).is_ok();
// allow preloading a kernel with delayed run
if run {
@ -617,7 +853,16 @@ fn process_aux_packet(
destination: _destination,
succeeded,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// received if local subkernel started another, remote subkernel
kernel_manager.subkernel_load_run_reply(succeeded);
Ok(())
@ -628,14 +873,32 @@ fn process_aux_packet(
with_exception,
exception_src,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
kernel_manager.remote_subkernel_finished(id, with_exception, exception_src);
Ok(())
}
drtioaux::Packet::SubkernelExceptionRequest {
destination: _destination,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = kernel_manager.exception_get_slice(&mut data_slice);
drtioaux::send(
@ -655,7 +918,16 @@ fn process_aux_packet(
length,
data,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
kernel_manager.message_handle_incoming(status, id, length as usize, &data);
router.send(
drtioaux::Packet::SubkernelMessageAck { destination: source },
@ -667,7 +939,16 @@ fn process_aux_packet(
drtioaux::Packet::SubkernelMessageAck {
destination: _destination,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
if kernel_manager.message_ack_slice() {
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) {

View File

@ -208,10 +208,37 @@ impl Repeater {
}
}
pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
pub fn aux_forward(
&self,
request: &drtioaux::Packet,
router: &mut Router,
routing_table: &drtio_routing::RoutingTable,
rank: u8,
self_destination: u8,
timer: &mut GlobalTimer,
) -> Result<(), drtioaux::Error> {
self.aux_send(request)?;
let reply = self.recv_aux_timeout(200, timer)?;
drtioaux::send(0, &reply).unwrap();
loop {
let reply = self.recv_aux_timeout(200, timer)?;
match reply {
// async/locally requested packets to be consumed or routed
// these may come while a packet would be forwarded
drtioaux::Packet::DmaPlaybackStatus { .. }
| drtioaux::Packet::SubkernelFinished { .. }
| drtioaux::Packet::SubkernelMessage { .. }
| drtioaux::Packet::SubkernelMessageAck { .. }
| drtioaux::Packet::SubkernelLoadRunReply { .. }
| drtioaux::Packet::SubkernelException { .. }
| drtioaux::Packet::DmaAddTraceReply { .. }
| drtioaux::Packet::DmaPlaybackReply { .. } => {
router.route(reply, routing_table, rank, self_destination);
}
_ => {
drtioaux::send(0, &reply).unwrap();
break;
}
}
}
Ok(())
}