diff --git a/src/libboard_artiq/src/drtioaux_proto.rs b/src/libboard_artiq/src/drtioaux_proto.rs index 3995f87..160f49e 100644 --- a/src/libboard_artiq/src/drtioaux_proto.rs +++ b/src/libboard_artiq/src/drtioaux_proto.rs @@ -207,7 +207,9 @@ pub enum Packet { trace: [u8; MASTER_PAYLOAD_MAX_SIZE], }, DmaAddTraceReply { + source: u8, destination: u8, + id: u32, succeeded: bool, }, DmaRemoveTraceRequest { @@ -456,7 +458,9 @@ impl Packet { } } 0xb1 => Packet::DmaAddTraceReply { + source: reader.read_u8()?, destination: reader.read_u8()?, + id: reader.read_u32()?, succeeded: reader.read_bool()?, }, 0xb2 => Packet::DmaRemoveTraceRequest { @@ -788,9 +792,16 @@ impl Packet { writer.write_u16(length)?; writer.write_all(&trace[0..length as usize])?; } - Packet::DmaAddTraceReply { destination, succeeded } => { + Packet::DmaAddTraceReply { + source, + destination, + id, + succeeded, + } => { writer.write_u8(0xb1)?; + writer.write_u8(source)?; writer.write_u8(destination)?; + writer.write_u32(id)?; writer.write_bool(succeeded)?; } Packet::DmaRemoveTraceRequest { diff --git a/src/runtime/src/rtio_dma.rs b/src/runtime/src/rtio_dma.rs index 9f4298f..e5f5194 100644 --- a/src/runtime/src/rtio_dma.rs +++ b/src/runtime/src/rtio_dma.rs @@ -142,9 +142,9 @@ pub mod remote_dma { } } - pub async fn playback_done(&mut self, destination: u8, error: u8, channel: u32, timestamp: u64) { + pub async fn playback_done(&mut self, source: u8, error: u8, channel: u32, timestamp: u64) { let mut traces_locked = self.traces.async_lock().await; - let mut trace = traces_locked.get_mut(&destination).unwrap(); + let mut trace = traces_locked.get_mut(&source).unwrap(); trace.state = RemoteState::PlaybackEnded { error: error, channel: channel, diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 06740d5..e9de1bc 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -560,10 +560,12 @@ pub mod drtio { Packet::DmaAddTraceReply { destination: 0, succeeded: true, + .. } => Ok(()), Packet::DmaAddTraceReply { destination: 0, succeeded: false, + .. } => Err("error adding trace on satellite"), _ => Err("adding DMA trace failed, unexpected aux packet"), }, diff --git a/src/satman/src/dma.rs b/src/satman/src/dma.rs index bb06f6f..9a36634 100644 --- a/src/satman/src/dma.rs +++ b/src/satman/src/dma.rs @@ -1,7 +1,13 @@ -use alloc::{collections::btree_map::BTreeMap, vec::Vec}; +use alloc::{collections::btree_map::BTreeMap, string::String, vec::Vec}; +use core::mem; -use libboard_artiq::{drtioaux_proto::PayloadStatus, pl::csr}; +use ksupport::kernel::DmaRecorder; +use libboard_artiq::{drtio_routing::RoutingTable, + drtioaux_proto::{Packet, PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}, + pl::csr}; use libcortex_a9::cache::dcci_slice; +use routing::{Router, Sliceable}; +use subkernel::Manager as KernelManager; const ALIGNMENT: usize = 64; @@ -19,10 +25,13 @@ pub struct RtioStatus { pub timestamp: u64, } +#[derive(Debug)] pub enum Error { IdNotFound, PlaybackInProgress, EntryNotComplete, + MasterDmaFound, + UploadFail, } #[derive(Debug)] @@ -30,6 +39,217 @@ struct Entry { trace: Vec, padding_len: usize, complete: bool, + duration: i64, // relevant for local DMA +} + +impl Entry { + pub fn from_vec(data: Vec, duration: i64) -> Entry { + let mut entry = Entry { + trace: data, + padding_len: 0, + complete: true, + duration: duration, + }; + entry.realign(); + entry + } + + pub fn id(&self) -> u32 { + self.trace[self.padding_len..].as_ptr() as u32 + } + + pub fn realign(&mut self) { + self.trace.push(0); + let data_len = self.trace.len(); + + self.trace.reserve(ALIGNMENT - 1); + let padding = ALIGNMENT - self.trace.as_ptr() as usize % ALIGNMENT; + let padding = if padding == ALIGNMENT { 0 } else { padding }; + for _ in 0..padding { + // Vec guarantees that this will not reallocate + self.trace.push(0) + } + for i in 1..data_len + 1 { + self.trace[data_len + padding - i] = self.trace[data_len - i] + } + self.complete = true; + self.padding_len = padding; + + dcci_slice(&self.trace); + } +} + +#[derive(Debug)] +enum RemoteTraceState { + Unsent, + Sending(usize), + Ready, + Running(usize), +} + +#[derive(Debug)] +struct RemoteTraces { + remote_traces: BTreeMap, + state: RemoteTraceState, +} + +impl RemoteTraces { + pub fn new(traces: BTreeMap) -> RemoteTraces { + RemoteTraces { + remote_traces: traces, + state: RemoteTraceState::Unsent, + } + } + + // on subkernel request + pub fn upload_traces( + &mut self, + id: u32, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) -> usize { + let len = self.remote_traces.len(); + if len > 0 { + self.state = RemoteTraceState::Sending(self.remote_traces.len()); + for (dest, trace) in self.remote_traces.iter_mut() { + // queue up the first packet for all destinations, rest will be sent after first ACK + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + let meta = trace.get_slice_master(&mut data_slice); + router.route( + Packet::DmaAddTraceRequest { + source: self_destination, + destination: *dest, + id: id, + status: meta.status, + length: meta.len, + trace: data_slice, + }, + routing_table, + rank, + self_destination, + ); + } + } + len + } + + // on incoming Packet::DmaAddTraceReply + pub fn ack_upload( + &mut self, + kernel_manager: &mut KernelManager, + source: u8, + id: u32, + succeeded: bool, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) { + if let RemoteTraceState::Sending(count) = self.state { + if let Some(trace) = self.remote_traces.get_mut(&source) { + if trace.at_end() { + if count - 1 == 0 { + self.state = RemoteTraceState::Ready; + if let Some((id, timestamp)) = kernel_manager.ddma_remote_uploaded(succeeded) { + self.playback(id, timestamp, router, rank, self_destination, routing_table); + } + } else { + self.state = RemoteTraceState::Sending(count - 1); + } + } else { + // send next slice + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + let meta = trace.get_slice_master(&mut data_slice); + router.route( + Packet::DmaAddTraceRequest { + source: self_destination, + destination: meta.destination, + id: id, + status: meta.status, + length: meta.len, + trace: data_slice, + }, + routing_table, + rank, + self_destination, + ); + } + } + } + } + + // on subkernel request + pub fn playback( + &mut self, + id: u32, + timestamp: u64, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) { + // route all the playback requests + // remote traces (local trace runs on core1 unlike mainline firmware) + self.state = RemoteTraceState::Running(self.remote_traces.len()); + for (dest, _) in self.remote_traces.iter() { + router.route( + Packet::DmaPlaybackRequest { + source: self_destination, + destination: *dest, + id: id, + timestamp: timestamp, + }, + routing_table, + rank, + self_destination, + ); + // response will be ignored (succeeded = false handled by the main thread) + } + } + + // on incoming Packet::DmaPlaybackDone + pub fn remote_finished(&mut self, kernel_manager: &mut KernelManager, error: u8, channel: u32, timestamp: u64) { + if let RemoteTraceState::Running(count) = self.state { + if error != 0 || count - 1 == 0 { + // notify the kernel about a DDMA error or finish + kernel_manager.ddma_finished(error, channel, timestamp); + self.state = RemoteTraceState::Ready; + // further messages will be ignored (if there was an error) + } else { + // no error and not the last one awaited + self.state = RemoteTraceState::Running(count - 1); + } + } + } + + pub fn erase( + &mut self, + id: u32, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) { + for (dest, _) in self.remote_traces.iter() { + router.route( + Packet::DmaRemoveTraceRequest { + source: self_destination, + destination: *dest, + id: id, + }, + routing_table, + rank, + self_destination, + ); + // response will be ignored as this object will stop existing too + } + } + + pub fn has_remote_traces(&self) -> bool { + self.remote_traces.len() > 0 + } } #[derive(Debug)] @@ -38,6 +258,9 @@ pub struct Manager { state: ManagerState, current_id: u32, current_source: u8, + + remote_entries: BTreeMap, + name_map: BTreeMap, } impl Manager { @@ -50,6 +273,8 @@ impl Manager { current_id: 0, current_source: 0, state: ManagerState::Idle, + remote_entries: BTreeMap::new(), + name_map: BTreeMap::new(), } } @@ -72,6 +297,7 @@ impl Manager { trace: Vec::new(), padding_len: 0, complete: false, + duration: 0, }, ); self.entries.get_mut(&(source, id)).unwrap() @@ -86,6 +312,7 @@ impl Manager { trace: Vec::new(), padding_len: 0, complete: false, + duration: 0, }, ); self.entries.get_mut(&(source, id)).unwrap() @@ -94,27 +321,12 @@ impl Manager { entry.trace.extend(&trace[0..trace_len]); if status.is_last() { - entry.trace.push(0); - let data_len = entry.trace.len(); - - // Realign. - entry.trace.reserve(ALIGNMENT - 1); - let padding = ALIGNMENT - entry.trace.as_ptr() as usize % ALIGNMENT; - let padding = if padding == ALIGNMENT { 0 } else { padding }; - for _ in 0..padding { - // Vec guarantees that this will not reallocate - entry.trace.push(0) - } - for i in 1..data_len + 1 { - entry.trace[data_len + padding - i] = entry.trace[data_len - i] - } - entry.complete = true; - entry.padding_len = padding; - dcci_slice(&entry.trace); + entry.realign(); } Ok(()) } + // api for DRTIO pub fn erase(&mut self, source: u8, id: u32) -> Result<(), Error> { match self.entries.remove(&(source, id)) { Some(_) => Ok(()), @@ -122,6 +334,168 @@ impl Manager { } } + // API for subkernel + pub fn erase_name( + &mut self, + name: &str, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) { + if let Some(id) = self.name_map.get(name) { + if let Some(traces) = self.remote_entries.get_mut(&id) { + traces.erase(*id, router, rank, self_destination, routing_table); + self.remote_entries.remove(&id); + } + self.entries.remove(&(self_destination, *id)); + self.name_map.remove(name); + } + } + + pub fn remote_finished( + &mut self, + kernel_manager: &mut KernelManager, + id: u32, + error: u8, + channel: u32, + timestamp: u64, + ) { + if let Some(entry) = self.remote_entries.get_mut(&id) { + entry.remote_finished(kernel_manager, error, channel, timestamp); + } + } + + pub fn ack_upload( + &mut self, + kernel_manager: &mut KernelManager, + source: u8, + id: u32, + succeeded: bool, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) { + if let Some(entry) = self.remote_entries.get_mut(&id) { + entry.ack_upload( + kernel_manager, + source, + id, + succeeded, + router, + rank, + self_destination, + routing_table, + ); + } + } + + // API for subkernel + pub fn upload_traces( + &mut self, + id: u32, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) -> Result { + let remote_traces = self.remote_entries.get_mut(&id); + let mut len = 0; + if let Some(traces) = remote_traces { + len = traces.upload_traces(id, router, rank, self_destination, routing_table); + } + Ok(len) + } + + // API for subkernel + pub fn playback_remote( + &mut self, + id: u32, + timestamp: u64, + router: &mut Router, + rank: u8, + self_destination: u8, + routing_table: &RoutingTable, + ) -> Result<(), Error> { + if let Some(traces) = self.remote_entries.get_mut(&id) { + traces.playback(id, timestamp, router, rank, self_destination, routing_table); + Ok(()) + } else { + Err(Error::IdNotFound) + } + } + + // API for subkernel + pub fn cleanup(&mut self, router: &mut Router, rank: u8, self_destination: u8, routing_table: &RoutingTable) { + // after subkernel ends, remove all self-generated traces + for (_, id) in self.name_map.iter_mut() { + if let Some(traces) = self.remote_entries.get_mut(&id) { + traces.erase(*id, router, rank, self_destination, routing_table); + self.remote_entries.remove(&id); + } + self.entries.remove(&(self_destination, *id)); + } + self.name_map.clear(); + } + + // API for subkernel + pub fn retrieve(&self, self_destination: u8, name: &String) -> Option<(i32, i64, bool)> { + let id = self.name_map.get(name)?; + let duration = self.entries.get(&(self_destination, *id))?.duration; + let uses_ddma = self.has_remote_traces(*id); + Some((*id as i32, duration, uses_ddma)) + } + + pub fn has_remote_traces(&self, id: u32) -> bool { + match self.remote_entries.get(&id) { + Some(traces) => traces.has_remote_traces(), + _ => false, + } + } + + pub fn put_record(&mut self, mut recorder: DmaRecorder, self_destination: u8) -> Result { + let mut remote_traces: BTreeMap = BTreeMap::new(); + + let mut local_trace: Vec = Vec::new(); + // analyze each entry and put in proper buckets, as the kernel core + // sends whole chunks, to limit comms/kernel CPU communication, + // and as only comms core has access to varios DMA buffers. + let mut ptr = 0; + recorder.buffer.push(0); + while recorder.buffer[ptr] != 0 { + // ptr + 3 = tgt >> 24 (destination) + let len = recorder.buffer[ptr] as usize; + let destination = recorder.buffer[ptr + 3]; + if destination == 0 { + return Err(Error::MasterDmaFound); + } else if destination == self_destination { + local_trace.extend(&recorder.buffer[ptr..ptr + len]); + } else { + if let Some(remote_trace) = remote_traces.get_mut(&destination) { + remote_trace.extend(&recorder.buffer[ptr..ptr + len]); + } else { + remote_traces.insert( + destination, + Sliceable::new(destination, recorder.buffer[ptr..ptr + len].to_vec()), + ); + } + } + // and jump to the next event + ptr += len; + } + let local_entry = Entry::from_vec(local_trace, recorder.duration); + + let id = local_entry.id(); + self.entries.insert((self_destination, id), local_entry); + self.remote_entries.insert(id, RemoteTraces::new(remote_traces)); + let mut name = String::new(); + mem::swap(&mut recorder.name, &mut name); + self.name_map.insert(name, id); + + Ok(id) + } + pub fn playback(&mut self, source: u8, id: u32, timestamp: u64) -> Result<(), Error> { if self.state != ManagerState::Idle { return Err(Error::PlaybackInProgress); diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index 5447db6..fab1218 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -483,7 +483,9 @@ fn process_aux_packet( let succeeded = dma_manager.add(source, id, status, &trace, length as usize).is_ok(); router.send( drtioaux::Packet::DmaAddTraceReply { + source: *self_destination, destination: source, + id: id, succeeded: succeeded, }, _routing_table, @@ -491,6 +493,25 @@ fn process_aux_packet( *self_destination, ) } + drtioaux::Packet::DmaAddTraceReply { + source, + destination: _destination, + id, + succeeded, + } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + dma_manager.ack_upload( + kernel_manager, + source, + id, + succeeded, + router, + *rank, + *self_destination, + _routing_table, + ); + Ok(()) + } drtioaux::Packet::DmaRemoveTraceRequest { source, destination: _destination, @@ -508,6 +529,13 @@ fn process_aux_packet( *self_destination, ) } + drtioaux::Packet::DmaRemoveTraceReply { + destination: _destination, + succeeded: _, + } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + Ok(()) + } drtioaux::Packet::DmaPlaybackRequest { source, destination: _destination, @@ -530,6 +558,28 @@ fn process_aux_packet( *self_destination, ) } + drtioaux::Packet::DmaPlaybackReply { + destination: _destination, + succeeded, + } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + if !succeeded { + kernel_manager.ddma_nack(); + } + Ok(()) + } + drtioaux::Packet::DmaPlaybackStatus { + source: _, + destination: _destination, + id, + error, + channel, + timestamp, + } => { + forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); + dma_manager.remote_finished(kernel_manager, id, error, channel, timestamp); + Ok(()) + } drtioaux::Packet::SubkernelAddDataRequest { destination, @@ -649,8 +699,8 @@ fn process_aux_packet( Ok(()) } - _ => { - warn!("received unexpected aux packet"); + p => { + warn!("received unexpected aux packet: {:?}", p); Ok(()) } } @@ -949,8 +999,16 @@ pub extern "C" fn main_core0() -> i32 { ); } - kernel_manager.process_kern_requests(&mut router, &routing_table, rank, destination, &timer); + kernel_manager.process_kern_requests( + &mut router, + &routing_table, + rank, + destination, + &mut dma_manager, + &timer, + ); + #[cfg(has_drtio_routing)] if let Some((repno, packet)) = router.get_downstream_packet() { if let Err(e) = repeaters[repno].aux_send(&packet) { warn!("[REP#{}] Error when sending packet to satellite ({:?})", repno, e) diff --git a/src/satman/src/repeater.rs b/src/satman/src/repeater.rs index faa5068..e7f6b78 100644 --- a/src/satman/src/repeater.rs +++ b/src/satman/src/repeater.rs @@ -347,7 +347,15 @@ impl Repeater { Repeater::default() } - pub fn service(&self, _routing_table: &drtio_routing::RoutingTable, _rank: u8, _timer: &mut GlobalTimer) {} + pub fn service( + &self, + _routing_table: &drtio_routing::RoutingTable, + _rank: u8, + _destination: u8, + _router: &mut Router, + _timer: &mut GlobalTimer, + ) { + } pub fn sync_tsc(&self, _timer: &mut GlobalTimer) -> Result<(), drtioaux::Error> { Ok(()) diff --git a/src/satman/src/routing.rs b/src/satman/src/routing.rs index 74482af..46f8e7f 100644 --- a/src/satman/src/routing.rs +++ b/src/satman/src/routing.rs @@ -1,6 +1,65 @@ -use alloc::collections::vec_deque::VecDeque; +use alloc::{collections::vec_deque::VecDeque, vec::Vec}; +use core::cmp::min; -use libboard_artiq::{drtio_routing, drtioaux, pl::csr}; +#[cfg(has_drtio_routing)] +use libboard_artiq::pl::csr; +use libboard_artiq::{drtio_routing, drtioaux, + drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}}; + +pub struct SliceMeta { + pub destination: u8, + pub len: u16, + pub status: PayloadStatus, +} + +/* represents data that has to be sent to Master */ +#[derive(Debug)] +pub struct Sliceable { + it: usize, + data: Vec, + destination: u8, +} + +macro_rules! get_slice_fn { + ($name:tt, $size:expr) => { + pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { + let first = self.it == 0; + let len = min($size, self.data.len() - self.it); + let last = self.it + len == self.data.len(); + let status = PayloadStatus::from_status(first, last); + + data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]); + self.it += len; + + SliceMeta { + destination: self.destination, + len: len as u16, + status: status, + } + } + }; +} + +impl Sliceable { + pub fn new(destination: u8, data: Vec) -> Sliceable { + Sliceable { + it: 0, + data: data, + destination: destination, + } + } + + pub fn at_end(&self) -> bool { + self.it == self.data.len() + } + + pub fn extend(&mut self, data: &[u8]) { + self.data.extend(data); + } + + get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE); + get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE); +} // Packets from downstream (further satellites) are received and routed appropriately. // they're passed as soon as possible downstream (within the subtree), or sent upstream, @@ -14,6 +73,7 @@ use libboard_artiq::{drtio_routing, drtioaux, pl::csr}; pub struct Router { upstream_queue: VecDeque, local_queue: VecDeque, + #[cfg(has_drtio_routing)] downstream_queue: VecDeque<(usize, drtioaux::Packet)>, upstream_notified: bool, } @@ -23,26 +83,27 @@ impl Router { Router { upstream_queue: VecDeque::new(), local_queue: VecDeque::new(), + #[cfg(has_drtio_routing)] downstream_queue: VecDeque::new(), upstream_notified: false, } } - // called by local sources (DDMA, kernel) and by repeaters on receiving async data + // Called by local sources (DDMA, kernel) and by repeaters on receiving async data; // messages are always buffered for both upstream and downstream pub fn route( &mut self, packet: drtioaux::Packet, _routing_table: &drtio_routing::RoutingTable, _rank: u8, - _self_destination: u8, + self_destination: u8, ) { + let destination = packet.routable_destination(); #[cfg(has_drtio_routing)] { - let destination = packet.routable_destination(); if let Some(destination) = destination { let hop = _routing_table.0[destination as usize][_rank as usize] as usize; - if destination == _self_destination { + if destination == self_destination { self.local_queue.push_back(packet); } else if hop > 0 && hop < csr::DRTIOREP.len() { let repno = (hop - 1) as usize; @@ -56,11 +117,15 @@ impl Router { } #[cfg(not(has_drtio_routing))] { - self.upstream_queue.push_back(packet); + if destination == Some(self_destination) { + self.local_queue.push_back(packet); + } else { + self.upstream_queue.push_back(packet); + } } } - // Sends a packet to a required destination, routing if it's necessary + // Sends a packet to a required destination, routing if necessary pub fn send( &mut self, packet: drtioaux::Packet, @@ -114,6 +179,7 @@ impl Router { packet } + #[cfg(has_drtio_routing)] pub fn get_downstream_packet(&mut self) -> Option<(usize, drtioaux::Packet)> { self.downstream_queue.pop_front() } diff --git a/src/satman/src/subkernel.rs b/src/satman/src/subkernel.rs index 7bdc8f2..50c9690 100644 --- a/src/satman/src/subkernel.rs +++ b/src/satman/src/subkernel.rs @@ -2,10 +2,11 @@ use alloc::{collections::{BTreeMap, VecDeque}, format, string::{String, ToString}, vec::Vec}; -use core::{cmp::min, option::NoneError, slice, str}; +use core::{option::NoneError, slice, str}; use core_io::{Error as IoError, Write}; use cslice::AsCSlice; +use dma::{Error as DmaError, Manager as DmaManager}; use io::{Cursor, ProtoWrite}; use ksupport::{eh_artiq, kernel, rpc}; use libboard_artiq::{drtio_routing::RoutingTable, @@ -15,7 +16,7 @@ use libboard_artiq::{drtio_routing::RoutingTable, use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libcortex_a9::sync_channel::Receiver; use log::warn; -use routing::Router; +use routing::{Router, SliceMeta, Sliceable}; #[derive(Debug, Clone, PartialEq)] enum KernelState { @@ -25,7 +26,23 @@ enum KernelState { MsgAwait(Milliseconds, Vec), MsgSending, SubkernelAwaitLoad, - SubkernelAwaitFinish { max_time: Milliseconds, id: u32 }, + SubkernelAwaitFinish { + max_time: Milliseconds, + id: u32, + }, + DmaUploading, + DmaPendingPlayback { + id: u32, + timestamp: u64, + }, + DmaPendingAwait { + id: u32, + timestamp: u64, + max_time: Milliseconds, + }, + DmaAwait { + max_time: Milliseconds, + }, } #[derive(Debug)] @@ -38,6 +55,7 @@ pub enum Error { SubkernelIoError, DrtioError, KernelException(Sliceable), + DmaError(DmaError), } impl From for Error { @@ -52,6 +70,12 @@ impl From for Error { } } +impl From for Error { + fn from(value: DmaError) -> Error { + Error::DmaError(value) + } +} + impl From<()> for Error { fn from(_: ()) -> Error { Error::NoMessage @@ -68,14 +92,6 @@ macro_rules! unexpected { ($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*)))); } -/* represents data that has to be sent to Master */ -#[derive(Debug)] -pub struct Sliceable { - it: usize, - data: Vec, - destination: u8, -} - /* represents interkernel messages */ struct Message { count: u8, @@ -123,11 +139,7 @@ impl Session { fn running(&self) -> bool { match self.kernel_state { KernelState::Absent | KernelState::Loaded => false, - KernelState::Running - | KernelState::MsgAwait { .. } - | KernelState::MsgSending - | KernelState::SubkernelAwaitLoad - | KernelState::SubkernelAwaitFinish { .. } => true, + _ => true, } } } @@ -153,45 +165,6 @@ pub struct SubkernelFinished { pub source: u8, } -pub struct SliceMeta { - pub destination: u8, - pub len: u16, - pub status: PayloadStatus, -} - -macro_rules! get_slice_fn { - ($name:tt, $size:expr) => { - pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { - let first = self.it == 0; - let len = min($size, self.data.len() - self.it); - let last = self.it + len == self.data.len(); - let status = PayloadStatus::from_status(first, last); - - data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]); - self.it += len; - - SliceMeta { - destination: self.destination, - len: len as u16, - status: status, - } - } - }; -} - -impl Sliceable { - pub fn new(destination: u8, data: Vec) -> Sliceable { - Sliceable { - it: 0, - data: data, - destination: destination, - } - } - - get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE); - get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE); -} - impl MessageManager { pub fn new() -> MessageManager { MessageManager { @@ -355,7 +328,6 @@ impl<'a> Manager<'_> { } pub fn run(&mut self, source: u8, id: u32) -> Result<(), Error> { - info!("starting subkernel #{}", id); if self.session.kernel_state != KernelState::Loaded || self.session.id != id { self.load(id)?; } @@ -466,12 +438,66 @@ impl<'a> Manager<'_> { self.kernel_stop(); } + pub fn ddma_finished(&mut self, error: u8, channel: u32, timestamp: u64) { + if let KernelState::DmaAwait { .. } = self.session.kernel_state { + self.control.tx.send(kernel::Message::DmaAwaitRemoteReply { + timeout: false, + error: error, + channel: channel, + timestamp: timestamp, + }); + self.session.kernel_state = KernelState::Running; + } + } + + pub fn ddma_nack(&mut self) { + // for simplicity treat it as a timeout... + if let KernelState::DmaAwait { .. } = self.session.kernel_state { + self.control.tx.send(kernel::Message::DmaAwaitRemoteReply { + timeout: true, + error: 0, + channel: 0, + timestamp: 0, + }); + self.session.kernel_state = KernelState::Running; + } + } + + pub fn ddma_remote_uploaded(&mut self, succeeded: bool) -> Option<(u32, u64)> { + // returns a tuple of id, timestamp in case a playback needs to be started immediately + if !succeeded { + self.kernel_stop(); + self.runtime_exception(Error::DmaError(DmaError::UploadFail)); + } + let res = match self.session.kernel_state { + KernelState::DmaPendingPlayback { id, timestamp } => { + self.session.kernel_state = KernelState::Running; + Some((id, timestamp)) + } + KernelState::DmaPendingAwait { + id, + timestamp, + max_time, + } => { + self.session.kernel_state = KernelState::DmaAwait { max_time: max_time }; + Some((id, timestamp)) + } + KernelState::DmaUploading => { + self.session.kernel_state = KernelState::Running; + None + } + _ => None, + }; + res + } + pub fn process_kern_requests( &mut self, router: &mut Router, routing_table: &RoutingTable, rank: u8, destination: u8, + dma_manager: &mut DmaManager, timer: &GlobalTimer, ) { if let Some(subkernel_finished) = self.last_finished.take() { @@ -520,7 +546,7 @@ impl<'a> Manager<'_> { } } - match self.process_kern_message(router, routing_table, rank, destination, timer) { + match self.process_kern_message(router, routing_table, rank, destination, dma_manager, timer) { Ok(true) => { self.last_finished = Some(SubkernelFinished { id: self.session.id, @@ -583,12 +609,14 @@ impl<'a> Manager<'_> { routing_table: &RoutingTable, rank: u8, self_destination: u8, + dma_manager: &mut DmaManager, timer: &GlobalTimer, ) -> Result { let reply = self.control.rx.try_recv()?; match reply { kernel::Message::KernelFinished(_async_errors) => { self.kernel_stop(); + dma_manager.cleanup(router, rank, self_destination, routing_table); return Ok(true); } kernel::Message::KernelException(exceptions, stack_pointers, backtrace, async_errors) => { @@ -615,6 +643,53 @@ impl<'a> Manager<'_> { let value = self.cache.get(&key).unwrap_or(&DEFAULT).clone(); self.control.tx.send(kernel::Message::CacheGetReply(value)); } + + kernel::Message::DmaPutRequest(recorder) => { + // ddma is always used on satellites + if let Ok(id) = dma_manager.put_record(recorder, self_destination) { + dma_manager.upload_traces(id, router, rank, self_destination, routing_table)?; + self.session.kernel_state = KernelState::DmaUploading; + } else { + unexpected!("DMAError: found an unsupported call to RTIO devices on master") + } + } + kernel::Message::DmaEraseRequest(name) => { + dma_manager.erase_name(&name, router, rank, self_destination, routing_table); + } + kernel::Message::DmaGetRequest(name) => { + let dma_meta = dma_manager.retrieve(self_destination, &name); + self.control.tx.send(kernel::Message::DmaGetReply(dma_meta)); + } + kernel::Message::DmaStartRemoteRequest { id, timestamp } => { + if self.session.kernel_state != KernelState::DmaUploading { + dma_manager.playback_remote( + id as u32, + timestamp as u64, + router, + rank, + self_destination, + routing_table, + )?; + } else { + self.session.kernel_state = KernelState::DmaPendingPlayback { + id: id as u32, + timestamp: timestamp as u64, + }; + } + } + kernel::Message::DmaAwaitRemoteRequest(_id) => { + let max_time = timer.get_time() + Milliseconds(10000); + self.session.kernel_state = match self.session.kernel_state { + // if we are still waiting for the traces to be uploaded, extend the state by timeout + KernelState::DmaPendingPlayback { id, timestamp } => KernelState::DmaPendingAwait { + id: id, + timestamp: timestamp, + max_time: max_time, + }, + _ => KernelState::DmaAwait { max_time: max_time }, + }; + } + kernel::Message::SubkernelMsgSend { id: _id, destination: msg_dest, @@ -728,6 +803,18 @@ impl<'a> Manager<'_> { } Ok(()) } + KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => { + if timer.get_time() > *max_time { + self.control.tx.send(kernel::Message::DmaAwaitRemoteReply { + timeout: true, + error: 0, + channel: 0, + timestamp: 0, + }); + self.session.kernel_state = KernelState::Running; + } + Ok(()) + } _ => Ok(()), } }