diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 63b5a55..6a65b7c 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -1,8 +1,7 @@ use core::fmt; use core::cell::RefCell; use core::str::Utf8Error; -use alloc::rc::Rc; -use alloc::{vec, vec::Vec, string::String}; +use alloc::{vec, vec::Vec, string::String, collections::BTreeMap, rc::Rc}; use log::{info, warn, error}; use num_derive::{FromPrimitive, ToPrimitive}; @@ -18,7 +17,7 @@ use libboard_zynq::{ }, timer::GlobalTimer, }; -use libcortex_a9::semaphore::Semaphore; +use libcortex_a9::{semaphore::Semaphore, mutex::Mutex}; use futures::{select_biased, future::FutureExt}; use libasync::{smoltcp::{Sockets, TcpStream}, task}; @@ -83,6 +82,9 @@ enum Reply { ClockFailure = 15, } +static CACHE_STORE: Mutex>> = Mutex::new(BTreeMap::new()); +static DMA_RECORD_STORE: Mutex, i64)>> = Mutex::new(BTreeMap::new()); + async fn write_header(stream: &TcpStream, reply: Reply) -> Result<()> { stream.send([0x5a, 0x5a, 0x5a, 0x5a, reply.to_u8().unwrap()].iter().copied()).await?; Ok(()) @@ -223,6 +225,25 @@ async fn handle_run_kernel(stream: Option<&TcpStream>, control: &Rc { + CACHE_STORE.lock().insert(key, value); + }, + kernel::Message::CacheGetRequest(key) => { + const DEFAULT: Vec = Vec::new(); + let value = CACHE_STORE.lock().get(&key).unwrap_or(&DEFAULT).clone(); + control.borrow_mut().tx.async_send(kernel::Message::CacheGetReply(value)).await; + }, + kernel::Message::DmaPutRequest(recorder) => { + DMA_RECORD_STORE.lock().insert(recorder.name, (recorder.buffer, recorder.duration)); + }, + kernel::Message::DmaEraseRequest(name) => { + // prevent possible OOM when we have large DMA record replacement. + DMA_RECORD_STORE.lock().remove(&name); + }, + kernel::Message::DmaGetRequest(name) => { + let result = DMA_RECORD_STORE.lock().get(&name).map(|v| v.clone()); + control.borrow_mut().tx.async_send(kernel::Message::DmaGetReply(result)).await; + }, _ => { panic!("unexpected message from core1 while kernel was running: {:?}", reply); } diff --git a/src/runtime/src/kernel/cache.rs b/src/runtime/src/kernel/cache.rs index 8ae27f6..56b2d09 100644 --- a/src/runtime/src/kernel/cache.rs +++ b/src/runtime/src/kernel/cache.rs @@ -1,78 +1,27 @@ -use alloc::{string::String, vec::Vec, collections::BTreeMap}; -use libcortex_a9::mutex::Mutex; +use alloc::string::String; use cslice::{CSlice, AsCSlice}; use core::mem::transmute; -use core::str; - -use crate::artiq_raise; - - -#[derive(Debug)] -struct Entry { - data: Vec, - borrowed: bool -} - -#[derive(Debug)] -struct Cache { - entries: BTreeMap -} - -impl Cache { - pub const fn new() -> Cache { - Cache { entries: BTreeMap::new() } - } - - pub fn get(&mut self, key: &str) -> *const [i32] { - match self.entries.get_mut(key) { - None => &[], - Some(ref mut entry) => { - entry.borrowed = true; - &entry.data[..] - } - } - } - - pub fn put(&mut self, key: &str, data: &[i32]) -> Result<(), ()> { - match self.entries.get_mut(key) { - None => (), - Some(ref mut entry) => { - if entry.borrowed { return Err(()) } - entry.data = Vec::from(data); - return Ok(()) - } - } - - self.entries.insert(String::from(key), Entry { - data: Vec::from(data), - borrowed: false - }); - Ok(()) - } - - pub unsafe fn unborrow(&mut self) { - for (_key, entry) in self.entries.iter_mut() { - entry.borrowed = false; - } - } -} - -static CACHE: Mutex = Mutex::new(Cache::new()); +use super::{KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0, Message}; pub extern fn get(key: CSlice) -> CSlice<'static, i32> { - let value = CACHE.lock().get(str::from_utf8(key.as_ref()).unwrap()); - unsafe { - transmute::<*const [i32], &'static [i32]>(value).as_c_slice() + let key = String::from_utf8(key.as_ref().to_vec()).unwrap(); + KERNEL_CHANNEL_1TO0.lock().as_mut().unwrap().send(Message::CacheGetRequest(key)); + let msg = KERNEL_CHANNEL_0TO1.lock().as_mut().unwrap().recv(); + if let Message::CacheGetReply(v) = msg { + let slice = v.as_c_slice(); + // we intentionally leak the memory here, + // which does not matter as core1 would restart + unsafe { + transmute(slice) + } + } else { + panic!("Expected CacheGetReply for CacheGetRequest"); } } pub extern fn put(key: CSlice, list: CSlice) { - let result = CACHE.lock().put(str::from_utf8(key.as_ref()).unwrap(), list.as_ref()); - if result.is_err() { - artiq_raise!("CacheError", "cannot put into a busy cache row"); - } + let key = String::from_utf8(key.as_ref().to_vec()).unwrap(); + let value = list.as_ref().to_vec(); + KERNEL_CHANNEL_1TO0.lock().as_mut().unwrap().send(Message::CachePutRequest(key, value)); } -pub unsafe fn unborrow() { - CACHE.lock().unborrow(); -} diff --git a/src/runtime/src/kernel/core1.rs b/src/runtime/src/kernel/core1.rs index 9bc2ad0..ac0632b 100644 --- a/src/runtime/src/kernel/core1.rs +++ b/src/runtime/src/kernel/core1.rs @@ -22,7 +22,7 @@ use super::{ KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0, KERNEL_IMAGE, Message, - cache + dma, }; // linker symbols @@ -149,6 +149,7 @@ pub fn main_core1() { unsafe { core0_tx.reset(); core1_tx.reset(); + dma::init_dma_recorder(); } *CHANNEL_0TO1.lock() = Some(core0_tx); *CHANNEL_1TO0.lock() = Some(core0_rx); @@ -186,7 +187,6 @@ pub fn main_core1() { core1_rx = core::mem::replace(&mut *KERNEL_CHANNEL_0TO1.lock(), None).unwrap(); core1_tx = core::mem::replace(&mut *KERNEL_CHANNEL_1TO0.lock(), None).unwrap(); } - unsafe { cache::unborrow(); } info!("kernel finished"); core1_tx.send(Message::KernelFinished); } @@ -197,8 +197,6 @@ pub fn main_core1() { /// Called by eh_artiq pub fn terminate(exception: &'static eh_artiq::Exception<'static>, backtrace: &'static mut [usize]) -> ! { - unsafe { cache::unborrow(); } - let load_addr = unsafe { KERNEL_IMAGE.as_ref().unwrap().get_load_addr() }; diff --git a/src/runtime/src/kernel/dma.rs b/src/runtime/src/kernel/dma.rs index 9dcb643..b8f435f 100644 --- a/src/runtime/src/kernel/dma.rs +++ b/src/runtime/src/kernel/dma.rs @@ -3,110 +3,15 @@ use crate::{ artiq_raise, rtio, }; -use alloc::{vec::Vec, string::String, collections::BTreeMap, str}; +use alloc::{vec::Vec, string::String, boxed::Box}; use cslice::CSlice; -use super::KERNEL_IMAGE; +use super::{KERNEL_IMAGE, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0, Message}; use core::mem; use log::debug; -use libcortex_a9::{ - cache::dcci_slice, - asm::dsb, -}; +use libcortex_a9::cache::dcci_slice; const ALIGNMENT: usize = 16 * 8; -const DMA_BUFFER_SIZE: usize = 16 * 8 * 1024; - -struct DmaRecorder { - active: bool, - data_len: usize, - buffer: [u8; DMA_BUFFER_SIZE], -} - -static mut DMA_RECORDER: DmaRecorder = DmaRecorder { - active: false, - data_len: 0, - buffer: [0; DMA_BUFFER_SIZE], -}; - -#[derive(Debug)] -struct Entry { - trace: Vec, - padding_len: usize, - duration: u64 -} - -#[derive(Debug)] -pub struct Manager { - entries: BTreeMap, - recording_name: String, - recording_trace: Vec -} - -// Copied from https://github.com/m-labs/artiq/blob/master/artiq/firmware/runtime/rtio_dma.rs -// basically without modification except removing some warnings. -impl Manager { - pub const fn new() -> Manager { - Manager { - entries: BTreeMap::new(), - recording_name: String::new(), - recording_trace: Vec::new(), - } - } - - pub fn record_start(&mut self, name: &str) { - self.recording_name = String::from(name); - self.recording_trace = Vec::new(); - - // or we could needlessly OOM replacing a large trace - self.entries.remove(name); - } - - pub fn record_append(&mut self, data: &[u8]) { - self.recording_trace.extend_from_slice(data); - } - - pub fn record_stop(&mut self, duration: u64) { - let mut trace = Vec::new(); - mem::swap(&mut self.recording_trace, &mut trace); - trace.push(0); - let data_len = trace.len(); - - // Realign. - trace.reserve(ALIGNMENT - 1); - let padding = ALIGNMENT - trace.as_ptr() as usize % ALIGNMENT; - let padding = if padding == ALIGNMENT { 0 } else { padding }; - for _ in 0..padding { - // Vec guarantees that this will not reallocate - trace.push(0) - } - for i in 1..data_len + 1 { - trace[data_len + padding - i] = trace[data_len - i] - } - - let mut name = String::new(); - mem::swap(&mut self.recording_name, &mut name); - self.entries.insert(name, Entry { - trace, duration, - padding_len: padding, - }); - } - - pub fn erase(&mut self, name: &str) { - self.entries.remove(name); - } - - pub fn with_trace(&self, name: &str, f: F) -> R - where F: FnOnce(Option<&[u8]>, u64) -> R { - match self.entries.get(name) { - Some(entry) => f(Some(&entry.trace[entry.padding_len..]), entry.duration), - None => f(None, 0) - } - } -} - - -static mut DMA_MANAGER: Manager = Manager::new(); #[repr(C)] pub struct DmaTrace { @@ -114,18 +19,26 @@ pub struct DmaTrace { address: i32, } -fn dma_record_flush() { - unsafe { - DMA_MANAGER.record_append(&DMA_RECORDER.buffer[..DMA_RECORDER.data_len]); - DMA_RECORDER.data_len = 0; - } +#[derive(Clone, Debug)] +pub struct DmaRecorder { + pub name: String, + pub buffer: Vec, + pub duration: i64, +} + +static mut RECORDER: Option = None; + +pub unsafe fn init_dma_recorder() { + // as static would remain after restart, we have to reset it, + // without running its destructor. + mem::forget(mem::replace(&mut RECORDER, None)); } pub extern fn dma_record_start(name: CSlice) { - let name = str::from_utf8(name.as_ref()).unwrap(); - + let name = String::from_utf8(name.as_ref().to_vec()).unwrap(); + KERNEL_CHANNEL_1TO0.lock().as_mut().unwrap().send(Message::DmaEraseRequest(name.clone())); unsafe { - if DMA_RECORDER.active { + if RECORDER.is_some() { artiq_raise!("DMAError", "DMA is already recording") } @@ -135,16 +48,17 @@ pub extern fn dma_record_start(name: CSlice) { library.rebind(b"rtio_output_wide", dma_record_output_wide as *const ()).unwrap(); - DMA_RECORDER.active = true; - DMA_MANAGER.record_start(name); + RECORDER = Some(DmaRecorder { + name, + buffer: Vec::new(), + duration: 0, + }); } } -pub extern fn dma_record_stop(duration: i64) { +pub extern fn dma_record_stop(_: i64) { unsafe { - dma_record_flush(); - - if !DMA_RECORDER.active { + if RECORDER.is_none() { artiq_raise!("DMAError", "DMA is not recording") } @@ -154,29 +68,22 @@ pub extern fn dma_record_stop(duration: i64) { library.rebind(b"rtio_output_wide", rtio::output_wide as *const ()).unwrap(); - DMA_RECORDER.active = false; - DMA_MANAGER.record_stop(duration as u64); + KERNEL_CHANNEL_1TO0.lock().as_mut().unwrap().send( + Message::DmaPutRequest(RECORDER.take().unwrap()) + ); } } #[inline(always)] unsafe fn dma_record_output_prepare(timestamp: i64, target: i32, - words: usize) -> &'static mut [u8] { + words: usize) { // See gateware/rtio/dma.py. const HEADER_LENGTH: usize = /*length*/1 + /*channel*/3 + /*timestamp*/8 + /*address*/1; let length = HEADER_LENGTH + /*data*/words * 4; - if DMA_RECORDER.buffer.len() - DMA_RECORDER.data_len < length { - dma_record_flush() - } - - let record = &mut DMA_RECORDER.buffer[DMA_RECORDER.data_len.. - DMA_RECORDER.data_len + length]; - DMA_RECORDER.data_len += length; - - let (header, data) = record.split_at_mut(HEADER_LENGTH); - - header.copy_from_slice(&[ + let buffer = &mut RECORDER.as_mut().unwrap().buffer; + buffer.reserve(length); + buffer.extend_from_slice(&[ (length >> 0) as u8, (target >> 8) as u8, (target >> 16) as u8, @@ -191,15 +98,13 @@ unsafe fn dma_record_output_prepare(timestamp: i64, target: i32, (timestamp >> 56) as u8, (target >> 0) as u8, ]); - - data } pub extern fn dma_record_output(target: i32, word: i32) { unsafe { let timestamp = rtio::now_mu(); - let data = dma_record_output_prepare(timestamp, target, 1); - data.copy_from_slice(&[ + dma_record_output_prepare(timestamp, target, 1); + RECORDER.as_mut().unwrap().buffer.extend_from_slice(&[ (word >> 0) as u8, (word >> 8) as u8, (word >> 16) as u8, @@ -213,53 +118,60 @@ pub extern fn dma_record_output_wide(target: i32, words: CSlice) { unsafe { let timestamp = rtio::now_mu(); - let mut data = dma_record_output_prepare(timestamp, target, words.len()); + dma_record_output_prepare(timestamp, target, words.len()); + let buffer = &mut RECORDER.as_mut().unwrap().buffer; for word in words.as_ref().iter() { - data[..4].copy_from_slice(&[ + buffer.extend_from_slice(&[ (word >> 0) as u8, (word >> 8) as u8, (word >> 16) as u8, (word >> 24) as u8, ]); - data = &mut data[4..]; } } } pub extern fn dma_erase(name: CSlice) { - let name = str::from_utf8(name.as_ref()).unwrap(); - - unsafe { - DMA_MANAGER.erase(name); - }; + let name = String::from_utf8(name.as_ref().to_vec()).unwrap(); + KERNEL_CHANNEL_1TO0.lock().as_mut().unwrap().send(Message::DmaEraseRequest(name)); } pub extern fn dma_retrieve(name: CSlice) -> DmaTrace { - let name = str::from_utf8(name.as_ref()).unwrap(); - - let (trace, duration) = unsafe { - DMA_MANAGER.with_trace(name, |trace, duration| (trace.map(|v| { - dcci_slice(v); - dsb(); - v.as_ptr() - }), duration)) - }; - match trace { - Some(ptr) => Ok(DmaTrace { - address: ptr as i32, - duration: duration as i64, - }), - None => Err(()) - }.unwrap_or_else(|_| { - artiq_raise!("DMAError", "DMA trace not found"); - }) + let name = String::from_utf8(name.as_ref().to_vec()).unwrap(); + KERNEL_CHANNEL_1TO0.lock().as_mut().unwrap().send(Message::DmaGetRequest(name)); + match KERNEL_CHANNEL_0TO1.lock().as_mut().unwrap().recv() { + Message::DmaGetReply(None) => (), + Message::DmaGetReply(Some((mut v, duration))) => { + v.reserve(ALIGNMENT - 1); + let original_length = v.len(); + let padding = ALIGNMENT - v.as_ptr() as usize % ALIGNMENT; + let padding = if padding == ALIGNMENT { 0 } else { padding }; + for _ in 0..padding { + v.push(0); + } + v.copy_within(0..original_length, padding); + let v = Box::new(v); + let address = Box::into_raw(v) as *mut Vec as i32; + return DmaTrace { + address, + duration, + }; + }, + _ => panic!("Expected DmaGetReply after DmaGetRequest!"), + } + // we have to defer raising error as we have to drop the message first... + artiq_raise!("DMAError", "DMA trace not found"); } pub extern fn dma_playback(timestamp: i64, ptr: i32) { - assert!(ptr % ALIGNMENT as i32 == 0); - debug!("DMA playback started"); unsafe { + let v = Box::from_raw(ptr as *mut Vec); + let padding = ALIGNMENT - v.as_ptr() as usize % ALIGNMENT; + let padding = if padding == ALIGNMENT { 0 } else { padding }; + dcci_slice(&v[padding..]); + let ptr = v.as_ptr().add(padding) as i32; + csr::rtio_dma::base_address_write(ptr as u32); csr::rtio_dma::time_offset_write(timestamp as u64); @@ -268,6 +180,7 @@ pub extern fn dma_playback(timestamp: i64, ptr: i32) { while csr::rtio_dma::enable_read() != 0 {} csr::cri_con::selected_write(0); + mem::drop(v); debug!("DMA playback finished"); let error = csr::rtio_dma::error_read(); diff --git a/src/runtime/src/kernel/mod.rs b/src/runtime/src/kernel/mod.rs index c2bb35b..6605fce 100644 --- a/src/runtime/src/kernel/mod.rs +++ b/src/runtime/src/kernel/mod.rs @@ -10,6 +10,7 @@ pub mod core1; mod api; mod rpc; mod dma; +pub use dma::DmaRecorder; mod cache; #[derive(Debug, Clone)] @@ -34,6 +35,15 @@ pub enum Message { RpcSend { is_async: bool, data: Vec }, RpcRecvRequest(*mut ()), RpcRecvReply(Result), + + CacheGetRequest(String), + CacheGetReply(Vec), + CachePutRequest(String, Vec), + + DmaPutRequest(DmaRecorder), + DmaEraseRequest(String), + DmaGetRequest(String), + DmaGetReply(Option<(Vec, i64)>), } static CHANNEL_0TO1: Mutex>> = Mutex::new(None);