Subkernels: support sub-subkernels, DRTIO routing #281

Merged
sb10q merged 3 commits from mwojcik/artiq-zynq:drtio_routing into master 2024-01-11 12:33:10 +08:00
12 changed files with 1600 additions and 400 deletions

View File

@ -5,7 +5,7 @@ use io::proto::{ProtoRead, ProtoWrite};
// used by satellite -> master analyzer, subkernel exceptions // used by satellite -> master analyzer, subkernel exceptions
pub const SAT_PAYLOAD_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*last*/1 - /*length*/2; pub const SAT_PAYLOAD_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*last*/1 - /*length*/2;
// used by DDMA, subkernel program data (need to provide extra ID and destination) // used by DDMA, subkernel program data (need to provide extra ID and destination)
pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*destination*/1 - /*ID*/4; pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*source*/1 - /*destination*/1 - /*ID*/4;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -89,6 +89,8 @@ pub enum Packet {
RoutingSetRank { RoutingSetRank {
rank: u8, rank: u8,
}, },
RoutingRetrievePackets,
RoutingNoPackets,
RoutingAck, RoutingAck,
MonitorRequest { MonitorRequest {
@ -197,6 +199,7 @@ pub enum Packet {
}, },
DmaAddTraceRequest { DmaAddTraceRequest {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
status: PayloadStatus, status: PayloadStatus,
@ -204,24 +207,32 @@ pub enum Packet {
trace: [u8; MASTER_PAYLOAD_MAX_SIZE], trace: [u8; MASTER_PAYLOAD_MAX_SIZE],
}, },
DmaAddTraceReply { DmaAddTraceReply {
source: u8,
destination: u8,
id: u32,
succeeded: bool, succeeded: bool,
}, },
DmaRemoveTraceRequest { DmaRemoveTraceRequest {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
}, },
DmaRemoveTraceReply { DmaRemoveTraceReply {
destination: u8,
succeeded: bool, succeeded: bool,
}, },
DmaPlaybackRequest { DmaPlaybackRequest {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
timestamp: u64, timestamp: u64,
}, },
DmaPlaybackReply { DmaPlaybackReply {
destination: u8,
succeeded: bool, succeeded: bool,
}, },
DmaPlaybackStatus { DmaPlaybackStatus {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
error: u8, error: u8,
@ -240,22 +251,20 @@ pub enum Packet {
succeeded: bool, succeeded: bool,
}, },
SubkernelLoadRunRequest { SubkernelLoadRunRequest {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
run: bool, run: bool,
}, },
SubkernelLoadRunReply { SubkernelLoadRunReply {
succeeded: bool,
},
SubkernelStopRequest {
destination: u8, destination: u8,
},
SubkernelStopReply {
succeeded: bool, succeeded: bool,
}, },
SubkernelFinished { SubkernelFinished {
destination: u8,
id: u32, id: u32,
with_exception: bool, with_exception: bool,
exception_src: u8,
}, },
SubkernelExceptionRequest { SubkernelExceptionRequest {
destination: u8, destination: u8,
@ -266,6 +275,7 @@ pub enum Packet {
data: [u8; SAT_PAYLOAD_MAX_SIZE], data: [u8; SAT_PAYLOAD_MAX_SIZE],
}, },
SubkernelMessage { SubkernelMessage {
source: u8,
destination: u8, destination: u8,
id: u32, id: u32,
status: PayloadStatus, status: PayloadStatus,
@ -315,6 +325,8 @@ impl Packet {
rank: reader.read_u8()?, rank: reader.read_u8()?,
}, },
0x32 => Packet::RoutingAck, 0x32 => Packet::RoutingAck,
0x33 => Packet::RoutingRetrievePackets,
0x34 => Packet::RoutingNoPackets,
0x40 => Packet::MonitorRequest { 0x40 => Packet::MonitorRequest {
destination: reader.read_u8()?, destination: reader.read_u8()?,
@ -429,39 +441,49 @@ impl Packet {
} }
0xb0 => { 0xb0 => {
let source = reader.read_u8()?;
let destination = reader.read_u8()?; let destination = reader.read_u8()?;
let id = reader.read_u32()?; let id = reader.read_u32()?;
let status = PayloadStatus::from(reader.read_u8()?); let status = reader.read_u8()?;
let length = reader.read_u16()?; let length = reader.read_u16()?;
let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
reader.read_exact(&mut trace[0..length as usize])?; reader.read_exact(&mut trace[0..length as usize])?;
Packet::DmaAddTraceRequest { Packet::DmaAddTraceRequest {
source: source,
destination: destination, destination: destination,
id: id, id: id,
status: status, status: PayloadStatus::from(status),
length: length as u16, length: length as u16,
trace: trace, trace: trace,
} }
} }
0xb1 => Packet::DmaAddTraceReply { 0xb1 => Packet::DmaAddTraceReply {
source: reader.read_u8()?,
destination: reader.read_u8()?,
id: reader.read_u32()?,
succeeded: reader.read_bool()?, succeeded: reader.read_bool()?,
}, },
0xb2 => Packet::DmaRemoveTraceRequest { 0xb2 => Packet::DmaRemoveTraceRequest {
source: reader.read_u8()?,
destination: reader.read_u8()?, destination: reader.read_u8()?,
id: reader.read_u32()?, id: reader.read_u32()?,
}, },
0xb3 => Packet::DmaRemoveTraceReply { 0xb3 => Packet::DmaRemoveTraceReply {
destination: reader.read_u8()?,
succeeded: reader.read_bool()?, succeeded: reader.read_bool()?,
}, },
0xb4 => Packet::DmaPlaybackRequest { 0xb4 => Packet::DmaPlaybackRequest {
source: reader.read_u8()?,
destination: reader.read_u8()?, destination: reader.read_u8()?,
id: reader.read_u32()?, id: reader.read_u32()?,
timestamp: reader.read_u64()?, timestamp: reader.read_u64()?,
}, },
0xb5 => Packet::DmaPlaybackReply { 0xb5 => Packet::DmaPlaybackReply {
destination: reader.read_u8()?,
succeeded: reader.read_bool()?, succeeded: reader.read_bool()?,
}, },
0xb6 => Packet::DmaPlaybackStatus { 0xb6 => Packet::DmaPlaybackStatus {
source: reader.read_u8()?,
destination: reader.read_u8()?, destination: reader.read_u8()?,
id: reader.read_u32()?, id: reader.read_u32()?,
error: reader.read_u8()?, error: reader.read_u8()?,
@ -488,22 +510,20 @@ impl Packet {
succeeded: reader.read_bool()?, succeeded: reader.read_bool()?,
}, },
0xc4 => Packet::SubkernelLoadRunRequest { 0xc4 => Packet::SubkernelLoadRunRequest {
source: reader.read_u8()?,
destination: reader.read_u8()?, destination: reader.read_u8()?,
id: reader.read_u32()?, id: reader.read_u32()?,
run: reader.read_bool()?, run: reader.read_bool()?,
}, },
0xc5 => Packet::SubkernelLoadRunReply { 0xc5 => Packet::SubkernelLoadRunReply {
succeeded: reader.read_bool()?,
},
0xc6 => Packet::SubkernelStopRequest {
destination: reader.read_u8()?, destination: reader.read_u8()?,
},
0xc7 => Packet::SubkernelStopReply {
succeeded: reader.read_bool()?, succeeded: reader.read_bool()?,
}, },
0xc8 => Packet::SubkernelFinished { 0xc8 => Packet::SubkernelFinished {
destination: reader.read_u8()?,
id: reader.read_u32()?, id: reader.read_u32()?,
with_exception: reader.read_bool()?, with_exception: reader.read_bool()?,
exception_src: reader.read_u8()?,
}, },
0xc9 => Packet::SubkernelExceptionRequest { 0xc9 => Packet::SubkernelExceptionRequest {
destination: reader.read_u8()?, destination: reader.read_u8()?,
@ -520,16 +540,18 @@ impl Packet {
} }
} }
0xcb => { 0xcb => {
let source = reader.read_u8()?;
let destination = reader.read_u8()?; let destination = reader.read_u8()?;
let id = reader.read_u32()?; let id = reader.read_u32()?;
let status = PayloadStatus::from(reader.read_u8()?); let status = reader.read_u8()?;
let length = reader.read_u16()?; let length = reader.read_u16()?;
let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
reader.read_exact(&mut data[0..length as usize])?; reader.read_exact(&mut data[0..length as usize])?;
Packet::SubkernelMessage { Packet::SubkernelMessage {
source: source,
destination: destination, destination: destination,
id: id, id: id,
status: status, status: PayloadStatus::from(status),
length: length as u16, length: length as u16,
data: data, data: data,
} }
@ -580,6 +602,8 @@ impl Packet {
writer.write_u8(rank)?; writer.write_u8(rank)?;
} }
Packet::RoutingAck => writer.write_u8(0x32)?, Packet::RoutingAck => writer.write_u8(0x32)?,
Packet::RoutingRetrievePackets => writer.write_u8(0x33)?,
Packet::RoutingNoPackets => writer.write_u8(0x34)?,
Packet::MonitorRequest { Packet::MonitorRequest {
destination, destination,
@ -751,6 +775,7 @@ impl Packet {
} }
Packet::DmaAddTraceRequest { Packet::DmaAddTraceRequest {
source,
destination, destination,
id, id,
status, status,
@ -758,6 +783,7 @@ impl Packet {
length, length,
} => { } => {
writer.write_u8(0xb0)?; writer.write_u8(0xb0)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_u8(status as u8)?; writer.write_u8(status as u8)?;
@ -766,34 +792,52 @@ impl Packet {
writer.write_u16(length)?; writer.write_u16(length)?;
writer.write_all(&trace[0..length as usize])?; writer.write_all(&trace[0..length as usize])?;
} }
Packet::DmaAddTraceReply { succeeded } => { Packet::DmaAddTraceReply {
source,
destination,
id,
succeeded,
} => {
writer.write_u8(0xb1)?; writer.write_u8(0xb1)?;
writer.write_u8(source)?;
writer.write_u8(destination)?;
writer.write_u32(id)?;
writer.write_bool(succeeded)?; writer.write_bool(succeeded)?;
} }
Packet::DmaRemoveTraceRequest { destination, id } => { Packet::DmaRemoveTraceRequest {
source,
destination,
id,
} => {
writer.write_u8(0xb2)?; writer.write_u8(0xb2)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
} }
Packet::DmaRemoveTraceReply { succeeded } => { Packet::DmaRemoveTraceReply { destination, succeeded } => {
writer.write_u8(0xb3)?; writer.write_u8(0xb3)?;
writer.write_u8(destination)?;
writer.write_bool(succeeded)?; writer.write_bool(succeeded)?;
} }
Packet::DmaPlaybackRequest { Packet::DmaPlaybackRequest {
source,
destination, destination,
id, id,
timestamp, timestamp,
} => { } => {
writer.write_u8(0xb4)?; writer.write_u8(0xb4)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_u64(timestamp)?; writer.write_u64(timestamp)?;
} }
Packet::DmaPlaybackReply { succeeded } => { Packet::DmaPlaybackReply { destination, succeeded } => {
writer.write_u8(0xb5)?; writer.write_u8(0xb5)?;
writer.write_u8(destination)?;
writer.write_bool(succeeded)?; writer.write_bool(succeeded)?;
} }
Packet::DmaPlaybackStatus { Packet::DmaPlaybackStatus {
source,
destination, destination,
id, id,
error, error,
@ -801,6 +845,7 @@ impl Packet {
timestamp, timestamp,
} => { } => {
writer.write_u8(0xb6)?; writer.write_u8(0xb6)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_u8(error)?; writer.write_u8(error)?;
@ -826,28 +871,34 @@ impl Packet {
writer.write_u8(0xc1)?; writer.write_u8(0xc1)?;
writer.write_bool(succeeded)?; writer.write_bool(succeeded)?;
} }
Packet::SubkernelLoadRunRequest { destination, id, run } => { Packet::SubkernelLoadRunRequest {
source,
destination,
id,
run,
} => {
writer.write_u8(0xc4)?; writer.write_u8(0xc4)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_bool(run)?; writer.write_bool(run)?;
} }
Packet::SubkernelLoadRunReply { succeeded } => { Packet::SubkernelLoadRunReply { destination, succeeded } => {
writer.write_u8(0xc5)?; writer.write_u8(0xc5)?;
writer.write_bool(succeeded)?;
}
Packet::SubkernelStopRequest { destination } => {
writer.write_u8(0xc6)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
}
Packet::SubkernelStopReply { succeeded } => {
writer.write_u8(0xc7)?;
writer.write_bool(succeeded)?; writer.write_bool(succeeded)?;
} }
Packet::SubkernelFinished { id, with_exception } => { Packet::SubkernelFinished {
destination,
id,
with_exception,
exception_src,
} => {
writer.write_u8(0xc8)?; writer.write_u8(0xc8)?;
writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_bool(with_exception)?; writer.write_bool(with_exception)?;
writer.write_u8(exception_src)?;
} }
Packet::SubkernelExceptionRequest { destination } => { Packet::SubkernelExceptionRequest { destination } => {
writer.write_u8(0xc9)?; writer.write_u8(0xc9)?;
@ -860,6 +911,7 @@ impl Packet {
writer.write_all(&data[0..length as usize])?; writer.write_all(&data[0..length as usize])?;
} }
Packet::SubkernelMessage { Packet::SubkernelMessage {
source,
destination, destination,
id, id,
status, status,
@ -867,6 +919,7 @@ impl Packet {
length, length,
} => { } => {
writer.write_u8(0xcb)?; writer.write_u8(0xcb)?;
writer.write_u8(source)?;
writer.write_u8(destination)?; writer.write_u8(destination)?;
writer.write_u32(id)?; writer.write_u32(id)?;
writer.write_u8(status as u8)?; writer.write_u8(status as u8)?;
@ -880,4 +933,39 @@ impl Packet {
} }
Ok(()) Ok(())
} }
pub fn routable_destination(&self) -> Option<u8> {
// only for packets that could be re-routed, not only forwarded
match self {
Packet::DmaAddTraceRequest { destination, .. } => Some(*destination),
Packet::DmaAddTraceReply { destination, .. } => Some(*destination),
Packet::DmaRemoveTraceRequest { destination, .. } => Some(*destination),
Packet::DmaRemoveTraceReply { destination, .. } => Some(*destination),
Packet::DmaPlaybackRequest { destination, .. } => Some(*destination),
Packet::DmaPlaybackReply { destination, .. } => Some(*destination),
Packet::SubkernelLoadRunRequest { destination, .. } => Some(*destination),
Packet::SubkernelLoadRunReply { destination, .. } => Some(*destination),
Packet::SubkernelMessage { destination, .. } => Some(*destination),
Packet::SubkernelMessageAck { destination } => Some(*destination),
Packet::DmaPlaybackStatus { destination, .. } => Some(*destination),
Packet::SubkernelFinished { destination, .. } => Some(*destination),
_ => None,
}
}
pub fn expects_response(&self) -> bool {
// returns true if the routable packet should elicit a response
// e.g. reply, ACK packets end a conversation,
// and firmware should not wait for response
match self {
Packet::DmaAddTraceReply { .. }
| Packet::DmaRemoveTraceReply { .. }
| Packet::DmaPlaybackReply { .. }
| Packet::SubkernelLoadRunReply { .. }
| Packet::SubkernelMessageAck { .. }
| Packet::DmaPlaybackStatus { .. }
| Packet::SubkernelFinished { .. } => false,
_ => true,
}
}
} }

View File

@ -77,6 +77,7 @@ pub enum Message {
#[cfg(has_drtio)] #[cfg(has_drtio)]
SubkernelLoadRunRequest { SubkernelLoadRunRequest {
id: u32, id: u32,
destination: u8,
run: bool, run: bool,
}, },
#[cfg(has_drtio)] #[cfg(has_drtio)]
@ -95,6 +96,7 @@ pub enum Message {
#[cfg(has_drtio)] #[cfg(has_drtio)]
SubkernelMsgSend { SubkernelMsgSend {
id: u32, id: u32,
destination: Option<u8>,
data: Vec<u8>, data: Vec<u8>,
}, },
#[cfg(has_drtio)] #[cfg(has_drtio)]

View File

@ -5,12 +5,16 @@ use cslice::CSlice;
use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0}; use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0};
use crate::{artiq_raise, rpc::send_args}; use crate::{artiq_raise, rpc::send_args};
pub extern "C" fn load_run(id: u32, run: bool) { pub extern "C" fn load_run(id: u32, destination: u8, run: bool) {
unsafe { unsafe {
KERNEL_CHANNEL_1TO0 KERNEL_CHANNEL_1TO0
.as_mut() .as_mut()
.unwrap() .unwrap()
.send(Message::SubkernelLoadRunRequest { id: id, run: run }); .send(Message::SubkernelLoadRunRequest {
id: id,
destination: destination,
run: run,
});
} }
match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() {
Message::SubkernelLoadRunReply { succeeded: true } => (), Message::SubkernelLoadRunReply { succeeded: true } => (),
@ -51,7 +55,14 @@ pub extern "C" fn await_finish(id: u32, timeout: u64) {
} }
} }
pub extern "C" fn send_message(id: u32, count: u8, tag: &CSlice<u8>, data: *const *const ()) { pub extern "C" fn send_message(
id: u32,
is_return: bool,
destination: u8,
count: u8,
tag: &CSlice<u8>,
data: *const *const (),
) {
let mut buffer = Vec::<u8>::new(); let mut buffer = Vec::<u8>::new();
send_args(&mut buffer, 0, tag.as_ref(), data, false).expect("RPC encoding failed"); send_args(&mut buffer, 0, tag.as_ref(), data, false).expect("RPC encoding failed");
// overwrite service tag, include how many tags are in the message // overwrite service tag, include how many tags are in the message
@ -59,6 +70,7 @@ pub extern "C" fn send_message(id: u32, count: u8, tag: &CSlice<u8>, data: *cons
unsafe { unsafe {
KERNEL_CHANNEL_1TO0.as_mut().unwrap().send(Message::SubkernelMsgSend { KERNEL_CHANNEL_1TO0.as_mut().unwrap().send(Message::SubkernelMsgSend {
id: id, id: id,
destination: if is_return { None } else { Some(destination) },
data: buffer[3..].to_vec(), data: buffer[3..].to_vec(),
}); });
} }

View File

@ -401,7 +401,11 @@ async fn handle_run_kernel(
control.borrow_mut().tx.async_send(reply).await; control.borrow_mut().tx.async_send(reply).await;
} }
#[cfg(has_drtio)] #[cfg(has_drtio)]
kernel::Message::SubkernelLoadRunRequest { id, run } => { kernel::Message::SubkernelLoadRunRequest {
id,
destination: _,
run,
} => {
let succeeded = match subkernel::load(aux_mutex, routing_table, timer, id, run).await { let succeeded = match subkernel::load(aux_mutex, routing_table, timer, id, run).await {
Ok(()) => true, Ok(()) => true,
Err(e) => { Err(e) => {
@ -447,7 +451,11 @@ async fn handle_run_kernel(
.await; .await;
} }
#[cfg(has_drtio)] #[cfg(has_drtio)]
kernel::Message::SubkernelMsgSend { id, data } => { kernel::Message::SubkernelMsgSend {
id,
destination: _,
data,
} => {
let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await; let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await;
match res { match res {
Ok(_) => (), Ok(_) => (),

View File

@ -142,9 +142,9 @@ pub mod remote_dma {
} }
} }
pub async fn playback_done(&mut self, destination: u8, error: u8, channel: u32, timestamp: u64) { pub async fn playback_done(&mut self, source: u8, error: u8, channel: u32, timestamp: u64) {
let mut traces_locked = self.traces.async_lock().await; let mut traces_locked = self.traces.async_lock().await;
let mut trace = traces_locked.get_mut(&destination).unwrap(); let mut trace = traces_locked.get_mut(&source).unwrap();
trace.state = RemoteState::PlaybackEnded { trace.state = RemoteState::PlaybackEnded {
error: error, error: error,
channel: channel, channel: channel,

View File

@ -43,26 +43,49 @@ pub mod drtio {
unsafe { (csr::DRTIO[linkno].rx_up_read)() == 1 } unsafe { (csr::DRTIO[linkno].rx_up_read)() == 1 }
} }
async fn process_async_packets(aux_mutex: &Mutex<bool>, linkno: u8, packet: Packet) -> Option<Packet> { async fn link_has_async_ready(linkno: u8) -> bool {
// returns None if an async packet has been consumed let linkno = linkno as usize;
let async_ready;
unsafe {
async_ready = (csr::DRTIO[linkno].async_messages_ready_read)() == 1;
(csr::DRTIO[linkno].async_messages_ready_write)(1);
}
async_ready
}
async fn process_async_packets(
aux_mutex: &Mutex<bool>,
linkno: u8,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
) {
if link_has_async_ready(linkno).await {
loop {
let reply = aux_transact(aux_mutex, linkno, &Packet::RoutingRetrievePackets, timer).await;
if let Ok(packet) = reply {
match packet { match packet {
Packet::DmaPlaybackStatus { Packet::DmaPlaybackStatus {
id, id,
destination, source,
destination: 0,
error, error,
channel, channel,
timestamp, timestamp,
} => { } => {
remote_dma::playback_done(id, destination, error, channel, timestamp).await; remote_dma::playback_done(id, source, error, channel, timestamp).await;
None
} }
Packet::SubkernelFinished { id, with_exception } => { Packet::SubkernelFinished {
subkernel::subkernel_finished(id, with_exception).await; id,
None destination: 0,
with_exception,
exception_src,
} => {
subkernel::subkernel_finished(id, with_exception, exception_src).await;
} }
Packet::SubkernelMessage { Packet::SubkernelMessage {
id, id,
destination: from, source,
destination: 0,
status, status,
length, length,
data, data,
@ -70,12 +93,52 @@ pub mod drtio {
subkernel::message_handle_incoming(id, status, length as usize, &data).await; subkernel::message_handle_incoming(id, status, length as usize, &data).await;
// acknowledge receiving part of the message // acknowledge receiving part of the message
let _lock = aux_mutex.async_lock().await; let _lock = aux_mutex.async_lock().await;
drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: from }) drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: source })
.await .await
.unwrap(); .unwrap();
None let mut countdown = timer.countdown();
// give the satellites some time to process
delay(&mut countdown, Milliseconds(10)).await;
}
// routable packets
Packet::DmaAddTraceRequest { destination, .. }
| Packet::DmaAddTraceReply { destination, .. }
| Packet::DmaRemoveTraceRequest { destination, .. }
| Packet::DmaRemoveTraceReply { destination, .. }
| Packet::DmaPlaybackRequest { destination, .. }
| Packet::DmaPlaybackReply { destination, .. }
| Packet::SubkernelLoadRunRequest { destination, .. }
| Packet::SubkernelLoadRunReply { destination, .. }
| Packet::SubkernelMessage { destination, .. }
| Packet::SubkernelMessageAck { destination, .. }
| Packet::DmaPlaybackStatus { destination, .. }
| Packet::SubkernelFinished { destination, .. } => {
let dest_link = routing_table.0[destination as usize][0] - 1;
if dest_link == linkno {
warn!(
"[LINK#{}] Re-routed packet would return to the same link, dropping: {:?}",
linkno, packet
);
} else if destination == 0 {
warn!("[LINK#{}] Received invalid routable packet: {:?}", linkno, packet)
} else {
drtioaux_async::send(dest_link, &packet).await.unwrap();
}
}
Packet::RoutingNoPackets => break,
other => warn!("[LINK#{}] Received an unroutable packet: {:?}", linkno, other),
}
} else {
warn!(
"[LINK#{}] Error handling async packets ({})",
linkno,
reply.unwrap_err()
);
return;
}
} }
other => Some(other),
} }
} }
@ -210,11 +273,7 @@ pub mod drtio {
async fn process_unsolicited_aux(aux_mutex: &Rc<Mutex<bool>>, linkno: u8) { async fn process_unsolicited_aux(aux_mutex: &Rc<Mutex<bool>>, linkno: u8) {
let _lock = aux_mutex.async_lock().await; let _lock = aux_mutex.async_lock().await;
match drtioaux_async::recv(linkno).await { match drtioaux_async::recv(linkno).await {
Ok(Some(packet)) => { Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet),
if let Some(packet) = process_async_packets(aux_mutex, linkno, packet).await {
warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet);
}
}
Ok(None) => (), Ok(None) => (),
Err(_) => warn!("[LINK#{}] aux packet error", linkno), Err(_) => warn!("[LINK#{}] aux packet error", linkno),
} }
@ -283,7 +342,6 @@ pub mod drtio {
let linkno = hop - 1; let linkno = hop - 1;
if destination_up(up_destinations, destination).await { if destination_up(up_destinations, destination).await {
if up_links[linkno as usize] { if up_links[linkno as usize] {
loop {
let reply = aux_transact( let reply = aux_transact(
aux_mutex, aux_mutex,
linkno, linkno,
@ -296,13 +354,7 @@ pub mod drtio {
match reply { match reply {
Ok(Packet::DestinationDownReply) => { Ok(Packet::DestinationDownReply) => {
destination_set_up(routing_table, up_destinations, destination, false).await; destination_set_up(routing_table, up_destinations, destination, false).await;
remote_dma::destination_changed( remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false)
aux_mutex,
routing_table,
timer,
destination,
false,
)
.await; .await;
subkernel::destination_changed(aux_mutex, routing_table, timer, destination, false) subkernel::destination_changed(aux_mutex, routing_table, timer, destination, false)
.await; .await;
@ -335,16 +387,9 @@ pub mod drtio {
); );
unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY }; unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY };
} }
Ok(packet) => match process_async_packets(aux_mutex, linkno, packet).await { Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet),
Some(packet) => {
error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet)
}
None => continue,
},
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e), Err(e) => error!("[DEST#{}] communication failed ({})", destination, e),
} }
break;
}
} else { } else {
destination_set_up(routing_table, up_destinations, destination, false).await; destination_set_up(routing_table, up_destinations, destination, false).await;
remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false).await; remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false).await;
@ -393,6 +438,7 @@ pub mod drtio {
if up_links[linkno as usize] { if up_links[linkno as usize] {
/* link was previously up */ /* link was previously up */
if link_rx_up(linkno).await { if link_rx_up(linkno).await {
process_async_packets(aux_mutex, linkno, routing_table, timer).await;
process_unsolicited_aux(aux_mutex, linkno).await; process_unsolicited_aux(aux_mutex, linkno).await;
process_local_errors(linkno).await; process_local_errors(linkno).await;
} else { } else {
@ -504,14 +550,23 @@ pub mod drtio {
trace, trace,
|slice, status, len| Packet::DmaAddTraceRequest { |slice, status, len| Packet::DmaAddTraceRequest {
id: id, id: id,
source: 0,
destination: destination, destination: destination,
status: status, status: status,
length: len as u16, length: len as u16,
trace: *slice, trace: *slice,
}, },
|reply| match reply { |reply| match reply {
Packet::DmaAddTraceReply { succeeded: true } => Ok(()), Packet::DmaAddTraceReply {
Packet::DmaAddTraceReply { succeeded: false } => Err("error adding trace on satellite"), destination: 0,
succeeded: true,
..
} => Ok(()),
Packet::DmaAddTraceReply {
destination: 0,
succeeded: false,
..
} => Err("error adding trace on satellite"),
_ => Err("adding DMA trace failed, unexpected aux packet"), _ => Err("adding DMA trace failed, unexpected aux packet"),
}, },
) )
@ -531,14 +586,21 @@ pub mod drtio {
linkno, linkno,
&Packet::DmaRemoveTraceRequest { &Packet::DmaRemoveTraceRequest {
id: id, id: id,
source: 0,
destination: destination, destination: destination,
}, },
timer, timer,
) )
.await; .await;
match reply { match reply {
Ok(Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()), Ok(Packet::DmaRemoveTraceReply {
Ok(Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"), destination: 0,
succeeded: true,
}) => Ok(()),
Ok(Packet::DmaRemoveTraceReply {
destination: 0,
succeeded: false,
}) => Err("satellite DMA erase error"),
Ok(_) => Err("adding trace failed, unexpected aux packet"), Ok(_) => Err("adding trace failed, unexpected aux packet"),
Err(_) => Err("erasing trace failed, aux error"), Err(_) => Err("erasing trace failed, aux error"),
} }
@ -558,6 +620,7 @@ pub mod drtio {
linkno, linkno,
&Packet::DmaPlaybackRequest { &Packet::DmaPlaybackRequest {
id: id, id: id,
source: 0,
destination: destination, destination: destination,
timestamp: timestamp, timestamp: timestamp,
}, },
@ -565,8 +628,14 @@ pub mod drtio {
) )
.await; .await;
match reply { match reply {
Ok(Packet::DmaPlaybackReply { succeeded: true }) => Ok(()), Ok(Packet::DmaPlaybackReply {
Ok(Packet::DmaPlaybackReply { succeeded: false }) => Err("error on DMA playback request"), destination: 0,
succeeded: true,
}) => Ok(()),
Ok(Packet::DmaPlaybackReply {
destination: 0,
succeeded: false,
}) => Err("error on DMA playback request"),
Ok(_) => Err("received unexpected aux packet during DMA playback"), Ok(_) => Err("received unexpected aux packet during DMA playback"),
Err(_) => Err("aux error on DMA playback"), Err(_) => Err("aux error on DMA playback"),
} }
@ -689,6 +758,7 @@ pub mod drtio {
linkno, linkno,
&Packet::SubkernelLoadRunRequest { &Packet::SubkernelLoadRunRequest {
id: id, id: id,
source: 0,
destination: destination, destination: destination,
run: run, run: run,
}, },
@ -696,8 +766,14 @@ pub mod drtio {
) )
.await?; .await?;
match reply { match reply {
Packet::SubkernelLoadRunReply { succeeded: true } => return Ok(()), Packet::SubkernelLoadRunReply {
Packet::SubkernelLoadRunReply { succeeded: false } => return Err("error on subkernel run request"), destination: 0,
succeeded: true,
} => return Ok(()),
Packet::SubkernelLoadRunReply {
destination: 0,
succeeded: false,
} => return Err("error on subkernel run request"),
_ => return Err("received unexpected aux packet during subkernel run"), _ => return Err("received unexpected aux packet during subkernel run"),
} }
} }
@ -747,6 +823,7 @@ pub mod drtio {
timer, timer,
message, message,
|slice, status, len| Packet::SubkernelMessage { |slice, status, len| Packet::SubkernelMessage {
source: 0,
destination: destination, destination: destination,
id: id, id: id,
status: status, status: status,

View File

@ -13,7 +13,7 @@ use crate::rtio_mgt::drtio;
pub enum FinishStatus { pub enum FinishStatus {
Ok, Ok,
CommLost, CommLost,
Exception, Exception(u8), // exception source
} }
#[derive(Debug, PartialEq, Clone, Copy)] #[derive(Debug, PartialEq, Clone, Copy)]
@ -121,14 +121,14 @@ pub async fn clear_subkernels() {
CURRENT_MESSAGES.async_lock().await.clear(); CURRENT_MESSAGES.async_lock().await.clear();
} }
pub async fn subkernel_finished(id: u32, with_exception: bool) { pub async fn subkernel_finished(id: u32, with_exception: bool, exception_src: u8) {
// called upon receiving DRTIO SubkernelRunDone // called upon receiving DRTIO SubkernelRunDone
// may be None if session ends and is cleared // may be None if session ends and is cleared
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
if subkernel.state == SubkernelState::Running { if subkernel.state == SubkernelState::Running {
subkernel.state = SubkernelState::Finished { subkernel.state = SubkernelState::Finished {
status: match with_exception { status: match with_exception {
true => FinishStatus::Exception, true => FinishStatus::Exception(exception_src),
false => FinishStatus::Ok, false => FinishStatus::Ok,
}, },
} }
@ -196,11 +196,8 @@ pub async fn await_finish(
Ok(SubkernelFinished { Ok(SubkernelFinished {
id: id, id: id,
status: status, status: status,
exception: if status == FinishStatus::Exception { exception: if let FinishStatus::Exception(dest) = status {
Some( Some(drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, dest).await?)
drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, subkernel.destination)
.await?,
)
} else { } else {
None None
}, },
@ -292,7 +289,7 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<
status: FinishStatus::CommLost, status: FinishStatus::CommLost,
} => return Err(Error::CommLost), } => return Err(Error::CommLost),
SubkernelState::Finished { SubkernelState::Finished {
status: FinishStatus::Exception, status: FinishStatus::Exception(_),
} => return Err(Error::SubkernelException), } => return Err(Error::SubkernelException),
_ => (), _ => (),
} }

View File

@ -1,7 +1,13 @@
use alloc::{collections::btree_map::BTreeMap, vec::Vec}; use alloc::{collections::btree_map::BTreeMap, string::String, vec::Vec};
use core::mem;
use libboard_artiq::{drtioaux_proto::PayloadStatus, pl::csr}; use ksupport::kernel::DmaRecorder;
use libboard_artiq::{drtio_routing::RoutingTable,
drtioaux_proto::{Packet, PayloadStatus, MASTER_PAYLOAD_MAX_SIZE},
pl::csr};
use libcortex_a9::cache::dcci_slice; use libcortex_a9::cache::dcci_slice;
use routing::{Router, Sliceable};
use subkernel::Manager as KernelManager;
const ALIGNMENT: usize = 64; const ALIGNMENT: usize = 64;
@ -12,16 +18,20 @@ enum ManagerState {
} }
pub struct RtioStatus { pub struct RtioStatus {
pub source: u8,
pub id: u32, pub id: u32,
pub error: u8, pub error: u8,
pub channel: u32, pub channel: u32,
pub timestamp: u64, pub timestamp: u64,
} }
#[derive(Debug)]
pub enum Error { pub enum Error {
IdNotFound, IdNotFound,
PlaybackInProgress, PlaybackInProgress,
EntryNotComplete, EntryNotComplete,
MasterDmaFound,
UploadFail,
} }
#[derive(Debug)] #[derive(Debug)]
@ -29,13 +39,228 @@ struct Entry {
trace: Vec<u8>, trace: Vec<u8>,
padding_len: usize, padding_len: usize,
complete: bool, complete: bool,
duration: i64, // relevant for local DMA
}
impl Entry {
pub fn from_vec(data: Vec<u8>, duration: i64) -> Entry {
let mut entry = Entry {
trace: data,
padding_len: 0,
complete: true,
duration: duration,
};
entry.realign();
entry
}
pub fn id(&self) -> u32 {
self.trace[self.padding_len..].as_ptr() as u32
}
pub fn realign(&mut self) {
self.trace.push(0);
let data_len = self.trace.len();
self.trace.reserve(ALIGNMENT - 1);
let padding = ALIGNMENT - self.trace.as_ptr() as usize % ALIGNMENT;
let padding = if padding == ALIGNMENT { 0 } else { padding };
for _ in 0..padding {
// Vec guarantees that this will not reallocate
self.trace.push(0)
}
for i in 1..data_len + 1 {
self.trace[data_len + padding - i] = self.trace[data_len - i]
}
self.complete = true;
self.padding_len = padding;
dcci_slice(&self.trace);
}
}
#[derive(Debug)]
enum RemoteTraceState {
Unsent,
Sending(usize),
Ready,
Running(usize),
}
#[derive(Debug)]
struct RemoteTraces {
remote_traces: BTreeMap<u8, Sliceable>,
state: RemoteTraceState,
}
impl RemoteTraces {
pub fn new(traces: BTreeMap<u8, Sliceable>) -> RemoteTraces {
RemoteTraces {
remote_traces: traces,
state: RemoteTraceState::Unsent,
}
}
// on subkernel request
pub fn upload_traces(
&mut self,
id: u32,
router: &mut Router,
rank: u8,
self_destination: u8,
routing_table: &RoutingTable,
) -> usize {
let len = self.remote_traces.len();
if len > 0 {
self.state = RemoteTraceState::Sending(self.remote_traces.len());
for (dest, trace) in self.remote_traces.iter_mut() {
// queue up the first packet for all destinations, rest will be sent after first ACK
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
let meta = trace.get_slice_master(&mut data_slice);
router.route(
Packet::DmaAddTraceRequest {
source: self_destination,
destination: *dest,
id: id,
status: meta.status,
length: meta.len,
trace: data_slice,
},
routing_table,
rank,
self_destination,
);
}
}
len
}
// on incoming Packet::DmaAddTraceReply
pub fn ack_upload(
&mut self,
kernel_manager: &mut KernelManager,
source: u8,
id: u32,
succeeded: bool,
router: &mut Router,
rank: u8,
self_destination: u8,
routing_table: &RoutingTable,
) {
if let RemoteTraceState::Sending(count) = self.state {
if let Some(trace) = self.remote_traces.get_mut(&source) {
if trace.at_end() {
if count - 1 == 0 {
self.state = RemoteTraceState::Ready;
if let Some((id, timestamp)) = kernel_manager.ddma_remote_uploaded(succeeded) {
self.playback(id, timestamp, router, rank, self_destination, routing_table);
}
} else {
self.state = RemoteTraceState::Sending(count - 1);
}
} else {
// send next slice
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
let meta = trace.get_slice_master(&mut data_slice);
router.route(
Packet::DmaAddTraceRequest {
source: self_destination,
destination: meta.destination,
id: id,
status: meta.status,
length: meta.len,
trace: data_slice,
},
routing_table,
rank,
self_destination,
);
}
}
}
}
// on subkernel request
pub fn playback(
&mut self,
id: u32,
timestamp: u64,
router: &mut Router,
rank: u8,
self_destination: u8,
routing_table: &RoutingTable,
) {
// route all the playback requests
// remote traces (local trace runs on core1 unlike mainline firmware)
self.state = RemoteTraceState::Running(self.remote_traces.len());
for (dest, _) in self.remote_traces.iter() {
router.route(
Packet::DmaPlaybackRequest {
source: self_destination,
destination: *dest,
id: id,
timestamp: timestamp,
},
routing_table,
rank,
self_destination,
);
// response will be ignored (succeeded = false handled by the main thread)
}
}
// on incoming Packet::DmaPlaybackDone
pub fn remote_finished(&mut self, kernel_manager: &mut KernelManager, error: u8, channel: u32, timestamp: u64) {
if let RemoteTraceState::Running(count) = self.state {
if error != 0 || count - 1 == 0 {
// notify the kernel about a DDMA error or finish
kernel_manager.ddma_finished(error, channel, timestamp);
self.state = RemoteTraceState::Ready;
// further messages will be ignored (if there was an error)
} else {
// no error and not the last one awaited
self.state = RemoteTraceState::Running(count - 1);
}
}
}
pub fn erase(
&mut self,
id: u32,
router: &mut Router,
rank: u8,
self_destination: u8,
routing_table: &RoutingTable,
) {
for (dest, _) in self.remote_traces.iter() {
router.route(
Packet::DmaRemoveTraceRequest {
source: self_destination,
destination: *dest,
id: id,
},
routing_table,
rank,
self_destination,
);
// response will be ignored as this object will stop existing too
}
}
pub fn has_remote_traces(&self) -> bool {
self.remote_traces.len() > 0
}
} }
#[derive(Debug)] #[derive(Debug)]
pub struct Manager { pub struct Manager {
entries: BTreeMap<u32, Entry>, entries: BTreeMap<(u8, u32), Entry>,
state: ManagerState, state: ManagerState,
currentid: u32, current_id: u32,
current_source: u8,
remote_entries: BTreeMap<u32, RemoteTraces>,
name_map: BTreeMap<String, u32>,
} }
impl Manager { impl Manager {
@ -45,79 +270,238 @@ impl Manager {
unsafe { while csr::rtio_dma::enable_read() != 0 {} } unsafe { while csr::rtio_dma::enable_read() != 0 {} }
Manager { Manager {
entries: BTreeMap::new(), entries: BTreeMap::new(),
currentid: 0, current_id: 0,
current_source: 0,
state: ManagerState::Idle, state: ManagerState::Idle,
remote_entries: BTreeMap::new(),
name_map: BTreeMap::new(),
} }
} }
pub fn add(&mut self, id: u32, status: PayloadStatus, trace: &[u8], trace_len: usize) -> Result<(), Error> { pub fn add(
let entry = match self.entries.get_mut(&id) { &mut self,
source: u8,
id: u32,
status: PayloadStatus,
trace: &[u8],
trace_len: usize,
) -> Result<(), Error> {
let entry = match self.entries.get_mut(&(source, id)) {
Some(entry) => { Some(entry) => {
if entry.complete || status.is_first() { if entry.complete || status.is_first() {
// replace entry // replace entry
self.entries.remove(&id); self.entries.remove(&(source, id));
self.entries.insert( self.entries.insert(
id, (source, id),
Entry { Entry {
trace: Vec::new(), trace: Vec::new(),
padding_len: 0, padding_len: 0,
complete: false, complete: false,
duration: 0,
}, },
); );
self.entries.get_mut(&id).unwrap() self.entries.get_mut(&(source, id)).unwrap()
} else { } else {
entry entry
} }
} }
None => { None => {
self.entries.insert( self.entries.insert(
id, (source, id),
Entry { Entry {
trace: Vec::new(), trace: Vec::new(),
padding_len: 0, padding_len: 0,
complete: false, complete: false,
duration: 0,
}, },
); );
self.entries.get_mut(&id).unwrap() self.entries.get_mut(&(source, id)).unwrap()
} }
}; };
entry.trace.extend(&trace[0..trace_len]); entry.trace.extend(&trace[0..trace_len]);
if status.is_last() { if status.is_last() {
entry.trace.push(0); entry.realign();
let data_len = entry.trace.len();
// Realign.
entry.trace.reserve(ALIGNMENT - 1);
let padding = ALIGNMENT - entry.trace.as_ptr() as usize % ALIGNMENT;
let padding = if padding == ALIGNMENT { 0 } else { padding };
for _ in 0..padding {
// Vec guarantees that this will not reallocate
entry.trace.push(0)
}
for i in 1..data_len + 1 {
entry.trace[data_len + padding - i] = entry.trace[data_len - i]
}
entry.complete = true;
entry.padding_len = padding;
dcci_slice(&entry.trace);
} }
Ok(()) Ok(())
} }
pub fn erase(&mut self, id: u32) -> Result<(), Error> { // api for DRTIO
match self.entries.remove(&id) { pub fn erase(&mut self, source: u8, id: u32) -> Result<(), Error> {
match self.entries.remove(&(source, id)) {
Some(_) => Ok(()), Some(_) => Ok(()),
None => Err(Error::IdNotFound), None => Err(Error::IdNotFound),
} }
} }
pub fn playback(&mut self, id: u32, timestamp: u64) -> Result<(), Error> { // API for subkernel
pub fn erase_name(
&mut self,
name: &str,
router: &mut Router,
rank: u8,
self_destination: u8,
routing_table: &RoutingTable,
) {
if let Some(id) = self.name_map.get(name) {
if let Some(traces) = self.remote_entries.get_mut(&id) {
traces.erase(*id, router, rank, self_destination, routing_table);
self.remote_entries.remove(&id);
}
self.entries.remove(&(self_destination, *id));
self.name_map.remove(name);
}
}
pub fn remote_finished(
&mut self,
kernel_manager: &mut KernelManager,
id: u32,
error: u8,
channel: u32,
timestamp: u64,
) {
if let Some(entry) = self.remote_entries.get_mut(&id) {
entry.remote_finished(kernel_manager, error, channel, timestamp);
}
}
pub fn ack_upload(
&mut self,
kernel_manager: &mut KernelManager,
source: u8,
id: u32,
succeeded: bool,
router: &mut Router,
rank: u8,
self_destination: u8,
routing_table: &RoutingTable,
) {
if let Some(entry) = self.remote_entries.get_mut(&id) {
entry.ack_upload(
kernel_manager,
source,
id,
succeeded,
router,
rank,
self_destination,
routing_table,
);
}
}
// API for subkernel
pub fn upload_traces(
&mut self,
id: u32,
router: &mut Router,
rank: u8,
self_destination: u8,
routing_table: &RoutingTable,
) -> Result<usize, Error> {
let remote_traces = self.remote_entries.get_mut(&id);
let mut len = 0;
if let Some(traces) = remote_traces {
len = traces.upload_traces(id, router, rank, self_destination, routing_table);
}
Ok(len)
}
// API for subkernel
pub fn playback_remote(
&mut self,
id: u32,
timestamp: u64,
router: &mut Router,
rank: u8,
self_destination: u8,
routing_table: &RoutingTable,
) -> Result<(), Error> {
if let Some(traces) = self.remote_entries.get_mut(&id) {
traces.playback(id, timestamp, router, rank, self_destination, routing_table);
Ok(())
} else {
Err(Error::IdNotFound)
}
}
// API for subkernel
pub fn cleanup(&mut self, router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) {
// after subkernel ends, remove all self-generated traces
for (_, id) in self.name_map.iter_mut() {
if let Some(traces) = self.remote_entries.get_mut(&id) {
traces.erase(*id, router, rank, self_destination, routing_table);
self.remote_entries.remove(&id);
}
self.entries.remove(&(self_destination, *id));
}
self.name_map.clear();
}
// API for subkernel
pub fn retrieve(&self, self_destination: u8, name: &String) -> Option<(i32, i64, bool)> {
let id = self.name_map.get(name)?;
let duration = self.entries.get(&(self_destination, *id))?.duration;
let uses_ddma = self.has_remote_traces(*id);
Some((*id as i32, duration, uses_ddma))
}
pub fn has_remote_traces(&self, id: u32) -> bool {
match self.remote_entries.get(&id) {
Some(traces) => traces.has_remote_traces(),
_ => false,
}
}
pub fn put_record(&mut self, mut recorder: DmaRecorder, self_destination: u8) -> Result<u32, Error> {
let mut remote_traces: BTreeMap<u8, Sliceable> = BTreeMap::new();
let mut local_trace: Vec<u8> = Vec::new();
// analyze each entry and put in proper buckets, as the kernel core
// sends whole chunks, to limit comms/kernel CPU communication,
// and as only comms core has access to varios DMA buffers.
let mut ptr = 0;
recorder.buffer.push(0);
while recorder.buffer[ptr] != 0 {
// ptr + 3 = tgt >> 24 (destination)
let len = recorder.buffer[ptr] as usize;
let destination = recorder.buffer[ptr + 3];
if destination == 0 {
return Err(Error::MasterDmaFound);
} else if destination == self_destination {
local_trace.extend(&recorder.buffer[ptr..ptr + len]);
} else {
if let Some(remote_trace) = remote_traces.get_mut(&destination) {
remote_trace.extend(&recorder.buffer[ptr..ptr + len]);
} else {
remote_traces.insert(
destination,
Sliceable::new(destination, recorder.buffer[ptr..ptr + len].to_vec()),
);
}
}
// and jump to the next event
ptr += len;
}
let local_entry = Entry::from_vec(local_trace, recorder.duration);
let id = local_entry.id();
self.entries.insert((self_destination, id), local_entry);
self.remote_entries.insert(id, RemoteTraces::new(remote_traces));
let mut name = String::new();
mem::swap(&mut recorder.name, &mut name);
self.name_map.insert(name, id);
Ok(id)
}
pub fn playback(&mut self, source: u8, id: u32, timestamp: u64) -> Result<(), Error> {
if self.state != ManagerState::Idle { if self.state != ManagerState::Idle {
return Err(Error::PlaybackInProgress); return Err(Error::PlaybackInProgress);
} }
let entry = match self.entries.get(&id) { let entry = match self.entries.get(&(source, id)) {
Some(entry) => entry, Some(entry) => entry,
None => { None => {
return Err(Error::IdNotFound); return Err(Error::IdNotFound);
@ -130,7 +514,8 @@ impl Manager {
assert!(ptr as u32 % 64 == 0); assert!(ptr as u32 % 64 == 0);
self.state = ManagerState::Playback; self.state = ManagerState::Playback;
self.currentid = id; self.current_id = id;
self.current_source = source;
unsafe { unsafe {
csr::rtio_dma::base_address_write(ptr as u32); csr::rtio_dma::base_address_write(ptr as u32);
@ -162,7 +547,8 @@ impl Manager {
csr::rtio_dma::error_write(1); csr::rtio_dma::error_write(1);
} }
return Some(RtioStatus { return Some(RtioStatus {
id: self.currentid, source: self.current_source,
id: self.current_id,
error: error, error: error,
channel: channel, channel: channel,
timestamp: timestamp, timestamp: timestamp,

View File

@ -39,11 +39,13 @@ use libboard_zynq::{i2c::I2c, print, println, time::Milliseconds, timer::GlobalT
use libcortex_a9::{l2c::enable_l2_cache, regs::MPIDR}; use libcortex_a9::{l2c::enable_l2_cache, regs::MPIDR};
use libregister::RegisterR; use libregister::RegisterR;
use libsupport_zynq::ram; use libsupport_zynq::ram;
use routing::Router;
use subkernel::Manager as KernelManager; use subkernel::Manager as KernelManager;
mod analyzer; mod analyzer;
mod dma; mod dma;
mod repeater; mod repeater;
mod routing;
mod subkernel; mod subkernel;
fn drtiosat_reset(reset: bool) { fn drtiosat_reset(reset: bool) {
@ -72,6 +74,12 @@ fn drtiosat_tsc_loaded() -> bool {
} }
} }
fn drtiosat_async_ready() {
unsafe {
csr::drtiosat::async_messages_ready_write(1);
}
}
#[cfg(has_drtio_routing)] #[cfg(has_drtio_routing)]
macro_rules! forward { macro_rules! forward {
($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{ ($routing_table:expr, $destination:expr, $rank:expr, $repeaters:expr, $packet:expr, $timer:expr) => {{
@ -79,7 +87,11 @@ macro_rules! forward {
if hop != 0 { if hop != 0 {
let repno = (hop - 1) as usize; let repno = (hop - 1) as usize;
if repno < $repeaters.len() { if repno < $repeaters.len() {
if $packet.expects_response() {
return $repeaters[repno].aux_forward($packet, $timer); return $repeaters[repno].aux_forward($packet, $timer);
} else {
return $repeaters[repno].aux_send($packet);
}
} else { } else {
return Err(drtioaux::Error::RoutingError); return Err(drtioaux::Error::RoutingError);
} }
@ -95,13 +107,15 @@ macro_rules! forward {
fn process_aux_packet( fn process_aux_packet(
_repeaters: &mut [repeater::Repeater], _repeaters: &mut [repeater::Repeater],
_routing_table: &mut drtio_routing::RoutingTable, _routing_table: &mut drtio_routing::RoutingTable,
_rank: &mut u8, rank: &mut u8,
self_destination: &mut u8,
packet: drtioaux::Packet, packet: drtioaux::Packet,
timer: &mut GlobalTimer, timer: &mut GlobalTimer,
i2c: &mut I2c, i2c: &mut I2c,
dma_manager: &mut DmaManager, dma_manager: &mut DmaManager,
analyzer: &mut Analyzer, analyzer: &mut Analyzer,
kernel_manager: &mut KernelManager, kernel_manager: &mut KernelManager,
router: &mut Router,
) -> Result<(), drtioaux::Error> { ) -> Result<(), drtioaux::Error> {
// In the code below, *_chan_sel_write takes an u8 if there are fewer than 256 channels, // In the code below, *_chan_sel_write takes an u8 if there are fewer than 256 channels,
// and u16 otherwise; hence the `as _` conversion. // and u16 otherwise; hence the `as _` conversion.
@ -122,54 +136,12 @@ fn process_aux_packet(
drtioaux::Packet::DestinationStatusRequest { destination } => { drtioaux::Packet::DestinationStatusRequest { destination } => {
#[cfg(has_drtio_routing)] #[cfg(has_drtio_routing)]
let hop = _routing_table.0[destination as usize][*_rank as usize]; let hop = _routing_table.0[destination as usize][*rank as usize];
#[cfg(not(has_drtio_routing))] #[cfg(not(has_drtio_routing))]
let hop = 0; let hop = 0;
if hop == 0 { if hop == 0 {
if let Some(status) = dma_manager.check_state() { *self_destination = destination;
info!(
"playback done, error: {}, channel: {}, timestamp: {}",
status.error, status.channel, status.timestamp
);
drtioaux::send(
0,
&drtioaux::Packet::DmaPlaybackStatus {
destination: destination,
id: status.id,
error: status.error,
channel: status.channel,
timestamp: status.timestamp,
},
)?;
} else if let Some(subkernel_finished) = kernel_manager.get_last_finished() {
info!(
"subkernel {} finished, with exception: {}",
subkernel_finished.id, subkernel_finished.with_exception
);
drtioaux::send(
0,
&drtioaux::Packet::SubkernelFinished {
id: subkernel_finished.id,
with_exception: subkernel_finished.with_exception,
},
)?;
} else if kernel_manager.message_is_ready() {
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
match kernel_manager.message_get_slice(&mut data_slice) {
Some(meta) => drtioaux::send(
0,
&drtioaux::Packet::SubkernelMessage {
destination: destination,
id: kernel_manager.get_current_id().unwrap(),
status: meta.status,
length: meta.len as u16,
data: data_slice,
},
)?,
None => warn!("subkernel message is ready but no message is present"),
}
} else {
let errors; let errors;
unsafe { unsafe {
errors = csr::drtiosat::rtio_error_read(); errors = csr::drtiosat::rtio_error_read();
@ -199,7 +171,6 @@ fn process_aux_packet(
drtioaux::send(0, &drtioaux::Packet::DestinationOkReply)?; drtioaux::send(0, &drtioaux::Packet::DestinationOkReply)?;
} }
} }
}
#[cfg(has_drtio_routing)] #[cfg(has_drtio_routing)]
{ {
@ -242,11 +213,11 @@ fn process_aux_packet(
drtioaux::send(0, &drtioaux::Packet::RoutingAck) drtioaux::send(0, &drtioaux::Packet::RoutingAck)
} }
#[cfg(has_drtio_routing)] #[cfg(has_drtio_routing)]
drtioaux::Packet::RoutingSetRank { rank } => { drtioaux::Packet::RoutingSetRank { rank: new_rank } => {
*_rank = rank; *rank = new_rank;
drtio_routing::interconnect_enable_all(_routing_table, rank); drtio_routing::interconnect_enable_all(_routing_table, new_rank);
let rep_rank = rank + 1; let rep_rank = new_rank + 1;
for rep in _repeaters.iter() { for rep in _repeaters.iter() {
if let Err(e) = rep.set_rank(rep_rank, timer) { if let Err(e) = rep.set_rank(rep_rank, timer) {
error!("failed to set rank ({:?})", e); error!("failed to set rank ({:?})", e);
@ -267,12 +238,20 @@ fn process_aux_packet(
#[cfg(not(has_drtio_routing))] #[cfg(not(has_drtio_routing))]
drtioaux::Packet::RoutingSetRank { rank: _ } => drtioaux::send(0, &drtioaux::Packet::RoutingAck), drtioaux::Packet::RoutingSetRank { rank: _ } => drtioaux::send(0, &drtioaux::Packet::RoutingAck),
drtioaux::Packet::RoutingRetrievePackets => {
let packet = router
.get_upstream_packet()
.or(Some(drtioaux::Packet::RoutingNoPackets))
.unwrap();
drtioaux::send(0, &packet)
}
drtioaux::Packet::MonitorRequest { drtioaux::Packet::MonitorRequest {
destination: _destination, destination: _destination,
channel, channel,
probe, probe,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let value; let value;
#[cfg(has_rtio_moninj)] #[cfg(has_rtio_moninj)]
unsafe { unsafe {
@ -294,7 +273,7 @@ fn process_aux_packet(
overrd, overrd,
value, value,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
#[cfg(has_rtio_moninj)] #[cfg(has_rtio_moninj)]
unsafe { unsafe {
csr::rtio_moninj::inj_chan_sel_write(channel as _); csr::rtio_moninj::inj_chan_sel_write(channel as _);
@ -308,7 +287,7 @@ fn process_aux_packet(
channel, channel,
overrd, overrd,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let value; let value;
#[cfg(has_rtio_moninj)] #[cfg(has_rtio_moninj)]
unsafe { unsafe {
@ -327,7 +306,7 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let succeeded = i2c.start().is_ok(); let succeeded = i2c.start().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
} }
@ -335,7 +314,7 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let succeeded = i2c.restart().is_ok(); let succeeded = i2c.restart().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
} }
@ -343,7 +322,7 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let succeeded = i2c.stop().is_ok(); let succeeded = i2c.stop().is_ok();
drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::I2cBasicReply { succeeded: succeeded })
} }
@ -352,7 +331,7 @@ fn process_aux_packet(
busno: _busno, busno: _busno,
data, data,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
match i2c.write(data) { match i2c.write(data) {
Ok(ack) => drtioaux::send( Ok(ack) => drtioaux::send(
0, 0,
@ -375,7 +354,7 @@ fn process_aux_packet(
busno: _busno, busno: _busno,
ack, ack,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
match i2c.read(ack) { match i2c.read(ack) {
Ok(data) => drtioaux::send( Ok(data) => drtioaux::send(
0, 0,
@ -399,7 +378,7 @@ fn process_aux_packet(
address, address,
mask, mask,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let ch = match mask { let ch = match mask {
//decode from mainline, PCA9548-centric API //decode from mainline, PCA9548-centric API
0x00 => None, 0x00 => None,
@ -425,7 +404,7 @@ fn process_aux_packet(
div: _div, div: _div,
cs: _cs, cs: _cs,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
// todo: reimplement when/if SPI is available // todo: reimplement when/if SPI is available
//let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok(); //let succeeded = spi::set_config(busno, flags, length, div, cs).is_ok();
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
@ -435,7 +414,7 @@ fn process_aux_packet(
busno: _busno, busno: _busno,
data: _data, data: _data,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
// todo: reimplement when/if SPI is available // todo: reimplement when/if SPI is available
//let succeeded = spi::write(busno, data).is_ok(); //let succeeded = spi::write(busno, data).is_ok();
drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false }) drtioaux::send(0, &drtioaux::Packet::SpiBasicReply { succeeded: false })
@ -444,7 +423,7 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
busno: _busno, busno: _busno,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
// todo: reimplement when/if SPI is available // todo: reimplement when/if SPI is available
// match spi::read(busno) { // match spi::read(busno) {
// Ok(data) => drtioaux::send(0, // Ok(data) => drtioaux::send(0,
@ -464,7 +443,7 @@ fn process_aux_packet(
drtioaux::Packet::AnalyzerHeaderRequest { drtioaux::Packet::AnalyzerHeaderRequest {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let header = analyzer.get_header(); let header = analyzer.get_header();
drtioaux::send( drtioaux::send(
0, 0,
@ -478,7 +457,7 @@ fn process_aux_packet(
drtioaux::Packet::AnalyzerDataRequest { drtioaux::Packet::AnalyzerDataRequest {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = analyzer.get_data(&mut data_slice); let meta = analyzer.get_data(&mut data_slice);
drtioaux::send( drtioaux::send(
@ -492,55 +471,135 @@ fn process_aux_packet(
} }
drtioaux::Packet::DmaAddTraceRequest { drtioaux::Packet::DmaAddTraceRequest {
destination: _destination, source,
destination,
id, id,
status, status,
length, length,
trace, trace,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, destination, *rank, _repeaters, &packet, timer);
let succeeded = dma_manager.add(id, status, &trace, length as usize).is_ok(); *self_destination = destination;
drtioaux::send(0, &drtioaux::Packet::DmaAddTraceReply { succeeded: succeeded }) let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok();
router.send(
drtioaux::Packet::DmaAddTraceReply {
source: *self_destination,
destination: source,
id: id,
succeeded: succeeded,
},
_routing_table,
*rank,
*self_destination,
)
}
drtioaux::Packet::DmaAddTraceReply {
source,
destination: _destination,
id,
succeeded,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
dma_manager.ack_upload(
kernel_manager,
source,
id,
succeeded,
router,
*rank,
*self_destination,
_routing_table,
);
Ok(())
} }
drtioaux::Packet::DmaRemoveTraceRequest { drtioaux::Packet::DmaRemoveTraceRequest {
source,
destination: _destination, destination: _destination,
id, id,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let succeeded = dma_manager.erase(id).is_ok(); let succeeded = dma_manager.erase(source, id).is_ok();
drtioaux::send(0, &drtioaux::Packet::DmaRemoveTraceReply { succeeded: succeeded }) router.send(
drtioaux::Packet::DmaRemoveTraceReply {
destination: source,
succeeded: succeeded,
},
_routing_table,
*rank,
*self_destination,
)
}
drtioaux::Packet::DmaRemoveTraceReply {
destination: _destination,
succeeded: _,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
Ok(())
} }
drtioaux::Packet::DmaPlaybackRequest { drtioaux::Packet::DmaPlaybackRequest {
source,
destination: _destination, destination: _destination,
id, id,
timestamp, timestamp,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let succeeded = if !kernel_manager.running() { let succeeded = if !kernel_manager.running() {
dma_manager.playback(id, timestamp).is_ok() dma_manager.playback(source, id, timestamp).is_ok()
} else { } else {
false false
}; };
drtioaux::send(0, &drtioaux::Packet::DmaPlaybackReply { succeeded: succeeded }) router.send(
drtioaux::Packet::DmaPlaybackReply {
destination: source,
succeeded: succeeded,
},
_routing_table,
*rank,
*self_destination,
)
}
drtioaux::Packet::DmaPlaybackReply {
destination: _destination,
succeeded,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
if !succeeded {
kernel_manager.ddma_nack();
}
Ok(())
}
drtioaux::Packet::DmaPlaybackStatus {
source: _,
destination: _destination,
id,
error,
channel,
timestamp,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp);
Ok(())
} }
drtioaux::Packet::SubkernelAddDataRequest { drtioaux::Packet::SubkernelAddDataRequest {
destination: _destination, destination,
id, id,
status, status,
length, length,
data, data,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, destination, *rank, _repeaters, &packet, timer);
*self_destination = destination;
let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok(); let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok();
drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded })
} }
drtioaux::Packet::SubkernelLoadRunRequest { drtioaux::Packet::SubkernelLoadRunRequest {
source,
destination: _destination, destination: _destination,
id, id,
run, run,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let mut succeeded = kernel_manager.load(id).is_ok(); let mut succeeded = kernel_manager.load(id).is_ok();
// allow preloading a kernel with delayed run // allow preloading a kernel with delayed run
if run { if run {
@ -548,15 +607,42 @@ fn process_aux_packet(
// cannot run kernel while DDMA is running // cannot run kernel while DDMA is running
succeeded = false; succeeded = false;
} else { } else {
succeeded |= kernel_manager.run(id).is_ok(); succeeded |= kernel_manager.run(source, id).is_ok();
} }
} }
drtioaux::send(0, &drtioaux::Packet::SubkernelLoadRunReply { succeeded: succeeded }) router.send(
drtioaux::Packet::SubkernelLoadRunReply {
destination: source,
succeeded: succeeded,
},
_routing_table,
*rank,
*self_destination,
)
}
drtioaux::Packet::SubkernelLoadRunReply {
destination: _destination,
succeeded,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
// received if local subkernel started another, remote subkernel
kernel_manager.subkernel_load_run_reply(succeeded);
Ok(())
}
drtioaux::Packet::SubkernelFinished {
destination: _destination,
id,
with_exception,
exception_src,
} => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
kernel_manager.remote_subkernel_finished(id, with_exception, exception_src);
Ok(())
} }
drtioaux::Packet::SubkernelExceptionRequest { drtioaux::Packet::SubkernelExceptionRequest {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = kernel_manager.exception_get_slice(&mut data_slice); let meta = kernel_manager.exception_get_slice(&mut data_slice);
drtioaux::send( drtioaux::send(
@ -569,38 +655,43 @@ fn process_aux_packet(
) )
} }
drtioaux::Packet::SubkernelMessage { drtioaux::Packet::SubkernelMessage {
destination, source,
destination: _destination,
id: _id, id: _id,
status, status,
length, length,
data, data,
} => { } => {
forward!(_routing_table, destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
kernel_manager.message_handle_incoming(status, length as usize, &data); kernel_manager.message_handle_incoming(status, length as usize, &data);
drtioaux::send( router.send(
0, drtioaux::Packet::SubkernelMessageAck { destination: source },
&drtioaux::Packet::SubkernelMessageAck { _routing_table,
destination: destination, *rank,
}, *self_destination,
) )
} }
drtioaux::Packet::SubkernelMessageAck { drtioaux::Packet::SubkernelMessageAck {
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
if kernel_manager.message_ack_slice() { if kernel_manager.message_ack_slice() {
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) { if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) {
drtioaux::send( // route and not send immediately as ACKs are not a beginning of a transaction
0, router.route(
&drtioaux::Packet::SubkernelMessage { drtioaux::Packet::SubkernelMessage {
destination: *_rank, source: *self_destination,
destination: meta.destination,
id: kernel_manager.get_current_id().unwrap(), id: kernel_manager.get_current_id().unwrap(),
status: meta.status, status: meta.status,
length: meta.len as u16, length: meta.len as u16,
data: data_slice, data: data_slice,
}, },
)?; _routing_table,
*rank,
*self_destination,
);
} else { } else {
error!("Error receiving message slice"); error!("Error receiving message slice");
} }
@ -608,8 +699,8 @@ fn process_aux_packet(
Ok(()) Ok(())
} }
_ => { p => {
warn!("received unexpected aux packet"); warn!("received unexpected aux packet: {:?}", p);
Ok(()) Ok(())
} }
} }
@ -619,32 +710,35 @@ fn process_aux_packets(
repeaters: &mut [repeater::Repeater], repeaters: &mut [repeater::Repeater],
routing_table: &mut drtio_routing::RoutingTable, routing_table: &mut drtio_routing::RoutingTable,
rank: &mut u8, rank: &mut u8,
self_destination: &mut u8,
timer: &mut GlobalTimer, timer: &mut GlobalTimer,
i2c: &mut I2c, i2c: &mut I2c,
dma_manager: &mut DmaManager, dma_manager: &mut DmaManager,
analyzer: &mut Analyzer, analyzer: &mut Analyzer,
kernel_manager: &mut KernelManager, kernel_manager: &mut KernelManager,
router: &mut Router,
) { ) {
let result = drtioaux::recv(0).and_then(|packet| { let result = drtioaux::recv(0).and_then(|packet| {
if let Some(packet) = packet { if let Some(packet) = packet.or_else(|| router.get_local_packet()) {
process_aux_packet( process_aux_packet(
repeaters, repeaters,
routing_table, routing_table,
rank, rank,
self_destination,
packet, packet,
timer, timer,
i2c, i2c,
dma_manager, dma_manager,
analyzer, analyzer,
kernel_manager, kernel_manager,
router,
) )
} else { } else {
Ok(()) Ok(())
} }
}); });
match result { if let Err(e) = result {
Ok(()) => (), warn!("aux packet error ({:?})", e);
Err(e) => warn!("aux packet error ({:?})", e),
} }
} }
@ -800,17 +894,20 @@ pub extern "C" fn main_core0() -> i32 {
} }
let mut routing_table = drtio_routing::RoutingTable::default_empty(); let mut routing_table = drtio_routing::RoutingTable::default_empty();
let mut rank = 1; let mut rank = 1;
let mut destination = 1;
let mut hardware_tick_ts = 0; let mut hardware_tick_ts = 0;
let mut control = ksupport::kernel::Control::start(); let mut control = ksupport::kernel::Control::start();
loop { loop {
let mut router = Router::new();
while !drtiosat_link_rx_up() { while !drtiosat_link_rx_up() {
drtiosat_process_errors(); drtiosat_process_errors();
#[allow(unused_mut)] #[allow(unused_mut)]
for mut rep in repeaters.iter_mut() { for mut rep in repeaters.iter_mut() {
rep.service(&routing_table, rank, &mut timer); rep.service(&routing_table, rank, destination, &mut router, &mut timer);
} }
#[cfg(feature = "target_kasli_soc")] #[cfg(feature = "target_kasli_soc")]
{ {
@ -849,15 +946,17 @@ pub extern "C" fn main_core0() -> i32 {
&mut repeaters, &mut repeaters,
&mut routing_table, &mut routing_table,
&mut rank, &mut rank,
&mut destination,
&mut timer, &mut timer,
&mut i2c, &mut i2c,
&mut dma_manager, &mut dma_manager,
&mut analyzer, &mut analyzer,
&mut kernel_manager, &mut kernel_manager,
&mut router,
); );
#[allow(unused_mut)] #[allow(unused_mut)]
for mut rep in repeaters.iter_mut() { for mut rep in repeaters.iter_mut() {
rep.service(&routing_table, rank, &mut timer); rep.service(&routing_table, rank, destination, &mut router, &mut timer);
} }
#[cfg(feature = "target_kasli_soc")] #[cfg(feature = "target_kasli_soc")]
{ {
@ -880,7 +979,45 @@ pub extern "C" fn main_core0() -> i32 {
error!("aux packet error: {:?}", e); error!("aux packet error: {:?}", e);
} }
} }
kernel_manager.process_kern_requests(rank, timer); if let Some(status) = dma_manager.check_state() {
info!(
"playback done, error: {}, channel: {}, timestamp: {}",
status.error, status.channel, status.timestamp
);
router.route(
drtioaux::Packet::DmaPlaybackStatus {
source: destination,
destination: status.source,
id: status.id,
error: status.error,
channel: status.channel,
timestamp: status.timestamp,
},
&routing_table,
rank,
destination,
);
}
kernel_manager.process_kern_requests(
&mut router,
&routing_table,
rank,
destination,
&mut dma_manager,
&timer,
);
#[cfg(has_drtio_routing)]
if let Some((repno, packet)) = router.get_downstream_packet() {
if let Err(e) = repeaters[repno].aux_send(&packet) {
warn!("[REP#{}] Error when sending packet to satellite ({:?})", repno, e)
}
}
if router.any_upstream_waiting() {
drtiosat_async_ready();
}
} }
drtiosat_reset_phy(true); drtiosat_reset_phy(true);

View File

@ -6,6 +6,7 @@ use libboard_artiq::{drtio_routing, drtioaux};
#[cfg(has_drtio_routing)] #[cfg(has_drtio_routing)]
use libboard_zynq::time::Milliseconds; use libboard_zynq::time::Milliseconds;
use libboard_zynq::timer::GlobalTimer; use libboard_zynq::timer::GlobalTimer;
use routing::Router;
#[cfg(has_drtio_routing)] #[cfg(has_drtio_routing)]
fn rep_link_rx_up(repno: u8) -> bool { fn rep_link_rx_up(repno: u8) -> bool {
@ -53,7 +54,14 @@ impl Repeater {
self.state == RepeaterState::Up self.state == RepeaterState::Up
} }
pub fn service(&mut self, routing_table: &drtio_routing::RoutingTable, rank: u8, timer: &mut GlobalTimer) { pub fn service(
&mut self,
routing_table: &drtio_routing::RoutingTable,
rank: u8,
destination: u8,
router: &mut Router,
timer: &mut GlobalTimer,
) {
self.process_local_errors(); self.process_local_errors();
match self.state { match self.state {
@ -116,6 +124,11 @@ impl Repeater {
info!("[REP#{}] link is down", self.repno); info!("[REP#{}] link is down", self.repno);
self.state = RepeaterState::Down; self.state = RepeaterState::Down;
} }
if self.async_messages_ready() {
if let Err(e) = self.handle_async(routing_table, rank, destination, router, timer) {
warn!("[REP#{}] Error handling async messages ({:?})", self.repno, e);
}
}
} }
RepeaterState::Failed => { RepeaterState::Failed => {
if !rep_link_rx_up(self.repno) { if !rep_link_rx_up(self.repno) {
@ -173,6 +186,34 @@ impl Repeater {
} }
} }
fn async_messages_ready(&self) -> bool {
let async_rdy;
unsafe {
async_rdy = (csr::DRTIOREP[self.repno as usize].async_messages_ready_read)();
(csr::DRTIOREP[self.repno as usize].async_messages_ready_write)(0);
}
async_rdy == 1
}
fn handle_async(
&self,
routing_table: &drtio_routing::RoutingTable,
rank: u8,
self_destination: u8,
router: &mut Router,
timer: &mut GlobalTimer,
) -> Result<(), drtioaux::Error> {
loop {
drtioaux::send(self.auxno, &drtioaux::Packet::RoutingRetrievePackets).unwrap();
let reply = self.recv_aux_timeout(200, timer)?;
match reply {
drtioaux::Packet::RoutingNoPackets => break,
packet => router.route(packet, routing_table, rank, self_destination),
}
}
Ok(())
}
fn recv_aux_timeout(&self, timeout: u32, timer: &mut GlobalTimer) -> Result<drtioaux::Packet, drtioaux::Error> { fn recv_aux_timeout(&self, timeout: u32, timer: &mut GlobalTimer) -> Result<drtioaux::Packet, drtioaux::Error> {
let max_time = timer.get_time() + Milliseconds(timeout.into()); let max_time = timer.get_time() + Milliseconds(timeout.into());
loop { loop {
@ -191,15 +232,19 @@ impl Repeater {
} }
pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { pub fn aux_forward(&self, request: &drtioaux::Packet, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
if self.state != RepeaterState::Up { self.aux_send(request)?;
return Err(drtioaux::Error::LinkDown);
}
drtioaux::send(self.auxno, request).unwrap();
let reply = self.recv_aux_timeout(200, timer)?; let reply = self.recv_aux_timeout(200, timer)?;
drtioaux::send(0, &reply).unwrap(); drtioaux::send(0, &reply).unwrap();
Ok(()) Ok(())
} }
pub fn aux_send(&self, request: &drtioaux::Packet) -> Result<(), drtioaux::Error> {
if self.state != RepeaterState::Up {
return Err(drtioaux::Error::LinkDown);
}
drtioaux::send(self.auxno, request)
}
pub fn sync_tsc(&self, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { pub fn sync_tsc(&self, timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
if self.state != RepeaterState::Up { if self.state != RepeaterState::Up {
return Ok(()); return Ok(());
@ -302,7 +347,15 @@ impl Repeater {
Repeater::default() Repeater::default()
} }
pub fn service(&self, _routing_table: &drtio_routing::RoutingTable, _rank: u8, _timer: &mut GlobalTimer) {} pub fn service(
&self,
_routing_table: &drtio_routing::RoutingTable,
_rank: u8,
_destination: u8,
_router: &mut Router,
_timer: &mut GlobalTimer,
) {
}
pub fn sync_tsc(&self, _timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { pub fn sync_tsc(&self, _timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
Ok(()) Ok(())

190
src/satman/src/routing.rs Normal file
View File

@ -0,0 +1,190 @@
use alloc::{collections::vec_deque::VecDeque, vec::Vec};
use core::cmp::min;
#[cfg(has_drtio_routing)]
use libboard_artiq::pl::csr;
use libboard_artiq::{drtio_routing, drtioaux,
drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}};
pub struct SliceMeta {
pub destination: u8,
pub len: u16,
pub status: PayloadStatus,
}
/* represents data that has to be sent to Master */
#[derive(Debug)]
pub struct Sliceable {
it: usize,
data: Vec<u8>,
destination: u8,
}
macro_rules! get_slice_fn {
($name:tt, $size:expr) => {
pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta {
let first = self.it == 0;
let len = min($size, self.data.len() - self.it);
let last = self.it + len == self.data.len();
let status = PayloadStatus::from_status(first, last);
data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]);
self.it += len;
SliceMeta {
destination: self.destination,
len: len as u16,
status: status,
}
}
};
}
impl Sliceable {
pub fn new(destination: u8, data: Vec<u8>) -> Sliceable {
Sliceable {
it: 0,
data: data,
destination: destination,
}
}
pub fn at_end(&self) -> bool {
self.it == self.data.len()
}
pub fn extend(&mut self, data: &[u8]) {
self.data.extend(data);
}
get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE);
get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE);
}
// Packets from downstream (further satellites) are received and routed appropriately.
// they're passed as soon as possible downstream (within the subtree), or sent upstream,
// which is notified about pending packets.
// for rank 1 (connected to master) satellites, these packets are passed as an answer to DestinationStatusRequest;
// for higher ranks, after getting a notification, it will transact with downstream to get the pending packets.
// forward! macro is not deprecated, as routable packets are only these that can originate
// from both master and satellite, e.g. DDMA and Subkernel.
pub struct Router {
upstream_queue: VecDeque<drtioaux::Packet>,
local_queue: VecDeque<drtioaux::Packet>,
#[cfg(has_drtio_routing)]
downstream_queue: VecDeque<(usize, drtioaux::Packet)>,
upstream_notified: bool,
}
impl Router {
pub fn new() -> Router {
Router {
upstream_queue: VecDeque::new(),
local_queue: VecDeque::new(),
#[cfg(has_drtio_routing)]
downstream_queue: VecDeque::new(),
upstream_notified: false,
}
}
// Called by local sources (DDMA, kernel) and by repeaters on receiving async data;
// messages are always buffered for both upstream and downstream
pub fn route(
&mut self,
packet: drtioaux::Packet,
_routing_table: &drtio_routing::RoutingTable,
_rank: u8,
self_destination: u8,
) {
let destination = packet.routable_destination();
#[cfg(has_drtio_routing)]
{
if let Some(destination) = destination {
let hop = _routing_table.0[destination as usize][_rank as usize] as usize;
if destination == self_destination {
self.local_queue.push_back(packet);
} else if hop > 0 && hop < csr::DRTIOREP.len() {
let repno = (hop - 1) as usize;
self.downstream_queue.push_back((repno, packet));
} else {
self.upstream_queue.push_back(packet);
}
} else {
error!("Received an unroutable packet: {:?}", packet);
}
}
#[cfg(not(has_drtio_routing))]
{
if destination == Some(self_destination) {
self.local_queue.push_back(packet);
} else {
self.upstream_queue.push_back(packet);
}
}
}
// Sends a packet to a required destination, routing if necessary
pub fn send(
&mut self,
packet: drtioaux::Packet,
_routing_table: &drtio_routing::RoutingTable,
_rank: u8,
_destination: u8,
) -> Result<(), drtioaux::Error> {
#[cfg(has_drtio_routing)]
{
let destination = packet.routable_destination();
if let Some(destination) = destination {
let hop = _routing_table.0[destination as usize][_rank as usize] as usize;
if destination == 0 {
// response is needed immediately if master required it
drtioaux::send(0, &packet)?;
} else if !(hop > 0 && hop < csr::DRTIOREP.len()) {
// higher rank can wait
self.upstream_queue.push_back(packet);
} else {
let repno = (hop - 1) as usize;
// transaction will occur at closest possible opportunity
self.downstream_queue.push_back((repno, packet));
}
Ok(())
} else {
// packet not supported in routing, fallback - sent directly
drtioaux::send(0, &packet)
}
}
#[cfg(not(has_drtio_routing))]
{
drtioaux::send(0, &packet)
}
}
pub fn any_upstream_waiting(&mut self) -> bool {
let empty = self.upstream_queue.is_empty();
if !empty && !self.upstream_notified {
self.upstream_notified = true; // so upstream will not get spammed with notifications
true
} else {
false
}
}
pub fn get_upstream_packet(&mut self) -> Option<drtioaux::Packet> {
let packet = self.upstream_queue.pop_front();
if packet.is_none() {
self.upstream_notified = false;
}
packet
}
#[cfg(has_drtio_routing)]
pub fn get_downstream_packet(&mut self) -> Option<(usize, drtioaux::Packet)> {
self.downstream_queue.pop_front()
}
pub fn get_local_packet(&mut self) -> Option<drtioaux::Packet> {
self.local_queue.pop_front()
}
}

View File

@ -2,17 +2,21 @@ use alloc::{collections::{BTreeMap, VecDeque},
format, format,
string::{String, ToString}, string::{String, ToString},
vec::Vec}; vec::Vec};
use core::{cmp::min, option::NoneError, slice, str}; use core::{option::NoneError, slice, str};
use core_io::{Error as IoError, Write}; use core_io::{Error as IoError, Write};
use cslice::AsCSlice; use cslice::AsCSlice;
use dma::{Error as DmaError, Manager as DmaManager};
use io::{Cursor, ProtoWrite}; use io::{Cursor, ProtoWrite};
use ksupport::{eh_artiq, kernel, rpc}; use ksupport::{eh_artiq, kernel, rpc};
use libboard_artiq::{drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, use libboard_artiq::{drtio_routing::RoutingTable,
drtioaux,
drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE},
pl::csr}; pl::csr};
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
use libcortex_a9::sync_channel::Receiver; use libcortex_a9::sync_channel::Receiver;
use log::warn; use log::warn;
use routing::{Router, SliceMeta, Sliceable};
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
enum KernelState { enum KernelState {
@ -21,6 +25,24 @@ enum KernelState {
Running, Running,
MsgAwait(Milliseconds, Vec<u8>), MsgAwait(Milliseconds, Vec<u8>),
MsgSending, MsgSending,
SubkernelAwaitLoad,
SubkernelAwaitFinish {
max_time: Milliseconds,
id: u32,
},
DmaUploading,
DmaPendingPlayback {
id: u32,
timestamp: u64,
},
DmaPendingAwait {
id: u32,
timestamp: u64,
max_time: Milliseconds,
},
DmaAwait {
max_time: Milliseconds,
},
} }
#[derive(Debug)] #[derive(Debug)]
@ -31,7 +53,9 @@ pub enum Error {
NoMessage, NoMessage,
AwaitingMessage, AwaitingMessage,
SubkernelIoError, SubkernelIoError,
DrtioError,
KernelException(Sliceable), KernelException(Sliceable),
DmaError(DmaError),
} }
impl From<NoneError> for Error { impl From<NoneError> for Error {
@ -46,21 +70,26 @@ impl From<IoError> for Error {
} }
} }
impl From<DmaError> for Error {
fn from(value: DmaError) -> Error {
Error::DmaError(value)
}
}
impl From<()> for Error { impl From<()> for Error {
fn from(_: ()) -> Error { fn from(_: ()) -> Error {
Error::NoMessage Error::NoMessage
} }
} }
macro_rules! unexpected { impl From<drtioaux::Error> for Error {
($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*)))); fn from(_value: drtioaux::Error) -> Error {
Error::DrtioError
}
} }
/* represents data that has to be sent to Master */ macro_rules! unexpected {
#[derive(Debug)] ($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*))));
pub struct Sliceable {
it: usize,
data: Vec<u8>,
} }
/* represents interkernel messages */ /* represents interkernel messages */
@ -72,7 +101,6 @@ struct Message {
#[derive(PartialEq)] #[derive(PartialEq)]
enum OutMessageState { enum OutMessageState {
NoMessage, NoMessage,
MessageReady,
MessageBeingSent, MessageBeingSent,
MessageSent, MessageSent,
MessageAcknowledged, MessageAcknowledged,
@ -92,6 +120,8 @@ struct Session {
kernel_state: KernelState, kernel_state: KernelState,
last_exception: Option<Sliceable>, last_exception: Option<Sliceable>,
messages: MessageManager, messages: MessageManager,
source: u8, // which destination requested running the kernel
subkernels_finished: Vec<u32>,
} }
impl Session { impl Session {
@ -101,13 +131,15 @@ impl Session {
kernel_state: KernelState::Absent, kernel_state: KernelState::Absent,
last_exception: None, last_exception: None,
messages: MessageManager::new(), messages: MessageManager::new(),
source: 0,
subkernels_finished: Vec::new(),
} }
} }
fn running(&self) -> bool { fn running(&self) -> bool {
match self.kernel_state { match self.kernel_state {
KernelState::Absent | KernelState::Loaded => false, KernelState::Absent | KernelState::Loaded => false,
KernelState::Running | KernelState::MsgAwait { .. } | KernelState::MsgSending => true, _ => true,
} }
} }
} }
@ -129,39 +161,8 @@ pub struct Manager<'a> {
pub struct SubkernelFinished { pub struct SubkernelFinished {
pub id: u32, pub id: u32,
pub with_exception: bool, pub with_exception: bool,
} pub exception_source: u8,
pub source: u8,
pub struct SliceMeta {
pub len: u16,
pub status: PayloadStatus,
}
macro_rules! get_slice_fn {
($name:tt, $size:expr) => {
pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta {
let first = self.it == 0;
let len = min($size, self.data.len() - self.it);
let last = self.it + len == self.data.len();
let status = PayloadStatus::from_status(first, last);
data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]);
self.it += len;
SliceMeta {
len: len as u16,
status: status,
}
}
};
}
impl Sliceable {
pub fn new(data: Vec<u8>) -> Sliceable {
Sliceable { it: 0, data: data }
}
get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE);
get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE);
} }
impl MessageManager { impl MessageManager {
@ -194,17 +195,6 @@ impl MessageManager {
} }
} }
pub fn is_outgoing_ready(&mut self) -> bool {
// called by main loop, to see if there's anything to send, will send it afterwards
match self.out_state {
OutMessageState::MessageReady => {
self.out_state = OutMessageState::MessageBeingSent;
true
}
_ => false,
}
}
pub fn was_message_acknowledged(&mut self) -> bool { pub fn was_message_acknowledged(&mut self) -> bool {
match self.out_state { match self.out_state {
OutMessageState::MessageAcknowledged => { OutMessageState::MessageAcknowledged => {
@ -244,10 +234,34 @@ impl MessageManager {
} }
} }
pub fn accept_outgoing(&mut self, message: Vec<u8>) -> Result<(), Error> { pub fn accept_outgoing(
// service tag skipped in kernel &mut self,
self.out_message = Some(Sliceable::new(message)); id: u32,
self.out_state = OutMessageState::MessageReady; self_destination: u8,
destination: u8,
message: Vec<u8>,
routing_table: &RoutingTable,
rank: u8,
router: &mut Router,
) -> Result<(), Error> {
self.out_message = Some(Sliceable::new(destination, message));
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
self.out_state = OutMessageState::MessageBeingSent;
let meta = self.get_outgoing_slice(&mut data_slice).unwrap();
router.route(
drtioaux::Packet::SubkernelMessage {
source: self_destination,
destination: destination,
id: id,
status: meta.status,
length: meta.len as u16,
data: data_slice,
},
routing_table,
rank,
self_destination,
);
Ok(()) Ok(())
} }
@ -313,12 +327,12 @@ impl<'a> Manager<'_> {
} }
} }
pub fn run(&mut self, id: u32) -> Result<(), Error> { pub fn run(&mut self, source: u8, id: u32) -> Result<(), Error> {
info!("starting subkernel #{}", id);
if self.session.kernel_state != KernelState::Loaded || self.session.id != id { if self.session.kernel_state != KernelState::Loaded || self.session.id != id {
self.load(id)?; self.load(id)?;
} }
self.session.kernel_state = KernelState::Running; self.session.kernel_state = KernelState::Running;
self.session.source = source;
unsafe { unsafe {
csr::cri_con::selected_write(2); csr::cri_con::selected_write(2);
} }
@ -354,10 +368,6 @@ impl<'a> Manager<'_> {
self.session.messages.ack_slice() self.session.messages.ack_slice()
} }
pub fn message_is_ready(&mut self) -> bool {
self.session.messages.is_outgoing_ready()
}
pub fn load(&mut self, id: u32) -> Result<(), Error> { pub fn load(&mut self, id: u32) -> Result<(), Error> {
if self.session.id == id && self.session.kernel_state == KernelState::Loaded { if self.session.id == id && self.session.kernel_state == KernelState::Loaded {
return Ok(()); return Ok(());
@ -386,16 +396,13 @@ impl<'a> Manager<'_> {
match self.session.last_exception.as_mut() { match self.session.last_exception.as_mut() {
Some(exception) => exception.get_slice_sat(data_slice), Some(exception) => exception.get_slice_sat(data_slice),
None => SliceMeta { None => SliceMeta {
destination: 0,
len: 0, len: 0,
status: PayloadStatus::FirstAndLast, status: PayloadStatus::FirstAndLast,
}, },
} }
} }
pub fn get_last_finished(&mut self) -> Option<SubkernelFinished> {
self.last_finished.take()
}
fn kernel_stop(&mut self) { fn kernel_stop(&mut self) {
self.session.kernel_state = KernelState::Absent; self.session.kernel_state = KernelState::Absent;
unsafe { unsafe {
@ -425,13 +432,92 @@ impl<'a> Manager<'_> {
&[], &[],
0, 0,
) { ) {
Ok(_) => self.session.last_exception = Some(Sliceable::new(writer.into_inner())), Ok(_) => self.session.last_exception = Some(Sliceable::new(0, writer.into_inner())),
Err(_) => error!("Error writing exception data"), Err(_) => error!("Error writing exception data"),
} }
self.kernel_stop(); self.kernel_stop();
} }
pub fn process_kern_requests(&mut self, rank: u8, timer: GlobalTimer) { pub fn ddma_finished(&mut self, error: u8, channel: u32, timestamp: u64) {
if let KernelState::DmaAwait { .. } = self.session.kernel_state {
self.control.tx.send(kernel::Message::DmaAwaitRemoteReply {
timeout: false,
error: error,
channel: channel,
timestamp: timestamp,
});
self.session.kernel_state = KernelState::Running;
}
}
pub fn ddma_nack(&mut self) {
// for simplicity treat it as a timeout...
if let KernelState::DmaAwait { .. } = self.session.kernel_state {
self.control.tx.send(kernel::Message::DmaAwaitRemoteReply {
timeout: true,
error: 0,
channel: 0,
timestamp: 0,
});
self.session.kernel_state = KernelState::Running;
}
}
pub fn ddma_remote_uploaded(&mut self, succeeded: bool) -> Option<(u32, u64)> {
// returns a tuple of id, timestamp in case a playback needs to be started immediately
if !succeeded {
self.kernel_stop();
self.runtime_exception(Error::DmaError(DmaError::UploadFail));
}
let res = match self.session.kernel_state {
KernelState::DmaPendingPlayback { id, timestamp } => {
self.session.kernel_state = KernelState::Running;
Some((id, timestamp))
}
KernelState::DmaPendingAwait {
id,
timestamp,
max_time,
} => {
self.session.kernel_state = KernelState::DmaAwait { max_time: max_time };
Some((id, timestamp))
}
KernelState::DmaUploading => {
self.session.kernel_state = KernelState::Running;
None
}
_ => None,
};
res
}
pub fn process_kern_requests(
&mut self,
router: &mut Router,
routing_table: &RoutingTable,
rank: u8,
destination: u8,
dma_manager: &mut DmaManager,
timer: &GlobalTimer,
) {
if let Some(subkernel_finished) = self.last_finished.take() {
info!(
"subkernel {} finished, with exception: {}",
subkernel_finished.id, subkernel_finished.with_exception
);
router.route(
drtioaux::Packet::SubkernelFinished {
destination: subkernel_finished.source,
id: subkernel_finished.id,
with_exception: subkernel_finished.with_exception,
exception_src: subkernel_finished.exception_source,
},
&routing_table,
rank,
destination,
);
}
if !self.running() { if !self.running() {
return; return;
} }
@ -444,6 +530,8 @@ impl<'a> Manager<'_> {
self.last_finished = Some(SubkernelFinished { self.last_finished = Some(SubkernelFinished {
id: self.session.id, id: self.session.id,
with_exception: true, with_exception: true,
exception_source: destination,
source: self.session.source,
}); });
} }
Err(e) => { Err(e) => {
@ -452,15 +540,19 @@ impl<'a> Manager<'_> {
self.last_finished = Some(SubkernelFinished { self.last_finished = Some(SubkernelFinished {
id: self.session.id, id: self.session.id,
with_exception: true, with_exception: true,
exception_source: destination,
source: self.session.source,
}); });
} }
} }
match self.process_kern_message(rank, timer) { match self.process_kern_message(router, routing_table, rank, destination, dma_manager, timer) {
Ok(true) => { Ok(true) => {
self.last_finished = Some(SubkernelFinished { self.last_finished = Some(SubkernelFinished {
id: self.session.id, id: self.session.id,
with_exception: false, with_exception: false,
exception_source: 0,
source: self.session.source,
}); });
} }
Ok(false) | Err(Error::NoMessage) => (), Ok(false) | Err(Error::NoMessage) => (),
@ -469,6 +561,8 @@ impl<'a> Manager<'_> {
self.last_finished = Some(SubkernelFinished { self.last_finished = Some(SubkernelFinished {
id: self.session.id, id: self.session.id,
with_exception: true, with_exception: true,
exception_source: destination,
source: self.session.source,
}); });
} }
Err(e) => { Err(e) => {
@ -477,16 +571,52 @@ impl<'a> Manager<'_> {
self.last_finished = Some(SubkernelFinished { self.last_finished = Some(SubkernelFinished {
id: self.session.id, id: self.session.id,
with_exception: true, with_exception: true,
exception_source: destination,
source: self.session.source,
}); });
} }
} }
} }
fn process_kern_message(&mut self, rank: u8, timer: GlobalTimer) -> Result<bool, Error> { pub fn subkernel_load_run_reply(&mut self, succeeded: bool) {
if self.session.kernel_state == KernelState::SubkernelAwaitLoad {
self.control
.tx
.send(kernel::Message::SubkernelLoadRunReply { succeeded: succeeded });
self.session.kernel_state = KernelState::Running;
} else {
warn!("received unsolicited SubkernelLoadRunReply");
}
}
pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) {
if with_exception {
self.kernel_stop();
self.last_finished = Some(SubkernelFinished {
source: self.session.source,
id: self.session.id,
with_exception: true,
exception_source: exception_source,
})
} else {
self.session.subkernels_finished.push(id);
}
}
fn process_kern_message(
&mut self,
router: &mut Router,
routing_table: &RoutingTable,
rank: u8,
self_destination: u8,
dma_manager: &mut DmaManager,
timer: &GlobalTimer,
) -> Result<bool, Error> {
let reply = self.control.rx.try_recv()?; let reply = self.control.rx.try_recv()?;
match reply { match reply {
kernel::Message::KernelFinished(_async_errors) => { kernel::Message::KernelFinished(_async_errors) => {
self.kernel_stop(); self.kernel_stop();
dma_manager.cleanup(router, rank, self_destination, routing_table);
return Ok(true); return Ok(true);
} }
kernel::Message::KernelException(exceptions, stack_pointers, backtrace, async_errors) => { kernel::Message::KernelException(exceptions, stack_pointers, backtrace, async_errors) => {
@ -503,7 +633,7 @@ impl<'a> Manager<'_> {
Err(_) => error!("Error writing exception data"), Err(_) => error!("Error writing exception data"),
} }
self.kernel_stop(); self.kernel_stop();
return Err(Error::KernelException(Sliceable::new(writer.into_inner()))); return Err(Error::KernelException(Sliceable::new(0, writer.into_inner())));
} }
kernel::Message::CachePutRequest(key, value) => { kernel::Message::CachePutRequest(key, value) => {
self.cache.insert(key, value); self.cache.insert(key, value);
@ -513,18 +643,104 @@ impl<'a> Manager<'_> {
let value = self.cache.get(&key).unwrap_or(&DEFAULT).clone(); let value = self.cache.get(&key).unwrap_or(&DEFAULT).clone();
self.control.tx.send(kernel::Message::CacheGetReply(value)); self.control.tx.send(kernel::Message::CacheGetReply(value));
} }
kernel::Message::SubkernelMsgSend { id: _, data } => {
self.session.messages.accept_outgoing(data)?; kernel::Message::DmaPutRequest(recorder) => {
// ddma is always used on satellites
if let Ok(id) = dma_manager.put_record(recorder, self_destination) {
dma_manager.upload_traces(id, router, rank, self_destination, routing_table)?;
self.session.kernel_state = KernelState::DmaUploading;
} else {
unexpected!("DMAError: found an unsupported call to RTIO devices on master")
}
}
kernel::Message::DmaEraseRequest(name) => {
dma_manager.erase_name(&name, router, rank, self_destination, routing_table);
}
kernel::Message::DmaGetRequest(name) => {
let dma_meta = dma_manager.retrieve(self_destination, &name);
self.control.tx.send(kernel::Message::DmaGetReply(dma_meta));
}
kernel::Message::DmaStartRemoteRequest { id, timestamp } => {
if self.session.kernel_state != KernelState::DmaUploading {
dma_manager.playback_remote(
id as u32,
timestamp as u64,
router,
rank,
self_destination,
routing_table,
)?;
} else {
self.session.kernel_state = KernelState::DmaPendingPlayback {
id: id as u32,
timestamp: timestamp as u64,
};
}
}
kernel::Message::DmaAwaitRemoteRequest(_id) => {
let max_time = timer.get_time() + Milliseconds(10000);
self.session.kernel_state = match self.session.kernel_state {
// if we are still waiting for the traces to be uploaded, extend the state by timeout
KernelState::DmaPendingPlayback { id, timestamp } => KernelState::DmaPendingAwait {
id: id,
timestamp: timestamp,
max_time: max_time,
},
_ => KernelState::DmaAwait { max_time: max_time },
};
}
kernel::Message::SubkernelMsgSend {
id: _id,
destination: msg_dest,
data,
} => {
let msg_dest = msg_dest.or(Some(self.session.source)).unwrap();
self.session.messages.accept_outgoing(
self.session.id,
self_destination,
msg_dest,
data,
routing_table,
rank,
router,
)?;
self.session.kernel_state = KernelState::MsgSending; self.session.kernel_state = KernelState::MsgSending;
} }
kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => { kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => {
let max_time = timer.get_time() + Milliseconds(timeout); let max_time = timer.get_time() + Milliseconds(timeout);
self.session.kernel_state = KernelState::MsgAwait(max_time, tags); self.session.kernel_state = KernelState::MsgAwait(max_time, tags);
} }
kernel::Message::SubkernelLoadRunRequest {
id,
destination: sk_destination,
run,
} => {
self.session.kernel_state = KernelState::SubkernelAwaitLoad;
router.route(
drtioaux::Packet::SubkernelLoadRunRequest {
source: self_destination,
destination: sk_destination,
id: id,
run: run,
},
routing_table,
rank,
self_destination,
);
}
kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => {
let max_time = timer.get_time() + Milliseconds(timeout);
self.session.kernel_state = KernelState::SubkernelAwaitFinish {
max_time: max_time,
id: id,
};
}
kernel::Message::UpDestinationsRequest(destination) => { kernel::Message::UpDestinationsRequest(destination) => {
self.control self.control.tx.send(kernel::Message::UpDestinationsReply(
.tx destination == (self_destination as i32),
.send(kernel::Message::UpDestinationsReply(destination == (rank as i32))); ));
} }
_ => { _ => {
unexpected!("unexpected message from core1 while kernel was running: {:?}", reply); unexpected!("unexpected message from core1 while kernel was running: {:?}", reply);
@ -533,7 +749,7 @@ impl<'a> Manager<'_> {
Ok(false) Ok(false)
} }
fn process_external_messages(&mut self, timer: GlobalTimer) -> Result<(), Error> { fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> {
match &self.session.kernel_state { match &self.session.kernel_state {
KernelState::MsgAwait(timeout, tags) => { KernelState::MsgAwait(timeout, tags) => {
if timer.get_time() > *timeout { if timer.get_time() > *timeout {
@ -565,11 +781,45 @@ impl<'a> Manager<'_> {
Err(Error::AwaitingMessage) Err(Error::AwaitingMessage)
} }
} }
KernelState::SubkernelAwaitFinish { max_time, id } => {
if timer.get_time() > *max_time {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::Timeout,
});
self.session.kernel_state = KernelState::Running;
} else {
let mut i = 0;
for status in &self.session.subkernels_finished {
if *status == *id {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::NoError,
});
self.session.kernel_state = KernelState::Running;
self.session.subkernels_finished.swap_remove(i);
break;
}
i += 1;
}
}
Ok(())
}
KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => {
if timer.get_time() > *max_time {
self.control.tx.send(kernel::Message::DmaAwaitRemoteReply {
timeout: true,
error: 0,
channel: 0,
timestamp: 0,
});
self.session.kernel_state = KernelState::Running;
}
Ok(())
}
_ => Ok(()), _ => Ok(()),
} }
} }
fn pass_message_to_kernel(&mut self, message: &Message, tags: Vec<u8>, timer: GlobalTimer) -> Result<(), Error> { fn pass_message_to_kernel(&mut self, message: &Message, tags: Vec<u8>, timer: &GlobalTimer) -> Result<(), Error> {
let mut reader = Cursor::new(&message.data); let mut reader = Cursor::new(&message.data);
let mut current_tags: &[u8] = &tags; let mut current_tags: &[u8] = &tags;
let mut i = message.count; let mut i = message.count;
@ -592,7 +842,7 @@ impl<'a> Manager<'_> {
let mut writer = Cursor::new(buf); let mut writer = Cursor::new(buf);
match write_exception(&mut writer, exceptions, stack_pointers, backtrace, async_errors) { match write_exception(&mut writer, exceptions, stack_pointers, backtrace, async_errors) {
Ok(()) => { Ok(()) => {
exception = Some(Sliceable::new(writer.into_inner())); exception = Some(Sliceable::new(0, writer.into_inner()));
} }
Err(_) => { Err(_) => {
unexpected = Some("Error writing exception data".to_string()); unexpected = Some("Error writing exception data".to_string());
@ -686,7 +936,7 @@ where
fn recv_w_timeout( fn recv_w_timeout(
rx: &mut Receiver<'_, kernel::Message>, rx: &mut Receiver<'_, kernel::Message>,
timer: GlobalTimer, timer: &GlobalTimer,
timeout: u64, timeout: u64,
) -> Result<kernel::Message, Error> { ) -> Result<kernel::Message, Error> {
let max_time = timer.get_time() + Milliseconds(timeout); let max_time = timer.get_time() + Milliseconds(timeout);