satman: support sub-subkernels, routing

This commit is contained in:
mwojcik 2023-12-12 16:51:54 +08:00
parent 525c9c7be3
commit 0dd834acf8
7 changed files with 653 additions and 190 deletions

View File

@ -922,4 +922,39 @@ impl Packet {
}
Ok(())
}
pub fn routable_destination(&self) -> Option<u8> {
// only for packets that could be re-routed, not only forwarded
match self {
Packet::DmaAddTraceRequest { destination, .. } => Some(*destination),
Packet::DmaAddTraceReply { destination, .. } => Some(*destination),
Packet::DmaRemoveTraceRequest { destination, .. } => Some(*destination),
Packet::DmaRemoveTraceReply { destination, .. } => Some(*destination),
Packet::DmaPlaybackRequest { destination, .. } => Some(*destination),
Packet::DmaPlaybackReply { destination, .. } => Some(*destination),
Packet::SubkernelLoadRunRequest { destination, .. } => Some(*destination),
Packet::SubkernelLoadRunReply { destination, .. } => Some(*destination),
Packet::SubkernelMessage { destination, .. } => Some(*destination),
Packet::SubkernelMessageAck { destination } => Some(*destination),
Packet::DmaPlaybackStatus { destination, .. } => Some(*destination),
Packet::SubkernelFinished { destination, .. } => Some(*destination),
_ => None,
}
}
pub fn expects_response(&self) -> bool {
// returns true if the routable packet should elicit a response
// e.g. reply, ACK packets end a conversation,
// and firmware should not wait for response
match self {
Packet::DmaAddTraceReply { .. }
| Packet::DmaRemoveTraceReply { .. }
| Packet::DmaPlaybackReply { .. }
| Packet::SubkernelLoadRunReply { .. }
| Packet::SubkernelMessageAck { .. }
| Packet::DmaPlaybackStatus { .. }
| Packet::SubkernelFinished { .. } => false,
_ => true,
}
}
}

View File

