From 90071f7620acbce5845fe013cfd42b0c2b79ba03 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Mon, 27 Mar 2023 15:47:54 +0800 Subject: [PATCH] Master: DDMA support Co-authored-by: mwojcik Co-committed-by: mwojcik --- flake.lock | 18 +- src/runtime/src/comms.rs | 62 ++++-- src/runtime/src/kernel/dma.rs | 70 ++++--- src/runtime/src/kernel/mod.rs | 16 +- src/runtime/src/main.rs | 1 + src/runtime/src/moninj.rs | 4 +- src/runtime/src/rtio_dma.rs | 373 ++++++++++++++++++++++++++++++++++ src/runtime/src/rtio_mgt.rs | 92 ++++++++- 8 files changed, 584 insertions(+), 52 deletions(-) create mode 100644 src/runtime/src/rtio_dma.rs diff --git a/flake.lock b/flake.lock index bb02bff5..31e06294 100644 --- a/flake.lock +++ b/flake.lock @@ -11,11 +11,11 @@ "src-pythonparser": "src-pythonparser" }, "locked": { - "lastModified": 1677033865, - "narHash": "sha256-9w9V+B6vMl4I2uX5k6lJc1FXhf4PeTCZXrq5Tdq2qHc=", - "ref": "refs/heads/master", - "rev": "d0437f5672b7dd5f898a73469d730bc46f058ecc", - "revCount": 8310, + "lastModified": 1679454985, + "narHash": "sha256-ObDDVR8hKh4lbT42qBdMxc+kIGh5/ORGM0YWGTyO4Sw=", + "ref": "master", + "rev": "e9a153b985d1c703338ad49bb0ac986ade29d21a", + "revCount": 8319, "type": "git", "url": "https://github.com/m-labs/artiq.git" }, @@ -84,11 +84,11 @@ "mozilla-overlay_2": { "flake": false, "locked": { - "lastModified": 1675354105, - "narHash": "sha256-ZAJGIZ7TjOCU7302lSUabNDz+rxM4If0l8/ZbE/7R5U=", + "lastModified": 1677493379, + "narHash": "sha256-A1gO8zlWLv3+tZ3cGVB1WYvvoN9pbFyv0xIJHcTsckw=", "owner": "mozilla", "repo": "nixpkgs-mozilla", - "rev": "85eb0ba7d8e5d6d4b79e5b0180aadbdd25d76404", + "rev": "78e723925daf5c9e8d0a1837ec27059e61649cb6", "type": "github" }, "original": { @@ -218,7 +218,7 @@ "locked": { "lastModified": 1669819016, "narHash": "sha256-WvNMUekL4Elc55RdqX8XP43QPnBrK8Rbd0bsoI61E5U=", - "ref": "refs/heads/master", + "ref": "master", "rev": "67dbb5932fa8ff5f143983476f741f945871d286", "revCount": 624, "type": "git", diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 1465d77c..bafe2532 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -27,7 +27,8 @@ use crate::pl; use crate::{analyzer, kernel, mgmt, moninj, proto_async::*, rpc, - rtio_mgt::{self, resolve_channel_name}}; + rtio_mgt::{self, resolve_channel_name}, + rtio_dma}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Error { @@ -79,7 +80,7 @@ enum Reply { } static CACHE_STORE: Mutex>> = Mutex::new(BTreeMap::new()); -static DMA_RECORD_STORE: Mutex, i64)>> = Mutex::new(BTreeMap::new()); + async fn write_header(stream: &TcpStream, reply: Reply) -> Result<()> { stream @@ -157,6 +158,9 @@ async fn handle_run_kernel( stream: Option<&TcpStream>, control: &Rc>, _up_destinations: &Rc>, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer ) -> Result<()> { control.borrow_mut().tx.async_send(kernel::Message::StartRequest).await; loop { @@ -319,16 +323,16 @@ async fn handle_run_kernel( .await; } kernel::Message::DmaPutRequest(recorder) => { - DMA_RECORD_STORE - .lock() - .insert(recorder.name, (recorder.buffer, recorder.duration)); + let _id = rtio_dma::put_record(aux_mutex, routing_table, timer, recorder).await; + #[cfg(has_drtio)] + rtio_dma::remote_dma::upload_traces(aux_mutex, routing_table, timer, _id).await; } kernel::Message::DmaEraseRequest(name) => { // prevent possible OOM when we have large DMA record replacement. - DMA_RECORD_STORE.lock().remove(&name); + rtio_dma::erase(name, aux_mutex, routing_table, timer).await; } kernel::Message::DmaGetRequest(name) => { - let result = DMA_RECORD_STORE.lock().get(&name).map(|v| v.clone()); + let result = rtio_dma::retrieve(name); control .borrow_mut() .tx @@ -336,6 +340,33 @@ async fn handle_run_kernel( .await; } #[cfg(has_drtio)] + kernel::Message::DmaStartRemoteRequest { id, timestamp } => { + rtio_dma::remote_dma::playback(aux_mutex, routing_table, timer, id as u32, timestamp as u64).await; + } + #[cfg(has_drtio)] + kernel::Message::DmaAwaitRemoteRequest(id) => { + let result = rtio_dma::remote_dma::await_done(id as u32, Some(10_000), timer).await; + let reply = match result { + Ok(rtio_dma::remote_dma::RemoteState::PlaybackEnded { + error, + channel, + timestamp + }) => kernel::Message::DmaAwaitRemoteReply { + timeout: false, + error: error, + channel: channel, + timestamp: timestamp + }, + _ => kernel::Message::DmaAwaitRemoteReply { + timeout: true, + error: 0, + channel: 0, + timestamp: 0 + } + }; + control.borrow_mut().tx.async_send(reply).await; + } + #[cfg(has_drtio)] kernel::Message::UpDestinationsRequest(destination) => { let result = _up_destinations.borrow()[destination as usize]; control @@ -395,6 +426,9 @@ async fn handle_connection( stream: &mut TcpStream, control: Rc>, up_destinations: &Rc>, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer ) -> Result<()> { stream.set_ack_delay(None); @@ -418,7 +452,7 @@ async fn handle_connection( load_kernel(&buffer, &control, Some(stream)).await?; } Request::RunKernel => { - handle_run_kernel(Some(stream), &control, &up_destinations).await?; + handle_run_kernel(Some(stream), &control, &up_destinations, aux_mutex, routing_table, timer).await?; } _ => { error!("unexpected request from host: {:?}", request); @@ -485,7 +519,7 @@ pub fn main(timer: GlobalTimer, cfg: Config) { rtio_mgt::startup(&aux_mutex, &drtio_routing_table, &up_destinations, timer, &cfg); analyzer::start(); - moninj::start(timer, aux_mutex, drtio_routing_table); + moninj::start(timer, &aux_mutex, &drtio_routing_table); let control: Rc> = Rc::new(RefCell::new(kernel::Control::start())); let idle_kernel = Rc::new(cfg.read("idle_kernel").ok()); @@ -493,7 +527,8 @@ pub fn main(timer: GlobalTimer, cfg: Config) { info!("Loading startup kernel..."); if let Ok(()) = task::block_on(load_kernel(&buffer, &control, None)) { info!("Starting startup kernel..."); - let _ = task::block_on(handle_run_kernel(None, &control, &up_destinations)); + let routing_table = drtio_routing_table.borrow(); + let _ = task::block_on(handle_run_kernel(None, &control, &up_destinations, &aux_mutex, &routing_table, timer)); info!("Startup kernel finished!"); } else { error!("Error loading startup kernel!"); @@ -519,13 +554,16 @@ pub fn main(timer: GlobalTimer, cfg: Config) { let connection = connection.clone(); let terminate = terminate.clone(); let up_destinations = up_destinations.clone(); + let aux_mutex = aux_mutex.clone(); + let routing_table = drtio_routing_table.clone(); // we make sure the value of terminate is 0 before we start let _ = terminate.try_wait(); task::spawn(async move { + let routing_table = routing_table.borrow(); select_biased! { _ = (async { - let _ = handle_connection(&mut stream, control.clone(), &up_destinations) + let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) .await .map_err(|e| warn!("connection terminated: {}", e)); if let Some(buffer) = &*idle_kernel { @@ -533,7 +571,7 @@ pub fn main(timer: GlobalTimer, cfg: Config) { let _ = load_kernel(&buffer, &control, None) .await.map_err(|_| warn!("error loading idle kernel")); info!("Running idle kernel"); - let _ = handle_run_kernel(None, &control, &up_destinations) + let _ = handle_run_kernel(None, &control, &up_destinations, &aux_mutex, &routing_table, timer) .await.map_err(|_| warn!("error running idle kernel")); info!("Idle kernel terminated"); } diff --git a/src/runtime/src/kernel/dma.rs b/src/runtime/src/kernel/dma.rs index d4c473af..2ff865b5 100644 --- a/src/runtime/src/kernel/dma.rs +++ b/src/runtime/src/kernel/dma.rs @@ -1,14 +1,11 @@ -use alloc::{boxed::Box, string::String, vec::Vec}; +use alloc::{string::String, vec::Vec}; use core::mem; use cslice::CSlice; -use libcortex_a9::cache::dcci_slice; use super::{Message, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0, KERNEL_IMAGE}; use crate::{artiq_raise, pl::csr, rtio}; -const ALIGNMENT: usize = 16 * 8; - #[repr(C)] pub struct DmaTrace { duration: i64, @@ -20,6 +17,7 @@ pub struct DmaRecorder { pub name: String, pub buffer: Vec, pub duration: i64, + pub enable_ddma: bool } static mut RECORDER: Option = None; @@ -53,11 +51,12 @@ pub extern "C" fn dma_record_start(name: CSlice) { name, buffer: Vec::new(), duration: 0, + enable_ddma: false }); } } -pub extern "C" fn dma_record_stop(duration: i64) { +pub extern "C" fn dma_record_stop(duration: i64, enable_ddma: bool) { unsafe { if RECORDER.is_none() { artiq_raise!("DMAError", "DMA is not recording") @@ -71,6 +70,7 @@ pub extern "C" fn dma_record_stop(duration: i64) { let mut recorder = RECORDER.take().unwrap(); recorder.duration = duration; + recorder.enable_ddma = enable_ddma; KERNEL_CHANNEL_1TO0 .as_mut() .unwrap() @@ -151,20 +151,7 @@ pub extern "C" fn dma_retrieve(name: CSlice) -> DmaTrace { } match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { Message::DmaGetReply(None) => (), - Message::DmaGetReply(Some((mut v, duration))) => { - v.reserve(ALIGNMENT - 1); - let original_length = v.len(); - let padding = ALIGNMENT - v.as_ptr() as usize % ALIGNMENT; - let padding = if padding == ALIGNMENT { 0 } else { padding }; - for _ in 0..padding { - v.push(0); - } - // trailing zero to indicate end of buffer - v.push(0); - v.copy_within(0..original_length, padding); - dcci_slice(&v); - let v = Box::new(v); - let address = Box::into_raw(v) as *mut Vec as i32; + Message::DmaGetReply(Some((address, duration))) => { return DmaTrace { address, duration }; } _ => panic!("Expected DmaGetReply after DmaGetRequest!"), @@ -175,22 +162,17 @@ pub extern "C" fn dma_retrieve(name: CSlice) -> DmaTrace { pub extern "C" fn dma_playback(timestamp: i64, ptr: i32) { unsafe { - let v = Box::from_raw(ptr as *mut Vec); - let padding = ALIGNMENT - v.as_ptr() as usize % ALIGNMENT; - let padding = if padding == ALIGNMENT { 0 } else { padding }; - let ptr = v.as_ptr().add(padding) as i32; - csr::rtio_dma::base_address_write(ptr as u32); csr::rtio_dma::time_offset_write(timestamp as u64); csr::cri_con::selected_write(1); csr::rtio_dma::enable_write(1); + #[cfg(has_drtio)] + KERNEL_CHANNEL_1TO0.as_mut().unwrap().send( + Message::DmaStartRemoteRequest{ id: ptr, timestamp: timestamp }); while csr::rtio_dma::enable_read() != 0 {} csr::cri_con::selected_write(0); - // leave the handle as we may try to do playback for another time. - mem::forget(v); - let error = csr::rtio_dma::error_read(); if error != 0 { let timestamp = csr::rtio_dma::error_timestamp_read(); @@ -215,5 +197,39 @@ pub extern "C" fn dma_playback(timestamp: i64, ptr: i32) { ); } } + #[cfg(has_drtio)] + { + KERNEL_CHANNEL_1TO0.as_mut().unwrap().send( + Message::DmaAwaitRemoteRequest(ptr)); + match KERNEL_CHANNEL_0TO1.as_mut().unwrap().recv() { + Message::DmaAwaitRemoteReply { timeout, error, channel, timestamp } => { + if timeout { + artiq_raise!( + "DMAError", + "Error running DMA on satellite device, timed out waiting for results" + ); + } + if error & 1 != 0 { + artiq_raise!( + "RTIOUnderflow", + "RTIO underflow at {1} mu, channel {rtio_channel_info:0}", + channel as i64, + timestamp as i64, + 0 + ); + } + if error & 2 != 0 { + artiq_raise!( + "RTIODestinationUnreachable", + "RTIO destination unreachable, output, at {1} mu, channel {rtio_channel_info:0}", + channel as i64, + timestamp as i64, + 0 + ); + } + } + _ => panic!("Expected DmaAwaitRemoteReply after DmaAwaitRemoteRequest!"), + } + } } } diff --git a/src/runtime/src/kernel/mod.rs b/src/runtime/src/kernel/mod.rs index 9f88297d..b72d3114 100644 --- a/src/runtime/src/kernel/mod.rs +++ b/src/runtime/src/kernel/mod.rs @@ -52,7 +52,21 @@ pub enum Message { DmaPutRequest(DmaRecorder), DmaEraseRequest(String), DmaGetRequest(String), - DmaGetReply(Option<(Vec, i64)>), + DmaGetReply(Option<(i32, i64)>), + #[cfg(has_drtio)] + DmaStartRemoteRequest { + id: i32, + timestamp: i64 + }, + #[cfg(has_drtio)] + DmaAwaitRemoteRequest(i32), + #[cfg(has_drtio)] + DmaAwaitRemoteReply { + timeout: bool, + error: u8, + channel: u32, + timestamp: u64 + }, #[cfg(has_drtio)] UpDestinationsRequest(i32), diff --git a/src/runtime/src/main.rs b/src/runtime/src/main.rs index 5eefd0b7..6bcab9ba 100644 --- a/src/runtime/src/main.rs +++ b/src/runtime/src/main.rs @@ -47,6 +47,7 @@ mod rtio; mod rtio; mod rtio_clocking; mod rtio_mgt; +mod rtio_dma; static mut SEEN_ASYNC_ERRORS: u8 = 0; diff --git a/src/runtime/src/moninj.rs b/src/runtime/src/moninj.rs index 966534ce..4dd2e426 100644 --- a/src/runtime/src/moninj.rs +++ b/src/runtime/src/moninj.rs @@ -298,7 +298,9 @@ async fn handle_connection( } } -pub fn start(timer: GlobalTimer, aux_mutex: Rc>, routing_table: Rc>) { +pub fn start(timer: GlobalTimer, aux_mutex: &Rc>, routing_table: &Rc>) { + let aux_mutex = aux_mutex.clone(); + let routing_table = routing_table.clone(); task::spawn(async move { loop { let aux_mutex = aux_mutex.clone(); diff --git a/src/runtime/src/rtio_dma.rs b/src/runtime/src/rtio_dma.rs new file mode 100644 index 00000000..164acc8a --- /dev/null +++ b/src/runtime/src/rtio_dma.rs @@ -0,0 +1,373 @@ + +use alloc::{collections::BTreeMap, string::String, vec::Vec, rc::Rc}; +use libcortex_a9::{mutex::Mutex, cache::dcci_slice}; +use libboard_zynq::timer::GlobalTimer; +use libboard_artiq::drtio_routing::RoutingTable; +#[cfg(has_drtio)] +use libasync::task; +#[cfg(has_drtio)] +use core::mem; +use crate::kernel::DmaRecorder; + +const ALIGNMENT: usize = 16 * 8; + +static DMA_RECORD_STORE: Mutex, i64)>> = Mutex::new(BTreeMap::new()); + +#[cfg(has_drtio)] +pub mod remote_dma { + use super::*; + use libboard_zynq::time::Milliseconds; + use log::error; + use crate::rtio_mgt::drtio; + + #[derive(Debug, PartialEq, Clone)] + pub enum RemoteState { + NotLoaded, + Loaded, + PlaybackEnded { error: u8, channel: u32, timestamp: u64 } + } + #[derive(Debug, Clone)] + struct RemoteTrace { + trace: Vec, + pub state: RemoteState + } + + impl From> for RemoteTrace { + fn from(trace: Vec) -> Self { + RemoteTrace { + trace: trace, + state: RemoteState::NotLoaded + } + } + } + + impl RemoteTrace { + pub fn get_trace(&self) -> &Vec { + &self.trace + } + } + + // represents all traces for a given ID + struct TraceSet { + id: u32, + done_count: Mutex, + traces: Mutex> + } + + impl TraceSet { + pub fn new(id: u32, traces: BTreeMap>) -> TraceSet { + let mut trace_map: BTreeMap = BTreeMap::new(); + for (destination, trace) in traces { + trace_map.insert(destination, trace.into()); + } + TraceSet { + id: id, + done_count: Mutex::new(0), + traces: Mutex::new(trace_map) + } + } + + pub async fn await_done( + &self, + timeout: Option, + timer: GlobalTimer + ) -> Result { + let timeout_ms = Milliseconds(timeout.unwrap_or(10_000)); + let limit = timer.get_time() + timeout_ms; + while (timer.get_time() < limit) & + (*(self.done_count.async_lock().await) < self.traces.async_lock().await.len()) { + task::r#yield().await; + } + if timer.get_time() >= limit { + error!("Remote DMA await done timed out"); + return Err("Timed out waiting for results."); + } + let mut playback_state: RemoteState = RemoteState::PlaybackEnded { error: 0, channel: 0, timestamp: 0 }; + let mut lock = self.traces.async_lock().await; + let trace_iter = lock.iter_mut(); + for (_dest, trace) in trace_iter { + match trace.state { + RemoteState::PlaybackEnded { + error: e, + channel: _c, + timestamp: _ts + } => if e != 0 { playback_state = trace.state.clone(); }, + _ => (), + } + trace.state = RemoteState::Loaded; + } + Ok(playback_state) + } + + pub async fn upload_traces( + &mut self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer + ) { + let mut lock = self.traces.async_lock().await; + let trace_iter = lock.iter_mut(); + for (destination, trace) in trace_iter { + match drtio::ddma_upload_trace( + aux_mutex, + routing_table, + timer, + self.id, + *destination, + trace.get_trace() + ).await { + Ok(_) => trace.state = RemoteState::Loaded, + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } + *(self.done_count.async_lock().await) = 0; + } + + pub async fn erase( + &mut self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer + ) { + let lock = self.traces.async_lock().await; + let trace_iter = lock.keys(); + for destination in trace_iter { + match drtio::ddma_send_erase( + aux_mutex, + routing_table, + timer, + self.id, + *destination + ).await { + Ok(_) => (), + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } + } + + pub async fn playback_done(&mut self, destination: u8, error: u8, channel: u32, timestamp: u64) { + let mut traces_locked = self.traces.async_lock().await; + let mut trace = traces_locked.get_mut(&destination).unwrap(); + trace.state = RemoteState::PlaybackEnded { + error: error, + channel: channel, + timestamp: timestamp + }; + *(self.done_count.async_lock().await) += 1; + } + + pub async fn playback( + &self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + timestamp: u64 + ) { + let mut dest_list: Vec = Vec::new(); + { + let lock = self.traces.async_lock().await; + let trace_iter = lock.iter(); + for (dest, trace) in trace_iter { + if trace.state != RemoteState::Loaded { + error!("Destination {} not ready for DMA, state: {:?}", dest, trace.state); + continue; + } + dest_list.push(dest.clone()); + } + } + // mutex lock must be dropped before sending a playback request to avoid a deadlock, + // if PlaybackStatus is sent from another satellite and the state must be updated. + for destination in dest_list { + match drtio::ddma_send_playback( + aux_mutex, + routing_table, + timer, + self.id, + destination, + timestamp + ).await { + Ok(_) => (), + Err(e) => error!("Error during remote DMA playback: {}", e) + } + } + } + + pub async fn destination_changed( + &mut self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + destination: u8, up: bool + ) { + // update state of the destination, resend traces if it's up + if let Some(trace) = self.traces.lock().get_mut(&destination) { + if up { + match drtio::ddma_upload_trace( + aux_mutex, + routing_table, + timer, + self.id, + destination, + trace.get_trace() + ).await { + Ok(_) => trace.state = RemoteState::Loaded, + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } else { + trace.state = RemoteState::NotLoaded; + } + } + } + } + + static mut TRACES: BTreeMap = BTreeMap::new(); + + pub fn add_traces(id: u32, traces: BTreeMap>) { + unsafe { TRACES.insert(id, TraceSet::new(id, traces)) }; + } + + pub async fn await_done( + id: u32, + timeout: Option, + timer: GlobalTimer + ) -> Result { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.await_done(timeout, timer).await + } + + pub async fn erase( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32 + ) { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.erase(aux_mutex, routing_table, timer).await; + unsafe { TRACES.remove(&id); } + } + + pub async fn upload_traces( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32 + ) { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.upload_traces(aux_mutex, routing_table, timer).await; + } + + pub async fn playback( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32, + timestamp: u64 + ) { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.playback(aux_mutex, routing_table, timer, timestamp).await; + } + + pub async fn playback_done( + id: u32, + destination: u8, + error: u8, + channel: u32, + timestamp: u64 + ) { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.playback_done(destination, error, channel, timestamp).await; + } + + pub async fn destination_changed( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + destination: u8, + up: bool + ) { + let trace_iter = unsafe { TRACES.values_mut() }; + for trace_set in trace_iter { + trace_set.destination_changed(aux_mutex, routing_table, timer, destination, up).await; + } + } +} + + +pub async fn put_record(_aux_mutex: &Rc>, + _routing_table: &RoutingTable, + _timer: GlobalTimer, + mut recorder: DmaRecorder, +) -> u32 { + #[cfg(has_drtio)] + let mut remote_traces: BTreeMap> = BTreeMap::new(); + + #[cfg(has_drtio)] + if recorder.enable_ddma { + let mut local_trace: Vec = Vec::new(); + // analyze each entry and put in proper buckets, as the kernel core + // sends whole chunks, to limit comms/kernel CPU communication, + // and as only comms core has access to varios DMA buffers. + let mut ptr = 0; + recorder.buffer.push(0); + while recorder.buffer[ptr] != 0 { + // ptr + 3 = tgt >> 24 (destination) + let len = recorder.buffer[ptr] as usize; + let destination = recorder.buffer[ptr+3]; + if destination == 0 { + local_trace.extend(&recorder.buffer[ptr..ptr+len]); + } + else { + if let Some(remote_trace) = remote_traces.get_mut(&destination) { + remote_trace.extend(&recorder.buffer[ptr..ptr+len]); + } else { + remote_traces.insert(destination, recorder.buffer[ptr..ptr+len].to_vec()); + } + } + // and jump to the next event + ptr += len; + } + mem::swap(&mut recorder.buffer, &mut local_trace); + } + // trailing zero to indicate end of buffer + recorder.buffer.push(0); + recorder.buffer.reserve(ALIGNMENT - 1); + let original_length = recorder.buffer.len(); + let padding = ALIGNMENT - recorder.buffer.as_ptr() as usize % ALIGNMENT; + let padding = if padding == ALIGNMENT { 0 } else { padding }; + for _ in 0..padding { + recorder.buffer.push(0); + } + recorder.buffer.copy_within(0..original_length, padding); + dcci_slice(&recorder.buffer); + + let ptr = recorder.buffer[padding..].as_ptr() as u32; + + let _old_record = DMA_RECORD_STORE + .lock() + .insert(recorder.name, (ptr, recorder.buffer, recorder.duration)); + + #[cfg(has_drtio)] + { + if let Some((old_id, _v, _d)) = _old_record { + remote_dma::erase(_aux_mutex, _routing_table, _timer, old_id).await; + } + remote_dma::add_traces(ptr, remote_traces); + } + + ptr +} + +pub async fn erase(name: String, _aux_mutex: &Rc>, + _routing_table: &RoutingTable, _timer: GlobalTimer +) { + let _entry = DMA_RECORD_STORE.lock().remove(&name); + #[cfg(has_drtio)] + if let Some((id, _v, _d)) = _entry { + remote_dma::erase(_aux_mutex, _routing_table, _timer, id).await; + } +} + +pub fn retrieve(name: String) -> Option<(i32, i64)> { + let (ptr, _v, duration) = DMA_RECORD_STORE.lock().get(&name)?.clone(); + Some((ptr as i32, duration)) +} \ No newline at end of file diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 9a75d201..09945546 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -13,13 +13,16 @@ static mut RTIO_DEVICE_MAP: BTreeMap = BTreeMap::new(); #[cfg(has_drtio)] pub mod drtio { use embedded_hal::blocking::delay::DelayMs; + use alloc::vec::Vec; use libasync::{delay, task}; use libboard_artiq::{drtioaux::Error, drtioaux_async, drtioaux_async::Packet}; + use libboard_artiq::drtioaux_proto::DMA_TRACE_MAX_SIZE; use libboard_zynq::time::Milliseconds; use log::{error, info, warn}; use super::*; use crate::{ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, SEEN_ASYNC_ERRORS}; + use crate::rtio_dma::remote_dma; pub fn startup( aux_mutex: &Rc>, @@ -63,7 +66,16 @@ pub mod drtio { } let _lock = aux_mutex.async_lock().await; drtioaux_async::send(linkno, request).await.unwrap(); - recv_aux_timeout(linkno, 200, timer).await + loop { + let reply = recv_aux_timeout(linkno, 200, timer).await; + match reply { + Ok(Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp }) => { + remote_dma::playback_done(id, destination, error, channel, timestamp).await; + }, + Ok(packet) => return Ok(packet), + Err(e) => return Err(e) + } + } } async fn drain_buffer(linkno: u8, draining_time: Milliseconds, timer: GlobalTimer) { @@ -172,6 +184,8 @@ pub mod drtio { async fn process_unsolicited_aux(aux_mutex: &Rc>, linkno: u8) { let _lock = aux_mutex.async_lock().await; match drtioaux_async::recv(linkno).await { + Ok(Some(Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp }) + ) => remote_dma::playback_done(id, destination, error, channel, timestamp).await, Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), Ok(None) => (), Err(_) => warn!("[LINK#{}] aux packet error", linkno), @@ -252,7 +266,8 @@ pub mod drtio { .await; match reply { Ok(Packet::DestinationDownReply) => { - destination_set_up(routing_table, up_destinations, destination, false).await + destination_set_up(routing_table, up_destinations, destination, false).await; + remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false).await; } Ok(Packet::DestinationOkReply) => (), Ok(Packet::DestinationSequenceErrorReply { channel }) => { @@ -287,6 +302,7 @@ pub mod drtio { } } else { destination_set_up(routing_table, up_destinations, destination, false).await; + remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false).await; } } else { if up_links[linkno as usize] { @@ -304,6 +320,7 @@ pub mod drtio { Ok(Packet::DestinationOkReply) => { destination_set_up(routing_table, up_destinations, destination, true).await; init_buffer_space(destination as u8, linkno).await; + remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, true).await; } Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), Err(e) => error!("[DEST#{}] communication failed ({})", destination, e), @@ -389,6 +406,77 @@ pub mod drtio { } } } + + pub async fn ddma_upload_trace( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + id: u32, + destination: u8, + trace: &Vec + ) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let mut i = 0; + while i < trace.len() { + let mut trace_slice: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE]; + let len: usize = if i + DMA_TRACE_MAX_SIZE < trace.len() { DMA_TRACE_MAX_SIZE } else { trace.len() - i } as usize; + let last = i + len == trace.len(); + trace_slice[..len].clone_from_slice(&trace[i..i+len]); + i += len; + let reply = aux_transact(aux_mutex, linkno, + &Packet::DmaAddTraceRequest { + id: id, destination: destination, last: last, length: len as u16, trace: trace_slice}, + timer).await; + match reply { + Ok(Packet::DmaAddTraceReply { succeeded: true }) => (), + Ok(Packet::DmaAddTraceReply { succeeded: false }) => { + return Err("error adding trace on satellite"); }, + Ok(_) => { return Err("adding DMA trace failed, unexpected aux packet"); }, + Err(_) => { return Err("adding DMA trace failed, aux error"); } + } + } + Ok(()) + } + + + pub async fn ddma_send_erase( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + id: u32, + destination: u8 + ) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let reply = aux_transact(aux_mutex, linkno, + &Packet::DmaRemoveTraceRequest { id: id, destination: destination }, + timer).await; + match reply { + Ok(Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()), + Ok(Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"), + Ok(_) => Err("adding trace failed, unexpected aux packet"), + Err(_) => Err("erasing trace failed, aux error") + } + } + + pub async fn ddma_send_playback( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + id: u32, + destination: u8, + timestamp: u64 + ) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let reply = aux_transact(aux_mutex, linkno, &Packet::DmaPlaybackRequest{ + id: id, destination: destination, timestamp: timestamp }, timer).await; + match reply { + Ok(Packet::DmaPlaybackReply { succeeded: true }) => Ok(()), + Ok(Packet::DmaPlaybackReply { succeeded: false }) => Err("error on DMA playback request"), + Ok(_) => Err("received unexpected aux packet during DMA playback"), + Err(_) => Err("aux error on DMA playback") + } + } + } fn read_device_map(cfg: &Config) -> BTreeMap {