rtio_dma: add ddma related module

pull/225/head
mwojcik 2023-03-24 13:29:10 +08:00
parent f1ef0219c3
commit 3d6350ca75
1 changed files with 284 additions and 2 deletions

View File

@ -1,14 +1,298 @@
use alloc::{collections::BTreeMap, string::String, vec::Vec};
use libcortex_a9::{mutex::Mutex, cache::dcci_slice};
use libboard_zynq::timer::GlobalTimer;
use crate::kernel::DmaRecorder;
const ALIGNMENT: usize = 16 * 8;
static DMA_RECORD_STORE: Mutex<BTreeMap<String, (u32, Vec<u8>, i64)>> = Mutex::new(BTreeMap::new());
#[cfg(has_drtio)]
pub mod remote_dma {
use super::*;
use alloc::rc::Rc;
use libasync::task;
use libboard_zynq::time::Milliseconds;
use libboard_artiq::drtio_routing::RoutingTable;
use log::error;
use crate::rtio_mgt::drtio;
#[derive(Debug, PartialEq, Clone)]
pub enum RemoteState {
NotLoaded,
Loaded,
PlaybackEnded { error: u8, channel: u32, timestamp: u64 }
}
#[derive(Debug, Clone)]
struct RemoteTrace {
trace: Vec<u8>,
pub state: RemoteState
}
impl From<Vec<u8>> for RemoteTrace {
fn from(trace: Vec<u8>) -> Self {
RemoteTrace {
trace: trace,
state: RemoteState::NotLoaded
}
}
}
impl RemoteTrace {
pub fn get_trace(&self) -> &Vec<u8> {
&self.trace
}
}
// represents all traces for a given ID
struct TraceSet {
id: u32,
done_count: Mutex<usize>,
traces: Mutex<BTreeMap<u8, RemoteTrace>>
}
impl TraceSet {
pub fn new(id: u32, traces: BTreeMap<u8, Vec<u8>>) -> TraceSet {
let mut trace_map: BTreeMap<u8, RemoteTrace> = BTreeMap::new();
for (destination, trace) in traces {
trace_map.insert(destination, trace.into());
}
TraceSet {
id: id,
done_count: Mutex::new(0),
traces: Mutex::new(trace_map)
}
}
pub async fn await_done(
&self,
timeout: Option<u64>,
timer: GlobalTimer
) -> Result<RemoteState, &'static str> {
let timeout_ms = Milliseconds(timeout.unwrap_or(10_000));
let limit = timer.get_time() + timeout_ms;
while (timer.get_time() < limit) &
(*(self.done_count.async_lock().await) < self.traces.async_lock().await.len()) {
task::r#yield().await;
}
if timer.get_time() >= limit {
error!("Remote DMA await done timed out");
return Err("Timed out waiting for results.");
}
let mut playback_state: RemoteState = RemoteState::PlaybackEnded { error: 0, channel: 0, timestamp: 0 };
let mut lock = self.traces.async_lock().await;
let trace_iter = lock.iter_mut();
for (_dest, trace) in trace_iter {
match trace.state {
RemoteState::PlaybackEnded {
error: e,
channel: _c,
timestamp: _ts
} => if e != 0 { playback_state = trace.state.clone(); },
_ => (),
}
trace.state = RemoteState::Loaded;
}
Ok(playback_state)
}
pub async fn upload_traces(
&mut self,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer
) {
let mut lock = self.traces.async_lock().await;
let trace_iter = lock.iter_mut();
for (destination, trace) in trace_iter {
match drtio::ddma_upload_trace(
aux_mutex,
routing_table,
timer,
self.id,
*destination,
trace.get_trace()
).await {
Ok(_) => trace.state = RemoteState::Loaded,
Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e)
}
}
*(self.done_count.async_lock().await) = 0;
}
pub async fn erase(
&mut self,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer
) {
let lock = self.traces.async_lock().await;
let trace_iter = lock.keys();
for destination in trace_iter {
match drtio::ddma_send_erase(
aux_mutex,
routing_table,
timer,
self.id,
*destination
).await {
Ok(_) => (),
Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e)
}
}
}
pub async fn playback_done(&mut self, destination: u8, error: u8, channel: u32, timestamp: u64) {
let mut traces_locked = self.traces.async_lock().await;
let mut trace = traces_locked.get_mut(&destination).unwrap();
trace.state = RemoteState::PlaybackEnded {
error: error,
channel: channel,
timestamp: timestamp
};
*(self.done_count.async_lock().await) += 1;
}
pub async fn playback(
&self,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
timestamp: u64
) {
let mut dest_list: Vec<u8> = Vec::new();
{
let lock = self.traces.async_lock().await;
let trace_iter = lock.iter();
for (dest, trace) in trace_iter {
if trace.state != RemoteState::Loaded {
error!("Destination {} not ready for DMA, state: {:?}", dest, trace.state);
continue;
}
dest_list.push(dest.clone());
}
}
// mutex lock must be dropped before sending a playback request to avoid a deadlock,
// if PlaybackStatus is sent from another satellite and the state must be updated.
for destination in dest_list {
match drtio::ddma_send_playback(
aux_mutex,
routing_table,
timer,
self.id,
destination,
timestamp
).await {
Ok(_) => (),
Err(e) => error!("Error during remote DMA playback: {}", e)
}
}
}
pub async fn destination_changed(
&mut self,
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
destination: u8, up: bool) {
// update state of the destination, resend traces if it's up
if let Some(trace) = self.traces.lock().get_mut(&destination) {
if up {
match drtio::ddma_upload_trace(
aux_mutex,
routing_table,
timer,
self.id,
destination,
trace.get_trace()
).await {
Ok(_) => trace.state = RemoteState::Loaded,
Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e)
}
} else {
trace.state = RemoteState::NotLoaded;
}
}
}
}
static mut TRACES: BTreeMap<u32, TraceSet> = BTreeMap::new();
pub fn add_traces(id: u32, traces: BTreeMap<u8, Vec<u8>>) {
unsafe { TRACES.insert(id, TraceSet::new(id, traces)) };
}
pub async fn await_done(
id: u32,
timeout: Option<u64>,
timer: GlobalTimer
) -> Result<RemoteState, &'static str> {
let trace_set = unsafe { TRACES.get_mut(&id).unwrap() };
trace_set.await_done(timeout, timer).await
}
pub fn erase(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
id: u32
) {
let mut trace_set = unsafe { TRACES.get_mut(&id).unwrap() };
task::block_on(trace_set.erase(aux_mutex, routing_table, timer));
unsafe { TRACES.remove(&id); }
}
pub async fn upload_traces(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
id: u32
) {
let trace_set = unsafe { TRACES.get_mut(&id).unwrap() };
trace_set.upload_traces(aux_mutex, routing_table, timer).await;
}
pub async fn playback(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
id: u32,
timestamp: u64
) {
let trace_set = unsafe { TRACES.get_mut(&id).unwrap() };
trace_set.playback(aux_mutex, routing_table, timer, timestamp).await;
}
pub async fn playback_done(
id: u32,
destination: u8,
error: u8,
channel: u32,
timestamp: u64
) {
let trace_set = unsafe { TRACES.get_mut(&id).unwrap() };
trace_set.playback_done(destination, error, channel, timestamp).await;
}
pub async fn destination_changed(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
destination: u8,
up: bool
) {
let trace_iter = unsafe { TRACES.values_mut() };
for trace_set in trace_iter {
trace_set.destination_changed(aux_mutex, routing_table, timer, destination, up).await;
}
}
}
pub fn put_record(mut recorder: DmaRecorder) {
// trailing zero to indicate end of buffer
recorder.buffer.push(0);
recorder.buffer.reserve(ALIGNMENT - 1);
let original_length = recorder.buffer.len();
let padding = ALIGNMENT - recorder.buffer.as_ptr() as usize % ALIGNMENT;
@ -16,8 +300,6 @@ pub fn put_record(mut recorder: DmaRecorder) {
for _ in 0..padding {
recorder.buffer.push(0);
}
// trailing zero to indicate end of buffer
recorder.buffer.push(0);
recorder.buffer.copy_within(0..original_length, padding);
dcci_slice(&recorder.buffer);