subkernel: add support for (d)dma
This commit is contained in:
parent
0dd834acf8
commit
2c33208fb1
@ -207,7 +207,9 @@ pub enum Packet {
|
||||
trace: [u8; MASTER_PAYLOAD_MAX_SIZE],
|
||||
},
|
||||
DmaAddTraceReply {
|
||||
source: u8,
|
||||
destination: u8,
|
||||
id: u32,
|
||||
succeeded: bool,
|
||||
},
|
||||
DmaRemoveTraceRequest {
|
||||
@ -456,7 +458,9 @@ impl Packet {
|
||||
}
|
||||
}
|
||||
0xb1 => Packet::DmaAddTraceReply {
|
||||
source: reader.read_u8()?,
|
||||
destination: reader.read_u8()?,
|
||||
id: reader.read_u32()?,
|
||||
succeeded: reader.read_bool()?,
|
||||
},
|
||||
0xb2 => Packet::DmaRemoveTraceRequest {
|
||||
@ -788,9 +792,16 @@ impl Packet {
|
||||
writer.write_u16(length)?;
|
||||
writer.write_all(&trace[0..length as usize])?;
|
||||
}
|
||||
Packet::DmaAddTraceReply { destination, succeeded } => {
|
||||
Packet::DmaAddTraceReply {
|
||||
source,
|
||||
destination,
|
||||
id,
|
||||
succeeded,
|
||||
} => {
|
||||
writer.write_u8(0xb1)?;
|
||||
writer.write_u8(source)?;
|
||||
writer.write_u8(destination)?;
|
||||
writer.write_u32(id)?;
|
||||
writer.write_bool(succeeded)?;
|
||||
}
|
||||
Packet::DmaRemoveTraceRequest {
|
||||
|
@ -142,9 +142,9 @@ pub mod remote_dma {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn playback_done(&mut self, destination: u8, error: u8, channel: u32, timestamp: u64) {
|
||||
pub async fn playback_done(&mut self, source: u8, error: u8, channel: u32, timestamp: u64) {
|
||||
let mut traces_locked = self.traces.async_lock().await;
|
||||
let mut trace = traces_locked.get_mut(&destination).unwrap();
|
||||
let mut trace = traces_locked.get_mut(&source).unwrap();
|
||||
trace.state = RemoteState::PlaybackEnded {
|
||||
error: error,
|
||||
channel: channel,
|
||||
|
@ -560,10 +560,12 @@ pub mod drtio {
|
||||
Packet::DmaAddTraceReply {
|
||||
destination: 0,
|
||||
succeeded: true,
|
||||
..
|
||||
} => Ok(()),
|
||||
Packet::DmaAddTraceReply {
|
||||
destination: 0,
|
||||
succeeded: false,
|
||||
..
|
||||
} => Err("error adding trace on satellite"),
|
||||
_ => Err("adding DMA trace failed, unexpected aux packet"),
|
||||
},
|
||||
|
@ -1,7 +1,13 @@
|
||||
use alloc::{collections::btree_map::BTreeMap, vec::Vec};
|
||||
use alloc::{collections::btree_map::BTreeMap, string::String, vec::Vec};
|
||||
use core::mem;
|
||||
|
||||
use libboard_artiq::{drtioaux_proto::PayloadStatus, pl::csr};
|
||||
use ksupport::kernel::DmaRecorder;
|
||||
use libboard_artiq::{drtio_routing::RoutingTable,
|
||||
drtioaux_proto::{Packet, PayloadStatus, MASTER_PAYLOAD_MAX_SIZE},
|
||||
pl::csr};
|
||||
use libcortex_a9::cache::dcci_slice;
|
||||
use routing::{Router, Sliceable};
|
||||
use subkernel::Manager as KernelManager;
|
||||
|
||||
const ALIGNMENT: usize = 64;
|
||||
|
||||
@ -19,10 +25,13 @@ pub struct RtioStatus {
|
||||
pub timestamp: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Error {
|
||||
IdNotFound,
|
||||
PlaybackInProgress,
|
||||
EntryNotComplete,
|
||||
MasterDmaFound,
|
||||
UploadFail,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -30,6 +39,217 @@ struct Entry {
|
||||
trace: Vec<u8>,
|
||||
padding_len: usize,
|
||||
complete: bool,
|
||||
duration: i64, // relevant for local DMA
|
||||
}
|
||||
|
||||
impl Entry {
|
||||
pub fn from_vec(data: Vec<u8>, duration: i64) -> Entry {
|
||||
let mut entry = Entry {
|
||||
trace: data,
|
||||
padding_len: 0,
|
||||
complete: true,
|
||||
duration: duration,
|
||||
};
|
||||
entry.realign();
|
||||
entry
|
||||
}
|
||||
|
||||
pub fn id(&self) -> u32 {
|
||||
self.trace[self.padding_len..].as_ptr() as u32
|
||||
}
|
||||
|
||||
pub fn realign(&mut self) {
|
||||
self.trace.push(0);
|
||||
let data_len = self.trace.len();
|
||||
|
||||
self.trace.reserve(ALIGNMENT - 1);
|
||||
let padding = ALIGNMENT - self.trace.as_ptr() as usize % ALIGNMENT;
|
||||
let padding = if padding == ALIGNMENT { 0 } else { padding };
|
||||
for _ in 0..padding {
|
||||
// Vec guarantees that this will not reallocate
|
||||
self.trace.push(0)
|
||||
}
|
||||
for i in 1..data_len + 1 {
|
||||
self.trace[data_len + padding - i] = self.trace[data_len - i]
|
||||
}
|
||||
self.complete = true;
|
||||
self.padding_len = padding;
|
||||
|
||||
dcci_slice(&self.trace);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum RemoteTraceState {
|
||||
Unsent,
|
||||
Sending(usize),
|
||||
Ready,
|
||||
Running(usize),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RemoteTraces {
|
||||
remote_traces: BTreeMap<u8, Sliceable>,
|
||||
state: RemoteTraceState,
|
||||
}
|
||||
|
||||
impl RemoteTraces {
|
||||
pub fn new(traces: BTreeMap<u8, Sliceable>) -> RemoteTraces {
|
||||
RemoteTraces {
|
||||
remote_traces: traces,
|
||||
state: RemoteTraceState::Unsent,
|
||||
}
|
||||
}
|
||||
|
||||
// on subkernel request
|
||||
pub fn upload_traces(
|
||||
&mut self,
|
||||
id: u32,
|
||||
router: &mut Router,
|
||||
rank: u8,
|
||||
self_destination: u8,
|
||||
routing_table: &RoutingTable,
|
||||
) -> usize {
|
||||
let len = self.remote_traces.len();
|
||||
if len > 0 {
|
||||
self.state = RemoteTraceState::Sending(self.remote_traces.len());
|
||||
for (dest, trace) in self.remote_traces.iter_mut() {
|
||||
// queue up the first packet for all destinations, rest will be sent after first ACK
|
||||
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
||||
let meta = trace.get_slice_master(&mut data_slice);
|
||||
router.route(
|
||||
Packet::DmaAddTraceRequest {
|
||||
source: self_destination,
|
||||
destination: *dest,
|
||||
id: id,
|
||||
status: meta.status,
|
||||
length: meta.len,
|
||||
trace: data_slice,
|
||||
},
|
||||
routing_table,
|
||||
rank,
|
||||
self_destination,
|
||||
);
|
||||
}
|
||||
}
|
||||
len
|
||||
}
|
||||
|
||||
// on incoming Packet::DmaAddTraceReply
|
||||
pub fn ack_upload(
|
||||
&mut self,
|
||||
kernel_manager: &mut KernelManager,
|
||||
source: u8,
|
||||
id: u32,
|
||||
succeeded: bool,
|
||||
router: &mut Router,
|
||||
rank: u8,
|
||||
self_destination: u8,
|
||||
routing_table: &RoutingTable,
|
||||
) {
|
||||
if let RemoteTraceState::Sending(count) = self.state {
|
||||
if let Some(trace) = self.remote_traces.get_mut(&source) {
|
||||
if trace.at_end() {
|
||||
if count - 1 == 0 {
|
||||
self.state = RemoteTraceState::Ready;
|
||||
if let Some((id, timestamp)) = kernel_manager.ddma_remote_uploaded(succeeded) {
|
||||
self.playback(id, timestamp, router, rank, self_destination, routing_table);
|
||||
}
|
||||
} else {
|
||||
self.state = RemoteTraceState::Sending(count - 1);
|
||||
}
|
||||
} else {
|
||||
// send next slice
|
||||
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
||||
let meta = trace.get_slice_master(&mut data_slice);
|
||||
router.route(
|
||||
Packet::DmaAddTraceRequest {
|
||||
source: self_destination,
|
||||
destination: meta.destination,
|
||||
id: id,
|
||||
status: meta.status,
|
||||
length: meta.len,
|
||||
trace: data_slice,
|
||||
},
|
||||
routing_table,
|
||||
rank,
|
||||
self_destination,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// on subkernel request
|
||||
pub fn playback(
|
||||
&mut self,
|
||||
id: u32,
|
||||
timestamp: u64,
|
||||
router: &mut Router,
|
||||
rank: u8,
|
||||
self_destination: u8,
|
||||
routing_table: &RoutingTable,
|
||||
) {
|
||||
// route all the playback requests
|
||||
// remote traces (local trace runs on core1 unlike mainline firmware)
|
||||
self.state = RemoteTraceState::Running(self.remote_traces.len());
|
||||
for (dest, _) in self.remote_traces.iter() {
|
||||
router.route(
|
||||
Packet::DmaPlaybackRequest {
|
||||
source: self_destination,
|
||||
destination: *dest,
|
||||
id: id,
|
||||
timestamp: timestamp,
|
||||
},
|
||||
routing_table,
|
||||
rank,
|
||||
self_destination,
|
||||
);
|
||||
// response will be ignored (succeeded = false handled by the main thread)
|
||||
}
|
||||
}
|
||||
|
||||
// on incoming Packet::DmaPlaybackDone
|
||||
pub fn remote_finished(&mut self, kernel_manager: &mut KernelManager, error: u8, channel: u32, timestamp: u64) {
|
||||
if let RemoteTraceState::Running(count) = self.state {
|
||||
if error != 0 || count - 1 == 0 {
|
||||
// notify the kernel about a DDMA error or finish
|
||||
kernel_manager.ddma_finished(error, channel, timestamp);
|
||||
self.state = RemoteTraceState::Ready;
|
||||
// further messages will be ignored (if there was an error)
|
||||
} else {
|
||||
// no error and not the last one awaited
|
||||
self.state = RemoteTraceState::Running(count - 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn erase(
|
||||
&mut self,
|
||||
id: u32,
|
||||
router: &mut Router,
|
||||
rank: u8,
|
||||
self_destination: u8,
|
||||
routing_table: &RoutingTable,
|
||||
) {
|
||||
for (dest, _) in self.remote_traces.iter() {
|
||||
router.route(
|
||||
Packet::DmaRemoveTraceRequest {
|
||||
source: self_destination,
|
||||
destination: *dest,
|
||||
id: id,
|
||||
},
|
||||
routing_table,
|
||||
rank,
|
||||
self_destination,
|
||||
);
|
||||
// response will be ignored as this object will stop existing too
|
||||
}
|
||||
}
|
||||
|
||||
pub fn has_remote_traces(&self) -> bool {
|
||||
self.remote_traces.len() > 0
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -38,6 +258,9 @@ pub struct Manager {
|
||||
state: ManagerState,
|
||||
current_id: u32,
|
||||
current_source: u8,
|
||||
|
||||
remote_entries: BTreeMap<u32, RemoteTraces>,
|
||||
name_map: BTreeMap<String, u32>,
|
||||
}
|
||||
|
||||
impl Manager {
|
||||
@ -50,6 +273,8 @@ impl Manager {
|
||||
current_id: 0,
|
||||
current_source: 0,
|
||||
state: ManagerState::Idle,
|
||||
remote_entries: BTreeMap::new(),
|
||||
name_map: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -72,6 +297,7 @@ impl Manager {
|
||||
trace: Vec::new(),
|
||||
padding_len: 0,
|
||||
complete: false,
|
||||
duration: 0,
|
||||
},
|
||||
);
|
||||
self.entries.get_mut(&(source, id)).unwrap()
|
||||
@ -86,6 +312,7 @@ impl Manager {
|
||||
trace: Vec::new(),
|
||||
padding_len: 0,
|
||||
complete: false,
|
||||
duration: 0,
|
||||
},
|
||||
);
|
||||
self.entries.get_mut(&(source, id)).unwrap()
|
||||
@ -94,27 +321,12 @@ impl Manager {
|
||||
entry.trace.extend(&trace[0..trace_len]);
|
||||
|
||||
if status.is_last() {
|
||||
entry.trace.push(0);
|
||||
let data_len = entry.trace.len();
|
||||
|
||||
// Realign.
|
||||
entry.trace.reserve(ALIGNMENT - 1);
|
||||
let padding = ALIGNMENT - entry.trace.as_ptr() as usize % ALIGNMENT;
|
||||
let padding = if padding == ALIGNMENT { 0 } else { padding };
|
||||
for _ in 0..padding {
|
||||
// Vec guarantees that this will not reallocate
|
||||
entry.trace.push(0)
|
||||
}
|
||||
for i in 1..data_len + 1 {
|
||||
entry.trace[data_len + padding - i] = entry.trace[data_len - i]
|
||||
}
|
||||
entry.complete = true;
|
||||
entry.padding_len = padding;
|
||||
dcci_slice(&entry.trace);
|
||||
entry.realign();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// api for DRTIO
|
||||
pub fn erase(&mut self, source: u8, id: u32) -> Result<(), Error> {
|
||||
match self.entries.remove(&(source, id)) {
|
||||
Some(_) => Ok(()),
|
||||
@ -122,6 +334,168 @@ impl Manager {
|
||||
}
|
||||
}
|
||||
|
||||
// API for subkernel
|
||||
pub fn erase_name(
|
||||
&mut self,
|
||||
name: &str,
|
||||
router: &mut Router,
|
||||
rank: u8,
|
||||
self_destination: u8,
|
||||
routing_table: &RoutingTable,
|
||||
) {
|
||||
if let Some(id) = self.name_map.get(name) {
|
||||
if let Some(traces) = self.remote_entries.get_mut(&id) {
|
||||
traces.erase(*id, router, rank, self_destination, routing_table);
|
||||
self.remote_entries.remove(&id);
|
||||
}
|
||||
self.entries.remove(&(self_destination, *id));
|
||||
self.name_map.remove(name);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn remote_finished(
|
||||
&mut self,
|
||||
kernel_manager: &mut KernelManager,
|
||||
id: u32,
|
||||
error: u8,
|
||||
channel: u32,
|
||||
timestamp: u64,
|
||||
) {
|
||||
if let Some(entry) = self.remote_entries.get_mut(&id) {
|
||||
entry.remote_finished(kernel_manager, error, channel, timestamp);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ack_upload(
|
||||
&mut self,
|
||||
kernel_manager: &mut KernelManager,
|
||||
source: u8,
|
||||
id: u32,
|
||||
succeeded: bool,
|
||||
router: &mut Router,
|
||||
rank: u8,
|
||||
self_destination: u8,
|
||||
routing_table: &RoutingTable,
|
||||
) {
|
||||
if let Some(entry) = self.remote_entries.get_mut(&id) {
|
||||
entry.ack_upload(
|
||||
kernel_manager,
|
||||
source,
|
||||
id,
|
||||
succeeded,
|
||||
router,
|
||||
rank,
|
||||
self_destination,
|
||||
routing_table,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// API for subkernel
|
||||
pub fn upload_traces(
|
||||
&mut self,
|
||||
id: u32,
|
||||
router: &mut Router,
|
||||
rank: u8,
|
||||
self_destination: u8,
|
||||
routing_table: &RoutingTable,
|
||||
) -> Result<usize, Error> {
|
||||
let remote_traces = self.remote_entries.get_mut(&id);
|
||||
let mut len = 0;
|
||||
if let Some(traces) = remote_traces {
|
||||
len = traces.upload_traces(id, router, rank, self_destination, routing_table);
|
||||
}
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
// API for subkernel
|
||||
pub fn playback_remote(
|
||||
&mut self,
|
||||
id: u32,
|
||||
timestamp: u64,
|
||||
router: &mut Router,
|
||||
rank: u8,
|
||||
self_destination: u8,
|
||||
routing_table: &RoutingTable,
|
||||
) -> Result<(), Error> {
|
||||
if let Some(traces) = self.remote_entries.get_mut(&id) {
|
||||
traces.playback(id, timestamp, router, rank, self_destination, routing_table);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::IdNotFound)
|
||||
}
|
||||
}
|
||||
|
||||
// API for subkernel
|
||||
pub fn cleanup(&mut self, router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) {
|
||||
// after subkernel ends, remove all self-generated traces
|
||||
for (_, id) in self.name_map.iter_mut() {
|
||||
if let Some(traces) = self.remote_entries.get_mut(&id) {
|
||||
traces.erase(*id, router, rank, self_destination, routing_table);
|
||||
self.remote_entries.remove(&id);
|
||||
}
|
||||
self.entries.remove(&(self_destination, *id));
|
||||
}
|
||||
self.name_map.clear();
|
||||
}
|
||||
|
||||
// API for subkernel
|
||||
pub fn retrieve(&self, self_destination: u8, name: &String) -> Option<(i32, i64, bool)> {
|
||||
let id = self.name_map.get(name)?;
|
||||
let duration = self.entries.get(&(self_destination, *id))?.duration;
|
||||
let uses_ddma = self.has_remote_traces(*id);
|
||||
Some((*id as i32, duration, uses_ddma))
|
||||
}
|
||||
|
||||
pub fn has_remote_traces(&self, id: u32) -> bool {
|
||||
match self.remote_entries.get(&id) {
|
||||
Some(traces) => traces.has_remote_traces(),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn put_record(&mut self, mut recorder: DmaRecorder, self_destination: u8) -> Result<u32, Error> {
|
||||
let mut remote_traces: BTreeMap<u8, Sliceable> = BTreeMap::new();
|
||||
|
||||
let mut local_trace: Vec<u8> = Vec::new();
|
||||
// analyze each entry and put in proper buckets, as the kernel core
|
||||
// sends whole chunks, to limit comms/kernel CPU communication,
|
||||
// and as only comms core has access to varios DMA buffers.
|
||||
let mut ptr = 0;
|
||||
recorder.buffer.push(0);
|
||||
while recorder.buffer[ptr] != 0 {
|
||||
// ptr + 3 = tgt >> 24 (destination)
|
||||
let len = recorder.buffer[ptr] as usize;
|
||||
let destination = recorder.buffer[ptr + 3];
|
||||
if destination == 0 {
|
||||
return Err(Error::MasterDmaFound);
|
||||
} else if destination == self_destination {
|
||||
local_trace.extend(&recorder.buffer[ptr..ptr + len]);
|
||||
} else {
|
||||
if let Some(remote_trace) = remote_traces.get_mut(&destination) {
|
||||
remote_trace.extend(&recorder.buffer[ptr..ptr + len]);
|
||||
} else {
|
||||
remote_traces.insert(
|
||||
destination,
|
||||
Sliceable::new(destination, recorder.buffer[ptr..ptr + len].to_vec()),
|
||||
);
|
||||
}
|
||||
}
|
||||
// and jump to the next event
|
||||
ptr += len;
|
||||
}
|
||||
let local_entry = Entry::from_vec(local_trace, recorder.duration);
|
||||
|
||||
let id = local_entry.id();
|
||||
self.entries.insert((self_destination, id), local_entry);
|
||||
self.remote_entries.insert(id, RemoteTraces::new(remote_traces));
|
||||
let mut name = String::new();
|
||||
mem::swap(&mut recorder.name, &mut name);
|
||||
self.name_map.insert(name, id);
|
||||
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
pub fn playback(&mut self, source: u8, id: u32, timestamp: u64) -> Result<(), Error> {
|
||||
if self.state != ManagerState::Idle {
|
||||
return Err(Error::PlaybackInProgress);
|
||||
|
@ -483,7 +483,9 @@ fn process_aux_packet(
|
||||
let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok();
|
||||
router.send(
|
||||
drtioaux::Packet::DmaAddTraceReply {
|
||||
source: *self_destination,
|
||||
destination: source,
|
||||
id: id,
|
||||
succeeded: succeeded,
|
||||
},
|
||||
_routing_table,
|
||||
@ -491,6 +493,25 @@ fn process_aux_packet(
|
||||
*self_destination,
|
||||
)
|
||||
}
|
||||
drtioaux::Packet::DmaAddTraceReply {
|
||||
source,
|
||||
destination: _destination,
|
||||
id,
|
||||
succeeded,
|
||||
} => {
|
||||
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||
dma_manager.ack_upload(
|
||||
kernel_manager,
|
||||
source,
|
||||
id,
|
||||
succeeded,
|
||||
router,
|
||||
*rank,
|
||||
*self_destination,
|
||||
_routing_table,
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
drtioaux::Packet::DmaRemoveTraceRequest {
|
||||
source,
|
||||
destination: _destination,
|
||||
@ -508,6 +529,13 @@ fn process_aux_packet(
|
||||
*self_destination,
|
||||
)
|
||||
}
|
||||
drtioaux::Packet::DmaRemoveTraceReply {
|
||||
destination: _destination,
|
||||
succeeded: _,
|
||||
} => {
|
||||
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||
Ok(())
|
||||
}
|
||||
drtioaux::Packet::DmaPlaybackRequest {
|
||||
source,
|
||||
destination: _destination,
|
||||
@ -530,6 +558,28 @@ fn process_aux_packet(
|
||||
*self_destination,
|
||||
)
|
||||
}
|
||||
drtioaux::Packet::DmaPlaybackReply {
|
||||
destination: _destination,
|
||||
succeeded,
|
||||
} => {
|
||||
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||
if !succeeded {
|
||||
kernel_manager.ddma_nack();
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
drtioaux::Packet::DmaPlaybackStatus {
|
||||
source: _,
|
||||
destination: _destination,
|
||||
id,
|
||||
error,
|
||||
channel,
|
||||
timestamp,
|
||||
} => {
|
||||
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
|
||||
dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
drtioaux::Packet::SubkernelAddDataRequest {
|
||||
destination,
|
||||
@ -649,8 +699,8 @@ fn process_aux_packet(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
_ => {
|
||||
warn!("received unexpected aux packet");
|
||||
p => {
|
||||
warn!("received unexpected aux packet: {:?}", p);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -949,8 +999,16 @@ pub extern "C" fn main_core0() -> i32 {
|
||||
);
|
||||
}
|
||||
|
||||
kernel_manager.process_kern_requests(&mut router, &routing_table, rank, destination, &timer);
|
||||
kernel_manager.process_kern_requests(
|
||||
&mut router,
|
||||
&routing_table,
|
||||
rank,
|
||||
destination,
|
||||
&mut dma_manager,
|
||||
&timer,
|
||||
);
|
||||
|
||||
#[cfg(has_drtio_routing)]
|
||||
if let Some((repno, packet)) = router.get_downstream_packet() {
|
||||
if let Err(e) = repeaters[repno].aux_send(&packet) {
|
||||
warn!("[REP#{}] Error when sending packet to satellite ({:?})", repno, e)
|
||||
|
@ -347,7 +347,15 @@ impl Repeater {
|
||||
Repeater::default()
|
||||
}
|
||||
|
||||
pub fn service(&self, _routing_table: &drtio_routing::RoutingTable, _rank: u8, _timer: &mut GlobalTimer) {}
|
||||
pub fn service(
|
||||
&self,
|
||||
_routing_table: &drtio_routing::RoutingTable,
|
||||
_rank: u8,
|
||||
_destination: u8,
|
||||
_router: &mut Router,
|
||||
_timer: &mut GlobalTimer,
|
||||
) {
|
||||
}
|
||||
|
||||
pub fn sync_tsc(&self, _timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> {
|
||||
Ok(())
|
||||
|
@ -1,6 +1,65 @@
|
||||
use alloc::collections::vec_deque::VecDeque;
|
||||
use alloc::{collections::vec_deque::VecDeque, vec::Vec};
|
||||
use core::cmp::min;
|
||||
|
||||
use libboard_artiq::{drtio_routing, drtioaux, pl::csr};
|
||||
#[cfg(has_drtio_routing)]
|
||||
use libboard_artiq::pl::csr;
|
||||
use libboard_artiq::{drtio_routing, drtioaux,
|
||||
drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}};
|
||||
|
||||
pub struct SliceMeta {
|
||||
pub destination: u8,
|
||||
pub len: u16,
|
||||
pub status: PayloadStatus,
|
||||
}
|
||||
|
||||
/* represents data that has to be sent to Master */
|
||||
#[derive(Debug)]
|
||||
pub struct Sliceable {
|
||||
it: usize,
|
||||
data: Vec<u8>,
|
||||
destination: u8,
|
||||
}
|
||||
|
||||
macro_rules! get_slice_fn {
|
||||
($name:tt, $size:expr) => {
|
||||
pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta {
|
||||
let first = self.it == 0;
|
||||
let len = min($size, self.data.len() - self.it);
|
||||
let last = self.it + len == self.data.len();
|
||||
let status = PayloadStatus::from_status(first, last);
|
||||
|
||||
data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]);
|
||||
self.it += len;
|
||||
|
||||
SliceMeta {
|
||||
destination: self.destination,
|
||||
len: len as u16,
|
||||
status: status,
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl Sliceable {
|
||||
pub fn new(destination: u8, data: Vec<u8>) -> Sliceable {
|
||||
Sliceable {
|
||||
it: 0,
|
||||
data: data,
|
||||
destination: destination,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn at_end(&self) -> bool {
|
||||
self.it == self.data.len()
|
||||
}
|
||||
|
||||
pub fn extend(&mut self, data: &[u8]) {
|
||||
self.data.extend(data);
|
||||
}
|
||||
|
||||
get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE);
|
||||
get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE);
|
||||
}
|
||||
|
||||
// Packets from downstream (further satellites) are received and routed appropriately.
|
||||
// they're passed as soon as possible downstream (within the subtree), or sent upstream,
|
||||
@ -14,6 +73,7 @@ use libboard_artiq::{drtio_routing, drtioaux, pl::csr};
|
||||
pub struct Router {
|
||||
upstream_queue: VecDeque<drtioaux::Packet>,
|
||||
local_queue: VecDeque<drtioaux::Packet>,
|
||||
#[cfg(has_drtio_routing)]
|
||||
downstream_queue: VecDeque<(usize, drtioaux::Packet)>,
|
||||
upstream_notified: bool,
|
||||
}
|
||||
@ -23,26 +83,27 @@ impl Router {
|
||||
Router {
|
||||
upstream_queue: VecDeque::new(),
|
||||
local_queue: VecDeque::new(),
|
||||
#[cfg(has_drtio_routing)]
|
||||
downstream_queue: VecDeque::new(),
|
||||
upstream_notified: false,
|
||||
}
|
||||
}
|
||||
|
||||
// called by local sources (DDMA, kernel) and by repeaters on receiving async data
|
||||
// Called by local sources (DDMA, kernel) and by repeaters on receiving async data;
|
||||
// messages are always buffered for both upstream and downstream
|
||||
pub fn route(
|
||||
&mut self,
|
||||
packet: drtioaux::Packet,
|
||||
_routing_table: &drtio_routing::RoutingTable,
|
||||
_rank: u8,
|
||||
_self_destination: u8,
|
||||
self_destination: u8,
|
||||
) {
|
||||
let destination = packet.routable_destination();
|
||||
#[cfg(has_drtio_routing)]
|
||||
{
|
||||
let destination = packet.routable_destination();
|
||||
if let Some(destination) = destination {
|
||||
let hop = _routing_table.0[destination as usize][_rank as usize] as usize;
|
||||
if destination == _self_destination {
|
||||
if destination == self_destination {
|
||||
self.local_queue.push_back(packet);
|
||||
} else if hop > 0 && hop < csr::DRTIOREP.len() {
|
||||
let repno = (hop - 1) as usize;
|
||||
@ -56,11 +117,15 @@ impl Router {
|
||||
}
|
||||
#[cfg(not(has_drtio_routing))]
|
||||
{
|
||||
self.upstream_queue.push_back(packet);
|
||||
if destination == Some(self_destination) {
|
||||
self.local_queue.push_back(packet);
|
||||
} else {
|
||||
self.upstream_queue.push_back(packet);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sends a packet to a required destination, routing if it's necessary
|
||||
// Sends a packet to a required destination, routing if necessary
|
||||
pub fn send(
|
||||
&mut self,
|
||||
packet: drtioaux::Packet,
|
||||
@ -114,6 +179,7 @@ impl Router {
|
||||
packet
|
||||
}
|
||||
|
||||
#[cfg(has_drtio_routing)]
|
||||
pub fn get_downstream_packet(&mut self) -> Option<(usize, drtioaux::Packet)> {
|
||||
self.downstream_queue.pop_front()
|
||||
}
|
||||
|
@ -2,10 +2,11 @@ use alloc::{collections::{BTreeMap, VecDeque},
|
||||
format,
|
||||
string::{String, ToString},
|
||||
vec::Vec};
|
||||
use core::{cmp::min, option::NoneError, slice, str};
|
||||
use core::{option::NoneError, slice, str};
|
||||
|
||||
use core_io::{Error as IoError, Write};
|
||||
use cslice::AsCSlice;
|
||||
use dma::{Error as DmaError, Manager as DmaManager};
|
||||
use io::{Cursor, ProtoWrite};
|
||||
use ksupport::{eh_artiq, kernel, rpc};
|
||||
use libboard_artiq::{drtio_routing::RoutingTable,
|
||||
@ -15,7 +16,7 @@ use libboard_artiq::{drtio_routing::RoutingTable,
|
||||
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
|
||||
use libcortex_a9::sync_channel::Receiver;
|
||||
use log::warn;
|
||||
use routing::Router;
|
||||
use routing::{Router, SliceMeta, Sliceable};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
enum KernelState {
|
||||
@ -25,7 +26,23 @@ enum KernelState {
|
||||
MsgAwait(Milliseconds, Vec<u8>),
|
||||
MsgSending,
|
||||
SubkernelAwaitLoad,
|
||||
SubkernelAwaitFinish { max_time: Milliseconds, id: u32 },
|
||||
SubkernelAwaitFinish {
|
||||
max_time: Milliseconds,
|
||||
id: u32,
|
||||
},
|
||||
DmaUploading,
|
||||
DmaPendingPlayback {
|
||||
id: u32,
|
||||
timestamp: u64,
|
||||
},
|
||||
DmaPendingAwait {
|
||||
id: u32,
|
||||
timestamp: u64,
|
||||
max_time: Milliseconds,
|
||||
},
|
||||
DmaAwait {
|
||||
max_time: Milliseconds,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -38,6 +55,7 @@ pub enum Error {
|
||||
SubkernelIoError,
|
||||
DrtioError,
|
||||
KernelException(Sliceable),
|
||||
DmaError(DmaError),
|
||||
}
|
||||
|
||||
impl From<NoneError> for Error {
|
||||
@ -52,6 +70,12 @@ impl From<IoError> for Error {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<DmaError> for Error {
|
||||
fn from(value: DmaError) -> Error {
|
||||
Error::DmaError(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<()> for Error {
|
||||
fn from(_: ()) -> Error {
|
||||
Error::NoMessage
|
||||
@ -68,14 +92,6 @@ macro_rules! unexpected {
|
||||
($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*))));
|
||||
}
|
||||
|
||||
/* represents data that has to be sent to Master */
|
||||
#[derive(Debug)]
|
||||
pub struct Sliceable {
|
||||
it: usize,
|
||||
data: Vec<u8>,
|
||||
destination: u8,
|
||||
}
|
||||
|
||||
/* represents interkernel messages */
|
||||
struct Message {
|
||||
count: u8,
|
||||
@ -123,11 +139,7 @@ impl Session {
|
||||
fn running(&self) -> bool {
|
||||
match self.kernel_state {
|
||||
KernelState::Absent | KernelState::Loaded => false,
|
||||
KernelState::Running
|
||||
| KernelState::MsgAwait { .. }
|
||||
| KernelState::MsgSending
|
||||
| KernelState::SubkernelAwaitLoad
|
||||
| KernelState::SubkernelAwaitFinish { .. } => true,
|
||||
_ => true,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -153,45 +165,6 @@ pub struct SubkernelFinished {
|
||||
pub source: u8,
|
||||
}
|
||||
|
||||
pub struct SliceMeta {
|
||||
pub destination: u8,
|
||||
pub len: u16,
|
||||
pub status: PayloadStatus,
|
||||
}
|
||||
|
||||
macro_rules! get_slice_fn {
|
||||
($name:tt, $size:expr) => {
|
||||
pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta {
|
||||
let first = self.it == 0;
|
||||
let len = min($size, self.data.len() - self.it);
|
||||
let last = self.it + len == self.data.len();
|
||||
let status = PayloadStatus::from_status(first, last);
|
||||
|
||||
data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]);
|
||||
self.it += len;
|
||||
|
||||
SliceMeta {
|
||||
destination: self.destination,
|
||||
len: len as u16,
|
||||
status: status,
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
impl Sliceable {
|
||||
pub fn new(destination: u8, data: Vec<u8>) -> Sliceable {
|
||||
Sliceable {
|
||||
it: 0,
|
||||
data: data,
|
||||
destination: destination,
|
||||
}
|
||||
}
|
||||
|
||||
get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE);
|
||||
get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE);
|
||||
}
|
||||
|
||||
impl MessageManager {
|
||||
pub fn new() -> MessageManager {
|
||||
MessageManager {
|
||||
@ -355,7 +328,6 @@ impl<'a> Manager<'_> {
|
||||
}
|
||||
|
||||
pub fn run(&mut self, source: u8, id: u32) -> Result<(), Error> {
|
||||
info!("starting subkernel #{}", id);
|
||||
if self.session.kernel_state != KernelState::Loaded || self.session.id != id {
|
||||
self.load(id)?;
|
||||
}
|
||||
@ -466,12 +438,66 @@ impl<'a> Manager<'_> {
|
||||
self.kernel_stop();
|
||||
}
|
||||
|
||||
pub fn ddma_finished(&mut self, error: u8, channel: u32, timestamp: u64) {
|
||||
if let KernelState::DmaAwait { .. } = self.session.kernel_state {
|
||||
self.control.tx.send(kernel::Message::DmaAwaitRemoteReply {
|
||||
timeout: false,
|
||||
error: error,
|
||||
channel: channel,
|
||||
timestamp: timestamp,
|
||||
});
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ddma_nack(&mut self) {
|
||||
// for simplicity treat it as a timeout...
|
||||
if let KernelState::DmaAwait { .. } = self.session.kernel_state {
|
||||
self.control.tx.send(kernel::Message::DmaAwaitRemoteReply {
|
||||
timeout: true,
|
||||
error: 0,
|
||||
channel: 0,
|
||||
timestamp: 0,
|
||||
});
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ddma_remote_uploaded(&mut self, succeeded: bool) -> Option<(u32, u64)> {
|
||||
// returns a tuple of id, timestamp in case a playback needs to be started immediately
|
||||
if !succeeded {
|
||||
self.kernel_stop();
|
||||
self.runtime_exception(Error::DmaError(DmaError::UploadFail));
|
||||
}
|
||||
let res = match self.session.kernel_state {
|
||||
KernelState::DmaPendingPlayback { id, timestamp } => {
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
Some((id, timestamp))
|
||||
}
|
||||
KernelState::DmaPendingAwait {
|
||||
id,
|
||||
timestamp,
|
||||
max_time,
|
||||
} => {
|
||||
self.session.kernel_state = KernelState::DmaAwait { max_time: max_time };
|
||||
Some((id, timestamp))
|
||||
}
|
||||
KernelState::DmaUploading => {
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
None
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
res
|
||||
}
|
||||
|
||||
pub fn process_kern_requests(
|
||||
&mut self,
|
||||
router: &mut Router,
|
||||
routing_table: &RoutingTable,
|
||||
rank: u8,
|
||||
destination: u8,
|
||||
dma_manager: &mut DmaManager,
|
||||
timer: &GlobalTimer,
|
||||
) {
|
||||
if let Some(subkernel_finished) = self.last_finished.take() {
|
||||
@ -520,7 +546,7 @@ impl<'a> Manager<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
match self.process_kern_message(router, routing_table, rank, destination, timer) {
|
||||
match self.process_kern_message(router, routing_table, rank, destination, dma_manager, timer) {
|
||||
Ok(true) => {
|
||||
self.last_finished = Some(SubkernelFinished {
|
||||
id: self.session.id,
|
||||
@ -583,12 +609,14 @@ impl<'a> Manager<'_> {
|
||||
routing_table: &RoutingTable,
|
||||
rank: u8,
|
||||
self_destination: u8,
|
||||
dma_manager: &mut DmaManager,
|
||||
timer: &GlobalTimer,
|
||||
) -> Result<bool, Error> {
|
||||
let reply = self.control.rx.try_recv()?;
|
||||
match reply {
|
||||
kernel::Message::KernelFinished(_async_errors) => {
|
||||
self.kernel_stop();
|
||||
dma_manager.cleanup(router, rank, self_destination, routing_table);
|
||||
return Ok(true);
|
||||
}
|
||||
kernel::Message::KernelException(exceptions, stack_pointers, backtrace, async_errors) => {
|
||||
@ -615,6 +643,53 @@ impl<'a> Manager<'_> {
|
||||
let value = self.cache.get(&key).unwrap_or(&DEFAULT).clone();
|
||||
self.control.tx.send(kernel::Message::CacheGetReply(value));
|
||||
}
|
||||
|
||||
kernel::Message::DmaPutRequest(recorder) => {
|
||||
// ddma is always used on satellites
|
||||
if let Ok(id) = dma_manager.put_record(recorder, self_destination) {
|
||||
dma_manager.upload_traces(id, router, rank, self_destination, routing_table)?;
|
||||
self.session.kernel_state = KernelState::DmaUploading;
|
||||
} else {
|
||||
unexpected!("DMAError: found an unsupported call to RTIO devices on master")
|
||||
}
|
||||
}
|
||||
kernel::Message::DmaEraseRequest(name) => {
|
||||
dma_manager.erase_name(&name, router, rank, self_destination, routing_table);
|
||||
}
|
||||
kernel::Message::DmaGetRequest(name) => {
|
||||
let dma_meta = dma_manager.retrieve(self_destination, &name);
|
||||
self.control.tx.send(kernel::Message::DmaGetReply(dma_meta));
|
||||
}
|
||||
kernel::Message::DmaStartRemoteRequest { id, timestamp } => {
|
||||
if self.session.kernel_state != KernelState::DmaUploading {
|
||||
dma_manager.playback_remote(
|
||||
id as u32,
|
||||
timestamp as u64,
|
||||
router,
|
||||
rank,
|
||||
self_destination,
|
||||
routing_table,
|
||||
)?;
|
||||
} else {
|
||||
self.session.kernel_state = KernelState::DmaPendingPlayback {
|
||||
id: id as u32,
|
||||
timestamp: timestamp as u64,
|
||||
};
|
||||
}
|
||||
}
|
||||
kernel::Message::DmaAwaitRemoteRequest(_id) => {
|
||||
let max_time = timer.get_time() + Milliseconds(10000);
|
||||
self.session.kernel_state = match self.session.kernel_state {
|
||||
// if we are still waiting for the traces to be uploaded, extend the state by timeout
|
||||
KernelState::DmaPendingPlayback { id, timestamp } => KernelState::DmaPendingAwait {
|
||||
id: id,
|
||||
timestamp: timestamp,
|
||||
max_time: max_time,
|
||||
},
|
||||
_ => KernelState::DmaAwait { max_time: max_time },
|
||||
};
|
||||
}
|
||||
|
||||
kernel::Message::SubkernelMsgSend {
|
||||
id: _id,
|
||||
destination: msg_dest,
|
||||
@ -728,6 +803,18 @@ impl<'a> Manager<'_> {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => {
|
||||
if timer.get_time() > *max_time {
|
||||
self.control.tx.send(kernel::Message::DmaAwaitRemoteReply {
|
||||
timeout: true,
|
||||
error: 0,
|
||||
channel: 0,
|
||||
timestamp: 0,
|
||||
});
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
_ => Ok(()),
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user