repeater: handle async messages #304

Merged
sb10q merged 1 commits from mwojcik/artiq-zynq:repeater_async_forward into master 2024-07-09 23:04:48 +08:00
2 changed files with 342 additions and 34 deletions
Showing only changes of commit 5b90d46967 - Show all commits

View File

@ -83,13 +83,29 @@ fn drtiosat_tsc_loaded() -> bool {
#[cfg(has_drtio_routing)] #[cfg(has_drtio_routing)]
macro_rules! forward { macro_rules! forward {
($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{ (
$router:expr,
$routing_table:expr,
$destination:expr,
$rank:expr,
$self_destination:expr,
$repeaters:expr,
$packet:expr,
$timer:expr
) => {{
let hop = $routing_table.0[$destination as usize][$rank as usize]; let hop = $routing_table.0[$destination as usize][$rank as usize];
if hop != 0 { if hop != 0 {
let repno = (hop - 1) as usize; let repno = (hop - 1) as usize;
if repno < $repeaters.len() { if repno < $repeaters.len() {
if $packet.expects_response() { if $packet.expects_response() {
return $repeaters[repno].aux_forward($packet, $timer); return $repeaters[repno].aux_forward(
$packet,
$router,
$routing_table,
$rank,
$self_destination,
$timer,
);
} else { } else {
return $repeaters[repno].aux_send($packet); return $repeaters[repno].aux_send($packet);
} }
@ -102,7 +118,16 @@ macro_rules! forward {
#[cfg(not(has_drtio_routing))] #[cfg(not(has_drtio_routing))]
macro_rules! forward { macro_rules! forward {
($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {}; (
$router:expr,
$routing_table:expr,
$destination:expr,
$rank:expr,
$self_destination:expr,
$repeaters:expr,
$packet:expr,
$timer:expr
) => {};
} }
fn process_aux_packet( fn process_aux_packet(
@ -183,6 +208,10 @@ fn process_aux_packet(
&drtioaux::Packet::DestinationStatusRequest { &drtioaux::Packet::DestinationStatusRequest {
destination: destination, destination: destination,
}, },
router,
_routing_table,
*rank,
*self_destination,
timer, timer,
) { ) {
Ok(()) => (), Ok(()) => (),
@ -244,7 +273,16 @@ fn process_aux_packet(
channel, channel,
probe, probe,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let value; let value;
#[cfg(has_rtio_moninj)] #[cfg(has_rtio_moninj)]
unsafe { unsafe {
@ -266,7 +304,16 @@ fn process_aux_packet(
overrd, overrd,
value, value,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
#[cfg(has_rtio_moninj)] #[cfg(has_rtio_moninj)]
unsafe { unsafe {
csr::rtio_moninj::inj_chan_sel_write(channel as _); csr::rtio_moninj::inj_chan_sel_write(channel as _);
@ -280,7 +327,16 @@ fn process_aux_packet(
channel, channel,
overrd, overrd,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let value; let value;
#[cfg(has_rtio_moninj)] #[cfg(has_rtio_moninj)]
unsafe { unsafe {
@ -299,7 +355,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = i2c.start().is_ok(); let succeeded = i2c.start().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
} }
@ -307,7 +372,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = i2c.restart().is_ok(); let succeeded = i2c.restart().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
} }
@ -315,7 +389,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = i2c.stop().is_ok(); let succeeded = i2c.stop().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
} }
@ -324,7 +407,16 @@ fn process_aux_packet(
busno: _busno, busno: _busno,
data, data,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
match i2c.write(data) { match i2c.write(data) {
Ok(ack) => drtioaux::send( Ok(ack) => drtioaux::send(
0, 0,
@ -347,7 +439,16 @@ fn process_aux_packet(
busno: _busno, busno: _busno,
ack, ack,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
match i2c.read(ack) { match i2c.read(ack) {
Ok(data) => drtioaux::send( Ok(data) => drtioaux::send(
0, 0,
@ -371,7 +472,16 @@ fn process_aux_packet(
address, address,
mask, mask,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let ch = match mask { let ch = match mask {
//decode from mainline, PCA9548-centric API //decode from mainline, PCA9548-centric API
0x00 => None, 0x00 => None,
@ -397,7 +507,16 @@ fn process_aux_packet(
div: _div, div: _div,
cs: _cs, cs: _cs,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// todo: reimplement when/if SPI is available // todo: reimplement when/if SPI is available
//let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok(); //let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok();
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
@ -407,7 +526,16 @@ fn process_aux_packet(
busno: _busno, busno: _busno,
data: _data, data: _data,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// todo: reimplement when/if SPI is available // todo: reimplement when/if SPI is available
//let succeeded = spi::write(busno, data).is_ok(); //let succeeded = spi::write(busno, data).is_ok();
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
@ -416,7 +544,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// todo: reimplement when/if SPI is available // todo: reimplement when/if SPI is available
// match spi::read(busno) { // match spi::read(busno) {
// Ok(data) => drtioaux::send(0, // Ok(data) => drtioaux::send(0,
@ -436,7 +573,16 @@ fn process_aux_packet(
drtioaux::Packet::AnalyzerHeaderRequest { drtioaux::Packet::AnalyzerHeaderRequest {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let header = analyzer.get_header(); let header = analyzer.get_header();
drtioaux::send( drtioaux::send(
0, 0,
@ -450,7 +596,16 @@ fn process_aux_packet(
drtioaux::Packet::AnalyzerDataRequest { drtioaux::Packet::AnalyzerDataRequest {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = analyzer.get_data(&mut data_slice); let meta = analyzer.get_data(&mut data_slice);
drtioaux::send( drtioaux::send(
@ -471,7 +626,16 @@ fn process_aux_packet(
length, length,
trace, trace,
} => { } => {
forward!(_routing_table, destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
*self_destination = destination; *self_destination = destination;
let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok(); let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok();
router.send( router.send(
@ -492,7 +656,16 @@ fn process_aux_packet(
id, id,
succeeded, succeeded,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
dma_manager.ack_upload( dma_manager.ack_upload(
kernel_manager, kernel_manager,
source, source,
@ -510,7 +683,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
id, id,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = dma_manager.erase(source, id).is_ok(); let succeeded = dma_manager.erase(source, id).is_ok();
router.send( router.send(
drtioaux::Packet::DmaRemoveTraceReply { drtioaux::Packet::DmaRemoveTraceReply {
@ -526,7 +708,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
succeeded: _, succeeded: _,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
Ok(()) Ok(())
} }
drtioaux::Packet::DmaPlaybackRequest { drtioaux::Packet::DmaPlaybackRequest {
@ -535,7 +726,16 @@ fn process_aux_packet(
id, id,
timestamp, timestamp,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let succeeded = if !kernel_manager.running() { let succeeded = if !kernel_manager.running() {
dma_manager.playback(source, id, timestamp).is_ok() dma_manager.playback(source, id, timestamp).is_ok()
} else { } else {
@ -555,7 +755,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
succeeded, succeeded,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
if !succeeded { if !succeeded {
kernel_manager.ddma_nack(); kernel_manager.ddma_nack();
} }
@ -569,7 +778,16 @@ fn process_aux_packet(
channel, channel,
timestamp, timestamp,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp); dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp);
Ok(()) Ok(())
} }
@ -581,7 +799,16 @@ fn process_aux_packet(
length, length,
data, data,
} => { } => {
forward!(_routing_table, destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
*self_destination = destination; *self_destination = destination;
let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok(); let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok();
drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded })
@ -592,7 +819,16 @@ fn process_aux_packet(
id, id,
run, run,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let mut succeeded = kernel_manager.load(id).is_ok(); let mut succeeded = kernel_manager.load(id).is_ok();
// allow preloading a kernel with delayed run // allow preloading a kernel with delayed run
if run { if run {
@ -617,7 +853,16 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
succeeded, succeeded,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
// received if local subkernel started another, remote subkernel // received if local subkernel started another, remote subkernel
kernel_manager.subkernel_load_run_reply(succeeded); kernel_manager.subkernel_load_run_reply(succeeded);
Ok(()) Ok(())
@ -628,14 +873,32 @@ fn process_aux_packet(
with_exception, with_exception,
exception_src, exception_src,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
kernel_manager.remote_subkernel_finished(id, with_exception, exception_src); kernel_manager.remote_subkernel_finished(id, with_exception, exception_src);
Ok(()) Ok(())
} }
drtioaux::Packet::SubkernelExceptionRequest { drtioaux::Packet::SubkernelExceptionRequest {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = kernel_manager.exception_get_slice(&mut data_slice); let meta = kernel_manager.exception_get_slice(&mut data_slice);
drtioaux::send( drtioaux::send(
@ -655,7 +918,16 @@ fn process_aux_packet(
length, length,
data, data,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
kernel_manager.message_handle_incoming(status, id, length as usize, &data); kernel_manager.message_handle_incoming(status, id, length as usize, &data);
router.send( router.send(
drtioaux::Packet::SubkernelMessageAck { destination: source }, drtioaux::Packet::SubkernelMessageAck { destination: source },
@ -667,7 +939,16 @@ fn process_aux_packet(
drtioaux::Packet::SubkernelMessageAck { drtioaux::Packet::SubkernelMessageAck {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(
router,
_routing_table,
_destination,
*rank,
*self_destination,
_repeaters,
&packet,
timer
);
if kernel_manager.message_ack_slice() { if kernel_manager.message_ack_slice() {
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) { if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) {

View File

@ -208,10 +208,37 @@ impl Repeater {
} }
} }
pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { pub fn aux_forward(
&self,
request: &drtioaux::Packet,
router: &mut Router,
routing_table: &drtio_routing::RoutingTable,
rank: u8,
self_destination: u8,
timer: &mut GlobalTimer,
) -> Result<(), drtioaux::Error> {
self.aux_send(request)?; self.aux_send(request)?;
let reply = self.recv_aux_timeout(200, timer)?; loop {
drtioaux::send(0, &reply).unwrap(); let reply = self.recv_aux_timeout(200, timer)?;
match reply {
// async/locally requested packets to be consumed or routed
// these may come while a packet would be forwarded
drtioaux::Packet::DmaPlaybackStatus { .. }
| drtioaux::Packet::SubkernelFinished { .. }
| drtioaux::Packet::SubkernelMessage { .. }
| drtioaux::Packet::SubkernelMessageAck { .. }
| drtioaux::Packet::SubkernelLoadRunReply { .. }
| drtioaux::Packet::SubkernelException { .. }
| drtioaux::Packet::DmaAddTraceReply { .. }
| drtioaux::Packet::DmaPlaybackReply { .. } => {
router.route(reply, routing_table, rank, self_destination);
}
_ => {
drtioaux::send(0, &reply).unwrap();
break;
}
}
}
Ok(()) Ok(())
} }