From 9bc66e5c1451ecd297d1e88e4d9701aeedc22932 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Wed, 22 Nov 2023 14:09:48 +0800 Subject: [PATCH] support routing packets between satellites and master --- artiq/firmware/runtime/rtio_mgt.rs | 55 ++++++--- artiq/firmware/satman/dma.rs | 17 ++- artiq/firmware/satman/main.rs | 174 ++++++++++++++++++----------- artiq/firmware/satman/repeater.rs | 21 +++- artiq/firmware/satman/routing.rs | 122 ++++++++++++++++++++ 5 files changed, 297 insertions(+), 92 deletions(-) create mode 100644 artiq/firmware/satman/routing.rs diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index 1e17ab63d..e469e1ea9 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -96,26 +96,55 @@ pub mod drtio { } } - fn process_async_packets(io: &Io, ddma_mutex: &Mutex, subkernel_mutex: &Mutex, linkno: u8, - packet: drtioaux::Packet) -> Option { - // returns None if an async packet has been consumed + fn process_async_packets(io: &Io, ddma_mutex: &Mutex, subkernel_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, linkno: u8, packet: drtioaux::Packet + ) -> Option { + // returns None if a packet has been consumed or re-routed + macro_rules! route_packet { + ($dest:ident) => {{ + let dest_link = routing_table.0[$dest as usize][0] - 1; + if dest_link == linkno { + warn!("[LINK#{}] Re-routed packet would return to the same link, dropping: {:?}", linkno, packet); + } else if $dest == 0 { + warn!("[LINK#{}] Received invalid routable packet: {:?}", linkno, packet) + } + else { + drtioaux::send(dest_link, &packet).unwrap(); + } + None + }} + } match packet { - drtioaux::Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp } => { - remote_dma::playback_done(io, ddma_mutex, id, destination, error, channel, timestamp); + // packets to be consumed locally + drtioaux::Packet::DmaPlaybackStatus { id, destination: 0, error, channel, timestamp } => { + remote_dma::playback_done(io, ddma_mutex, id, 0, error, channel, timestamp); None }, drtioaux::Packet::SubkernelFinished { id, destination: 0, with_exception, exception_src } => { subkernel::subkernel_finished(io, subkernel_mutex, id, with_exception, exception_src); None }, - drtioaux::Packet::SubkernelMessage { id, source: 0, destination: from, status, length, data } => { + drtioaux::Packet::SubkernelMessage { id, source: from, destination: 0, status, length, data } => { subkernel::message_handle_incoming(io, subkernel_mutex, id, status, length as usize, &data); // acknowledge receiving part of the message drtioaux::send(linkno, &drtioaux::Packet::SubkernelMessageAck { destination: from } ).unwrap(); None - } + }, + // routable packets + drtioaux::Packet::DmaAddTraceRequest { destination, .. } => route_packet!(destination), + drtioaux::Packet::DmaAddTraceReply { destination, .. } => route_packet!(destination), + drtioaux::Packet::DmaRemoveTraceRequest { destination, .. } => route_packet!(destination), + drtioaux::Packet::DmaRemoveTraceReply { destination, .. } => route_packet!(destination), + drtioaux::Packet::DmaPlaybackRequest { destination, .. } => route_packet!(destination), + drtioaux::Packet::DmaPlaybackReply { destination, .. } => route_packet!(destination), + drtioaux::Packet::SubkernelLoadRunRequest { destination, .. } => route_packet!(destination), + drtioaux::Packet::SubkernelLoadRunReply { destination, .. } => route_packet!(destination), + drtioaux::Packet::SubkernelMessage { destination, .. } => route_packet!(destination), + drtioaux::Packet::SubkernelMessageAck { destination, .. } => route_packet!(destination), + drtioaux::Packet::DmaPlaybackStatus { destination, .. } => route_packet!(destination), + drtioaux::Packet::SubkernelFinished { destination, .. } => route_packet!(destination), other => Some(other) } } @@ -223,14 +252,10 @@ pub mod drtio { } } - fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, subkernel_mutex: &Mutex, linkno: u8) { + fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, linkno: u8) { let _lock = aux_mutex.lock(io).unwrap(); match drtioaux::recv(linkno) { - Ok(Some(packet)) => { - if let Some(packet) = process_async_packets(io, ddma_mutex, subkernel_mutex, linkno, packet) { - warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet); - } - } + Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), Ok(None) => (), Err(_) => warn!("[LINK#{}] aux packet error", linkno) } @@ -299,7 +324,7 @@ pub mod drtio { destination: destination }); if let Ok(reply) = reply { - let reply = process_async_packets(io, ddma_mutex, subkernel_mutex, linkno, reply); + let reply = process_async_packets(io, ddma_mutex, subkernel_mutex, routing_table, linkno, reply); match reply { Some(drtioaux::Packet::DestinationDownReply) => { destination_set_up(routing_table, up_destinations, destination, false); @@ -371,7 +396,7 @@ pub mod drtio { if up_links[linkno as usize] { /* link was previously up */ if link_rx_up(linkno) { - process_unsolicited_aux(&io, aux_mutex, ddma_mutex, subkernel_mutex, linkno); + process_unsolicited_aux(&io, aux_mutex, linkno); process_local_errors(linkno); } else { info!("[LINK#{}] link is down", linkno); diff --git a/artiq/firmware/satman/dma.rs b/artiq/firmware/satman/dma.rs index 6b9ea3f70..b22be573a 100644 --- a/artiq/firmware/satman/dma.rs +++ b/artiq/firmware/satman/dma.rs @@ -12,8 +12,9 @@ enum ManagerState { } pub struct RtioStatus { + pub source: u8, pub id: u32, - pub error: u8, + pub error: u8, pub channel: u32, pub timestamp: u64 } @@ -35,7 +36,8 @@ struct Entry { pub struct Manager { entries: BTreeMap<(u8, u32), Entry>, state: ManagerState, - currentid: u32 + current_id: u32, + current_source: u8 } impl Manager { @@ -47,7 +49,8 @@ impl Manager { } Manager { entries: BTreeMap::new(), - currentid: 0, + current_id: 0, + current_source: 0, state: ManagerState::Idle, } } @@ -125,7 +128,8 @@ impl Manager { assert!(ptr as u32 % 64 == 0); self.state = ManagerState::Playback; - self.currentid = id; + self.current_id = id; + self.current_source = source; unsafe { csr::rtio_dma::base_address_write(ptr as u64); @@ -156,8 +160,9 @@ impl Manager { if error != 0 { csr::rtio_dma::error_write(1); } - return Some(RtioStatus { - id: self.currentid, + return Some(RtioStatus { + source: self.current_source, + id: self.current_id, error: error, channel: channel, timestamp: timestamp }); diff --git a/artiq/firmware/satman/main.rs b/artiq/firmware/satman/main.rs index dbe4b08d4..2747f16c6 100644 --- a/artiq/firmware/satman/main.rs +++ b/artiq/firmware/satman/main.rs @@ -32,6 +32,7 @@ use analyzer::Analyzer; static mut ALLOC: alloc_list::ListAlloc = alloc_list::EMPTY; mod repeater; +mod routing; mod dma; mod analyzer; mod kernel; @@ -103,8 +104,9 @@ macro_rules! forward { } fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmgr: &mut KernelManager, - _repeaters: &mut [repeater::Repeater], _routing_table: &mut drtio_routing::RoutingTable, _rank: &mut u8, - self_destination: &mut u8, packet: drtioaux::Packet) -> Result<(), drtioaux::Error> { + _repeaters: &mut [repeater::Repeater], _routing_table: &mut drtio_routing::RoutingTable, rank: &mut u8, + router: &mut routing::Router, self_destination: &mut u8, packet: drtioaux::Packet +) -> Result<(), drtioaux::Error> { // In the code below, *_chan_sel_write takes an u8 if there are fewer than 256 channels, // and u16 otherwise; hence the `as _` conversion. match packet { @@ -125,29 +127,18 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg drtioaux::Packet::DestinationStatusRequest { destination } => { #[cfg(has_drtio_routing)] - let hop = _routing_table.0[destination as usize][*_rank as usize]; + let hop = _routing_table.0[destination as usize][*rank as usize]; #[cfg(not(has_drtio_routing))] let hop = 0; if hop == 0 { // async messages - if let Some(status) = dmamgr.get_status() { - info!("playback done, error: {}, channel: {}, timestamp: {}", status.error, status.channel, status.timestamp); - drtioaux::send(0, &drtioaux::Packet::DmaPlaybackStatus { - destination: destination, id: status.id, error: status.error, channel: status.channel, timestamp: status.timestamp })?; - } else if let Some(subkernel_finished) = kernelmgr.get_last_finished() { - info!("subkernel {} finished, with exception: {}", subkernel_finished.id, subkernel_finished.with_exception); - drtioaux::send(0, &drtioaux::Packet::SubkernelFinished { - destination: subkernel_finished.source, id: subkernel_finished.id, - with_exception: subkernel_finished.with_exception, exception_src: *self_destination - })?; - } else if kernelmgr.message_is_ready() { - let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; - let meta = kernelmgr.message_get_slice(&mut data_slice).unwrap(); - drtioaux::send(0, &drtioaux::Packet::SubkernelMessage { - source: *self_destination, destination: 0, id: kernelmgr.get_current_id().unwrap(), - status: meta.status, length: meta.len as u16, data: data_slice - })?; + if *rank == 1 { + if let Some(packet) = router.get_upstream_packet(*rank) { + // pass any async or routed packets to master + // this does mean that DDMA/SK packets to master will "trickle down" to higher rank + drtioaux::send(0, &packet)?; + } } else { let errors; unsafe { @@ -220,18 +211,18 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg drtioaux::send(0, &drtioaux::Packet::RoutingAck) } #[cfg(has_drtio_routing)] - drtioaux::Packet::RoutingSetRank { rank } => { - *_rank = rank; - drtio_routing::interconnect_enable_all(_routing_table, rank); + drtioaux::Packet::RoutingSetRank { rank: new_rank } => { + *rank = new_rank; + drtio_routing::interconnect_enable_all(_routing_table, new_rank); - let rep_rank = rank + 1; + let rep_rank = new_rank + 1; for rep in _repeaters.iter() { if let Err(e) = rep.set_rank(rep_rank) { error!("failed to set rank ({})", e); } } - info!("rank: {}", rank); + info!("rank: {}", new_rank); info!("routing table: {}", _routing_table); drtioaux::send(0, &drtioaux::Packet::RoutingAck) @@ -246,8 +237,18 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg drtioaux::send(0, &drtioaux::Packet::RoutingAck) } + #[cfg(has_drtio_routing)] + drtioaux::Packet::RoutingAck => { + if *rank > 1 { + router.routing_ack_received(); + } else { + warn!("received unexpected RoutingAck"); + } + Ok(()) + } + drtioaux::Packet::MonitorRequest { destination: _destination, channel, probe } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let value; #[cfg(has_rtio_moninj)] unsafe { @@ -264,7 +265,7 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg drtioaux::send(0, &reply) }, drtioaux::Packet::InjectionRequest { destination: _destination, channel, overrd, value } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); #[cfg(has_rtio_moninj)] unsafe { csr::rtio_moninj::inj_chan_sel_write(channel as _); @@ -274,7 +275,7 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg Ok(()) }, drtioaux::Packet::InjectionStatusRequest { destination: _destination, channel, overrd } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let value; #[cfg(has_rtio_moninj)] unsafe { @@ -290,22 +291,22 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg }, drtioaux::Packet::I2cStartRequest { destination: _destination, busno } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let succeeded = i2c::start(busno).is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } drtioaux::Packet::I2cRestartRequest { destination: _destination, busno } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let succeeded = i2c::restart(busno).is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } drtioaux::Packet::I2cStopRequest { destination: _destination, busno } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let succeeded = i2c::stop(busno).is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } drtioaux::Packet::I2cWriteRequest { destination: _destination, busno, data } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); match i2c::write(busno, data) { Ok(ack) => drtioaux::send(0, &drtioaux::Packet::I2cWriteReply { succeeded: true, ack: ack }), @@ -314,7 +315,7 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg } } drtioaux::Packet::I2cReadRequest { destination: _destination, busno, ack } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); match i2c::read(busno, ack) { Ok(data) => drtioaux::send(0, &drtioaux::Packet::I2cReadReply { succeeded: true, data: data }), @@ -323,25 +324,25 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg } } drtioaux::Packet::I2cSwitchSelectRequest { destination: _destination, busno, address, mask } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let succeeded = i2c::switch_select(busno, address, mask).is_ok(); drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) } drtioaux::Packet::SpiSetConfigRequest { destination: _destination, busno, flags, length, div, cs } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok(); drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: succeeded }) }, drtioaux::Packet::SpiWriteRequest { destination: _destination, busno, data } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let succeeded = spi::write(busno, data).is_ok(); drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: succeeded }) } drtioaux::Packet::SpiReadRequest { destination: _destination, busno } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); match spi::read(busno) { Ok(data) => drtioaux::send(0, &drtioaux::Packet::SpiReadReply { succeeded: true, data: data }), @@ -351,7 +352,7 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg } drtioaux::Packet::AnalyzerHeaderRequest { destination: _destination } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let header = analyzer.get_header(); drtioaux::send(0, &drtioaux::Packet::AnalyzerHeader { total_byte_count: header.total_byte_count, @@ -361,7 +362,7 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg } drtioaux::Packet::AnalyzerDataRequest { destination: _destination } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let meta = analyzer.get_data(&mut data_slice); drtioaux::send(0, &drtioaux::Packet::AnalyzerData { @@ -372,35 +373,38 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg } drtioaux::Packet::DmaAddTraceRequest { source, destination, id, status, length, trace } => { - forward!(_routing_table, destination, *_rank, _repeaters, &packet); + forward!(_routing_table, destination, *rank, _repeaters, &packet); *self_destination = destination; let succeeded = dmamgr.add(source, id, status, &trace, length as usize).is_ok(); - drtioaux::send(0, - &drtioaux::Packet::DmaAddTraceReply { destination: source, succeeded: succeeded }) + router.send(drtioaux::Packet::DmaAddTraceReply { + destination: source, succeeded: succeeded + }, _routing_table, *rank) } drtioaux::Packet::DmaRemoveTraceRequest { source, destination: _destination, id } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let succeeded = dmamgr.erase(source, id).is_ok(); - drtioaux::send(0, - &drtioaux::Packet::DmaRemoveTraceReply { destination: source, succeeded: succeeded }) + router.send(drtioaux::Packet::DmaRemoveTraceReply { + destination: source, succeeded: succeeded + }, _routing_table, *rank) } drtioaux::Packet::DmaPlaybackRequest { source, destination: _destination, id, timestamp } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); // no DMA with a running kernel let succeeded = !kernelmgr.is_running() && dmamgr.playback(source, id, timestamp).is_ok(); - drtioaux::send(0, - &drtioaux::Packet::DmaPlaybackReply { destination: source, succeeded: succeeded }) + router.send(drtioaux::Packet::DmaPlaybackReply { + destination: source, succeeded: succeeded + }, _routing_table, *rank) } drtioaux::Packet::SubkernelAddDataRequest { destination, id, status, length, data } => { - forward!(_routing_table, destination, *_rank, _repeaters, &packet); + forward!(_routing_table, destination, *rank, _repeaters, &packet); *self_destination = destination; let succeeded = kernelmgr.add(id, status, &data, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) } drtioaux::Packet::SubkernelLoadRunRequest { source, destination: _destination, id, run } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let mut succeeded = kernelmgr.load(id).is_ok(); // allow preloading a kernel with delayed run if run { @@ -411,11 +415,13 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg succeeded |= kernelmgr.run(source, id).is_ok(); } } - drtioaux::send(0, - &drtioaux::Packet::SubkernelLoadRunReply { destination: source, succeeded: succeeded }) + router.send(drtioaux::Packet::SubkernelLoadRunReply { + destination: source, succeeded: succeeded + }, + _routing_table, *rank) } drtioaux::Packet::SubkernelExceptionRequest { destination: _destination } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let meta = kernelmgr.exception_get_slice(&mut data_slice); drtioaux::send(0, &drtioaux::Packet::SubkernelException { @@ -425,21 +431,22 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg }) } drtioaux::Packet::SubkernelMessage { source, destination: _destination, id: _id, status, length, data } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); kernelmgr.message_handle_incoming(status, length as usize, &data); - drtioaux::send(0, &drtioaux::Packet::SubkernelMessageAck { - destination: source - }) + router.send(drtioaux::Packet::SubkernelMessageAck { + destination: source + }, _routing_table, *rank) } drtioaux::Packet::SubkernelMessageAck { destination: _destination } => { - forward!(_routing_table, _destination, *_rank, _repeaters, &packet); + forward!(_routing_table, _destination, *rank, _repeaters, &packet); if kernelmgr.message_ack_slice() { let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; if let Some(meta) = kernelmgr.message_get_slice(&mut data_slice) { - drtioaux::send(0, &drtioaux::Packet::SubkernelMessage { + router.send(drtioaux::Packet::SubkernelMessage { source: *self_destination, destination: 0, id: kernelmgr.get_current_id().unwrap(), status: meta.status, length: meta.len as u16, data: data_slice - })? + }, + _routing_table, *rank)?; } else { error!("Error receiving message slice"); } @@ -456,12 +463,12 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg fn process_aux_packets(dma_manager: &mut DmaManager, analyzer: &mut Analyzer, kernelmgr: &mut KernelManager, repeaters: &mut [repeater::Repeater], - routing_table: &mut drtio_routing::RoutingTable, rank: &mut u8, + routing_table: &mut drtio_routing::RoutingTable, rank: &mut u8, router: &mut routing::Router, destination: &mut u8) { let result = - drtioaux::recv(0).and_then(|packet| { + drtioaux::recv(0).or_else(|_| Ok(router.get_local_packet())).and_then(|packet| { if let Some(packet) = packet { - process_aux_packet(dma_manager, analyzer, kernelmgr, repeaters, routing_table, rank, destination, packet) + process_aux_packet(dma_manager, analyzer, kernelmgr, repeaters, routing_table, rank, router, destination, packet) } else { Ok(()) } @@ -682,10 +689,12 @@ pub extern fn main() -> i32 { ad9117::init().expect("AD9117 initialization failed"); loop { + let mut router = routing::Router::new(); + while !drtiosat_link_rx_up() { drtiosat_process_errors(); for rep in repeaters.iter_mut() { - rep.service(&routing_table, rank); + rep.service(&routing_table, rank, &mut router); } #[cfg(all(soc_platform = "kasli", hw_rev = "v2.0"))] { @@ -719,10 +728,10 @@ pub extern fn main() -> i32 { while drtiosat_link_rx_up() { drtiosat_process_errors(); process_aux_packets(&mut dma_manager, &mut analyzer, - &mut kernelmgr, &mut repeaters, - &mut routing_table, &mut rank, &mut destination); + &mut kernelmgr, &mut repeaters, &mut routing_table, + &mut rank, &mut router, &mut destination); for rep in repeaters.iter_mut() { - rep.service(&routing_table, rank); + rep.service(&routing_table, rank, &mut router); } #[cfg(all(soc_platform = "kasli", hw_rev = "v2.0"))] { @@ -743,7 +752,38 @@ pub extern fn main() -> i32 { error!("aux packet error: {}", e); } } + if let Some(status) = dma_manager.get_status() { + info!("playback done, error: {}, channel: {}, timestamp: {}", status.error, status.channel, status.timestamp); + let res = router.route(drtioaux::Packet::DmaPlaybackStatus { + destination: status.source, id: status.id, error: status.error, + channel: status.channel, timestamp: status.timestamp + }, &routing_table, rank); + if let Err(e) = res { + warn!("error sending DmaPlaybackStatus: {}", e); + } + } kernelmgr.process_kern_requests(destination); + if let Some(subkernel_finished) = kernelmgr.get_last_finished() { + info!("subkernel {} finished, with exception: {}", subkernel_finished.id, subkernel_finished.with_exception); + let res = router.route(drtioaux::Packet::SubkernelFinished { + destination: subkernel_finished.source, id: subkernel_finished.id, + with_exception: subkernel_finished.with_exception, exception_src: destination + }, &routing_table, rank); + if let Err(e) = res { + warn!("error sending SubkernelFinished: {}", e); + } + } + if kernelmgr.message_is_ready() { + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + let meta = kernelmgr.message_get_slice(&mut data_slice).unwrap(); + let res = router.route(drtioaux::Packet::SubkernelMessage { + source: destination, destination: 0, id: kernelmgr.get_current_id().unwrap(), + status: meta.status, length: meta.len as u16, data: data_slice + }, &routing_table, rank); + if let Err(e) = res { + warn!("error sending SubkernelMessage: {}", e); + } + } } drtiosat_reset_phy(true); diff --git a/artiq/firmware/satman/repeater.rs b/artiq/firmware/satman/repeater.rs index 9969d5099..96af6ba84 100644 --- a/artiq/firmware/satman/repeater.rs +++ b/artiq/firmware/satman/repeater.rs @@ -1,6 +1,7 @@ use board_artiq::{drtioaux, drtio_routing}; #[cfg(has_drtio_routing)] use board_misoc::{csr, clock}; +use routing::{Router, get_routable_packet_destination}; #[cfg(has_drtio_routing)] fn rep_link_rx_up(repno: u8) -> bool { @@ -48,7 +49,7 @@ impl Repeater { self.state == RepeaterState::Up } - pub fn service(&mut self, routing_table: &drtio_routing::RoutingTable, rank: u8) { + pub fn service(&mut self, routing_table: &drtio_routing::RoutingTable, rank: u8, router: &mut Router) { self.process_local_errors(); match self.state { @@ -106,7 +107,7 @@ impl Repeater { } } RepeaterState::Up => { - self.process_unsolicited_aux(); + self.process_unsolicited_aux(routing_table, rank, router); if !rep_link_rx_up(self.repno) { info!("[REP#{}] link is down", self.repno); self.state = RepeaterState::Down; @@ -121,9 +122,21 @@ impl Repeater { } } - fn process_unsolicited_aux(&self) { + fn process_unsolicited_aux(&self, routing_table: &drtio_routing::RoutingTable, rank: u8, router: &mut Router) { match drtioaux::recv(self.auxno) { - Ok(Some(packet)) => warn!("[REP#{}] unsolicited aux packet: {:?}", self.repno, packet), + Ok(Some(packet)) => { + let destination = get_routable_packet_destination(&packet); + if destination.is_none() { + warn!("[REP#{}] unsolicited aux packet: {:?}", self.repno, packet); + } else { + // routable packet + let res = router.route(packet, routing_table, rank); + match res { + Ok(()) => drtioaux::send(self.auxno, &drtioaux::Packet::RoutingAck).unwrap(), + Err(e) => warn!("[REP#{}] Error routing packet: {:?}", self.repno, e), + } + } + } Ok(None) => (), Err(_) => warn!("[REP#{}] aux packet error", self.repno) } diff --git a/artiq/firmware/satman/routing.rs b/artiq/firmware/satman/routing.rs new file mode 100644 index 000000000..e27ec53c9 --- /dev/null +++ b/artiq/firmware/satman/routing.rs @@ -0,0 +1,122 @@ +use alloc::collections::vec_deque::VecDeque; +use board_artiq::{drtioaux, drtio_routing}; +use board_misoc::csr; + +// Packets from downstream (further satellites) are received and routed appropriately. +// they're passed immediately if it's possible (within the subtree), or sent upstream. +// for rank 1 (connected to master) satellites, these packets are passed as an answer to DestinationStatusRequest; +// for higher ranks, straight upstream, but awaiting for an ACK to make sure the upstream is not overwhelmed. + +// forward! macro is not deprecated, as routable packets are only these that can originate +// from both master and satellite, e.g. DDMA and Subkernel. + +pub fn get_routable_packet_destination(packet: &drtioaux::Packet) -> Option { + let destination = match packet { + // received from downstream + drtioaux::Packet::DmaAddTraceRequest { destination, .. } => destination, + drtioaux::Packet::DmaAddTraceReply { destination, .. } => destination, + drtioaux::Packet::DmaRemoveTraceRequest { destination, .. } => destination, + drtioaux::Packet::DmaRemoveTraceReply { destination, .. } => destination, + drtioaux::Packet::DmaPlaybackRequest { destination, .. } => destination, + drtioaux::Packet::DmaPlaybackReply { destination, .. } => destination, + drtioaux::Packet::SubkernelLoadRunRequest { destination, .. } => destination, + drtioaux::Packet::SubkernelLoadRunReply { destination, .. } => destination, + // received from downstream or produced locally + drtioaux::Packet::SubkernelMessage { destination, .. } => destination, + drtioaux::Packet::SubkernelMessageAck { destination, .. } => destination, + // "async" - master gets them by deststatreq, satellites would get it through the router + drtioaux::Packet::DmaPlaybackStatus { destination, .. } => destination, + drtioaux::Packet::SubkernelFinished { destination, .. } => destination, + _ => return None + }; + Some(*destination) +} + +pub struct Router { + out_messages: VecDeque, + local_messages: VecDeque, + upstream_ready: bool +} + +impl Router { + pub fn new() -> Router { + Router { + out_messages: VecDeque::new(), + local_messages: VecDeque::new(), + upstream_ready: true + } + } + + + // called by local sources (DDMA, kernel) and by repeaters on receiving unsolicited data + // messages are always buffered for upstream, or passed downstream directly + pub fn route(&mut self, packet: drtioaux::Packet, + _routing_table: &drtio_routing::RoutingTable, _rank: u8 + ) -> Result<(), drtioaux::Error> { + #[cfg(has_drtio_routing)] + { + let destination = get_routable_packet_destination(&packet); + if let Some(destination) = destination { + let hop = _routing_table.0[destination as usize][_rank as usize]; + let auxno = if destination == 0 { 0 } else { hop }; + if hop != 0 { + if hop as usize <= csr::DRTIOREP.len() { + drtioaux::send(auxno, &packet)?; + } else { + self.out_messages.push_back(packet); + } + } else { + self.local_messages.push_back(packet); + } + } else { + return Err(drtioaux::Error::RoutingError); + } + } + #[cfg(not(has_drtio_routing))] + { + self.out_messages.push_back(packet); + } + Ok(()) + } + + // Sends a packet to a required destination, routing if it's necessary + pub fn send(&mut self, packet: drtioaux::Packet, + _routing_table: &drtio_routing::RoutingTable, _rank: u8) -> Result<(), drtioaux::Error> { + #[cfg(has_drtio_routing)] + { + let destination = get_routable_packet_destination(&packet); + if destination.is_none() || destination == Some(0) { + // send upstream directly (response to master) + drtioaux::send(0, &packet) + } else { + self.route(packet, _routing_table, _rank) + } + } + #[cfg(not(has_drtio_routing))] + { + drtioaux::send(0, &packet) + } + } + + pub fn get_upstream_packet(&mut self, rank: u8) -> Option { + // called on DestinationStatusRequest on rank 1, in loop in others + if self.upstream_ready { + let packet = self.out_messages.pop_front(); + if rank > 1 && packet.is_some() { + // packet will be sent out, awaiting ACK + self.upstream_ready = false; + } + packet + } else { + None + } + } + + pub fn routing_ack_received(&mut self) { + self.upstream_ready = true; + } + + pub fn get_local_packet(&mut self) -> Option { + self.local_messages.pop_front() + } +} \ No newline at end of file