From 963af1f04fee1f9372424a5b58ce31ceebfbadfd Mon Sep 17 00:00:00 2001 From: mwojcik Date: Thu, 23 Mar 2023 15:41:08 +0800 Subject: [PATCH 01/12] runtime DMA: bring architecture close to mainline --- src/runtime/src/comms.rs | 13 ++++--- src/runtime/src/kernel/dma.rs | 28 ++------------- src/runtime/src/kernel/mod.rs | 2 +- src/runtime/src/main.rs | 1 + src/runtime/src/rtio_dma.rs | 38 +++++++++++++++++++++ src/runtime/src/rtio_mgt.rs | 64 +++++++++++++++++++++++++++++++++++ 6 files changed, 112 insertions(+), 34 deletions(-) create mode 100644 src/runtime/src/rtio_dma.rs diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 1465d77..5b53f88 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -27,7 +27,8 @@ use crate::pl; use crate::{analyzer, kernel, mgmt, moninj, proto_async::*, rpc, - rtio_mgt::{self, resolve_channel_name}}; + rtio_mgt::{self, resolve_channel_name}, + rtio_dma}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Error { @@ -79,7 +80,7 @@ enum Reply { } static CACHE_STORE: Mutex>> = Mutex::new(BTreeMap::new()); -static DMA_RECORD_STORE: Mutex, i64)>> = Mutex::new(BTreeMap::new()); + async fn write_header(stream: &TcpStream, reply: Reply) -> Result<()> { stream @@ -319,16 +320,14 @@ async fn handle_run_kernel( .await; } kernel::Message::DmaPutRequest(recorder) => { - DMA_RECORD_STORE - .lock() - .insert(recorder.name, (recorder.buffer, recorder.duration)); + rtio_dma::put_record(recorder); } kernel::Message::DmaEraseRequest(name) => { // prevent possible OOM when we have large DMA record replacement. - DMA_RECORD_STORE.lock().remove(&name); + rtio_dma::erase(name); } kernel::Message::DmaGetRequest(name) => { - let result = DMA_RECORD_STORE.lock().get(&name).map(|v| v.clone()); + let result = rtio_dma::retrieve(name); control .borrow_mut() .tx diff --git a/src/runtime/src/kernel/dma.rs b/src/runtime/src/kernel/dma.rs index d4c473a..ed3d492 100644 --- a/src/runtime/src/kernel/dma.rs +++ b/src/runtime/src/kernel/dma.rs @@ -1,14 +1,11 @@ -use alloc::{boxed::Box, string::String, vec::Vec}; +use alloc::{string::String, vec::Vec}; use core::mem; use cslice::CSlice; -use libcortex_a9::cache::dcci_slice; use super::{Message, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0, KERNEL_IMAGE}; use crate::{artiq_raise, pl::csr, rtio}; -const ALIGNMENT: usize = 16 * 8; - #[repr(C)] pub struct DmaTrace { duration: i64, @@ -151,20 +148,7 @@ pub extern "C" fn dma_retrieve(name: CSlice) -> DmaTrace { } match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { Message::DmaGetReply(None) => (), - Message::DmaGetReply(Some((mut v, duration))) => { - v.reserve(ALIGNMENT - 1); - let original_length = v.len(); - let padding = ALIGNMENT - v.as_ptr() as usize % ALIGNMENT; - let padding = if padding == ALIGNMENT { 0 } else { padding }; - for _ in 0..padding { - v.push(0); - } - // trailing zero to indicate end of buffer - v.push(0); - v.copy_within(0..original_length, padding); - dcci_slice(&v); - let v = Box::new(v); - let address = Box::into_raw(v) as *mut Vec as i32; + Message::DmaGetReply(Some((address, duration))) => { return DmaTrace { address, duration }; } _ => panic!("Expected DmaGetReply after DmaGetRequest!"), @@ -175,11 +159,6 @@ pub extern "C" fn dma_retrieve(name: CSlice) -> DmaTrace { pub extern "C" fn dma_playback(timestamp: i64, ptr: i32) { unsafe { - let v = Box::from_raw(ptr as *mut Vec); - let padding = ALIGNMENT - v.as_ptr() as usize % ALIGNMENT; - let padding = if padding == ALIGNMENT { 0 } else { padding }; - let ptr = v.as_ptr().add(padding) as i32; - csr::rtio_dma::base_address_write(ptr as u32); csr::rtio_dma::time_offset_write(timestamp as u64); @@ -188,9 +167,6 @@ pub extern "C" fn dma_playback(timestamp: i64, ptr: i32) { while csr::rtio_dma::enable_read() != 0 {} csr::cri_con::selected_write(0); - // leave the handle as we may try to do playback for another time. - mem::forget(v); - let error = csr::rtio_dma::error_read(); if error != 0 { let timestamp = csr::rtio_dma::error_timestamp_read(); diff --git a/src/runtime/src/kernel/mod.rs b/src/runtime/src/kernel/mod.rs index 9f88297..4e00888 100644 --- a/src/runtime/src/kernel/mod.rs +++ b/src/runtime/src/kernel/mod.rs @@ -52,7 +52,7 @@ pub enum Message { DmaPutRequest(DmaRecorder), DmaEraseRequest(String), DmaGetRequest(String), - DmaGetReply(Option<(Vec, i64)>), + DmaGetReply(Option<(i32, i64)>), #[cfg(has_drtio)] UpDestinationsRequest(i32), diff --git a/src/runtime/src/main.rs b/src/runtime/src/main.rs index 5eefd0b..6bcab9b 100644 --- a/src/runtime/src/main.rs +++ b/src/runtime/src/main.rs @@ -47,6 +47,7 @@ mod rtio; mod rtio; mod rtio_clocking; mod rtio_mgt; +mod rtio_dma; static mut SEEN_ASYNC_ERRORS: u8 = 0; diff --git a/src/runtime/src/rtio_dma.rs b/src/runtime/src/rtio_dma.rs new file mode 100644 index 0000000..8555a7e --- /dev/null +++ b/src/runtime/src/rtio_dma.rs @@ -0,0 +1,38 @@ + +use alloc::{collections::BTreeMap, string::String, vec::Vec}; +use libcortex_a9::{mutex::Mutex, cache::dcci_slice}; +use crate::kernel::DmaRecorder; + +const ALIGNMENT: usize = 16 * 8; + +static DMA_RECORD_STORE: Mutex, i64)>> = Mutex::new(BTreeMap::new()); + + +pub fn put_record(mut recorder: DmaRecorder) { + recorder.buffer.reserve(ALIGNMENT - 1); + let original_length = recorder.buffer.len(); + let padding = ALIGNMENT - recorder.buffer.as_ptr() as usize % ALIGNMENT; + let padding = if padding == ALIGNMENT { 0 } else { padding }; + for _ in 0..padding { + recorder.buffer.push(0); + } + // trailing zero to indicate end of buffer + recorder.buffer.push(0); + recorder.buffer.copy_within(0..original_length, padding); + dcci_slice(&recorder.buffer); + + let ptr = recorder.buffer[padding..].as_ptr() as u32; + + DMA_RECORD_STORE + .lock() + .insert(recorder.name, (ptr, recorder.buffer, recorder.duration)); +} + +pub fn erase(name: String) { + DMA_RECORD_STORE.lock().remove(&name); +} + +pub fn retrieve(name: String) -> Option<(i32, i64)> { + let (ptr, _v, duration) = DMA_RECORD_STORE.lock().get(&name)?.clone(); + Some((ptr as i32, duration)) +} \ No newline at end of file diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 9a75d20..315ab73 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -389,6 +389,70 @@ pub mod drtio { } } } + + // pub fn ddma_upload_trace(aux_mutex: Rc>, + // routing_table: &drtio_routing::RoutingTable, + // id: u32, destination: u8, trace: &Vec) -> Result<(), &'static str> { + // let linkno = routing_table.0[destination as usize][0] - 1; + // let mut i = 0; + // while i < trace.len() { + // let mut trace_slice: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE]; + // let len: usize = if i + DMA_TRACE_MAX_SIZE < trace.len() { DMA_TRACE_MAX_SIZE } else { trace.len() - i } as usize; + // let last = i + len == trace.len(); + // trace_slice[..len].clone_from_slice(&trace[i..i+len]); + // i += len; + // let reply = aux_transact(io, aux_mutex, linkno, + // &drtioaux::Packet::DmaAddTraceRequest { + // id: id, destination: destination, last: last, length: len as u16, trace: trace_slice}); + // match reply { + // Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: true }) => (), + // Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: false }) => { + // return Err("error adding trace on satellite"); }, + // Ok(_) => { return Err("adding DMA trace failed, unexpected aux packet"); }, + // Err(_) => { return Err("adding DMA trace failed, aux error"); } + // } + // } + // Ok(()) + // } + + + // pub fn ddma_send_erase(io: &Io, aux_mutex: &Mutex, + // routing_table: &drtio_routing::RoutingTable, + // id: u32, destination: u8) -> Result<(), &'static str> { + // let linkno = routing_table.0[destination as usize][0] - 1; + // let reply = aux_transact(io, aux_mutex, linkno, + // &drtioaux::Packet::DmaRemoveTraceRequest { id: id, destination: destination }); + // match reply { + // Ok(drtioaux::Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()), + // Ok(drtioaux::Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"), + // Ok(_) => Err("adding trace failed, unexpected aux packet"), + // Err(_) => Err("erasing trace failed, aux error") + // } + // } + + // pub fn ddma_send_playback(io: &Io, aux_mutex: &Mutex, + // routing_table: &drtio_routing::RoutingTable, + // ddma_mutex: &Mutex, id: u32, destination: u8, timestamp: u64) -> Result<(), &'static str> { + // let linkno = routing_table.0[destination as usize][0] - 1; + // let _lock = aux_mutex.lock(io).unwrap(); + // drtioaux::send(linkno, &drtioaux::Packet::DmaPlaybackRequest{ + // id: id, destination: destination, timestamp: timestamp }).unwrap(); + // loop { + // let reply = recv_aux_timeout(io, linkno, 200); + // match reply { + // Ok(drtioaux::Packet::DmaPlaybackReply { succeeded: true }) => { return Ok(()) }, + // Ok(drtioaux::Packet::DmaPlaybackReply { succeeded: false }) => { + // return Err("error on DMA playback request") }, + // // in case we received status from another destination + // // but we want to get DmaPlaybackReply anyway, thus the loop + // Ok(drtioaux::Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp }) => { + // remote_dma::playback_done(io, ddma_mutex, id, destination, error, channel, timestamp); + // }, + // Ok(_) => { return Err("received unexpected aux packet while DMA playback") }, + // Err(_) => { return Err("aux error on DMA playback") } + // } + // } + // } } fn read_device_map(cfg: &Config) -> BTreeMap { -- 2.42.0 From f1ef0219c3499cf4d8d641d0940ccbdd1bb2d8a5 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Thu, 23 Mar 2023 17:13:34 +0800 Subject: [PATCH 02/12] rtio_mgt: implement ddma related functions add kernel messages --- src/runtime/src/kernel/dma.rs | 5 +- src/runtime/src/kernel/mod.rs | 16 ++++ src/runtime/src/rtio_mgt.rs | 139 +++++++++++++++++++--------------- 3 files changed, 99 insertions(+), 61 deletions(-) diff --git a/src/runtime/src/kernel/dma.rs b/src/runtime/src/kernel/dma.rs index ed3d492..63b8318 100644 --- a/src/runtime/src/kernel/dma.rs +++ b/src/runtime/src/kernel/dma.rs @@ -17,6 +17,7 @@ pub struct DmaRecorder { pub name: String, pub buffer: Vec, pub duration: i64, + pub enable_ddma: bool } static mut RECORDER: Option = None; @@ -50,11 +51,12 @@ pub extern "C" fn dma_record_start(name: CSlice) { name, buffer: Vec::new(), duration: 0, + enable_ddma: false }); } } -pub extern "C" fn dma_record_stop(duration: i64) { +pub extern "C" fn dma_record_stop(duration: i64, enable_ddma: bool) { unsafe { if RECORDER.is_none() { artiq_raise!("DMAError", "DMA is not recording") @@ -68,6 +70,7 @@ pub extern "C" fn dma_record_stop(duration: i64) { let mut recorder = RECORDER.take().unwrap(); recorder.duration = duration; + recorder.enable_ddma = enable_ddma; KERNEL_CHANNEL_1TO0 .as_mut() .unwrap() diff --git a/src/runtime/src/kernel/mod.rs b/src/runtime/src/kernel/mod.rs index 4e00888..883c615 100644 --- a/src/runtime/src/kernel/mod.rs +++ b/src/runtime/src/kernel/mod.rs @@ -53,6 +53,22 @@ pub enum Message { DmaEraseRequest(String), DmaGetRequest(String), DmaGetReply(Option<(i32, i64)>), + #[cfg(has_drtio)] + DmaStartRemoteRequest { + id: i32, + timestamp: i64 + }, + #[cfg(has_drtio)] + DmaAwaitRemoteRequest { + id: i32 + }, + #[cfg(has_drtio)] + DmaAwaitRemoteReply { + timeout: bool, + error: u8, + channel: u32, + timestamp: u64 + }, #[cfg(has_drtio)] UpDestinationsRequest(i32), diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 315ab73..683b581 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -13,13 +13,16 @@ static mut RTIO_DEVICE_MAP: BTreeMap = BTreeMap::new(); #[cfg(has_drtio)] pub mod drtio { use embedded_hal::blocking::delay::DelayMs; + use alloc::vec::Vec; use libasync::{delay, task}; use libboard_artiq::{drtioaux::Error, drtioaux_async, drtioaux_async::Packet}; + use libboard_artiq::drtioaux_proto::DMA_TRACE_MAX_SIZE; use libboard_zynq::time::Milliseconds; use log::{error, info, warn}; use super::*; use crate::{ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, SEEN_ASYNC_ERRORS}; + use crate::rtio_dma::remote_dma; pub fn startup( aux_mutex: &Rc>, @@ -390,69 +393,85 @@ pub mod drtio { } } - // pub fn ddma_upload_trace(aux_mutex: Rc>, - // routing_table: &drtio_routing::RoutingTable, - // id: u32, destination: u8, trace: &Vec) -> Result<(), &'static str> { - // let linkno = routing_table.0[destination as usize][0] - 1; - // let mut i = 0; - // while i < trace.len() { - // let mut trace_slice: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE]; - // let len: usize = if i + DMA_TRACE_MAX_SIZE < trace.len() { DMA_TRACE_MAX_SIZE } else { trace.len() - i } as usize; - // let last = i + len == trace.len(); - // trace_slice[..len].clone_from_slice(&trace[i..i+len]); - // i += len; - // let reply = aux_transact(io, aux_mutex, linkno, - // &drtioaux::Packet::DmaAddTraceRequest { - // id: id, destination: destination, last: last, length: len as u16, trace: trace_slice}); - // match reply { - // Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: true }) => (), - // Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: false }) => { - // return Err("error adding trace on satellite"); }, - // Ok(_) => { return Err("adding DMA trace failed, unexpected aux packet"); }, - // Err(_) => { return Err("adding DMA trace failed, aux error"); } - // } - // } - // Ok(()) - // } + pub async fn ddma_upload_trace( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + id: u32, + destination: u8, + trace: &Vec + ) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let mut i = 0; + while i < trace.len() { + let mut trace_slice: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE]; + let len: usize = if i + DMA_TRACE_MAX_SIZE < trace.len() { DMA_TRACE_MAX_SIZE } else { trace.len() - i } as usize; + let last = i + len == trace.len(); + trace_slice[..len].clone_from_slice(&trace[i..i+len]); + i += len; + let reply = aux_transact(aux_mutex, linkno, + &Packet::DmaAddTraceRequest { + id: id, destination: destination, last: last, length: len as u16, trace: trace_slice}, + timer).await; + match reply { + Ok(Packet::DmaAddTraceReply { succeeded: true }) => (), + Ok(Packet::DmaAddTraceReply { succeeded: false }) => { + return Err("error adding trace on satellite"); }, + Ok(_) => { return Err("adding DMA trace failed, unexpected aux packet"); }, + Err(_) => { return Err("adding DMA trace failed, aux error"); } + } + } + Ok(()) + } - // pub fn ddma_send_erase(io: &Io, aux_mutex: &Mutex, - // routing_table: &drtio_routing::RoutingTable, - // id: u32, destination: u8) -> Result<(), &'static str> { - // let linkno = routing_table.0[destination as usize][0] - 1; - // let reply = aux_transact(io, aux_mutex, linkno, - // &drtioaux::Packet::DmaRemoveTraceRequest { id: id, destination: destination }); - // match reply { - // Ok(drtioaux::Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()), - // Ok(drtioaux::Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"), - // Ok(_) => Err("adding trace failed, unexpected aux packet"), - // Err(_) => Err("erasing trace failed, aux error") - // } - // } + pub async fn ddma_send_erase( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + id: u32, + destination: u8 + ) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let reply = aux_transact(aux_mutex, linkno, + &Packet::DmaRemoveTraceRequest { id: id, destination: destination }, + timer).await; + match reply { + Ok(Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()), + Ok(Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"), + Ok(_) => Err("adding trace failed, unexpected aux packet"), + Err(_) => Err("erasing trace failed, aux error") + } + } - // pub fn ddma_send_playback(io: &Io, aux_mutex: &Mutex, - // routing_table: &drtio_routing::RoutingTable, - // ddma_mutex: &Mutex, id: u32, destination: u8, timestamp: u64) -> Result<(), &'static str> { - // let linkno = routing_table.0[destination as usize][0] - 1; - // let _lock = aux_mutex.lock(io).unwrap(); - // drtioaux::send(linkno, &drtioaux::Packet::DmaPlaybackRequest{ - // id: id, destination: destination, timestamp: timestamp }).unwrap(); - // loop { - // let reply = recv_aux_timeout(io, linkno, 200); - // match reply { - // Ok(drtioaux::Packet::DmaPlaybackReply { succeeded: true }) => { return Ok(()) }, - // Ok(drtioaux::Packet::DmaPlaybackReply { succeeded: false }) => { - // return Err("error on DMA playback request") }, - // // in case we received status from another destination - // // but we want to get DmaPlaybackReply anyway, thus the loop - // Ok(drtioaux::Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp }) => { - // remote_dma::playback_done(io, ddma_mutex, id, destination, error, channel, timestamp); - // }, - // Ok(_) => { return Err("received unexpected aux packet while DMA playback") }, - // Err(_) => { return Err("aux error on DMA playback") } - // } - // } - // } + pub async fn ddma_send_playback( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + id: u32, + destination: u8, + timestamp: u64 + ) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let _lock = aux_mutex.async_lock().await; + drtioaux_async::send(linkno, &Packet::DmaPlaybackRequest{ + id: id, destination: destination, timestamp: timestamp }).await.unwrap(); + loop { + let reply = recv_aux_timeout(linkno, 200, timer).await; + match reply { + Ok(Packet::DmaPlaybackReply { succeeded: true }) => { return Ok(()) }, + Ok(Packet::DmaPlaybackReply { succeeded: false }) => { + return Err("error on DMA playback request") }, + // in case we received status from another destination + // but we want to get DmaPlaybackReply anyway, thus the loop + Ok(Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp }) => { + remote_dma::playback_done(id, destination, error, channel, timestamp).await; + }, + Ok(_) => { return Err("received unexpected aux packet while DMA playback") }, + Err(_) => { return Err("aux error on DMA playback") } + } + } + } } fn read_device_map(cfg: &Config) -> BTreeMap { -- 2.42.0 From 3d6350ca753c12a1a0fab76e410f948a4eafb2ff Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 24 Mar 2023 13:29:10 +0800 Subject: [PATCH 03/12] rtio_dma: add ddma related module --- src/runtime/src/rtio_dma.rs | 286 +++++++++++++++++++++++++++++++++++- 1 file changed, 284 insertions(+), 2 deletions(-) diff --git a/src/runtime/src/rtio_dma.rs b/src/runtime/src/rtio_dma.rs index 8555a7e..1bf125f 100644 --- a/src/runtime/src/rtio_dma.rs +++ b/src/runtime/src/rtio_dma.rs @@ -1,14 +1,298 @@ use alloc::{collections::BTreeMap, string::String, vec::Vec}; use libcortex_a9::{mutex::Mutex, cache::dcci_slice}; +use libboard_zynq::timer::GlobalTimer; use crate::kernel::DmaRecorder; const ALIGNMENT: usize = 16 * 8; static DMA_RECORD_STORE: Mutex, i64)>> = Mutex::new(BTreeMap::new()); +#[cfg(has_drtio)] +pub mod remote_dma { + use super::*; + use alloc::rc::Rc; + use libasync::task; + use libboard_zynq::time::Milliseconds; + use libboard_artiq::drtio_routing::RoutingTable; + use log::error; + use crate::rtio_mgt::drtio; + + #[derive(Debug, PartialEq, Clone)] + pub enum RemoteState { + NotLoaded, + Loaded, + PlaybackEnded { error: u8, channel: u32, timestamp: u64 } + } + #[derive(Debug, Clone)] + struct RemoteTrace { + trace: Vec, + pub state: RemoteState + } + + impl From> for RemoteTrace { + fn from(trace: Vec) -> Self { + RemoteTrace { + trace: trace, + state: RemoteState::NotLoaded + } + } + } + + impl RemoteTrace { + pub fn get_trace(&self) -> &Vec { + &self.trace + } + } + + // represents all traces for a given ID + struct TraceSet { + id: u32, + done_count: Mutex, + traces: Mutex> + } + + impl TraceSet { + pub fn new(id: u32, traces: BTreeMap>) -> TraceSet { + let mut trace_map: BTreeMap = BTreeMap::new(); + for (destination, trace) in traces { + trace_map.insert(destination, trace.into()); + } + TraceSet { + id: id, + done_count: Mutex::new(0), + traces: Mutex::new(trace_map) + } + } + + pub async fn await_done( + &self, + timeout: Option, + timer: GlobalTimer + ) -> Result { + let timeout_ms = Milliseconds(timeout.unwrap_or(10_000)); + let limit = timer.get_time() + timeout_ms; + while (timer.get_time() < limit) & + (*(self.done_count.async_lock().await) < self.traces.async_lock().await.len()) { + task::r#yield().await; + } + if timer.get_time() >= limit { + error!("Remote DMA await done timed out"); + return Err("Timed out waiting for results."); + } + let mut playback_state: RemoteState = RemoteState::PlaybackEnded { error: 0, channel: 0, timestamp: 0 }; + let mut lock = self.traces.async_lock().await; + let trace_iter = lock.iter_mut(); + for (_dest, trace) in trace_iter { + match trace.state { + RemoteState::PlaybackEnded { + error: e, + channel: _c, + timestamp: _ts + } => if e != 0 { playback_state = trace.state.clone(); }, + _ => (), + } + trace.state = RemoteState::Loaded; + } + Ok(playback_state) + } + + pub async fn upload_traces( + &mut self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer + ) { + let mut lock = self.traces.async_lock().await; + let trace_iter = lock.iter_mut(); + for (destination, trace) in trace_iter { + match drtio::ddma_upload_trace( + aux_mutex, + routing_table, + timer, + self.id, + *destination, + trace.get_trace() + ).await { + Ok(_) => trace.state = RemoteState::Loaded, + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } + *(self.done_count.async_lock().await) = 0; + } + + pub async fn erase( + &mut self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer + ) { + let lock = self.traces.async_lock().await; + let trace_iter = lock.keys(); + for destination in trace_iter { + match drtio::ddma_send_erase( + aux_mutex, + routing_table, + timer, + self.id, + *destination + ).await { + Ok(_) => (), + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } + } + + pub async fn playback_done(&mut self, destination: u8, error: u8, channel: u32, timestamp: u64) { + let mut traces_locked = self.traces.async_lock().await; + let mut trace = traces_locked.get_mut(&destination).unwrap(); + trace.state = RemoteState::PlaybackEnded { + error: error, + channel: channel, + timestamp: timestamp + }; + *(self.done_count.async_lock().await) += 1; + } + + pub async fn playback( + &self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + timestamp: u64 + ) { + let mut dest_list: Vec = Vec::new(); + { + let lock = self.traces.async_lock().await; + let trace_iter = lock.iter(); + for (dest, trace) in trace_iter { + if trace.state != RemoteState::Loaded { + error!("Destination {} not ready for DMA, state: {:?}", dest, trace.state); + continue; + } + dest_list.push(dest.clone()); + } + } + // mutex lock must be dropped before sending a playback request to avoid a deadlock, + // if PlaybackStatus is sent from another satellite and the state must be updated. + for destination in dest_list { + match drtio::ddma_send_playback( + aux_mutex, + routing_table, + timer, + self.id, + destination, + timestamp + ).await { + Ok(_) => (), + Err(e) => error!("Error during remote DMA playback: {}", e) + } + } + } + + pub async fn destination_changed( + &mut self, + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + destination: u8, up: bool) { + // update state of the destination, resend traces if it's up + if let Some(trace) = self.traces.lock().get_mut(&destination) { + if up { + match drtio::ddma_upload_trace( + aux_mutex, + routing_table, + timer, + self.id, + destination, + trace.get_trace() + ).await { + Ok(_) => trace.state = RemoteState::Loaded, + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } else { + trace.state = RemoteState::NotLoaded; + } + } + } + } + + static mut TRACES: BTreeMap = BTreeMap::new(); + + pub fn add_traces(id: u32, traces: BTreeMap>) { + unsafe { TRACES.insert(id, TraceSet::new(id, traces)) }; + } + + pub async fn await_done( + id: u32, + timeout: Option, + timer: GlobalTimer + ) -> Result { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.await_done(timeout, timer).await + } + + pub fn erase( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32 + ) { + let mut trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + task::block_on(trace_set.erase(aux_mutex, routing_table, timer)); + unsafe { TRACES.remove(&id); } + } + + pub async fn upload_traces( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32 + ) { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.upload_traces(aux_mutex, routing_table, timer).await; + } + + pub async fn playback( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32, + timestamp: u64 + ) { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.playback(aux_mutex, routing_table, timer, timestamp).await; + } + + pub async fn playback_done( + id: u32, + destination: u8, + error: u8, + channel: u32, + timestamp: u64 + ) { + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + trace_set.playback_done(destination, error, channel, timestamp).await; + } + + pub async fn destination_changed( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + destination: u8, + up: bool + ) { + let trace_iter = unsafe { TRACES.values_mut() }; + for trace_set in trace_iter { + trace_set.destination_changed(aux_mutex, routing_table, timer, destination, up).await; + } + } +} + pub fn put_record(mut recorder: DmaRecorder) { + // trailing zero to indicate end of buffer + recorder.buffer.push(0); recorder.buffer.reserve(ALIGNMENT - 1); let original_length = recorder.buffer.len(); let padding = ALIGNMENT - recorder.buffer.as_ptr() as usize % ALIGNMENT; @@ -16,8 +300,6 @@ pub fn put_record(mut recorder: DmaRecorder) { for _ in 0..padding { recorder.buffer.push(0); } - // trailing zero to indicate end of buffer - recorder.buffer.push(0); recorder.buffer.copy_within(0..original_length, padding); dcci_slice(&recorder.buffer); -- 2.42.0 From 4f1810ab849fcae20d487c40ccf2ec25c52ba047 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 24 Mar 2023 16:12:21 +0800 Subject: [PATCH 04/12] rtio_dma: add ddma insert, ddma erase comms: pass drtio tools to kernel handle for ddma moninj: fix Rcs and references --- src/runtime/src/comms.rs | 22 +++++++++---- src/runtime/src/moninj.rs | 4 ++- src/runtime/src/rtio_dma.rs | 63 +++++++++++++++++++++++++++++++------ 3 files changed, 72 insertions(+), 17 deletions(-) diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 5b53f88..b781756 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -158,6 +158,9 @@ async fn handle_run_kernel( stream: Option<&TcpStream>, control: &Rc>, _up_destinations: &Rc>, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer ) -> Result<()> { control.borrow_mut().tx.async_send(kernel::Message::StartRequest).await; loop { @@ -324,7 +327,7 @@ async fn handle_run_kernel( } kernel::Message::DmaEraseRequest(name) => { // prevent possible OOM when we have large DMA record replacement. - rtio_dma::erase(name); + rtio_dma::erase(name, aux_mutex, routing_table, timer); } kernel::Message::DmaGetRequest(name) => { let result = rtio_dma::retrieve(name); @@ -394,6 +397,9 @@ async fn handle_connection( stream: &mut TcpStream, control: Rc>, up_destinations: &Rc>, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer ) -> Result<()> { stream.set_ack_delay(None); @@ -417,7 +423,7 @@ async fn handle_connection( load_kernel(&buffer, &control, Some(stream)).await?; } Request::RunKernel => { - handle_run_kernel(Some(stream), &control, &up_destinations).await?; + handle_run_kernel(Some(stream), &control, &up_destinations, aux_mutex, routing_table, timer).await?; } _ => { error!("unexpected request from host: {:?}", request); @@ -484,7 +490,7 @@ pub fn main(timer: GlobalTimer, cfg: Config) { rtio_mgt::startup(&aux_mutex, &drtio_routing_table, &up_destinations, timer, &cfg); analyzer::start(); - moninj::start(timer, aux_mutex, drtio_routing_table); + moninj::start(timer, &aux_mutex, &drtio_routing_table); let control: Rc> = Rc::new(RefCell::new(kernel::Control::start())); let idle_kernel = Rc::new(cfg.read("idle_kernel").ok()); @@ -492,7 +498,8 @@ pub fn main(timer: GlobalTimer, cfg: Config) { info!("Loading startup kernel..."); if let Ok(()) = task::block_on(load_kernel(&buffer, &control, None)) { info!("Starting startup kernel..."); - let _ = task::block_on(handle_run_kernel(None, &control, &up_destinations)); + let routing_table = drtio_routing_table.borrow(); + let _ = task::block_on(handle_run_kernel(None, &control, &up_destinations, &aux_mutex, &routing_table, timer)); info!("Startup kernel finished!"); } else { error!("Error loading startup kernel!"); @@ -518,13 +525,16 @@ pub fn main(timer: GlobalTimer, cfg: Config) { let connection = connection.clone(); let terminate = terminate.clone(); let up_destinations = up_destinations.clone(); + let aux_mutex = aux_mutex.clone(); + let routing_table = drtio_routing_table.clone(); // we make sure the value of terminate is 0 before we start let _ = terminate.try_wait(); task::spawn(async move { + let routing_table = routing_table.borrow(); select_biased! { _ = (async { - let _ = handle_connection(&mut stream, control.clone(), &up_destinations) + let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) .await .map_err(|e| warn!("connection terminated: {}", e)); if let Some(buffer) = &*idle_kernel { @@ -532,7 +542,7 @@ pub fn main(timer: GlobalTimer, cfg: Config) { let _ = load_kernel(&buffer, &control, None) .await.map_err(|_| warn!("error loading idle kernel")); info!("Running idle kernel"); - let _ = handle_run_kernel(None, &control, &up_destinations) + let _ = handle_run_kernel(None, &control, &up_destinations, &aux_mutex, &routing_table, timer) .await.map_err(|_| warn!("error running idle kernel")); info!("Idle kernel terminated"); } diff --git a/src/runtime/src/moninj.rs b/src/runtime/src/moninj.rs index 966534c..4dd2e42 100644 --- a/src/runtime/src/moninj.rs +++ b/src/runtime/src/moninj.rs @@ -298,7 +298,9 @@ async fn handle_connection( } } -pub fn start(timer: GlobalTimer, aux_mutex: Rc>, routing_table: Rc>) { +pub fn start(timer: GlobalTimer, aux_mutex: &Rc>, routing_table: &Rc>) { + let aux_mutex = aux_mutex.clone(); + let routing_table = routing_table.clone(); task::spawn(async move { loop { let aux_mutex = aux_mutex.clone(); diff --git a/src/runtime/src/rtio_dma.rs b/src/runtime/src/rtio_dma.rs index 1bf125f..a3a0da6 100644 --- a/src/runtime/src/rtio_dma.rs +++ b/src/runtime/src/rtio_dma.rs @@ -1,7 +1,12 @@ -use alloc::{collections::BTreeMap, string::String, vec::Vec}; +use alloc::{collections::BTreeMap, string::String, vec::Vec, rc::Rc}; use libcortex_a9::{mutex::Mutex, cache::dcci_slice}; use libboard_zynq::timer::GlobalTimer; +use libboard_artiq::drtio_routing::RoutingTable; +#[cfg(has_drtio)] +use libasync::task; +#[cfg(has_drtio)] +use core::mem; use crate::kernel::DmaRecorder; const ALIGNMENT: usize = 16 * 8; @@ -11,10 +16,7 @@ static DMA_RECORD_STORE: Mutex, i64)>> = Mutex::n #[cfg(has_drtio)] pub mod remote_dma { use super::*; - use alloc::rc::Rc; - use libasync::task; use libboard_zynq::time::Milliseconds; - use libboard_artiq::drtio_routing::RoutingTable; use log::error; use crate::rtio_mgt::drtio; @@ -195,8 +197,9 @@ pub mod remote_dma { aux_mutex: &Rc>, routing_table: &RoutingTable, timer: GlobalTimer, - destination: u8, up: bool) { - // update state of the destination, resend traces if it's up + destination: u8, up: bool + ) { + // update state of the destination, resend traces if it's up if let Some(trace) = self.traces.lock().get_mut(&destination) { if up { match drtio::ddma_upload_trace( @@ -238,7 +241,7 @@ pub mod remote_dma { timer: GlobalTimer, id: u32 ) { - let mut trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; task::block_on(trace_set.erase(aux_mutex, routing_table, timer)); unsafe { TRACES.remove(&id); } } @@ -290,7 +293,36 @@ pub mod remote_dma { } -pub fn put_record(mut recorder: DmaRecorder) { +pub fn put_record(mut recorder: DmaRecorder) -> u32 { + #[cfg(has_drtio)] + let mut remote_traces: BTreeMap> = BTreeMap::new(); + + #[cfg(has_drtio)] + if recorder.enable_ddma { + let mut local_trace: Vec = Vec::new(); + // analyze each entry and put in proper buckets, as the kernel core + // sends whole chunks, to limit comms/kernel CPU communication, + // and as only comms core has access to varios DMA buffers. + let mut ptr = 0; + while recorder.buffer[ptr] != 0 { + // ptr + 3 = tgt >> 24 (destination) + let len = recorder.buffer[ptr] as usize; + let destination = recorder.buffer[ptr+3]; + if destination == 0 { + local_trace.extend(&recorder.buffer[ptr..ptr+len]); + } + else { + if let Some(remote_trace) = remote_traces.get_mut(&destination) { + remote_trace.extend(&recorder.buffer[ptr..ptr+len]); + } else { + remote_traces.insert(destination, recorder.buffer[ptr..ptr+len].to_vec()); + } + } + // and jump to the next event + ptr += len; + } + mem::swap(&mut recorder.buffer, &mut local_trace); + } // trailing zero to indicate end of buffer recorder.buffer.push(0); recorder.buffer.reserve(ALIGNMENT - 1); @@ -308,10 +340,21 @@ pub fn put_record(mut recorder: DmaRecorder) { DMA_RECORD_STORE .lock() .insert(recorder.name, (ptr, recorder.buffer, recorder.duration)); + + #[cfg(has_drtio)] + remote_dma::add_traces(ptr, remote_traces); + + ptr } -pub fn erase(name: String) { - DMA_RECORD_STORE.lock().remove(&name); +pub fn erase(name: String, _aux_mutex: &Rc>, + _routing_table: &RoutingTable, _timer: GlobalTimer +) { + let _entry = DMA_RECORD_STORE.lock().remove(&name); + #[cfg(has_drtio)] + if let Some((id, _v, _d)) = _entry { + remote_dma::erase(_aux_mutex, _routing_table, _timer, id); + } } pub fn retrieve(name: String) -> Option<(i32, i64)> { -- 2.42.0 From fac563bbc0481db6892053ef0e1e320b918ca795 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 24 Mar 2023 16:40:29 +0800 Subject: [PATCH 05/12] kernel: add ddma support --- src/runtime/src/kernel/dma.rs | 39 +++++++++++++++++++++++++++++++++++ src/runtime/src/kernel/mod.rs | 4 +--- 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/runtime/src/kernel/dma.rs b/src/runtime/src/kernel/dma.rs index 63b8318..2cae479 100644 --- a/src/runtime/src/kernel/dma.rs +++ b/src/runtime/src/kernel/dma.rs @@ -167,6 +167,9 @@ pub extern "C" fn dma_playback(timestamp: i64, ptr: i32) { csr::cri_con::selected_write(1); csr::rtio_dma::enable_write(1); + #[cfg(has_drtio)] + KERNEL_CHANNEL_1TO0.as_mut().unwrap().send( + Message::DmaStartRemoteRequest{ id: ptr, timestamp: timestamp }); while csr::rtio_dma::enable_read() != 0 {} csr::cri_con::selected_write(0); @@ -194,5 +197,41 @@ pub extern "C" fn dma_playback(timestamp: i64, ptr: i32) { ); } } + #[cfg(has_drtio)] + { + KERNEL_CHANNEL_1TO0.as_mut().unwrap().send( + Message::DmaAwaitRemoteRequest(ptr)); + match KERNEL_CHANNEL_0TO1.as_mut().unwrap().recv() { + Message::DmaAwaitRemoteReply { + timeout: timeout, error: error, channel: channel, timestamp: timestamp + } => { + if timeout { + artiq_raise!( + "DMAError", + "Error running DMA on satellite device, timed out waiting for results" + ); + } + if error & 1 != 0 { + artiq_raise!( + "RTIOUnderflow", + "RTIO underflow at {1} mu, channel {rtio_channel_info:0}", + channel as i64, + timestamp as i64, + 0 + ); + } + if error & 2 != 0 { + artiq_raise!( + "RTIODestinationUnreachable", + "RTIO destination unreachable, output, at {1} mu, channel {rtio_channel_info:0}", + channel as i64, + timestamp as i64, + 0 + ); + } + } + _ => panic!("Expected DmaAwaitRemoteReply after DmaAwaitRemoteRequest!"), + } + } } } diff --git a/src/runtime/src/kernel/mod.rs b/src/runtime/src/kernel/mod.rs index 883c615..b72d311 100644 --- a/src/runtime/src/kernel/mod.rs +++ b/src/runtime/src/kernel/mod.rs @@ -59,9 +59,7 @@ pub enum Message { timestamp: i64 }, #[cfg(has_drtio)] - DmaAwaitRemoteRequest { - id: i32 - }, + DmaAwaitRemoteRequest(i32), #[cfg(has_drtio)] DmaAwaitRemoteReply { timeout: bool, -- 2.42.0 From 71db828e4ff5b3ccfc7bbd031319c4c2ab3003f8 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 24 Mar 2023 16:57:43 +0800 Subject: [PATCH 06/12] comms: implement ddma message support --- src/runtime/src/comms.rs | 31 ++++++++++++++++++++++++++++++- src/runtime/src/kernel/dma.rs | 4 +--- 2 files changed, 31 insertions(+), 4 deletions(-) diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index b781756..89eab8f 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -323,7 +323,9 @@ async fn handle_run_kernel( .await; } kernel::Message::DmaPutRequest(recorder) => { - rtio_dma::put_record(recorder); + let _id = rtio_dma::put_record(recorder); + #[cfg(has_drtio)] + rtio_dma::remote_dma::upload_traces(aux_mutex, routing_table, timer, _id).await; } kernel::Message::DmaEraseRequest(name) => { // prevent possible OOM when we have large DMA record replacement. @@ -338,6 +340,33 @@ async fn handle_run_kernel( .await; } #[cfg(has_drtio)] + kernel::Message::DmaStartRemoteRequest { id, timestamp } => { + rtio_dma::remote_dma::playback(aux_mutex, routing_table, timer, id as u32, timestamp as u64).await; + } + #[cfg(has_drtio)] + kernel::Message::DmaAwaitRemoteRequest(id) => { + let result = rtio_dma::remote_dma::await_done(id as u32, Some(10_000), timer).await; + let reply = match result { + Ok(rtio_dma::remote_dma::RemoteState::PlaybackEnded { + error, + channel, + timestamp + }) => kernel::Message::DmaAwaitRemoteReply { + timeout: false, + error: error, + channel: channel, + timestamp: timestamp + }, + _ => kernel::Message::DmaAwaitRemoteReply { + timeout: true, + error: 0, + channel: 0, + timestamp: 0 + } + }; + control.borrow_mut().tx.async_send(reply).await; + } + #[cfg(has_drtio)] kernel::Message::UpDestinationsRequest(destination) => { let result = _up_destinations.borrow()[destination as usize]; control diff --git a/src/runtime/src/kernel/dma.rs b/src/runtime/src/kernel/dma.rs index 2cae479..2ff865b 100644 --- a/src/runtime/src/kernel/dma.rs +++ b/src/runtime/src/kernel/dma.rs @@ -202,9 +202,7 @@ pub extern "C" fn dma_playback(timestamp: i64, ptr: i32) { KERNEL_CHANNEL_1TO0.as_mut().unwrap().send( Message::DmaAwaitRemoteRequest(ptr)); match KERNEL_CHANNEL_0TO1.as_mut().unwrap().recv() { - Message::DmaAwaitRemoteReply { - timeout: timeout, error: error, channel: channel, timestamp: timestamp - } => { + Message::DmaAwaitRemoteReply { timeout, error, channel, timestamp } => { if timeout { artiq_raise!( "DMAError", -- 2.42.0 From f6193769efbea5d9bb5d418292f6a93f30204f70 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 24 Mar 2023 17:04:19 +0800 Subject: [PATCH 07/12] rtio_mgt: notify remote_dma on dest status change --- src/runtime/src/rtio_mgt.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 683b581..dfff437 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -175,6 +175,8 @@ pub mod drtio { async fn process_unsolicited_aux(aux_mutex: &Rc>, linkno: u8) { let _lock = aux_mutex.async_lock().await; match drtioaux_async::recv(linkno).await { + Ok(Some(Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp }) + ) => remote_dma::playback_done(id, destination, error, channel, timestamp).await, Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), Ok(None) => (), Err(_) => warn!("[LINK#{}] aux packet error", linkno), @@ -255,7 +257,8 @@ pub mod drtio { .await; match reply { Ok(Packet::DestinationDownReply) => { - destination_set_up(routing_table, up_destinations, destination, false).await + destination_set_up(routing_table, up_destinations, destination, false).await; + remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false).await; } Ok(Packet::DestinationOkReply) => (), Ok(Packet::DestinationSequenceErrorReply { channel }) => { @@ -290,6 +293,7 @@ pub mod drtio { } } else { destination_set_up(routing_table, up_destinations, destination, false).await; + remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false).await; } } else { if up_links[linkno as usize] { @@ -307,6 +311,7 @@ pub mod drtio { Ok(Packet::DestinationOkReply) => { destination_set_up(routing_table, up_destinations, destination, true).await; init_buffer_space(destination as u8, linkno).await; + remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, true).await; } Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), Err(e) => error!("[DEST#{}] communication failed ({})", destination, e), -- 2.42.0 From b8ba7ea92948d1013d38798f18584b788afc5573 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Mon, 27 Mar 2023 11:17:55 +0800 Subject: [PATCH 08/12] dma: fix nested block_on panic --- src/runtime/src/comms.rs | 2 +- src/runtime/src/rtio_dma.rs | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 89eab8f..814c693 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -329,7 +329,7 @@ async fn handle_run_kernel( } kernel::Message::DmaEraseRequest(name) => { // prevent possible OOM when we have large DMA record replacement. - rtio_dma::erase(name, aux_mutex, routing_table, timer); + rtio_dma::erase(name, aux_mutex, routing_table, timer).await; } kernel::Message::DmaGetRequest(name) => { let result = rtio_dma::retrieve(name); diff --git a/src/runtime/src/rtio_dma.rs b/src/runtime/src/rtio_dma.rs index a3a0da6..01f3c2d 100644 --- a/src/runtime/src/rtio_dma.rs +++ b/src/runtime/src/rtio_dma.rs @@ -235,14 +235,14 @@ pub mod remote_dma { trace_set.await_done(timeout, timer).await } - pub fn erase( + pub async fn erase( aux_mutex: &Rc>, routing_table: &RoutingTable, timer: GlobalTimer, id: u32 ) { let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; - task::block_on(trace_set.erase(aux_mutex, routing_table, timer)); + trace_set.erase(aux_mutex, routing_table, timer).await; unsafe { TRACES.remove(&id); } } @@ -304,6 +304,7 @@ pub fn put_record(mut recorder: DmaRecorder) -> u32 { // sends whole chunks, to limit comms/kernel CPU communication, // and as only comms core has access to varios DMA buffers. let mut ptr = 0; + recorder.buffer.push(0); while recorder.buffer[ptr] != 0 { // ptr + 3 = tgt >> 24 (destination) let len = recorder.buffer[ptr] as usize; @@ -347,13 +348,13 @@ pub fn put_record(mut recorder: DmaRecorder) -> u32 { ptr } -pub fn erase(name: String, _aux_mutex: &Rc>, +pub async fn erase(name: String, _aux_mutex: &Rc>, _routing_table: &RoutingTable, _timer: GlobalTimer ) { let _entry = DMA_RECORD_STORE.lock().remove(&name); #[cfg(has_drtio)] if let Some((id, _v, _d)) = _entry { - remote_dma::erase(_aux_mutex, _routing_table, _timer, id); + remote_dma::erase(_aux_mutex, _routing_table, _timer, id).await; } } -- 2.42.0 From 7bd5413fbe627da3f860381107eab2b59d69a663 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Mon, 27 Mar 2023 11:18:05 +0800 Subject: [PATCH 09/12] flake: update dependencies --- flake.lock | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/flake.lock b/flake.lock index bb02bff..31e0629 100644 --- a/flake.lock +++ b/flake.lock @@ -11,11 +11,11 @@ "src-pythonparser": "src-pythonparser" }, "locked": { - "lastModified": 1677033865, - "narHash": "sha256-9w9V+B6vMl4I2uX5k6lJc1FXhf4PeTCZXrq5Tdq2qHc=", - "ref": "refs/heads/master", - "rev": "d0437f5672b7dd5f898a73469d730bc46f058ecc", - "revCount": 8310, + "lastModified": 1679454985, + "narHash": "sha256-ObDDVR8hKh4lbT42qBdMxc+kIGh5/ORGM0YWGTyO4Sw=", + "ref": "master", + "rev": "e9a153b985d1c703338ad49bb0ac986ade29d21a", + "revCount": 8319, "type": "git", "url": "https://github.com/m-labs/artiq.git" }, @@ -84,11 +84,11 @@ "mozilla-overlay_2": { "flake": false, "locked": { - "lastModified": 1675354105, - "narHash": "sha256-ZAJGIZ7TjOCU7302lSUabNDz+rxM4If0l8/ZbE/7R5U=", + "lastModified": 1677493379, + "narHash": "sha256-A1gO8zlWLv3+tZ3cGVB1WYvvoN9pbFyv0xIJHcTsckw=", "owner": "mozilla", "repo": "nixpkgs-mozilla", - "rev": "85eb0ba7d8e5d6d4b79e5b0180aadbdd25d76404", + "rev": "78e723925daf5c9e8d0a1837ec27059e61649cb6", "type": "github" }, "original": { @@ -218,7 +218,7 @@ "locked": { "lastModified": 1669819016, "narHash": "sha256-WvNMUekL4Elc55RdqX8XP43QPnBrK8Rbd0bsoI61E5U=", - "ref": "refs/heads/master", + "ref": "master", "rev": "67dbb5932fa8ff5f143983476f741f945871d286", "revCount": 624, "type": "git", -- 2.42.0 From ca51cf7729040a9c8c18546f8a521235709d9b58 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Mon, 27 Mar 2023 14:55:12 +0800 Subject: [PATCH 10/12] erase remote ID if name is overwritten --- src/runtime/src/comms.rs | 2 +- src/runtime/src/rtio_dma.rs | 15 ++++++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 814c693..bafe253 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -323,7 +323,7 @@ async fn handle_run_kernel( .await; } kernel::Message::DmaPutRequest(recorder) => { - let _id = rtio_dma::put_record(recorder); + let _id = rtio_dma::put_record(aux_mutex, routing_table, timer, recorder).await; #[cfg(has_drtio)] rtio_dma::remote_dma::upload_traces(aux_mutex, routing_table, timer, _id).await; } diff --git a/src/runtime/src/rtio_dma.rs b/src/runtime/src/rtio_dma.rs index 01f3c2d..164acc8 100644 --- a/src/runtime/src/rtio_dma.rs +++ b/src/runtime/src/rtio_dma.rs @@ -293,7 +293,11 @@ pub mod remote_dma { } -pub fn put_record(mut recorder: DmaRecorder) -> u32 { +pub async fn put_record(_aux_mutex: &Rc>, + _routing_table: &RoutingTable, + _timer: GlobalTimer, + mut recorder: DmaRecorder, +) -> u32 { #[cfg(has_drtio)] let mut remote_traces: BTreeMap> = BTreeMap::new(); @@ -338,12 +342,17 @@ pub fn put_record(mut recorder: DmaRecorder) -> u32 { let ptr = recorder.buffer[padding..].as_ptr() as u32; - DMA_RECORD_STORE + let _old_record = DMA_RECORD_STORE .lock() .insert(recorder.name, (ptr, recorder.buffer, recorder.duration)); #[cfg(has_drtio)] - remote_dma::add_traces(ptr, remote_traces); + { + if let Some((old_id, _v, _d)) = _old_record { + remote_dma::erase(_aux_mutex, _routing_table, _timer, old_id).await; + } + remote_dma::add_traces(ptr, remote_traces); + } ptr } -- 2.42.0 From 4fa063d53d85f29a79f241616f12e3788f205d34 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Mon, 27 Mar 2023 15:15:02 +0800 Subject: [PATCH 11/12] drtio: aux_transact handles async playback status --- src/runtime/src/rtio_mgt.rs | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index dfff437..02564a0 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -66,7 +66,16 @@ pub mod drtio { } let _lock = aux_mutex.async_lock().await; drtioaux_async::send(linkno, request).await.unwrap(); - recv_aux_timeout(linkno, 200, timer).await + loop { + let reply = recv_aux_timeout(linkno, 200, timer).await; + match reply { + Ok(Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp }) => { + remote_dma::playback_done(id, destination, error, channel, timestamp).await; + }, + Ok(packet) => return Ok(packet), + Err(e) => return Err(e) + } + } } async fn drain_buffer(linkno: u8, draining_time: Milliseconds, timer: GlobalTimer) { @@ -458,25 +467,16 @@ pub mod drtio { timestamp: u64 ) -> Result<(), &'static str> { let linkno = routing_table.0[destination as usize][0] - 1; - let _lock = aux_mutex.async_lock().await; - drtioaux_async::send(linkno, &Packet::DmaPlaybackRequest{ - id: id, destination: destination, timestamp: timestamp }).await.unwrap(); - loop { - let reply = recv_aux_timeout(linkno, 200, timer).await; - match reply { - Ok(Packet::DmaPlaybackReply { succeeded: true }) => { return Ok(()) }, - Ok(Packet::DmaPlaybackReply { succeeded: false }) => { - return Err("error on DMA playback request") }, - // in case we received status from another destination - // but we want to get DmaPlaybackReply anyway, thus the loop - Ok(Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp }) => { - remote_dma::playback_done(id, destination, error, channel, timestamp).await; - }, - Ok(_) => { return Err("received unexpected aux packet while DMA playback") }, - Err(_) => { return Err("aux error on DMA playback") } - } + let reply = aux_transact(aux_mutex, linkno, &Packet::DmaPlaybackRequest{ + id: id, destination: destination, timestamp: timestamp }, timer).await; + match reply { + Ok(Packet::DmaPlaybackReply { succeeded: true }) => Ok(()), + Ok(Packet::DmaPlaybackReply { succeeded: false }) => Err("error on DMA playback request"), + Ok(_) => Err("received unexpected aux packet while DMA playback"), + Err(_) => Err("aux error on DMA playback") } } + } fn read_device_map(cfg: &Config) -> BTreeMap { -- 2.42.0 From c3817c6ad75c057001a09cb4a67da1babc17b9b3 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Mon, 27 Mar 2023 15:23:44 +0800 Subject: [PATCH 12/12] rtio_mgt: fix typo --- src/runtime/src/rtio_mgt.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 02564a0..0994554 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -472,7 +472,7 @@ pub mod drtio { match reply { Ok(Packet::DmaPlaybackReply { succeeded: true }) => Ok(()), Ok(Packet::DmaPlaybackReply { succeeded: false }) => Err("error on DMA playback request"), - Ok(_) => Err("received unexpected aux packet while DMA playback"), + Ok(_) => Err("received unexpected aux packet during DMA playback"), Err(_) => Err("aux error on DMA playback") } } -- 2.42.0