@ -53,7 +53,12 @@ pub mod drtio {
async_ready
}
async fn process_async_packets(aux_mutex: &Mutex<bool>, linkno: u8, routing_table: &drtio_routing::RoutingTable, timer: GlobalTimer) {
async fn process_async_packets(
aux_mutex: &Mutex<bool>,
linkno: u8,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
) {
if link_has_async_ready(linkno).await {
loop {
let reply = aux_transact(aux_mutex, linkno, &Packet::RoutingRetrievePackets, timer).await;

View File

@ -12,6 +12,7 @@ enum ManagerState {
}
pub struct RtioStatus {
pub source: u8,
pub id: u32,
pub error: u8,
pub channel: u32,
@ -33,9 +34,10 @@ struct Entry {
#[derive(Debug)]
pub struct Manager {
entries: BTreeMap<u32, Entry>,
entries: BTreeMap<(u8, u32), Entry>,
state: ManagerState,
currentid: u32,
current_id: u32,
current_source: u8,
}
impl Manager {
@ -45,40 +47,48 @@ impl Manager {
unsafe { while csr::rtio_dma::enable_read() != 0 {} }
Manager {
entries: BTreeMap::new(),
currentid: 0,
current_id: 0,
current_source: 0,
state: ManagerState::Idle,
}
}
pub fn add(&mut self, id: u32, status: PayloadStatus, trace: &[u8], trace_len: usize) -> Result<(), Error> {
let entry = match self.entries.get_mut(&id) {
pub fn add(
&mut self,
source: u8,
id: u32,
status: PayloadStatus,
trace: &[u8],
trace_len: usize,
) -> Result<(), Error> {
let entry = match self.entries.get_mut(&(source, id)) {
Some(entry) => {
if entry.complete || status.is_first() {
// replace entry
self.entries.remove(&id);
self.entries.remove(&(source, id));
self.entries.insert(
id,
(source, id),
Entry {
trace: Vec::new(),
padding_len: 0,
complete: false,
},
);
self.entries.get_mut(&id).unwrap()
self.entries.get_mut(&(source, id)).unwrap()
} else {
entry
}
}
None => {
self.entries.insert(
id,
(source, id),
Entry {
trace: Vec::new(),
padding_len: 0,
complete: false,
},
);
self.entries.get_mut(&id).unwrap()
self.entries.get_mut(&(source, id)).unwrap()
}
};
entry.trace.extend(&trace[0..trace_len]);
@ -105,19 +115,19 @@ impl Manager {
Ok(())
}
pub fn erase(&mut self, id: u32) -> Result<(), Error> {
match self.entries.remove(&id) {
pub fn erase(&mut self, source: u8, id: u32) -> Result<(), Error> {
match self.entries.remove(&(source, id)) {
Some(_) => Ok(()),
None => Err(Error::IdNotFound),
}
}
pub fn playback(&mut self, id: u32, timestamp: u64) -> Result<(), Error> {
pub fn playback(&mut self, source: u8, id: u32, timestamp: u64) -> Result<(), Error> {
if self.state != ManagerState::Idle {
return Err(Error::PlaybackInProgress);
}
let entry = match self.entries.get(&id) {
let entry = match self.entries.get(&(source, id)) {
Some(entry) => entry,
None => {
return Err(Error::IdNotFound);
@ -130,7 +140,8 @@ impl Manager {
assert!(ptr as u32 % 64 == 0);
self.state = ManagerState::Playback;
self.currentid = id;
self.current_id = id;
self.current_source = source;
unsafe {
csr::rtio_dma::base_address_write(ptr as u32);
@ -162,7 +173,8 @@ impl Manager {
csr::rtio_dma::error_write(1);
}
return Some(RtioStatus {
id: self.currentid,
source: self.current_source,
id: self.current_id,
error: error,
channel: channel,
timestamp: timestamp,

View File

@ -39,11 +39,13 @@ use libboard_zynq::{i2c::I2c, print, println, time::Milliseconds, timer::GlobalT
use libcortex_a9::{l2c::enable_l2_cache, regs::MPIDR};
use libregister::RegisterR;
use libsupport_zynq::ram;
use routing::Router;
use subkernel::Manager as KernelManager;
mod analyzer;
mod dma;
mod repeater;
mod routing;
mod subkernel;
fn drtiosat_reset(reset: bool) {
@ -72,6 +74,12 @@ fn drtiosat_tsc_loaded() -> bool {
}
}
fn drtiosat_async_ready() {
unsafe {
csr::drtiosat::async_messages_ready_write(1);
}
}
#[cfg(has_drtio_routing)]
macro_rules! forward {
($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{
@ -79,7 +87,11 @@ macro_rules! forward {
if hop != 0 {
let repno = (hop - 1) as usize;
if repno < $repeaters.len() {
return $repeaters[repno].aux_forward($packet, $timer);
if $packet.expects_response() {
return $repeaters[repno].aux_forward($packet, $timer);
} else {
return $repeaters[repno].aux_send($packet);
}
} else {
return Err(drtioaux::Error::RoutingError);
}
@ -95,13 +107,15 @@ macro_rules! forward {
fn process_aux_packet(
_repeaters: &mut [repeater::Repeater],
_routing_table: &mut drtio_routing::RoutingTable,
_rank: &mut u8,
rank: &mut u8,
self_destination: &mut u8,
packet: drtioaux::Packet,
timer: &mut GlobalTimer,
i2c: &mut I2c,
dma_manager: &mut DmaManager,
analyzer: &mut Analyzer,
kernel_manager: &mut KernelManager,
router: &mut Router,
) -> Result<(), drtioaux::Error> {
// In the code below, *_chan_sel_write takes an u8 if there are fewer than 256 channels,
// and u16 otherwise; hence the `as _` conversion.
@ -122,82 +136,39 @@ fn process_aux_packet(
drtioaux::Packet::DestinationStatusRequest { destination } => {
#[cfg(has_drtio_routing)]
let hop = _routing_table.0[destination as usize][*_rank as usize];
let hop = _routing_table.0[destination as usize][*rank as usize];
#[cfg(not(has_drtio_routing))]
let hop = 0;
if hop == 0 {
if let Some(status) = dma_manager.check_state() {
info!(
"playback done, error: {}, channel: {}, timestamp: {}",
status.error, status.channel, status.timestamp
);
drtioaux::send(
0,
&drtioaux::Packet::DmaPlaybackStatus {
destination: destination,
id: status.id,
error: status.error,
channel: status.channel,
timestamp: status.timestamp,
},
)?;
} else if let Some(subkernel_finished) = kernel_manager.get_last_finished() {
info!(
"subkernel {} finished, with exception: {}",
subkernel_finished.id, subkernel_finished.with_exception
);
drtioaux::send(
0,
&drtioaux::Packet::SubkernelFinished {
id: subkernel_finished.id,
with_exception: subkernel_finished.with_exception,
},
)?;
} else if kernel_manager.message_is_ready() {
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
match kernel_manager.message_get_slice(&mut data_slice) {
Some(meta) => drtioaux::send(
0,
&drtioaux::Packet::SubkernelMessage {
destination: destination,
id: kernel_manager.get_current_id().unwrap(),
status: meta.status,
length: meta.len as u16,
data: data_slice,
},
)?,
None => warn!("subkernel message is ready but no message is present"),
}
} else {
let errors;
*self_destination = destination;
let errors;
unsafe {
errors = csr::drtiosat::rtio_error_read();
}
if errors & 1 != 0 {
let channel;
unsafe {
errors = csr::drtiosat::rtio_error_read();
channel = csr::drtiosat::sequence_error_channel_read();
csr::drtiosat::rtio_error_write(1);
}
if errors & 1 != 0 {
let channel;
unsafe {
channel = csr::drtiosat::sequence_error_channel_read();
csr::drtiosat::rtio_error_write(1);
}
drtioaux::send(0, &drtioaux::Packet::DestinationSequenceErrorReply { channel })?;
} else if errors & 2 != 0 {
let channel;
unsafe {
channel = csr::drtiosat::collision_channel_read();
csr::drtiosat::rtio_error_write(2);
}
drtioaux::send(0, &drtioaux::Packet::DestinationCollisionReply { channel })?;
} else if errors & 4 != 0 {
let channel;
unsafe {
channel = csr::drtiosat::busy_channel_read();
csr::drtiosat::rtio_error_write(4);
}
drtioaux::send(0, &drtioaux::Packet::DestinationBusyReply { channel })?;
} else {
drtioaux::send(0, &drtioaux::Packet::DestinationOkReply)?;
drtioaux::send(0, &drtioaux::Packet::DestinationSequenceErrorReply { channel })?;
} else if errors & 2 != 0 {
let channel;
unsafe {
channel = csr::drtiosat::collision_channel_read();
csr::drtiosat::rtio_error_write(2);
}
drtioaux::send(0, &drtioaux::Packet::DestinationCollisionReply { channel })?;
} else if errors & 4 != 0 {
let channel;
unsafe {
channel = csr::drtiosat::busy_channel_read();
csr::drtiosat::rtio_error_write(4);
}
drtioaux::send(0, &drtioaux::Packet::DestinationBusyReply { channel })?;
} else {
drtioaux::send(0, &drtioaux::Packet::DestinationOkReply)?;
}
}
@ -242,11 +213,11 @@ fn process_aux_packet(
drtioaux::send(0, &drtioaux::Packet::RoutingAck)
}
#[cfg(has_drtio_routing)]
drtioaux::Packet::RoutingSetRank { rank } => {
*_rank = rank;
drtio_routing::interconnect_enable_all(_routing_table, rank);
drtioaux::Packet::RoutingSetRank { rank: new_rank } => {
*rank = new_rank;
drtio_routing::interconnect_enable_all(_routing_table, new_rank);
let rep_rank = rank + 1;
let rep_rank = new_rank + 1;
for rep in _repeaters.iter() {
if let Err(e) = rep.set_rank(rep_rank, timer) {
error!("failed to set rank ({:?})", e);
@ -267,12 +238,20 @@ fn process_aux_packet(
#[cfg(not(has_drtio_routing))]
drtioaux::Packet::RoutingSetRank { rank: _ } => drtioaux::send(0, &drtioaux::Packet::RoutingAck),
drtioaux::Packet::RoutingRetrievePackets => {
let packet = router
.get_upstream_packet()
.or(Some(drtioaux::Packet::RoutingNoPackets))
.unwrap();
drtioaux::send(0, &packet)
}
drtioaux::Packet::MonitorRequest {
destination: _destination,
channel,
probe,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let value;
#[cfg(has_rtio_moninj)]
unsafe {
@ -294,7 +273,7 @@ fn process_aux_packet(
overrd,
value,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
#[cfg(has_rtio_moninj)]
unsafe {
csr::rtio_moninj::inj_chan_sel_write(channel as _);
@ -308,7 +287,7 @@ fn process_aux_packet(
channel,
overrd,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let value;
#[cfg(has_rtio_moninj)]
unsafe {
@ -327,7 +306,7 @@ fn process_aux_packet(
destination: _destination,
busno: _busno,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let succeeded = i2c.start().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
}
@ -335,7 +314,7 @@ fn process_aux_packet(
destination: _destination,
busno: _busno,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let succeeded = i2c.restart().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
}
@ -343,7 +322,7 @@ fn process_aux_packet(
destination: _destination,
busno: _busno,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let succeeded = i2c.stop().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
}
@ -352,7 +331,7 @@ fn process_aux_packet(
busno: _busno,
data,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
match i2c.write(data) {
Ok(ack) => drtioaux::send(
0,
@ -375,7 +354,7 @@ fn process_aux_packet(
busno: _busno,
ack,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
match i2c.read(ack) {
Ok(data) => drtioaux::send(
0,
@ -399,7 +378,7 @@ fn process_aux_packet(
address,
mask,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let ch = match mask {
//decode from mainline, PCA9548-centric API
0x00 => None,
@ -425,7 +404,7 @@ fn process_aux_packet(
div: _div,
cs: _cs,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
// todo: reimplement when/if SPI is available
//let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok();
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
@ -435,7 +414,7 @@ fn process_aux_packet(
busno: _busno,
data: _data,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
// todo: reimplement when/if SPI is available
//let succeeded = spi::write(busno, data).is_ok();
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
@ -444,7 +423,7 @@ fn process_aux_packet(
destination: _destination,
busno: _busno,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
// todo: reimplement when/if SPI is available
// match spi::read(busno) {
// Ok(data) => drtioaux::send(0,
@ -464,7 +443,7 @@ fn process_aux_packet(
drtioaux::Packet::AnalyzerHeaderRequest {
destination: _destination,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let header = analyzer.get_header();
drtioaux::send(
0,
@ -478,7 +457,7 @@ fn process_aux_packet(
drtioaux::Packet::AnalyzerDataRequest {
destination: _destination,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = analyzer.get_data(&mut data_slice);
drtioaux::send(
@ -492,55 +471,85 @@ fn process_aux_packet(
}
drtioaux::Packet::DmaAddTraceRequest {
destination: _destination,
source,
destination,
id,
status,
length,
trace,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
let succeeded = dma_manager.add(id, status, &trace, length as usize).is_ok();
drtioaux::send(0, &drtioaux::Packet::DmaAddTraceReply { succeeded: succeeded })
forward!(_routing_table, destination, *rank, _repeaters, &packet, timer);
*self_destination = destination;
let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok();
router.send(
drtioaux::Packet::DmaAddTraceReply {
destination: source,
succeeded: succeeded,
},
_routing_table,
*rank,
*self_destination,
)
}
drtioaux::Packet::DmaRemoveTraceRequest {
source,
destination: _destination,
id,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
let succeeded = dma_manager.erase(id).is_ok();
drtioaux::send(0, &drtioaux::Packet::DmaRemoveTraceReply { succeeded: succeeded })
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let succeeded = dma_manager.erase(source, id).is_ok();
router.send(
drtioaux::Packet::DmaRemoveTraceReply {
destination: source,
succeeded: succeeded,
},
_routing_table,
*rank,
*self_destination,
)
}
drtioaux::Packet::DmaPlaybackRequest {
source,
destination: _destination,
id,
timestamp,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let succeeded = if !kernel_manager.running() {
dma_manager.playback(id, timestamp).is_ok()
dma_manager.playback(source, id, timestamp).is_ok()
} else {
false
};
drtioaux::send(0, &drtioaux::Packet::DmaPlaybackReply { succeeded: succeeded })
router.send(
drtioaux::Packet::DmaPlaybackReply {
destination: source,
succeeded: succeeded,
},
_routing_table,
*rank,
*self_destination,
)
}
drtioaux::Packet::SubkernelAddDataRequest {
destination: _destination,
destination,
id,
status,
length,
data,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, destination, *rank, _repeaters, &packet, timer);
*self_destination = destination;
let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok();
drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded })
}
drtioaux::Packet::SubkernelLoadRunRequest {
source,
destination: _destination,
id,
run,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let mut succeeded = kernel_manager.load(id).is_ok();
// allow preloading a kernel with delayed run
if run {
@ -548,15 +557,42 @@ fn process_aux_packet(
// cannot run kernel while DDMA is running
succeeded = false;
} else {
succeeded |= kernel_manager.run(id).is_ok();
succeeded |= kernel_manager.run(source, id).is_ok();
}
}
drtioaux::send(0, &drtioaux::Packet::SubkernelLoadRunReply { succeeded: succeeded })
router.send(
drtioaux::Packet::SubkernelLoadRunReply {
destination: source,
succeeded: succeeded,
},
_routing_table,
*rank,
*self_destination,
)
}
drtioaux::Packet::SubkernelLoadRunReply {
destination: _destination,
succeeded,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
// received if local subkernel started another, remote subkernel
kernel_manager.subkernel_load_run_reply(succeeded);
Ok(())
}
drtioaux::Packet::SubkernelFinished {
destination: _destination,
id,
with_exception,
exception_src,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
kernel_manager.remote_subkernel_finished(id, with_exception, exception_src);
Ok(())
}
drtioaux::Packet::SubkernelExceptionRequest {
destination: _destination,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = kernel_manager.exception_get_slice(&mut data_slice);
drtioaux::send(
@ -569,38 +605,43 @@ fn process_aux_packet(
)
}
drtioaux::Packet::SubkernelMessage {
destination,
source,
destination: _destination,
id: _id,
status,
length,
data,
} => {
forward!(_routing_table, destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
kernel_manager.message_handle_incoming(status, length as usize, &data);
drtioaux::send(
0,
&drtioaux::Packet::SubkernelMessageAck {
destination: destination,
},
router.send(
drtioaux::Packet::SubkernelMessageAck { destination: source },
_routing_table,
*rank,
*self_destination,
)
}
drtioaux::Packet::SubkernelMessageAck {
destination: _destination,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
if kernel_manager.message_ack_slice() {
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) {
drtioaux::send(
0,
&drtioaux::Packet::SubkernelMessage {
destination: *_rank,
// route and not send immediately as ACKs are not a beginning of a transaction
router.route(
drtioaux::Packet::SubkernelMessage {
source: *self_destination,
destination: meta.destination,
id: kernel_manager.get_current_id().unwrap(),
status: meta.status,
length: meta.len as u16,
data: data_slice,
},
)?;
_routing_table,
*rank,
*self_destination,
);
} else {
error!("Error receiving message slice");
}
@ -619,32 +660,35 @@ fn process_aux_packets(
repeaters: &mut [repeater::Repeater],
routing_table: &mut drtio_routing::RoutingTable,
rank: &mut u8,
self_destination: &mut u8,
timer: &mut GlobalTimer,
i2c: &mut I2c,
dma_manager: &mut DmaManager,
analyzer: &mut Analyzer,
kernel_manager: &mut KernelManager,
router: &mut Router,
) {
let result = drtioaux::recv(0).and_then(|packet| {
if let Some(packet) = packet {
if let Some(packet) = packet.or_else(|| router.get_local_packet()) {
process_aux_packet(
repeaters,
routing_table,
rank,
self_destination,
packet,
timer,
i2c,
dma_manager,
analyzer,
kernel_manager,
router,
)
} else {
Ok(())
}
});
match result {
Ok(()) => (),
Err(e) => warn!("aux packet error ({:?})", e),
if let Err(e) = result {
warn!("aux packet error ({:?})", e);
}
}
@ -800,17 +844,20 @@ pub extern "C" fn main_core0() -> i32 {
}
let mut routing_table = drtio_routing::RoutingTable::default_empty();
let mut rank = 1;
let mut destination = 1;
let mut hardware_tick_ts = 0;
let mut control = ksupport::kernel::Control::start();
loop {
let mut router = Router::new();
while !drtiosat_link_rx_up() {
drtiosat_process_errors();
#[allow(unused_mut)]
for mut rep in repeaters.iter_mut() {
rep.service(&routing_table, rank, &mut timer);
rep.service(&routing_table, rank, destination, &mut router, &mut timer);
}
#[cfg(feature = "target_kasli_soc")]
{
@ -849,15 +896,17 @@ pub extern "C" fn main_core0() -> i32 {
&mut repeaters,
&mut routing_table,
&mut rank,
&mut destination,
&mut timer,
&mut i2c,
&mut dma_manager,
&mut analyzer,
&mut kernel_manager,
&mut router,
);
#[allow(unused_mut)]
for mut rep in repeaters.iter_mut() {
rep.service(&routing_table, rank, &mut timer);
rep.service(&routing_table, rank, destination, &mut router, &mut timer);
}
#[cfg(feature = "target_kasli_soc")]
{
@ -880,7 +929,37 @@ pub extern "C" fn main_core0() -> i32 {
error!("aux packet error: {:?}", e);
}
}
kernel_manager.process_kern_requests(rank, timer);
if let Some(status) = dma_manager.check_state() {
info!(
"playback done, error: {}, channel: {}, timestamp: {}",
status.error, status.channel, status.timestamp
);
router.route(
drtioaux::Packet::DmaPlaybackStatus {
source: destination,
destination: status.source,
id: status.id,
error: status.error,
channel: status.channel,
timestamp: status.timestamp,
},
&routing_table,
rank,
destination,
);
}
kernel_manager.process_kern_requests(&mut router, &routing_table, rank, destination, &timer);
if let Some((repno, packet)) = router.get_downstream_packet() {
if let Err(e) = repeaters[repno].aux_send(&packet) {
warn!("[REP#{}] Error when sending packet to satellite ({:?})", repno, e)
}
}
if router.any_upstream_waiting() {
drtiosat_async_ready();
}
}
drtiosat_reset_phy(true);

View File

@ -6,6 +6,7 @@ use libboard_artiq::{drtio_routing, drtioaux};
#[cfg(has_drtio_routing)]
use libboard_zynq::time::Milliseconds;
use libboard_zynq::timer::GlobalTimer;
use routing::Router;
#[cfg(has_drtio_routing)]
fn rep_link_rx_up(repno: u8) -> bool {
@ -53,7 +54,14 @@ impl Repeater {
self.state == RepeaterState::Up
}
pub fn service(&mut self, routing_table: &drtio_routing::RoutingTable, rank: u8, timer: &mut GlobalTimer) {
pub fn service(
&mut self,
routing_table: &drtio_routing::RoutingTable,
rank: u8,
destination: u8,
router: &mut Router,
timer: &mut GlobalTimer,
) {
self.process_local_errors();
match self.state {
@ -116,6 +124,11 @@ impl Repeater {
info!("[REP#{}] link is down", self.repno);
self.state = RepeaterState::Down;
}
if self.async_messages_ready() {
if let Err(e) = self.handle_async(routing_table, rank, destination, router, timer) {
warn!("[REP#{}] Error handling async messages ({:?})", self.repno, e);
}
}
}
RepeaterState::Failed => {
if !rep_link_rx_up(self.repno) {
@ -173,6 +186,34 @@ impl Repeater {
}
}
fn async_messages_ready(&self) -> bool {
let async_rdy;
unsafe {
async_rdy = (csr::DRTIOREP[self.repno as usize].async_messages_ready_read)();
(csr::DRTIOREP[self.repno as usize].async_messages_ready_write)(0);
}
async_rdy == 1
}
fn handle_async(
&self,
routing_table: &drtio_routing::RoutingTable,
rank: u8,
self_destination: u8,
router: &mut Router,
timer: &mut GlobalTimer,
) -> Result<(), drtioaux::Error> {
loop {
drtioaux::send(self.auxno, &drtioaux::Packet::RoutingRetrievePackets).unwrap();
let reply = self.recv_aux_timeout(200, timer)?;
match reply {
drtioaux::Packet::RoutingNoPackets => break,
packet => router.route(packet, routing_table, rank, self_destination),
}
}
Ok(())
}
fn recv_aux_timeout(&self, timeout: u32, timer: &mut GlobalTimer) -> Result<drtioaux::Packet, drtioaux::Error> {
let max_time = timer.get_time() + Milliseconds(timeout.into());
loop {
@ -191,15 +232,19 @@ impl Repeater {
}
pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
if self.state != RepeaterState::Up {
return Err(drtioaux::Error::LinkDown);
}
drtioaux::send(self.auxno, request).unwrap();
self.aux_send(request)?;
let reply = self.recv_aux_timeout(200, timer)?;
drtioaux::send(0, &reply).unwrap();
Ok(())
}
pub fn aux_send(&self, request: &drtioaux::Packet) -> Result<(), drtioaux::Error> {
if self.state != RepeaterState::Up {
return Err(drtioaux::Error::LinkDown);
}
drtioaux::send(self.auxno, request)
}
pub fn sync_tsc(&self, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
if self.state != RepeaterState::Up {
return Ok(());

124
src/satman/src/routing.rs Normal file
View File

@ -0,0 +1,124 @@
use alloc::collections::vec_deque::VecDeque;
use libboard_artiq::{drtio_routing, drtioaux, pl::csr};
// Packets from downstream (further satellites) are received and routed appropriately.
// they're passed as soon as possible downstream (within the subtree), or sent upstream,
// which is notified about pending packets.
// for rank 1 (connected to master) satellites, these packets are passed as an answer to DestinationStatusRequest;
// for higher ranks, after getting a notification, it will transact with downstream to get the pending packets.
// forward! macro is not deprecated, as routable packets are only these that can originate
// from both master and satellite, e.g. DDMA and Subkernel.
pub struct Router {
upstream_queue: VecDeque<drtioaux::Packet>,
local_queue: VecDeque<drtioaux::Packet>,
downstream_queue: VecDeque<(usize, drtioaux::Packet)>,
upstream_notified: bool,
}
impl Router {
pub fn new() -> Router {
Router {
upstream_queue: VecDeque::new(),
local_queue: VecDeque::new(),
downstream_queue: VecDeque::new(),
upstream_notified: false,
}
}
// called by local sources (DDMA, kernel) and by repeaters on receiving async data
// messages are always buffered for both upstream and downstream
pub fn route(
&mut self,
packet: drtioaux::Packet,
_routing_table: &drtio_routing::RoutingTable,
_rank: u8,
_self_destination: u8,
) {
#[cfg(has_drtio_routing)]
{
let destination = packet.routable_destination();
if let Some(destination) = destination {
let hop = _routing_table.0[destination as usize][_rank as usize] as usize;
if destination == _self_destination {
self.local_queue.push_back(packet);
} else if hop > 0 && hop < csr::DRTIOREP.len() {
let repno = (hop - 1) as usize;
self.downstream_queue.push_back((repno, packet));
} else {
self.upstream_queue.push_back(packet);
}
} else {
error!("Received an unroutable packet: {:?}", packet);
}
}
#[cfg(not(has_drtio_routing))]
{
self.upstream_queue.push_back(packet);
}
}
// Sends a packet to a required destination, routing if it's necessary
pub fn send(
&mut self,
packet: drtioaux::Packet,
_routing_table: &drtio_routing::RoutingTable,
_rank: u8,
_destination: u8,
) -> Result<(), drtioaux::Error> {
#[cfg(has_drtio_routing)]
{
let destination = packet.routable_destination();
if let Some(destination) = destination {
let hop = _routing_table.0[destination as usize][_rank as usize] as usize;
if destination == 0 {
// response is needed immediately if master required it
drtioaux::send(0, &packet)?;
} else if !(hop > 0 && hop < csr::DRTIOREP.len()) {
// higher rank can wait
self.upstream_queue.push_back(packet);
} else {
let repno = (hop - 1) as usize;
// transaction will occur at closest possible opportunity
self.downstream_queue.push_back((repno, packet));
}
Ok(())
} else {
// packet not supported in routing, fallback - sent directly
drtioaux::send(0, &packet)
}
}
#[cfg(not(has_drtio_routing))]
{
drtioaux::send(0, &packet)
}
}
pub fn any_upstream_waiting(&mut self) -> bool {
let empty = self.upstream_queue.is_empty();
if !empty && !self.upstream_notified {
self.upstream_notified = true; // so upstream will not get spammed with notifications
true
} else {
false
}
}
pub fn get_upstream_packet(&mut self) -> Option<drtioaux::Packet> {
let packet = self.upstream_queue.pop_front();
if packet.is_none() {
self.upstream_notified = false;
}
packet
}
pub fn get_downstream_packet(&mut self) -> Option<(usize, drtioaux::Packet)> {
self.downstream_queue.pop_front()
}
pub fn get_local_packet(&mut self) -> Option<drtioaux::Packet> {
self.local_queue.pop_front()
}
}

View File

@ -8,11 +8,14 @@ use core_io::{Error as IoError, Write};
use cslice::AsCSlice;
use io::{Cursor, ProtoWrite};
use ksupport::{eh_artiq, kernel, rpc};
use libboard_artiq::{drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE},
use libboard_artiq::{drtio_routing::RoutingTable,
drtioaux,
drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE},
pl::csr};
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
use libcortex_a9::sync_channel::Receiver;
use log::warn;
use routing::Router;
#[derive(Debug, Clone, PartialEq)]
enum KernelState {
@ -21,6 +24,8 @@ enum KernelState {
Running,
MsgAwait(Milliseconds, Vec<u8>),
MsgSending,
SubkernelAwaitLoad,
SubkernelAwaitFinish { max_time: Milliseconds, id: u32 },
}
#[derive(Debug)]
@ -31,6 +36,7 @@ pub enum Error {
NoMessage,
AwaitingMessage,
SubkernelIoError,
DrtioError,
KernelException(Sliceable),
}
@ -52,6 +58,12 @@ impl From<()> for Error {
}
}
impl From<drtioaux::Error> for Error {
fn from(_value: drtioaux::Error) -> Error {
Error::DrtioError
}
}
macro_rules! unexpected {
($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*))));
}
@ -61,6 +73,7 @@ macro_rules! unexpected {
pub struct Sliceable {
it: usize,
data: Vec<u8>,
destination: u8,
}
/* represents interkernel messages */
@ -72,7 +85,6 @@ struct Message {
#[derive(PartialEq)]
enum OutMessageState {
NoMessage,
MessageReady,
MessageBeingSent,
MessageSent,
MessageAcknowledged,
@ -92,6 +104,8 @@ struct Session {
kernel_state: KernelState,
last_exception: Option<Sliceable>,
messages: MessageManager,
source: u8, // which destination requested running the kernel
subkernels_finished: Vec<u32>,
}
impl Session {
@ -101,13 +115,19 @@ impl Session {
kernel_state: KernelState::Absent,
last_exception: None,
messages: MessageManager::new(),
source: 0,
subkernels_finished: Vec::new(),
}
}
fn running(&self) -> bool {
match self.kernel_state {
KernelState::Absent | KernelState::Loaded => false,
KernelState::Running | KernelState::MsgAwait { .. } | KernelState::MsgSending => true,
KernelState::Running
| KernelState::MsgAwait { .. }
| KernelState::MsgSending
| KernelState::SubkernelAwaitLoad
| KernelState::SubkernelAwaitFinish { .. } => true,
}
}
}
@ -129,9 +149,12 @@ pub struct Manager<'a> {
pub struct SubkernelFinished {
pub id: u32,
pub with_exception: bool,
pub exception_source: u8,
pub source: u8,
}
pub struct SliceMeta {
pub destination: u8,
pub len: u16,
pub status: PayloadStatus,
}
@ -148,6 +171,7 @@ macro_rules! get_slice_fn {
self.it += len;
SliceMeta {
destination: self.destination,
len: len as u16,
status: status,
}
@ -156,8 +180,12 @@ macro_rules! get_slice_fn {
}
impl Sliceable {
pub fn new(data: Vec<u8>) -> Sliceable {
Sliceable { it: 0, data: data }
pub fn new(destination: u8, data: Vec<u8>) -> Sliceable {
Sliceable {
it: 0,
data: data,
destination: destination,
}
}
get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE);
@ -194,17 +222,6 @@ impl MessageManager {
}
}
pub fn is_outgoing_ready(&mut self) -> bool {
// called by main loop, to see if there's anything to send, will send it afterwards
match self.out_state {
OutMessageState::MessageReady => {
self.out_state = OutMessageState::MessageBeingSent;
true
}
_ => false,
}
}
pub fn was_message_acknowledged(&mut self) -> bool {
match self.out_state {
OutMessageState::MessageAcknowledged => {
@ -244,10 +261,34 @@ impl MessageManager {
}
}
pub fn accept_outgoing(&mut self, message: Vec<u8>) -> Result<(), Error> {
// service tag skipped in kernel
self.out_message = Some(Sliceable::new(message));
self.out_state = OutMessageState::MessageReady;
pub fn accept_outgoing(
&mut self,
id: u32,
self_destination: u8,
destination: u8,
message: Vec<u8>,
routing_table: &RoutingTable,
rank: u8,
router: &mut Router,
) -> Result<(), Error> {
self.out_message = Some(Sliceable::new(destination, message));
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
self.out_state = OutMessageState::MessageBeingSent;
let meta = self.get_outgoing_slice(&mut data_slice).unwrap();
router.route(
drtioaux::Packet::SubkernelMessage {
source: self_destination,
destination: destination,
id: id,
status: meta.status,
length: meta.len as u16,
data: data_slice,
},
routing_table,
rank,
self_destination,
);
Ok(())
}
@ -313,12 +354,13 @@ impl<'a> Manager<'_> {
}
}
pub fn run(&mut self, id: u32) -> Result<(), Error> {
pub fn run(&mut self, source: u8, id: u32) -> Result<(), Error> {
info!("starting subkernel #{}", id);
if self.session.kernel_state != KernelState::Loaded || self.session.id != id {
self.load(id)?;
}
self.session.kernel_state = KernelState::Running;
self.session.source = source;
unsafe {
csr::cri_con::selected_write(2);
}
@ -354,10 +396,6 @@ impl<'a> Manager<'_> {
self.session.messages.ack_slice()
}
pub fn message_is_ready(&mut self) -> bool {
self.session.messages.is_outgoing_ready()
}
pub fn load(&mut self, id: u32) -> Result<(), Error> {
if self.session.id == id && self.session.kernel_state == KernelState::Loaded {
return Ok(());
@ -386,16 +424,13 @@ impl<'a> Manager<'_> {
match self.session.last_exception.as_mut() {
Some(exception) => exception.get_slice_sat(data_slice),
None => SliceMeta {
destination: 0,
len: 0,
status: PayloadStatus::FirstAndLast,
},
}
}
pub fn get_last_finished(&mut self) -> Option<SubkernelFinished> {
self.last_finished.take()
}
fn kernel_stop(&mut self) {
self.session.kernel_state = KernelState::Absent;
unsafe {
@ -425,13 +460,38 @@ impl<'a> Manager<'_> {
&[],
0,
) {
Ok(_) => self.session.last_exception = Some(Sliceable::new(writer.into_inner())),
Ok(_) => self.session.last_exception = Some(Sliceable::new(0, writer.into_inner())),
Err(_) => error!("Error writing exception data"),
}
self.kernel_stop();
}
pub fn process_kern_requests(&mut self, rank: u8, timer: GlobalTimer) {
pub fn process_kern_requests(
&mut self,
router: &mut Router,
routing_table: &RoutingTable,
rank: u8,
destination: u8,
timer: &GlobalTimer,
) {
if let Some(subkernel_finished) = self.last_finished.take() {
info!(
"subkernel {} finished, with exception: {}",
subkernel_finished.id, subkernel_finished.with_exception
);
router.route(
drtioaux::Packet::SubkernelFinished {
destination: subkernel_finished.source,
id: subkernel_finished.id,
with_exception: subkernel_finished.with_exception,
exception_src: subkernel_finished.exception_source,
},
&routing_table,
rank,
destination,
);
}
if !self.running() {
return;
}
@ -444,6 +504,8 @@ impl<'a> Manager<'_> {
self.last_finished = Some(SubkernelFinished {
id: self.session.id,
with_exception: true,
exception_source: destination,
source: self.session.source,
});
}
Err(e) => {
@ -452,15 +514,19 @@ impl<'a> Manager<'_> {
self.last_finished = Some(SubkernelFinished {
id: self.session.id,
with_exception: true,
exception_source: destination,
source: self.session.source,
});
}
}
match self.process_kern_message(rank, timer) {
match self.process_kern_message(router, routing_table, rank, destination, timer) {
Ok(true) => {
self.last_finished = Some(SubkernelFinished {
id: self.session.id,
with_exception: false,
exception_source: 0,
source: self.session.source,
});
}
Ok(false) | Err(Error::NoMessage) => (),
@ -469,6 +535,8 @@ impl<'a> Manager<'_> {
self.last_finished = Some(SubkernelFinished {
id: self.session.id,
with_exception: true,
exception_source: destination,
source: self.session.source,
});
}
Err(e) => {
@ -477,12 +545,46 @@ impl<'a> Manager<'_> {
self.last_finished = Some(SubkernelFinished {
id: self.session.id,
with_exception: true,
exception_source: destination,
source: self.session.source,
});
}
}
}
fn process_kern_message(&mut self, rank: u8, timer: GlobalTimer) -> Result<bool, Error> {
pub fn subkernel_load_run_reply(&mut self, succeeded: bool) {
if self.session.kernel_state == KernelState::SubkernelAwaitLoad {
self.control
.tx
.send(kernel::Message::SubkernelLoadRunReply { succeeded: succeeded });
self.session.kernel_state = KernelState::Running;
} else {
warn!("received unsolicited SubkernelLoadRunReply");
}
}
pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) {
if with_exception {
self.kernel_stop();
self.last_finished = Some(SubkernelFinished {
source: self.session.source,
id: self.session.id,
with_exception: true,
exception_source: exception_source,
})
} else {
self.session.subkernels_finished.push(id);
}
}
fn process_kern_message(
&mut self,
router: &mut Router,
routing_table: &RoutingTable,
rank: u8,
self_destination: u8,
timer: &GlobalTimer,
) -> Result<bool, Error> {
let reply = self.control.rx.try_recv()?;
match reply {
kernel::Message::KernelFinished(_async_errors) => {
@ -503,7 +605,7 @@ impl<'a> Manager<'_> {
Err(_) => error!("Error writing exception data"),
}
self.kernel_stop();
return Err(Error::KernelException(Sliceable::new(writer.into_inner())));
return Err(Error::KernelException(Sliceable::new(0, writer.into_inner())));
}
kernel::Message::CachePutRequest(key, value) => {
self.cache.insert(key, value);
@ -513,18 +615,57 @@ impl<'a> Manager<'_> {
let value = self.cache.get(&key).unwrap_or(&DEFAULT).clone();
self.control.tx.send(kernel::Message::CacheGetReply(value));
}
kernel::Message::SubkernelMsgSend { id: _, data } => {
self.session.messages.accept_outgoing(data)?;
kernel::Message::SubkernelMsgSend {
id: _id,
destination: msg_dest,
data,
} => {
let msg_dest = msg_dest.or(Some(self.session.source)).unwrap();
self.session.messages.accept_outgoing(
self.session.id,
self_destination,
msg_dest,
data,
routing_table,
rank,
router,
)?;
self.session.kernel_state = KernelState::MsgSending;
}
kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => {
let max_time = timer.get_time() + Milliseconds(timeout);
self.session.kernel_state = KernelState::MsgAwait(max_time, tags);
}
kernel::Message::SubkernelLoadRunRequest {
id,
destination: sk_destination,
run,
} => {
self.session.kernel_state = KernelState::SubkernelAwaitLoad;
router.route(
drtioaux::Packet::SubkernelLoadRunRequest {
source: self_destination,
destination: sk_destination,
id: id,
run: run,
},
routing_table,
rank,
self_destination,
);
}
kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => {
let max_time = timer.get_time() + Milliseconds(timeout);
self.session.kernel_state = KernelState::SubkernelAwaitFinish {
max_time: max_time,
id: id,
};
}
kernel::Message::UpDestinationsRequest(destination) => {
self.control
.tx
.send(kernel::Message::UpDestinationsReply(destination == (rank as i32)));
self.control.tx.send(kernel::Message::UpDestinationsReply(
destination == (self_destination as i32),
));
}
_ => {
unexpected!("unexpected message from core1 while kernel was running: {:?}", reply);
@ -533,7 +674,7 @@ impl<'a> Manager<'_> {
Ok(false)
}
fn process_external_messages(&mut self, timer: GlobalTimer) -> Result<(), Error> {
fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> {
match &self.session.kernel_state {
KernelState::MsgAwait(timeout, tags) => {
if timer.get_time() > *timeout {
@ -565,11 +706,33 @@ impl<'a> Manager<'_> {
Err(Error::AwaitingMessage)
}
}
KernelState::SubkernelAwaitFinish { max_time, id } => {
if timer.get_time() > *max_time {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::Timeout,
});
self.session.kernel_state = KernelState::Running;
} else {
let mut i = 0;
for status in &self.session.subkernels_finished {
if *status == *id {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::NoError,
});
self.session.kernel_state = KernelState::Running;
self.session.subkernels_finished.swap_remove(i);
break;
}
i += 1;
}
}
Ok(())
}
_ => Ok(()),
}
}
fn pass_message_to_kernel(&mut self, message: &Message, tags: Vec<u8>, timer: GlobalTimer) -> Result<(), Error> {
fn pass_message_to_kernel(&mut self, message: &Message, tags: Vec<u8>, timer: &GlobalTimer) -> Result<(), Error> {
let mut reader = Cursor::new(&message.data);
let mut current_tags: &[u8] = &tags;
let mut i = message.count;
@ -592,7 +755,7 @@ impl<'a> Manager<'_> {
let mut writer = Cursor::new(buf);
match write_exception(&mut writer, exceptions, stack_pointers, backtrace, async_errors) {
Ok(()) => {
exception = Some(Sliceable::new(writer.into_inner()));
exception = Some(Sliceable::new(0, writer.into_inner()));
}
Err(_) => {
unexpected = Some("Error writing exception data".to_string());
@ -686,7 +849,7 @@ where
fn recv_w_timeout(
rx: &mut Receiver<'_, kernel::Message>,
timer: GlobalTimer,
timer: &GlobalTimer,
timeout: u64,
) -> Result<kernel::Message, Error> {
let max_time = timer.get_time() + Milliseconds(timeout);