diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 5b53f88..b781756 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -158,6 +158,9 @@ async fn handle_run_kernel( stream: Option<&TcpStream>, control: &Rc>, _up_destinations: &Rc>, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer ) -> Result<()> { control.borrow_mut().tx.async_send(kernel::Message::StartRequest).await; loop { @@ -324,7 +327,7 @@ async fn handle_run_kernel( } kernel::Message::DmaEraseRequest(name) => { // prevent possible OOM when we have large DMA record replacement. - rtio_dma::erase(name); + rtio_dma::erase(name, aux_mutex, routing_table, timer); } kernel::Message::DmaGetRequest(name) => { let result = rtio_dma::retrieve(name); @@ -394,6 +397,9 @@ async fn handle_connection( stream: &mut TcpStream, control: Rc>, up_destinations: &Rc>, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer ) -> Result<()> { stream.set_ack_delay(None); @@ -417,7 +423,7 @@ async fn handle_connection( load_kernel(&buffer, &control, Some(stream)).await?; } Request::RunKernel => { - handle_run_kernel(Some(stream), &control, &up_destinations).await?; + handle_run_kernel(Some(stream), &control, &up_destinations, aux_mutex, routing_table, timer).await?; } _ => { error!("unexpected request from host: {:?}", request); @@ -484,7 +490,7 @@ pub fn main(timer: GlobalTimer, cfg: Config) { rtio_mgt::startup(&aux_mutex, &drtio_routing_table, &up_destinations, timer, &cfg); analyzer::start(); - moninj::start(timer, aux_mutex, drtio_routing_table); + moninj::start(timer, &aux_mutex, &drtio_routing_table); let control: Rc> = Rc::new(RefCell::new(kernel::Control::start())); let idle_kernel = Rc::new(cfg.read("idle_kernel").ok()); @@ -492,7 +498,8 @@ pub fn main(timer: GlobalTimer, cfg: Config) { info!("Loading startup kernel..."); if let Ok(()) = task::block_on(load_kernel(&buffer, &control, None)) { info!("Starting startup kernel..."); - let _ = task::block_on(handle_run_kernel(None, &control, &up_destinations)); + let routing_table = drtio_routing_table.borrow(); + let _ = task::block_on(handle_run_kernel(None, &control, &up_destinations, &aux_mutex, &routing_table, timer)); info!("Startup kernel finished!"); } else { error!("Error loading startup kernel!"); @@ -518,13 +525,16 @@ pub fn main(timer: GlobalTimer, cfg: Config) { let connection = connection.clone(); let terminate = terminate.clone(); let up_destinations = up_destinations.clone(); + let aux_mutex = aux_mutex.clone(); + let routing_table = drtio_routing_table.clone(); // we make sure the value of terminate is 0 before we start let _ = terminate.try_wait(); task::spawn(async move { + let routing_table = routing_table.borrow(); select_biased! { _ = (async { - let _ = handle_connection(&mut stream, control.clone(), &up_destinations) + let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) .await .map_err(|e| warn!("connection terminated: {}", e)); if let Some(buffer) = &*idle_kernel { @@ -532,7 +542,7 @@ pub fn main(timer: GlobalTimer, cfg: Config) { let _ = load_kernel(&buffer, &control, None) .await.map_err(|_| warn!("error loading idle kernel")); info!("Running idle kernel"); - let _ = handle_run_kernel(None, &control, &up_destinations) + let _ = handle_run_kernel(None, &control, &up_destinations, &aux_mutex, &routing_table, timer) .await.map_err(|_| warn!("error running idle kernel")); info!("Idle kernel terminated"); } diff --git a/src/runtime/src/moninj.rs b/src/runtime/src/moninj.rs index 966534c..4dd2e42 100644 --- a/src/runtime/src/moninj.rs +++ b/src/runtime/src/moninj.rs @@ -298,7 +298,9 @@ async fn handle_connection( } } -pub fn start(timer: GlobalTimer, aux_mutex: Rc>, routing_table: Rc>) { +pub fn start(timer: GlobalTimer, aux_mutex: &Rc>, routing_table: &Rc>) { + let aux_mutex = aux_mutex.clone(); + let routing_table = routing_table.clone(); task::spawn(async move { loop { let aux_mutex = aux_mutex.clone(); diff --git a/src/runtime/src/rtio_dma.rs b/src/runtime/src/rtio_dma.rs index 1bf125f..a3a0da6 100644 --- a/src/runtime/src/rtio_dma.rs +++ b/src/runtime/src/rtio_dma.rs @@ -1,7 +1,12 @@ -use alloc::{collections::BTreeMap, string::String, vec::Vec}; +use alloc::{collections::BTreeMap, string::String, vec::Vec, rc::Rc}; use libcortex_a9::{mutex::Mutex, cache::dcci_slice}; use libboard_zynq::timer::GlobalTimer; +use libboard_artiq::drtio_routing::RoutingTable; +#[cfg(has_drtio)] +use libasync::task; +#[cfg(has_drtio)] +use core::mem; use crate::kernel::DmaRecorder; const ALIGNMENT: usize = 16 * 8; @@ -11,10 +16,7 @@ static DMA_RECORD_STORE: Mutex, i64)>> = Mutex::n #[cfg(has_drtio)] pub mod remote_dma { use super::*; - use alloc::rc::Rc; - use libasync::task; use libboard_zynq::time::Milliseconds; - use libboard_artiq::drtio_routing::RoutingTable; use log::error; use crate::rtio_mgt::drtio; @@ -195,8 +197,9 @@ pub mod remote_dma { aux_mutex: &Rc>, routing_table: &RoutingTable, timer: GlobalTimer, - destination: u8, up: bool) { - // update state of the destination, resend traces if it's up + destination: u8, up: bool + ) { + // update state of the destination, resend traces if it's up if let Some(trace) = self.traces.lock().get_mut(&destination) { if up { match drtio::ddma_upload_trace( @@ -238,7 +241,7 @@ pub mod remote_dma { timer: GlobalTimer, id: u32 ) { - let mut trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; + let trace_set = unsafe { TRACES.get_mut(&id).unwrap() }; task::block_on(trace_set.erase(aux_mutex, routing_table, timer)); unsafe { TRACES.remove(&id); } } @@ -290,7 +293,36 @@ pub mod remote_dma { } -pub fn put_record(mut recorder: DmaRecorder) { +pub fn put_record(mut recorder: DmaRecorder) -> u32 { + #[cfg(has_drtio)] + let mut remote_traces: BTreeMap> = BTreeMap::new(); + + #[cfg(has_drtio)] + if recorder.enable_ddma { + let mut local_trace: Vec = Vec::new(); + // analyze each entry and put in proper buckets, as the kernel core + // sends whole chunks, to limit comms/kernel CPU communication, + // and as only comms core has access to varios DMA buffers. + let mut ptr = 0; + while recorder.buffer[ptr] != 0 { + // ptr + 3 = tgt >> 24 (destination) + let len = recorder.buffer[ptr] as usize; + let destination = recorder.buffer[ptr+3]; + if destination == 0 { + local_trace.extend(&recorder.buffer[ptr..ptr+len]); + } + else { + if let Some(remote_trace) = remote_traces.get_mut(&destination) { + remote_trace.extend(&recorder.buffer[ptr..ptr+len]); + } else { + remote_traces.insert(destination, recorder.buffer[ptr..ptr+len].to_vec()); + } + } + // and jump to the next event + ptr += len; + } + mem::swap(&mut recorder.buffer, &mut local_trace); + } // trailing zero to indicate end of buffer recorder.buffer.push(0); recorder.buffer.reserve(ALIGNMENT - 1); @@ -308,10 +340,21 @@ pub fn put_record(mut recorder: DmaRecorder) { DMA_RECORD_STORE .lock() .insert(recorder.name, (ptr, recorder.buffer, recorder.duration)); + + #[cfg(has_drtio)] + remote_dma::add_traces(ptr, remote_traces); + + ptr } -pub fn erase(name: String) { - DMA_RECORD_STORE.lock().remove(&name); +pub fn erase(name: String, _aux_mutex: &Rc>, + _routing_table: &RoutingTable, _timer: GlobalTimer +) { + let _entry = DMA_RECORD_STORE.lock().remove(&name); + #[cfg(has_drtio)] + if let Some((id, _v, _d)) = _entry { + remote_dma::erase(_aux_mutex, _routing_table, _timer, id); + } } pub fn retrieve(name: String) -> Option<(i32, i64)> {