From 3d6350ca753c12a1a0fab76e410f948a4eafb2ff Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 24 Mar 2023 13:29:10 +0800 Subject: [PATCH] rtio_dma: add ddma related module --- src/runtime/src/rtio_dma.rs | 286 +++++++++++++++++++++++++++++++++++- 1 file changed, 284 insertions(+), 2 deletions(-) diff --git a/src/runtime/src/rtio_dma.rs b/src/runtime/src/rtio_dma.rs index 8555a7e..1bf125f 100644 --- a/src/runtime/src/rtio_dma.rs +++ b/src/runtime/src/rtio_dma.rs @@ -1,14 +1,298 @@ use alloc::{collections::BTreeMap, string::String, vec::Vec}; use libcortex_a9::{mutex::Mutex, cache::dcci_slice}; +use libboard_zynq::timer::GlobalTimer; use crate::kernel::DmaRecorder; const ALIGNMENT: usize = 16 * 8; static DMA_RECORD_STORE: Mutex, i64)>> = Mutex::new(BTreeMap::new()); +#[cfg(has_drtio)] +pub mod remote_dma { + use super::*; + use alloc::rc::Rc; + use libasync::task; + use libboard_zynq::time::Milliseconds; + use libboard_artiq::drtio_routing::RoutingTable; + use log::error; + use crate::rtio_mgt::drtio; + + #[derive(Debug, PartialEq, Clone)] + pub enum RemoteState { + NotLoaded, + Loaded, + PlaybackEnded { error: u8, channel: u32, timestamp: u64 } + } + #[derive(Debug, Clone)] + struct RemoteTrace { + trace: Vec, + pub state: RemoteState + } + + impl From> for RemoteTrace { + fn from(trace: Vec) -> Self { + RemoteTrace { + trace: trace, + state: RemoteState::NotLoaded + } + } + } + + impl RemoteTrace { + pub fn get_trace(&self) -> &Vec { + &self.trace + } + } + + // represents all traces for a given ID + struct TraceSet { + id: u32, + done_count: Mutex, + traces: Mutex> + } + + impl TraceSet { + pub fn new(id: u32, traces: BTreeMap>) -> TraceSet { + let mut trace_map: BTreeMap = BTreeMap::new(); + for (destination, trace) in traces { + trace_map.insert(destination, trace.into()); + } + TraceSet { + id: id, + done_count: Mutex::new(0), + traces: Mutex::new(trace_map) + } + } + + pub async fn await_done( + &self, + timeout: Option, + timer: GlobalTimer + ) -> Result { + let timeout_ms = Milliseconds(timeout.unwrap_or(10_000)); + let limit = timer.get_time() + timeout_ms; + while (timer.get_time() < limit) & + (*(self.done_count.async_lock().await) < self.traces.async_lock().await.len()) { + task::r#yield().await; + } + if timer.get_time() >= limit { + error!("Remote DMA await done timed out"); + return Err("Timed out waiting for results."); + } + let mut playback_state: RemoteState = RemoteState::PlaybackEnded { error: 0, channel: 0, timestamp: 0 }; + let mut lock = self.traces.async_lock().await; + let trace_iter = lock.iter_mut(); + for (_dest, trace) in trace_iter { + match trace.state { + RemoteState::PlaybackEnded { + error: e, + channel: _c, + timestamp: _ts + } => if e != 0 { playback_state = trace.state.clone(); }, + _ => (), + } + trace.state = RemoteState::Loaded; + } + Ok(playback_state) + } + + pub async fn upload_traces( + &mut self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer + ) { + let mut lock = self.traces.async_lock().await; + let trace_iter = lock.iter_mut(); + for (destination, trace) in trace_iter { + match drtio::ddma_upload_trace( + aux_mutex, + routing_table, + timer, + self.id, + *destination, + trace.get_trace() + ).await { + Ok(_) => trace.state = RemoteState::Loaded, + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } + *(self.done_count.async_lock().await) = 0; + } + + pub async fn erase( + &mut self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer + ) { + let lock = self.traces.async_lock().await; + let trace_iter = lock.keys(); + for destination in trace_iter { + match drtio::ddma_send_erase( + aux_mutex, + routing_table, + timer, + self.id, + *destination + ).await { + Ok(_) => (), + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } + } + + pub async fn playback_done(&mut self, destination: u8, error: u8, channel: u32, timestamp: u64) { + let mut traces_locked = self.traces.async_lock().await; + let mut trace = traces_locked.get_mut(&destination).unwrap(); + trace.state = RemoteState::PlaybackEnded { + error: error, + channel: channel, + timestamp: timestamp + }; + *(self.done_count.async_lock().await) += 1; + } + + pub async fn playback( + &self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + timestamp: u64 + ) { + let mut dest_list: Vec = Vec::new(); + { + let lock = self.traces.async_lock().await; + let trace_iter = lock.iter(); + for (dest, trace) in trace_iter { + if trace.state != RemoteState::Loaded { + error!("Destination {} not ready for DMA, state: {:?}", dest, trace.state); + continue; + } + dest_list.push(dest.clone()); + } + } + // mutex lock must be dropped before sending a playback request to avoid a deadlock, + // if PlaybackStatus is sent from another satellite and the state must be updated. + for destination in dest_list { + match drtio::ddma_send_playback( + aux_mutex, + routing_table, + timer, + self.id, + destination, + timestamp + ).await { + Ok(_) => (), + Err(e) => error!("Error during remote DMA playback: {}", e) + } + } + } + + pub async fn destination_changed( + &mut self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + destination: u8, up: bool) { + // update state of the destination, resend traces if it's up + if let Some(trace) = self.traces.lock().get_mut(&destination) { + if up { + match drtio::ddma_upload_trace( + aux_mutex, + routing_table, + timer, + self.id, + destination, + trace.get_trace() + ).await { + Ok(_) => trace.state = RemoteState::Loaded, + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } else { + trace.state = RemoteState::NotLoaded; + } + } + } + } + + static mut TRACES: BTreeMap = BTreeMap::new(); + + pub fn add_traces(id: u32, traces: BTreeMap>) { + unsafe { TRACES.insert(id, TraceSet::new(id, traces)) }; + } + + pub async fn await_done( + id: u32, + timeout: Option, + timer: GlobalTimer + ) -> Result { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.await_done(timeout, timer).await + } + + pub fn erase( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32 + ) { + let mut trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + task::block_on(trace_set.erase(aux_mutex, routing_table, timer)); + unsafe { TRACES.remove(&id); } + } + + pub async fn upload_traces( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32 + ) { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.upload_traces(aux_mutex, routing_table, timer).await; + } + + pub async fn playback( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32, + timestamp: u64 + ) { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.playback(aux_mutex, routing_table, timer, timestamp).await; + } + + pub async fn playback_done( + id: u32, + destination: u8, + error: u8, + channel: u32, + timestamp: u64 + ) { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.playback_done(destination, error, channel, timestamp).await; + } + + pub async fn destination_changed( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + destination: u8, + up: bool + ) { + let trace_iter = unsafe { TRACES.values_mut() }; + for trace_set in trace_iter { + trace_set.destination_changed(aux_mutex, routing_table, timer, destination, up).await; + } + } +} + pub fn put_record(mut recorder: DmaRecorder) { + // trailing zero to indicate end of buffer + recorder.buffer.push(0); recorder.buffer.reserve(ALIGNMENT - 1); let original_length = recorder.buffer.len(); let padding = ALIGNMENT - recorder.buffer.as_ptr() as usize % ALIGNMENT; @@ -16,8 +300,6 @@ pub fn put_record(mut recorder: DmaRecorder) { for _ in 0..padding { recorder.buffer.push(0); } - // trailing zero to indicate end of buffer - recorder.buffer.push(0); recorder.buffer.copy_within(0..original_length, padding); dcci_slice(&recorder.buffer);