From c9e3771cd5990aafef4480cd11a96b50de7d22d5 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Fri, 29 Dec 2023 15:10:04 +0800 Subject: [PATCH] subkernels: add support for (d)dma --- artiq/firmware/ksupport/lib.rs | 29 +- .../firmware/libproto_artiq/drtioaux_proto.rs | 8 +- artiq/firmware/runtime/rtio_dma.rs | 4 +- artiq/firmware/runtime/rtio_mgt.rs | 4 +- artiq/firmware/satman/dma.rs | 317 ++++++++++++++++-- artiq/firmware/satman/kernel.rs | 179 ++++++---- artiq/firmware/satman/main.rs | 22 +- artiq/firmware/satman/routing.rs | 60 +++- 8 files changed, 529 insertions(+), 94 deletions(-) diff --git a/artiq/firmware/ksupport/lib.rs b/artiq/firmware/ksupport/lib.rs index 6f313dd6b..3cd2052ec 100644 --- a/artiq/firmware/ksupport/lib.rs +++ b/artiq/firmware/ksupport/lib.rs @@ -453,12 +453,39 @@ extern fn dma_playback(timestamp: i64, ptr: i32, _uses_ddma: bool) { } } -#[cfg(not(kernel_has_rtio_dma))] +#[cfg(all(not(kernel_has_rtio_dma), not(has_rtio_dma)))] #[unwind(allowed)] extern fn dma_playback(_timestamp: i64, _ptr: i32, _uses_ddma: bool) { unimplemented!("not(kernel_has_rtio_dma)") } +// for satellite (has_rtio_dma but not in kernel) +#[cfg(all(not(kernel_has_rtio_dma), has_rtio_dma))] +#[unwind(allowed)] +extern fn dma_playback(timestamp: i64, ptr: i32, _uses_ddma: bool) { + // DDMA is always used on satellites, so the `uses_ddma` setting is ignored + // StartRemoteRequest reused as "normal" start request + send(&DmaStartRemoteRequest { id: ptr as i32, timestamp: timestamp }); + // skip awaitremoterequest - it's a given + recv!(&DmaAwaitRemoteReply { timeout, error, channel, timestamp } => { + if timeout { + raise!("DMAError", + "Error running DMA on satellite device, timed out waiting for results"); + } + if error & 1 != 0 { + raise!("RTIOUnderflow", + "RTIO underflow at channel {rtio_channel_info:0}, {1} mu", + channel as i64, timestamp as i64, 0); + } + if error & 2 != 0 { + raise!("RTIODestinationUnreachable", + "RTIO destination unreachable, output, at channel {rtio_channel_info:0}, {1} mu", + channel as i64, timestamp as i64, 0); + } + }); +} + + #[unwind(allowed)] extern fn subkernel_load_run(id: u32, destination: u8, run: bool) { send(&SubkernelLoadRunRequest { id: id, destination: destination, run: run }); diff --git a/artiq/firmware/libproto_artiq/drtioaux_proto.rs b/artiq/firmware/libproto_artiq/drtioaux_proto.rs index 1157e864c..f58217f9c 100644 --- a/artiq/firmware/libproto_artiq/drtioaux_proto.rs +++ b/artiq/firmware/libproto_artiq/drtioaux_proto.rs @@ -113,7 +113,7 @@ pub enum Packet { id: u32, status: PayloadStatus, length: u16, trace: [u8; MASTER_PAYLOAD_MAX_SIZE] }, - DmaAddTraceReply { destination: u8, succeeded: bool }, + DmaAddTraceReply { source: u8, destination: u8, id: u32, succeeded: bool }, DmaRemoveTraceRequest { source: u8, destination: u8, id: u32 }, DmaRemoveTraceReply { destination: u8, succeeded: bool }, DmaPlaybackRequest { source: u8, destination: u8, id: u32, timestamp: u64 }, @@ -303,7 +303,9 @@ impl Packet { } }, 0xb1 => Packet::DmaAddTraceReply { + source: reader.read_u8()?, destination: reader.read_u8()?, + id: reader.read_u32()?, succeeded: reader.read_bool()? }, 0xb2 => Packet::DmaRemoveTraceRequest { @@ -598,9 +600,11 @@ impl Packet { writer.write_u16(length)?; writer.write_all(&trace[0..length as usize])?; }, - Packet::DmaAddTraceReply { destination, succeeded } => { + Packet::DmaAddTraceReply { source, destination, id, succeeded } => { writer.write_u8(0xb1)?; + writer.write_u8(source)?; writer.write_u8(destination)?; + writer.write_u32(id)?; writer.write_bool(succeeded)?; }, Packet::DmaRemoveTraceRequest { source, destination, id } => { diff --git a/artiq/firmware/runtime/rtio_dma.rs b/artiq/firmware/runtime/rtio_dma.rs index 63bf563a6..666986919 100644 --- a/artiq/firmware/runtime/rtio_dma.rs +++ b/artiq/firmware/runtime/rtio_dma.rs @@ -167,10 +167,10 @@ pub mod remote_dma { } pub fn playback_done(io: &Io, ddma_mutex: &Mutex, - id: u32, destination: u8, error: u8, channel: u32, timestamp: u64) { + id: u32, source: u8, error: u8, channel: u32, timestamp: u64) { // called upon receiving PlaybackDone aux packet let _lock = ddma_mutex.lock(io).unwrap(); - let mut trace = unsafe { TRACES.get_mut(&id).unwrap().get_mut(&destination).unwrap() }; + let mut trace = unsafe { TRACES.get_mut(&id).unwrap().get_mut(&source).unwrap() }; trace.state = RemoteState::PlaybackEnded { error: error, channel: channel, diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index 5a9263e55..55d97d532 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -491,8 +491,8 @@ pub mod drtio { &drtioaux::Packet::DmaAddTraceRequest { id: id, source: 0, destination: destination, status: status, length: len as u16, trace: *slice})?; match reply { - drtioaux::Packet::DmaAddTraceReply { destination: 0, succeeded: true } => Ok(()), - drtioaux::Packet::DmaAddTraceReply { destination: 0, succeeded: false } => Err(Error::DmaAddTraceFail(destination)), + drtioaux::Packet::DmaAddTraceReply { destination: 0, succeeded: true, .. } => Ok(()), + drtioaux::Packet::DmaAddTraceReply { destination: 0, succeeded: false, .. } => Err(Error::DmaAddTraceFail(destination)), packet => Err(Error::UnexpectedPacket(packet)), } }) diff --git a/artiq/firmware/satman/dma.rs b/artiq/firmware/satman/dma.rs index b22be573a..4fc7a1393 100644 --- a/artiq/firmware/satman/dma.rs +++ b/artiq/firmware/satman/dma.rs @@ -1,7 +1,11 @@ +use alloc::{vec::Vec, collections::btree_map::BTreeMap, string::String}; +use core::mem; +use board_artiq::{drtioaux, drtio_routing::RoutingTable}; use board_misoc::{csr, cache::flush_l2_cache}; use proto_artiq::drtioaux_proto::PayloadStatus; -use alloc::{vec::Vec, collections::btree_map::BTreeMap}; -use ::{cricon_select, RtioMaster}; +use routing::{Router, Sliceable}; +use kernel::Manager as KernelManager; +use ::{cricon_select, RtioMaster, MASTER_PAYLOAD_MAX_SIZE}; const ALIGNMENT: usize = 64; @@ -19,17 +23,158 @@ pub struct RtioStatus { pub timestamp: u64 } +#[derive(Debug)] pub enum Error { IdNotFound, PlaybackInProgress, - EntryNotComplete + EntryNotComplete, + MasterDmaFound, + UploadFail, } #[derive(Debug)] struct Entry { trace: Vec, padding_len: usize, - complete: bool + complete: bool, + duration: u64, // relevant for locally ran DMA +} + +impl Entry { + pub fn from_vec(data: Vec, duration: u64) -> Entry { + let mut entry = Entry { + trace: data, + padding_len: 0, + complete: true, + duration: duration, + }; + entry.realign(); + entry + } + + pub fn id(&self) -> u32 { + self.trace[self.padding_len..].as_ptr() as u32 + } + + pub fn realign(&mut self) { + self.trace.push(0); + let data_len = self.trace.len(); + + self.trace.reserve(ALIGNMENT - 1); + let padding = ALIGNMENT - self.trace.as_ptr() as usize % ALIGNMENT; + let padding = if padding == ALIGNMENT { 0 } else { padding }; + for _ in 0..padding { + // Vec guarantees that this will not reallocate + self.trace.push(0) + } + for i in 1..data_len + 1 { + self.trace[data_len + padding - i] = self.trace[data_len - i] + } + self.complete = true; + self.padding_len = padding; + } +} + +#[derive(Debug)] +enum RemoteTraceState { + Unsent, + Sending(usize), + Ready, + Running(usize), +} + +#[derive(Debug)] +struct RemoteTraces { + remote_traces: BTreeMap, + state: RemoteTraceState, +} + +impl RemoteTraces { + pub fn new(traces: BTreeMap) -> RemoteTraces { + RemoteTraces { + remote_traces: traces, + state: RemoteTraceState::Unsent + } + } + + // on subkernel request + pub fn upload_traces(&mut self, id: u32, router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) -> usize { + let len = self.remote_traces.len(); + if len > 0 { + self.state = RemoteTraceState::Sending(self.remote_traces.len()); + for (dest, trace) in self.remote_traces.iter_mut() { + // queue up the first packet for all destinations, rest will be sent after first ACK + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + let meta = trace.get_slice_master(&mut data_slice); + router.route(drtioaux::Packet::DmaAddTraceRequest { + source: self_destination, destination: *dest, id: id, + status: meta.status, length: meta.len, trace: data_slice + }, routing_table, rank, self_destination); + } + } + len + } + + // on incoming Packet::DmaAddTraceReply + pub fn ack_upload(&mut self, kernel_manager: &mut KernelManager, source: u8, id: u32, succeeded: bool, router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) { + if let RemoteTraceState::Sending(count) = self.state { + if let Some(trace) = self.remote_traces.get_mut(&source) { + if trace.at_end() { + if count - 1 == 0 { + self.state = RemoteTraceState::Ready; + kernel_manager.ddma_remote_uploaded(succeeded); + } else { + self.state = RemoteTraceState::Sending(count - 1); + } + } else { + // send next slice + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + let meta = trace.get_slice_master(&mut data_slice); + router.route(drtioaux::Packet::DmaAddTraceRequest { + source: self_destination, destination: meta.destination, id: id, + status: meta.status, length: meta.len, trace: data_slice + }, routing_table, rank, self_destination); + } + } + } + + } + + // on subkernel request + pub fn playback(&mut self, id: u32, timestamp: u64, router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) { + // route all the playback requests + // remote traces + local trace + self.state = RemoteTraceState::Running(self.remote_traces.len() + 1); + for (dest, _) in self.remote_traces.iter() { + router.route(drtioaux::Packet::DmaPlaybackRequest { + source: self_destination, destination: *dest, id: id, timestamp: timestamp + }, routing_table, rank, self_destination); + // response will be ignored (succeeded = false handled by the main thread) + } + } + + // on incoming Packet::DmaPlaybackDone + pub fn remote_finished(&mut self, kernel_manager: &mut KernelManager, error: u8, channel: u32, timestamp: u64) { + if let RemoteTraceState::Running(count) = self.state { + if error != 0 || count - 1 == 0 { + // notify the kernel about a DDMA error or finish + kernel_manager.ddma_finished(error, channel, timestamp); + self.state = RemoteTraceState::Ready; + // further messages will be ignored (if there was an error) + } else { // no error and not the last one awaited + self.state = RemoteTraceState::Running(count - 1); + } + } + } + + pub fn erase(&mut self, id: u32, router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) { + for (dest, _) in self.remote_traces.iter() { + router.route(drtioaux::Packet::DmaRemoveTraceRequest { + source: self_destination, destination: *dest, id: id + }, routing_table, rank, self_destination); + // response will be ignored as this object will stop existing too + } + } } #[derive(Debug)] @@ -37,7 +182,12 @@ pub struct Manager { entries: BTreeMap<(u8, u32), Entry>, state: ManagerState, current_id: u32, - current_source: u8 + current_source: u8, + + remote_entries: BTreeMap, + name_map: BTreeMap, + recording_trace: Vec, + recording_name: String } impl Manager { @@ -52,6 +202,10 @@ impl Manager { current_id: 0, current_source: 0, state: ManagerState::Idle, + remote_entries: BTreeMap::new(), + name_map: BTreeMap::new(), + recording_trace: Vec::new(), + recording_name: String::new(), } } @@ -67,7 +221,9 @@ impl Manager { self.entries.insert((source, id), Entry { trace: Vec::new(), padding_len: 0, - complete: false }); + complete: false, + duration: 0 + }); self.entries.get_mut(&(source, id)).unwrap() } else { entry @@ -77,34 +233,122 @@ impl Manager { self.entries.insert((source, id), Entry { trace: Vec::new(), padding_len: 0, - complete: false }); + complete: false, + duration: 0, + }); self.entries.get_mut(&(source, id)).unwrap() }, }; entry.trace.extend(&trace[0..trace_len]); if status.is_last() { - entry.trace.push(0); - let data_len = entry.trace.len(); - - // Realign. - entry.trace.reserve(ALIGNMENT - 1); - let padding = ALIGNMENT - entry.trace.as_ptr() as usize % ALIGNMENT; - let padding = if padding == ALIGNMENT { 0 } else { padding }; - for _ in 0..padding { - // Vec guarantees that this will not reallocate - entry.trace.push(0) - } - for i in 1..data_len + 1 { - entry.trace[data_len + padding - i] = entry.trace[data_len - i] - } - entry.complete = true; - entry.padding_len = padding; + entry.realign(); flush_l2_cache(); } Ok(()) } + // API for subkernel + pub fn record_start(&mut self, name: &str) { + self.recording_name = String::from(name); + self.recording_trace = Vec::new(); + } + // API for subkernel + pub fn record_append(&mut self, data: &[u8]) { + self.recording_trace.extend_from_slice(data); + } + + // API for subkernel + pub fn record_stop(&mut self, duration: u64, self_destination: u8) -> Result { + let mut trace = Vec::new(); + mem::swap(&mut self.recording_trace, &mut trace); + trace.push(0); + let mut local_trace = Vec::new(); + let mut remote_traces: BTreeMap = BTreeMap::new(); + // analyze each entry and put in proper buckets, as the kernel core + // sends whole chunks, to limit comms/kernel CPU communication, + // and as only comms core has access to varios DMA buffers. + let mut ptr = 0; + while trace[ptr] != 0 { + // ptr + 3 = tgt >> 24 (destination) + let len = trace[ptr] as usize; + let destination = trace[ptr+3]; + if destination == 0 { + return Err(Error::MasterDmaFound); + } else if destination == self_destination { + local_trace.extend(&trace[ptr..ptr+len]); + } + else { + if let Some(remote_trace) = remote_traces.get_mut(&destination) { + remote_trace.extend(&trace[ptr..ptr+len]); + } else { + remote_traces.insert(destination, Sliceable::new(destination, trace[ptr..ptr+len].to_vec())); + } + } + // and jump to the next event + ptr += len; + } + let local_entry = Entry::from_vec(local_trace, duration); + let id = local_entry.id(); + + self.entries.insert((self_destination, id), local_entry); + self.remote_entries.insert(id, RemoteTraces::new(remote_traces)); + let mut name = String::new(); + mem::swap(&mut self.recording_name, &mut name); + self.name_map.insert(name, id); + + flush_l2_cache(); + + Ok(id) + } + + pub fn upload_traces(&mut self, id: u32, router: &mut Router, rank: u8, self_destination: u8, + routing_table: &RoutingTable) -> Result { + let remote_traces = self.remote_entries.get_mut(&id); + let mut len = 0; + if let Some(traces) = remote_traces { + len = traces.upload_traces(id, router, rank, self_destination, routing_table); + } + Ok(len) + } + + pub fn with_trace(&self, self_destination: u8, name: &str, f: F) -> R + where F: FnOnce(Option<&[u8]>, u64) -> R { + if let Some(ptr) = self.name_map.get(name) { + match self.entries.get(&(self_destination, *ptr)) { + Some(entry) => f(Some(&entry.trace[entry.padding_len..]), entry.duration), + None => f(None, 0) + } + } else { + f(None, 0) + } + } + + // API for subkernel + pub fn playback_remote(&mut self, id: u32, timestamp: u64, + router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable + ) -> Result<(), Error> { + if let Some(traces) = self.remote_entries.get_mut(&id) { + traces.playback(id, timestamp, router, rank, self_destination, routing_table); + Ok(()) + } else { + Err(Error::IdNotFound) + } + } + + // API for subkernel + pub fn erase_name(&mut self, name: &str, router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) { + if let Some(id) = self.name_map.get(name) { + if let Some(traces) = self.remote_entries.get_mut(&id) { + traces.erase(*id, router, rank, self_destination, routing_table); + self.remote_entries.remove(&id); + } + self.entries.remove(&(self_destination, *id)); + self.name_map.remove(name); + } + } + + // API for incoming DDMA (drtio) pub fn erase(&mut self, source: u8, id: u32) -> Result<(), Error> { match self.entries.remove(&(source, id)) { Some(_) => Ok(()), @@ -112,6 +356,33 @@ impl Manager { } } + pub fn remote_finished(&mut self, kernel_manager: &mut KernelManager, + id: u32, error: u8, channel: u32, timestamp: u64) { + if let Some(entry) = self.remote_entries.get_mut(&id) { + entry.remote_finished(kernel_manager, error, channel, timestamp); + } + } + + pub fn ack_upload(&mut self, kernel_manager: &mut KernelManager, source: u8, id: u32, succeeded: bool, + router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) { + if let Some(entry) = self.remote_entries.get_mut(&id) { + entry.ack_upload(kernel_manager, source, id, succeeded, router, rank, self_destination, routing_table); + } + } + + pub fn cleanup(&mut self, router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) { + // after subkernel ends, remove all self-generated traces + for (_, id) in self.name_map.iter_mut() { + if let Some(traces) = self.remote_entries.get_mut(&id) { + traces.erase(*id, router, rank, self_destination, routing_table); + self.remote_entries.remove(&id); + } + self.entries.remove(&(self_destination, *id)); + } + self.name_map.clear(); + } + + // API for both incoming DDMA (drtio) and subkernel pub fn playback(&mut self, source: u8, id: u32, timestamp: u64) -> Result<(), Error> { if self.state != ManagerState::Idle { return Err(Error::PlaybackInProgress); diff --git a/artiq/firmware/satman/kernel.rs b/artiq/firmware/satman/kernel.rs index 0b9762385..d4cc226cb 100644 --- a/artiq/firmware/satman/kernel.rs +++ b/artiq/firmware/satman/kernel.rs @@ -1,4 +1,4 @@ -use core::{mem, option::NoneError, cmp::min}; +use core::{mem, option::NoneError}; use alloc::{string::String, format, vec::Vec, collections::{btree_map::BTreeMap, vec_deque::VecDeque}}; use cslice::AsCSlice; @@ -15,7 +15,8 @@ use kernel::eh_artiq::StackPointerBacktrace; use ::{cricon_select, RtioMaster}; use cache::Cache; -use routing::Router; +use dma::{Manager as DmaManager, Error as DmaError}; +use routing::{Router, Sliceable, SliceMeta}; use SAT_PAYLOAD_MAX_SIZE; use MASTER_PAYLOAD_MAX_SIZE; @@ -65,7 +66,9 @@ enum KernelState { MsgAwait { max_time: u64, tags: Vec }, MsgSending, SubkernelAwaitLoad, - SubkernelAwaitFinish { max_time: u64, id: u32 } + SubkernelAwaitFinish { max_time: u64, id: u32 }, + DmaUploading { max_time: u64 }, + DmaAwait { max_time: u64 }, } #[derive(Debug)] @@ -78,7 +81,8 @@ pub enum Error { AwaitingMessage, SubkernelIoError, DrtioError, - KernelException(Sliceable) + KernelException(Sliceable), + DmaError(DmaError), } impl From for Error { @@ -99,16 +103,14 @@ impl From> for Error { } } -macro_rules! unexpected { - ($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*)))); +impl From for Error { + fn from(value: DmaError) -> Error { + Error::DmaError(value) + } } -/* represents data that has to be sent to Master */ -#[derive(Debug)] -pub struct Sliceable { - it: usize, - data: Vec, - destination: u8 +macro_rules! unexpected { + ($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*)))); } /* represents interkernel messages */ @@ -164,44 +166,6 @@ pub struct SubkernelFinished { pub source: u8 } -pub struct SliceMeta { - pub destination: u8, - pub len: u16, - pub status: PayloadStatus -} - -macro_rules! get_slice_fn { - ( $name:tt, $size:expr ) => { - pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { - let first = self.it == 0; - let len = min($size, self.data.len() - self.it); - let last = self.it + len == self.data.len(); - let status = PayloadStatus::from_status(first, last); - data_slice[..len].clone_from_slice(&self.data[self.it..self.it+len]); - self.it += len; - - SliceMeta { - destination: self.destination, - len: len as u16, - status: status - } - } - }; -} - -impl Sliceable { - pub fn new(destination: u8, data: Vec) -> Sliceable { - Sliceable { - it: 0, - data: data, - destination: destination - } - } - - get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE); - get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE); -} - impl MessageManager { pub fn new() -> MessageManager { MessageManager { @@ -312,10 +276,8 @@ impl Session { fn running(&self) -> bool { match self.kernel_state { - KernelState::Absent | KernelState::Loaded => false, - KernelState::Running | KernelState::MsgAwait { .. } | - KernelState::MsgSending | KernelState::SubkernelAwaitLoad | - KernelState::SubkernelAwaitFinish { .. } => true + KernelState::Absent | KernelState::Loaded => false, + _ => true } } @@ -490,7 +452,39 @@ impl Manager { } } - pub fn process_kern_requests(&mut self, router: &mut Router, routing_table: &RoutingTable, rank: u8, destination: u8) { + pub fn ddma_finished(&mut self, error: u8, channel: u32, timestamp: u64) { + if let KernelState::DmaAwait { .. } = self.session.kernel_state { + kern_send(&kern::DmaAwaitRemoteReply { + timeout: false, error: error, channel: channel, timestamp: timestamp + }).unwrap(); + self.session.kernel_state = KernelState::Running; + } + } + + pub fn ddma_nack(&mut self) { + // for simplicity treat it as a timeout for now... + if let KernelState::DmaAwait { .. } = self.session.kernel_state { + kern_send(&kern::DmaAwaitRemoteReply { + timeout: true, error: 0, channel: 0, timestamp: 0 + }).unwrap(); + self.session.kernel_state = KernelState::Running; + } + } + + pub fn ddma_remote_uploaded(&mut self, succeeded: bool) { + if let KernelState::DmaUploading { .. } = self.session.kernel_state { + if succeeded { + self.session.kernel_state = KernelState::Running; + kern_acknowledge().unwrap(); + } else { + self.stop(); + self.runtime_exception(Error::DmaError(DmaError::UploadFail)); + } + + } + } + + pub fn process_kern_requests(&mut self, router: &mut Router, routing_table: &RoutingTable, rank: u8, destination: u8, dma_manager: &mut DmaManager) { macro_rules! finished { ($with_exception:expr) => {{ Some(SubkernelFinished { source: self.session.source, id: self.current_id, @@ -504,6 +498,7 @@ impl Manager { destination: subkernel_finished.source, id: subkernel_finished.id, with_exception: subkernel_finished.with_exception, exception_src: subkernel_finished.exception_source }, &routing_table, rank, destination); + dma_manager.cleanup(router, rank, destination, routing_table); } if !self.is_running() { @@ -528,7 +523,7 @@ impl Manager { } } - match self.process_kern_message(router, routing_table, rank, destination) { + match self.process_kern_message(router, routing_table, rank, destination, dma_manager) { Ok(Some(with_exception)) => { self.last_finished = finished!(with_exception) }, @@ -585,6 +580,20 @@ impl Manager { } Ok(()) } + KernelState::DmaAwait { max_time } => { + if clock::get_ms() > *max_time { + kern_send(&kern::DmaAwaitRemoteReply { timeout: true, error: 0, channel: 0, timestamp: 0 })?; + self.session.kernel_state = KernelState::Running; + } + // ddma_finished() and nack() covers the other case + Ok(()) + } + KernelState::DmaUploading { max_time } => { + if clock::get_ms() > *max_time { + unexpected!("DMAError: Timed out sending traces to remote"); + } + Ok(()) + } _ => Ok(()) } } @@ -622,13 +631,19 @@ impl Manager { fn process_kern_message(&mut self, router: &mut Router, routing_table: &RoutingTable, - rank: u8, destination: u8 + rank: u8, destination: u8, + dma_manager: &mut DmaManager ) -> Result, Error> { // returns Ok(with_exception) on finish // None if the kernel is still running kern_recv(|request| { match (request, &self.session.kernel_state) { - (&kern::LoadReply(_), KernelState::Loaded) => { + (&kern::LoadReply(_), KernelState::Loaded) | + (_, KernelState::DmaUploading { .. }) | + (_, KernelState::DmaAwait { .. }) | + (_, KernelState::MsgSending) | + (_, KernelState::SubkernelAwaitLoad) | + (_, KernelState::SubkernelAwaitFinish { .. }) => { // We're standing by; ignore the message. return Ok(None) } @@ -693,6 +708,50 @@ impl Manager { return Ok(Some(true)) } + &kern::DmaRecordStart(name) => { + dma_manager.record_start(name); + kern_acknowledge() + } + &kern::DmaRecordAppend(data) => { + dma_manager.record_append(data); + kern_acknowledge() + } + &kern::DmaRecordStop { duration, enable_ddma: _ } => { + // ddma is always used on satellites + if let Ok(id) = dma_manager.record_stop(duration, destination) { + let remote_count = dma_manager.upload_traces(id, router, rank, destination, routing_table)?; + if remote_count > 0 { + let max_time = clock::get_ms() + 10_000 as u64; + self.session.kernel_state = KernelState::DmaUploading { max_time: max_time }; + Ok(()) + } else { + kern_acknowledge() + } + } else { + unexpected!("DMAError: found an unsupported call to RTIO devices on master") + } + } + &kern::DmaEraseRequest { name } => { + dma_manager.erase_name(name, router, rank, destination, routing_table); + kern_acknowledge() + } + &kern::DmaRetrieveRequest { name } => { + dma_manager.with_trace(destination, name, |trace, duration| { + kern_send(&kern::DmaRetrieveReply { + trace: trace, + duration: duration, + uses_ddma: true, + }) + }) + } + &kern::DmaStartRemoteRequest { id, timestamp } => { + let max_time = clock::get_ms() + 10_000 as u64; + self.session.kernel_state = KernelState::DmaAwait { max_time: max_time }; + dma_manager.playback_remote(id as u32, timestamp as u64, router, rank, destination, routing_table)?; + dma_manager.playback(destination, id as u32, timestamp as u64)?; + Ok(()) + } + &kern::SubkernelMsgSend { id: _, destination: msg_dest, count, tag, data } => { let dest = match msg_dest { Some(dest) => dest, @@ -717,13 +776,13 @@ impl Manager { router.route(drtioaux::Packet::SubkernelLoadRunRequest { source: destination, destination: sk_destination, id: id, run: run }, routing_table, rank, destination); - kern_acknowledge() + Ok(()) } &kern::SubkernelAwaitFinishRequest{ id, timeout } => { let max_time = clock::get_ms() + timeout as u64; self.session.kernel_state = KernelState::SubkernelAwaitFinish { max_time: max_time, id: id }; - kern_acknowledge() + Ok(()) } request => unexpected!("unexpected request {:?} from kernel CPU", request) diff --git a/artiq/firmware/satman/main.rs b/artiq/firmware/satman/main.rs index eec0bab8f..a496afe3e 100644 --- a/artiq/firmware/satman/main.rs +++ b/artiq/firmware/satman/main.rs @@ -377,9 +377,14 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg *self_destination = destination; let succeeded = dmamgr.add(source, id, status, &trace, length as usize).is_ok(); router.send(drtioaux::Packet::DmaAddTraceReply { - destination: source, succeeded: succeeded + source: *self_destination, destination: source, id: id, succeeded: succeeded }, _routing_table, *rank, *self_destination) } + drtioaux::Packet::DmaAddTraceReply { source, destination: _destination, id, succeeded } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet); + dmamgr.ack_upload(kernelmgr, source, id, succeeded, router, *rank, *self_destination, _routing_table); + Ok(()) + } drtioaux::Packet::DmaRemoveTraceRequest { source, destination: _destination, id } => { forward!(_routing_table, _destination, *rank, _repeaters, &packet); let succeeded = dmamgr.erase(source, id).is_ok(); @@ -395,6 +400,18 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg destination: source, succeeded: succeeded }, _routing_table, *rank, *self_destination) } + drtioaux::Packet::DmaPlaybackReply { destination: _destination, succeeded } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet); + if !succeeded { + kernelmgr.ddma_nack(); + } + Ok(()) + } + drtioaux::Packet::DmaPlaybackStatus { source: _, destination: _destination, id, error, channel, timestamp } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet); + dmamgr.remote_finished(kernelmgr, id, error, channel, timestamp); + Ok(()) + } drtioaux::Packet::SubkernelAddDataRequest { destination, id, status, length, data } => { forward!(_routing_table, destination, *rank, _repeaters, &packet); @@ -426,7 +443,6 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg kernelmgr.subkernel_load_run_reply(succeeded, *self_destination); Ok(()) } - // { destination: u8, id: u32, with_exception: bool, exception_src: u8 }, drtioaux::Packet::SubkernelFinished { destination: _destination, id, with_exception, exception_src } => { forward!(_routing_table, _destination, *rank, _repeaters, &packet); kernelmgr.remote_subkernel_finished(id, with_exception, exception_src); @@ -772,7 +788,7 @@ pub extern fn main() -> i32 { }, &routing_table, rank, destination); } - kernelmgr.process_kern_requests(&mut router, &routing_table, rank, destination); + kernelmgr.process_kern_requests(&mut router, &routing_table, rank, destination, &mut dma_manager); if let Some((repno, packet)) = router.get_downstream_packet() { if let Err(e) = repeaters[repno].aux_send(&packet) { diff --git a/artiq/firmware/satman/routing.rs b/artiq/firmware/satman/routing.rs index 118ad11af..6867575ea 100644 --- a/artiq/firmware/satman/routing.rs +++ b/artiq/firmware/satman/routing.rs @@ -1,6 +1,64 @@ -use alloc::collections::vec_deque::VecDeque; +use alloc::{vec::Vec, collections::vec_deque::VecDeque}; use board_artiq::{drtioaux, drtio_routing}; use board_misoc::csr; +use core::cmp::min; +use proto_artiq::drtioaux_proto::PayloadStatus; +use SAT_PAYLOAD_MAX_SIZE; +use MASTER_PAYLOAD_MAX_SIZE; + +/* represents data that has to be sent with the aux protocol */ +#[derive(Debug)] +pub struct Sliceable { + it: usize, + data: Vec, + destination: u8 +} + +pub struct SliceMeta { + pub destination: u8, + pub len: u16, + pub status: PayloadStatus +} + +macro_rules! get_slice_fn { + ( $name:tt, $size:expr ) => { + pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { + let first = self.it == 0; + let len = min($size, self.data.len() - self.it); + let last = self.it + len == self.data.len(); + let status = PayloadStatus::from_status(first, last); + data_slice[..len].clone_from_slice(&self.data[self.it..self.it+len]); + self.it += len; + + SliceMeta { + destination: self.destination, + len: len as u16, + status: status + } + } + }; +} + +impl Sliceable { + pub fn new(destination: u8, data: Vec) -> Sliceable { + Sliceable { + it: 0, + data: data, + destination: destination + } + } + + pub fn at_end(&self) -> bool { + self.it == self.data.len() + } + + pub fn extend(&mut self, data: &[u8]) { + self.data.extend(data); + } + + get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE); + get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE); +} // Packets from downstream (further satellites) are received and routed appropriately. // they're passed as soon as possible downstream (within the subtree), or sent upstream,