From e9a153b985d1c703338ad49bb0ac986ade29d21a Mon Sep 17 00:00:00 2001 From: Spaqin Date: Wed, 22 Mar 2023 11:16:25 +0800 Subject: [PATCH] runtime: implement distributed DMA --- artiq/coredevice/dma.py | 17 +- artiq/firmware/ksupport/lib.rs | 26 +- .../firmware/libproto_artiq/drtioaux_proto.rs | 29 +- artiq/firmware/libproto_artiq/kernel_proto.rs | 20 +- artiq/firmware/runtime/main.rs | 7 +- artiq/firmware/runtime/rtio_dma.rs | 284 ++++++++++++++++-- artiq/firmware/runtime/rtio_mgt.rs | 108 ++++++- artiq/firmware/runtime/sched.rs | 4 + artiq/firmware/runtime/session.rs | 61 +++- artiq/firmware/satman/dma.rs | 17 +- artiq/firmware/satman/main.rs | 15 +- 11 files changed, 508 insertions(+), 80 deletions(-) diff --git a/artiq/coredevice/dma.py b/artiq/coredevice/dma.py index 261a6bcfe..72e0f4a08 100644 --- a/artiq/coredevice/dma.py +++ b/artiq/coredevice/dma.py @@ -6,7 +6,7 @@ alone could achieve. """ from artiq.language.core import syscall, kernel -from artiq.language.types import TInt32, TInt64, TStr, TNone, TTuple +from artiq.language.types import TInt32, TInt64, TStr, TNone, TTuple, TBool from artiq.coredevice.exceptions import DMAError from numpy import int64 @@ -17,7 +17,7 @@ def dma_record_start(name: TStr) -> TNone: raise NotImplementedError("syscall not simulated") @syscall -def dma_record_stop(duration: TInt64) -> TNone: +def dma_record_stop(duration: TInt64, enable_ddma: TBool) -> TNone: raise NotImplementedError("syscall not simulated") @syscall @@ -47,6 +47,7 @@ class DMARecordContextManager: def __init__(self): self.name = "" self.saved_now_mu = int64(0) + self.enable_ddma = False @kernel def __enter__(self): @@ -56,7 +57,7 @@ class DMARecordContextManager: @kernel def __exit__(self, type, value, traceback): - dma_record_stop(now_mu()) # see above + dma_record_stop(now_mu(), self.enable_ddma) # see above at_mu(self.saved_now_mu) @@ -74,12 +75,18 @@ class CoreDMA: self.epoch = 0 @kernel - def record(self, name): + def record(self, name, enable_ddma=False): """Returns a context manager that will record a DMA trace called ``name``. Any previously recorded trace with the same name is overwritten. - The trace will persist across kernel switches.""" + The trace will persist across kernel switches. + In DRTIO context, you can toggle distributed DMA with ``enable_ddma``. + Enabling it allows running DMA on satellites, rather than sending all + events from the master. + Disabling it may improve performance in some scenarios, + e.g. when there are many small satellite buffers.""" self.epoch += 1 self.recorder.name = name + self.recorder.enable_ddma = enable_ddma return self.recorder @kernel diff --git a/artiq/firmware/ksupport/lib.rs b/artiq/firmware/ksupport/lib.rs index acb461f88..1af9f7b46 100644 --- a/artiq/firmware/ksupport/lib.rs +++ b/artiq/firmware/ksupport/lib.rs @@ -269,7 +269,7 @@ extern fn dma_record_start(name: &CSlice) { } #[unwind(allowed)] -extern fn dma_record_stop(duration: i64) { +extern fn dma_record_stop(duration: i64, enable_ddma: bool) { unsafe { dma_record_flush(); @@ -285,7 +285,8 @@ extern fn dma_record_stop(duration: i64) { DMA_RECORDER.active = false; send(&DmaRecordStop { - duration: duration as u64 + duration: duration as u64, + enable_ddma: enable_ddma }); } } @@ -370,7 +371,7 @@ extern fn dma_erase(name: &CSlice) { #[repr(C)] struct DmaTrace { duration: i64, - address: i32, + address: i32 } #[unwind(allowed)] @@ -404,6 +405,7 @@ extern fn dma_playback(timestamp: i64, ptr: i32) { csr::cri_con::selected_write(1); csr::rtio_dma::enable_write(1); + send(&DmaStartRemoteRequest { id: ptr as i32, timestamp: timestamp }); while csr::rtio_dma::enable_read() != 0 {} csr::cri_con::selected_write(0); @@ -424,6 +426,24 @@ extern fn dma_playback(timestamp: i64, ptr: i32) { } } } + + send(&DmaAwaitRemoteRequest { id: ptr as i32 }); + recv!(&DmaAwaitRemoteReply { timeout, error, channel, timestamp } => { + if timeout { + raise!("DMAError", + "Error running DMA on satellite device, timed out waiting for results"); + } + if error & 1 != 0 { + raise!("RTIOUnderflow", + "RTIO underflow at channel {rtio_channel_info:0}, {1} mu", + channel as i64, timestamp as i64, 0); + } + if error & 2 != 0 { + raise!("RTIODestinationUnreachable", + "RTIO destination unreachable, output, at channel {rtio_channel_info:0}, {1} mu", + channel as i64, timestamp as i64, 0); + } + }); } #[cfg(not(has_rtio_dma))] diff --git a/artiq/firmware/libproto_artiq/drtioaux_proto.rs b/artiq/firmware/libproto_artiq/drtioaux_proto.rs index f5b232db9..efbf56bae 100644 --- a/artiq/firmware/libproto_artiq/drtioaux_proto.rs +++ b/artiq/firmware/libproto_artiq/drtioaux_proto.rs @@ -14,8 +14,8 @@ impl From> for Error { } } -/* 512 (max size) - 4 (CRC) - 1 (packet ID) - 4 (trace ID) - 1 (last) - 2 (length) */ -const DMA_TRACE_MAX_SIZE: usize = 500; +/* 512 (max size) - 4 (CRC) - 1 (packet ID) - 1 (destination) - 4 (trace ID) - 1 (last) - 2 (length) */ +pub const DMA_TRACE_MAX_SIZE: usize = 499; #[derive(PartialEq, Debug)] pub enum Packet { @@ -58,13 +58,13 @@ pub enum Packet { SpiReadReply { succeeded: bool, data: u32 }, SpiBasicReply { succeeded: bool }, - DmaAddTraceRequest { id: u32, last: bool, length: u16, trace: [u8; DMA_TRACE_MAX_SIZE] }, + DmaAddTraceRequest { destination: u8, id: u32, last: bool, length: u16, trace: [u8; DMA_TRACE_MAX_SIZE] }, DmaAddTraceReply { succeeded: bool }, - DmaRemoveTraceRequest { id: u32 }, + DmaRemoveTraceRequest { destination: u8, id: u32 }, DmaRemoveTraceReply { succeeded: bool }, - DmaPlaybackRequest { id: u32, timestamp: u64 }, + DmaPlaybackRequest { destination: u8, id: u32, timestamp: u64 }, DmaPlaybackReply { succeeded: bool }, - DmaPlaybackStatus { id: u32, error: u8, channel: u32, timestamp: u64 } + DmaPlaybackStatus { destination: u8, id: u32, error: u8, channel: u32, timestamp: u64 } } @@ -198,12 +198,14 @@ impl Packet { }, 0xb0 => { + let destination = reader.read_u8()?; let id = reader.read_u32()?; let last = reader.read_bool()?; let length = reader.read_u16()?; let mut trace: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE]; reader.read_exact(&mut trace[0..length as usize])?; Packet::DmaAddTraceRequest { + destination: destination, id: id, last: last, length: length as u16, @@ -214,12 +216,14 @@ impl Packet { succeeded: reader.read_bool()? }, 0xb2 => Packet::DmaRemoveTraceRequest { + destination: reader.read_u8()?, id: reader.read_u32()? }, 0xb3 => Packet::DmaRemoveTraceReply { succeeded: reader.read_bool()? }, 0xb4 => Packet::DmaPlaybackRequest { + destination: reader.read_u8()?, id: reader.read_u32()?, timestamp: reader.read_u64()? }, @@ -227,6 +231,7 @@ impl Packet { succeeded: reader.read_bool()? }, 0xb6 => Packet::DmaPlaybackStatus { + destination: reader.read_u8()?, id: reader.read_u32()?, error: reader.read_u8()?, channel: reader.read_u32()?, @@ -392,8 +397,9 @@ impl Packet { writer.write_bool(succeeded)?; }, - Packet::DmaAddTraceRequest { id, last, trace, length } => { + Packet::DmaAddTraceRequest { destination, id, last, trace, length } => { writer.write_u8(0xb0)?; + writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_bool(last)?; // trace may be broken down to fit within drtio aux memory limit @@ -405,16 +411,18 @@ impl Packet { writer.write_u8(0xb1)?; writer.write_bool(succeeded)?; }, - Packet::DmaRemoveTraceRequest { id } => { + Packet::DmaRemoveTraceRequest { destination, id } => { writer.write_u8(0xb2)?; + writer.write_u8(destination)?; writer.write_u32(id)?; }, Packet::DmaRemoveTraceReply { succeeded } => { writer.write_u8(0xb3)?; writer.write_bool(succeeded)?; }, - Packet::DmaPlaybackRequest { id, timestamp } => { + Packet::DmaPlaybackRequest { destination, id, timestamp } => { writer.write_u8(0xb4)?; + writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_u64(timestamp)?; }, @@ -422,8 +430,9 @@ impl Packet { writer.write_u8(0xb5)?; writer.write_bool(succeeded)?; }, - Packet::DmaPlaybackStatus { id, error, channel, timestamp } => { + Packet::DmaPlaybackStatus { destination, id, error, channel, timestamp } => { writer.write_u8(0xb6)?; + writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_u8(error)?; writer.write_u32(channel)?; diff --git a/artiq/firmware/libproto_artiq/kernel_proto.rs b/artiq/firmware/libproto_artiq/kernel_proto.rs index 8b07ea46b..e723ea2f5 100644 --- a/artiq/firmware/libproto_artiq/kernel_proto.rs +++ b/artiq/firmware/libproto_artiq/kernel_proto.rs @@ -20,7 +20,8 @@ pub enum Message<'a> { DmaRecordStart(&'a str), DmaRecordAppend(&'a [u8]), DmaRecordStop { - duration: u64 + duration: u64, + enable_ddma: bool }, DmaEraseRequest { @@ -32,9 +33,24 @@ pub enum Message<'a> { }, DmaRetrieveReply { trace: Option<&'a [u8]>, - duration: u64 + duration: u64, }, + DmaStartRemoteRequest { + id: i32, + timestamp: i64, + }, + DmaAwaitRemoteRequest { + id: i32 + }, + DmaAwaitRemoteReply { + timeout: bool, + error: u8, + channel: u32, + timestamp: u64 + }, + + RunFinished, RunException { exceptions: &'a [Option>], diff --git a/artiq/firmware/runtime/main.rs b/artiq/firmware/runtime/main.rs index 3ae50dcad..5c54aeffa 100644 --- a/artiq/firmware/runtime/main.rs +++ b/artiq/firmware/runtime/main.rs @@ -182,6 +182,8 @@ fn startup() { drtio_routing::interconnect_disable_all(); let aux_mutex = sched::Mutex::new(); + let ddma_mutex = sched::Mutex::new(); + let mut scheduler = sched::Scheduler::new(interface); let io = scheduler.io(); @@ -189,14 +191,15 @@ fn startup() { io.spawn(4096, dhcp::dhcp_thread); } - rtio_mgt::startup(&io, &aux_mutex, &drtio_routing_table, &up_destinations); + rtio_mgt::startup(&io, &aux_mutex, &drtio_routing_table, &up_destinations, &ddma_mutex); io.spawn(4096, mgmt::thread); { let aux_mutex = aux_mutex.clone(); let drtio_routing_table = drtio_routing_table.clone(); let up_destinations = up_destinations.clone(); - io.spawn(16384, move |io| { session::thread(io, &aux_mutex, &drtio_routing_table, &up_destinations) }); + let ddma_mutex = ddma_mutex.clone(); + io.spawn(16384, move |io| { session::thread(io, &aux_mutex, &drtio_routing_table, &up_destinations, &ddma_mutex) }); } #[cfg(any(has_rtio_moninj, has_drtio))] { diff --git a/artiq/firmware/runtime/rtio_dma.rs b/artiq/firmware/runtime/rtio_dma.rs index 292874047..1ec15384e 100644 --- a/artiq/firmware/runtime/rtio_dma.rs +++ b/artiq/firmware/runtime/rtio_dma.rs @@ -1,10 +1,182 @@ use core::mem; use alloc::{vec::Vec, string::String, collections::btree_map::BTreeMap}; +use sched::{Io, Mutex}; const ALIGNMENT: usize = 64; +#[cfg(has_drtio)] +pub mod remote_dma { + use super::*; + use board_artiq::drtio_routing::RoutingTable; + use rtio_mgt::drtio; + use board_misoc::clock; + + #[derive(Debug, PartialEq, Clone)] + pub enum RemoteState { + NotLoaded, + Loaded, + PlaybackEnded { error: u8, channel: u32, timestamp: u64 } + } + #[derive(Debug, Clone)] + struct RemoteTrace { + trace: Vec, + pub state: RemoteState + } + + impl From> for RemoteTrace { + fn from(trace: Vec) -> Self { + RemoteTrace { + trace: trace, + state: RemoteState::NotLoaded + } + } + } + + impl RemoteTrace { + pub fn get_trace(&self) -> &Vec { + &self.trace + } + } + + // remote traces map. ID -> destination, trace pair + static mut TRACES: BTreeMap> = BTreeMap::new(); + + pub fn add_traces(io: &Io, ddma_mutex: &Mutex, id: u32, traces: BTreeMap>) { + let _lock = ddma_mutex.lock(io); + let mut trace_map: BTreeMap = BTreeMap::new(); + for (destination, trace) in traces { + trace_map.insert(destination, trace.into()); + } + unsafe { TRACES.insert(id, trace_map); } + } + + pub fn await_done(io: &Io, ddma_mutex: &Mutex, id: u32, timeout: u64) -> Result { + let max_time = clock::get_ms() + timeout as u64; + io.until(|| { + if clock::get_ms() > max_time { + return true; + } + if ddma_mutex.test_lock() { + // cannot lock again within io.until - scheduler guarantees + // that it will not be interrupted - so only test the lock + return false; + } + let traces = unsafe { TRACES.get(&id).unwrap() }; + for (_dest, trace) in traces { + match trace.state { + RemoteState::PlaybackEnded {error: _, channel: _, timestamp: _} => (), + _ => return false + } + } + true + }).unwrap(); + if clock::get_ms() > max_time { + error!("Remote DMA await done timed out"); + return Err("Timed out waiting for results."); + } + // clear the internal state, and if there have been any errors, return one of them + let mut playback_state: RemoteState = RemoteState::PlaybackEnded { error: 0, channel: 0, timestamp: 0 }; + { + let _lock = ddma_mutex.lock(io).unwrap(); + let traces = unsafe { TRACES.get_mut(&id).unwrap() }; + for (_dest, trace) in traces { + match trace.state { + RemoteState::PlaybackEnded {error: e, channel: _c, timestamp: _ts} => if e != 0 { playback_state = trace.state.clone(); }, + _ => (), + } + trace.state = RemoteState::Loaded; + } + } + Ok(playback_state) + } + + pub fn erase(io: &Io, aux_mutex: &Mutex, routing_table: &RoutingTable, + ddma_mutex: &Mutex, id: u32) { + let _lock = ddma_mutex.lock(io).unwrap(); + let destinations = unsafe { TRACES.get(&id).unwrap() }; + for destination in destinations.keys() { + match drtio::ddma_send_erase(io, aux_mutex, routing_table, id, *destination) { + Ok(_) => (), + Err(e) => error!("Error erasing trace on DMA: {}", e) + } + } + unsafe { TRACES.remove(&id); } + } + + pub fn upload_traces(io: &Io, aux_mutex: &Mutex, routing_table: &RoutingTable, + ddma_mutex: &Mutex, id: u32) { + let _lock = ddma_mutex.lock(io); + let traces = unsafe { TRACES.get_mut(&id).unwrap() }; + for (destination, mut trace) in traces { + match drtio::ddma_upload_trace(io, aux_mutex, routing_table, id, *destination, trace.get_trace()) + { + Ok(_) => trace.state = RemoteState::Loaded, + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } + } + + pub fn playback(io: &Io, aux_mutex: &Mutex, routing_table: &RoutingTable, + ddma_mutex: &Mutex, id: u32, timestamp: u64) { + // triggers playback on satellites + let destinations = unsafe { + let _lock = ddma_mutex.lock(io).unwrap(); + TRACES.get(&id).unwrap() }; + for (destination, trace) in destinations { + { + // need to drop the lock before sending the playback request to avoid a deadlock + // if a PlaybackStatus is returned from another satellite in the meanwhile. + let _lock = ddma_mutex.lock(io).unwrap(); + if trace.state != RemoteState::Loaded { + error!("Destination {} not ready for DMA, state: {:?}", *destination, trace.state); + continue; + } + } + match drtio::ddma_send_playback(io, aux_mutex, routing_table, ddma_mutex, id, *destination, timestamp) { + Ok(_) => (), + Err(e) => error!("Error during remote DMA playback: {}", e) + } + } + } + + pub fn playback_done(io: &Io, ddma_mutex: &Mutex, + id: u32, destination: u8, error: u8, channel: u32, timestamp: u64) { + // called upon receiving PlaybackDone aux packet + let _lock = ddma_mutex.lock(io).unwrap(); + let mut trace = unsafe { TRACES.get_mut(&id).unwrap().get_mut(&destination).unwrap() }; + trace.state = RemoteState::PlaybackEnded { + error: error, + channel: channel, + timestamp: timestamp + }; + } + + pub fn destination_changed(io: &Io, aux_mutex: &Mutex, routing_table: &RoutingTable, + ddma_mutex: &Mutex, destination: u8, up: bool) { + // update state of the destination, resend traces if it's up + let _lock = ddma_mutex.lock(io).unwrap(); + let traces_iter = unsafe { TRACES.iter_mut() }; + for (id, dest_traces) in traces_iter { + if let Some(trace) = dest_traces.get_mut(&destination) { + if up { + match drtio::ddma_upload_trace(io, aux_mutex, routing_table, *id, destination, trace.get_trace()) + { + Ok(_) => trace.state = RemoteState::Loaded, + Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) + } + } else { + trace.state = RemoteState::NotLoaded; + } + } + } + + } + +} + + #[derive(Debug)] -struct Entry { +struct LocalEntry { trace: Vec, padding_len: usize, duration: u64 @@ -12,7 +184,8 @@ struct Entry { #[derive(Debug)] pub struct Manager { - entries: BTreeMap, + entries: BTreeMap, + name_map: BTreeMap, recording_name: String, recording_trace: Vec } @@ -21,59 +194,116 @@ impl Manager { pub fn new() -> Manager { Manager { entries: BTreeMap::new(), - recording_name: String::new(), + name_map: BTreeMap::new(), recording_trace: Vec::new(), + recording_name: String::new() } } - pub fn record_start(&mut self, name: &str) { + pub fn record_start(&mut self, name: &str) -> Option { self.recording_name = String::from(name); self.recording_trace = Vec::new(); - - // or we could needlessly OOM replacing a large trace - self.entries.remove(name); + if let Some(id) = self.name_map.get(&self.recording_name) { + // replacing a trace + let old_id = id.clone(); + self.entries.remove(&id); + self.name_map.remove(&self.recording_name); + // return old ID + return Some(old_id); + } + return None; } pub fn record_append(&mut self, data: &[u8]) { self.recording_trace.extend_from_slice(data) } - pub fn record_stop(&mut self, duration: u64) { - let mut trace = Vec::new(); - mem::swap(&mut self.recording_trace, &mut trace); - trace.push(0); - let data_len = trace.len(); + pub fn record_stop(&mut self, duration: u64, enable_ddma: bool, + _io: &Io, _ddma_mutex: &Mutex) -> u32 { + let mut local_trace = Vec::new(); + let mut remote_traces: BTreeMap> = BTreeMap::new(); - // Realign. - trace.reserve(ALIGNMENT - 1); - let padding = ALIGNMENT - trace.as_ptr() as usize % ALIGNMENT; + if enable_ddma { + let mut trace = Vec::new(); + mem::swap(&mut self.recording_trace, &mut trace); + trace.push(0); + // analyze each entry and put in proper buckets, as the kernel core + // sends whole chunks, to limit comms/kernel CPU communication, + // and as only comms core has access to varios DMA buffers. + let mut ptr = 0; + while trace[ptr] != 0 { + // ptr + 3 = tgt >> 24 (destination) + let len = trace[ptr] as usize; + let destination = trace[ptr+3]; + if destination == 0 { + local_trace.extend(&trace[ptr..ptr+len]); + } + else { + if let Some(remote_trace) = remote_traces.get_mut(&destination) { + remote_trace.extend(&trace[ptr..ptr+len]); + } else { + remote_traces.insert(destination, trace[ptr..ptr+len].to_vec()); + } + } + // and jump to the next event + ptr += len; + } + } else { + // with disabled DDMA, move the whole trace to local + mem::swap(&mut self.recording_trace, &mut local_trace); + } + + local_trace.push(0); + let data_len = local_trace.len(); + // Realign the local entry. + local_trace.reserve(ALIGNMENT - 1); + let padding = ALIGNMENT - local_trace.as_ptr() as usize % ALIGNMENT; let padding = if padding == ALIGNMENT { 0 } else { padding }; for _ in 0..padding { // Vec guarantees that this will not reallocate - trace.push(0) + local_trace.push(0) } for i in 1..data_len + 1 { - trace[data_len + padding - i] = trace[data_len - i] + local_trace[data_len + padding - i] = local_trace[data_len - i] } - + // trace ID is its pointer + let id = local_trace[padding..].as_ptr() as u32; + self.entries.insert(id, LocalEntry { + trace: local_trace, + padding_len: padding, + duration: duration, + }); let mut name = String::new(); mem::swap(&mut self.recording_name, &mut name); - self.entries.insert(name, Entry { - trace: trace, - padding_len: padding, - duration: duration - }); + self.name_map.insert(name, id); + + #[cfg(has_drtio)] + remote_dma::add_traces(_io, _ddma_mutex, id, remote_traces); + + id } pub fn erase(&mut self, name: &str) { - self.entries.remove(name); + if let Some(id) = self.name_map.get(name) { + self.entries.remove(&id); + } + self.name_map.remove(name); + } + + #[cfg(has_drtio)] + pub fn get_id(&mut self, name: &str) -> Option<&u32> { + self.name_map.get(name) } pub fn with_trace(&self, name: &str, f: F) -> R where F: FnOnce(Option<&[u8]>, u64) -> R { - match self.entries.get(name) { - Some(entry) => f(Some(&entry.trace[entry.padding_len..]), entry.duration), - None => f(None, 0) + if let Some(ptr) = self.name_map.get(name) { + match self.entries.get(ptr) { + Some(entry) => f(Some(&entry.trace[entry.padding_len..]), entry.duration), + None => f(None, 0) + } + } else { + f(None, 0) } } } diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index 92aaaf66a..15a67be75 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -18,17 +18,22 @@ static mut RTIO_DEVICE_MAP: BTreeMap = BTreeMap::new(); #[cfg(has_drtio)] pub mod drtio { use super::*; + use alloc::vec::Vec; use drtioaux; + use proto_artiq::drtioaux_proto::DMA_TRACE_MAX_SIZE; + use rtio_dma::remote_dma; pub fn startup(io: &Io, aux_mutex: &Mutex, routing_table: &Urc>, - up_destinations: &Urc>) { + up_destinations: &Urc>, + ddma_mutex: &Mutex) { let aux_mutex = aux_mutex.clone(); let routing_table = routing_table.clone(); let up_destinations = up_destinations.clone(); - io.spawn(4096, move |io| { + let ddma_mutex = ddma_mutex.clone(); + io.spawn(8192, move |io| { let routing_table = routing_table.borrow(); - link_thread(io, &aux_mutex, &routing_table, &up_destinations); + link_thread(io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex); }); } @@ -147,9 +152,12 @@ pub mod drtio { } } - fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, linkno: u8) { + fn process_unsolicited_aux(io: &Io, aux_mutex: &Mutex, linkno: u8, ddma_mutex: &Mutex) { let _lock = aux_mutex.lock(io).unwrap(); match drtioaux::recv(linkno) { + Ok(Some(drtioaux::Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp })) => { + remote_dma::playback_done(io, ddma_mutex, id, destination, error, channel, timestamp); + } Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), Ok(None) => (), Err(_) => warn!("[LINK#{}] aux packet error", linkno) @@ -198,7 +206,8 @@ pub mod drtio { fn destination_survey(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_links: &[bool], - up_destinations: &Urc>) { + up_destinations: &Urc>, + ddma_mutex: &Mutex) { for destination in 0..drtio_routing::DEST_COUNT { let hop = routing_table.0[destination][0]; let destination = destination as u8; @@ -216,8 +225,10 @@ pub mod drtio { destination: destination }); match reply { - Ok(drtioaux::Packet::DestinationDownReply) => - destination_set_up(routing_table, up_destinations, destination, false), + Ok(drtioaux::Packet::DestinationDownReply) => { + destination_set_up(routing_table, up_destinations, destination, false); + remote_dma::destination_changed(io, aux_mutex, routing_table, ddma_mutex, destination, false); + } Ok(drtioaux::Packet::DestinationOkReply) => (), Ok(drtioaux::Packet::DestinationSequenceErrorReply { channel }) => { error!("[DEST#{}] RTIO sequence error involving channel 0x{:04x}:{}", destination, channel, resolve_channel_name(channel as u32)); @@ -236,6 +247,7 @@ pub mod drtio { } } else { destination_set_up(routing_table, up_destinations, destination, false); + remote_dma::destination_changed(io, aux_mutex, routing_table, ddma_mutex, destination, false); } } else { if up_links[linkno as usize] { @@ -247,6 +259,7 @@ pub mod drtio { Ok(drtioaux::Packet::DestinationOkReply) => { destination_set_up(routing_table, up_destinations, destination, true); init_buffer_space(destination as u8, linkno); + remote_dma::destination_changed(io, aux_mutex, routing_table, ddma_mutex, destination, true); }, Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), Err(e) => error!("[DEST#{}] communication failed ({})", destination, e) @@ -259,7 +272,8 @@ pub mod drtio { pub fn link_thread(io: Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, - up_destinations: &Urc>) { + up_destinations: &Urc>, + ddma_mutex: &Mutex) { let mut up_links = [false; csr::DRTIO.len()]; loop { for linkno in 0..csr::DRTIO.len() { @@ -267,7 +281,7 @@ pub mod drtio { if up_links[linkno as usize] { /* link was previously up */ if link_rx_up(linkno) { - process_unsolicited_aux(&io, aux_mutex, linkno); + process_unsolicited_aux(&io, aux_mutex, linkno, ddma_mutex); process_local_errors(linkno); } else { info!("[LINK#{}] link is down", linkno); @@ -297,7 +311,7 @@ pub mod drtio { } } } - destination_survey(&io, aux_mutex, routing_table, &up_links, up_destinations); + destination_survey(&io, aux_mutex, routing_table, &up_links, up_destinations, ddma_mutex); io.sleep(200).unwrap(); } } @@ -328,6 +342,72 @@ pub mod drtio { } } } + + pub fn ddma_upload_trace(io: &Io, aux_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, + id: u32, destination: u8, trace: &Vec) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let mut i = 0; + while i < trace.len() { + let mut trace_slice: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE]; + let len: usize = if i + DMA_TRACE_MAX_SIZE < trace.len() { DMA_TRACE_MAX_SIZE } else { trace.len() - i } as usize; + let last = i + len == trace.len(); + trace_slice[..len].clone_from_slice(&trace[i..i+len]); + i += len; + let reply = aux_transact(io, aux_mutex, linkno, + &drtioaux::Packet::DmaAddTraceRequest { + id: id, destination: destination, last: last, length: len as u16, trace: trace_slice}); + match reply { + Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: true }) => (), + Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: false }) => { + return Err("error adding trace on satellite"); }, + Ok(_) => { return Err("adding DMA trace failed, unexpected aux packet"); }, + Err(_) => { return Err("adding DMA trace failed, aux error"); } + } + } + Ok(()) + } + + + pub fn ddma_send_erase(io: &Io, aux_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, + id: u32, destination: u8) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let reply = aux_transact(io, aux_mutex, linkno, + &drtioaux::Packet::DmaRemoveTraceRequest { id: id, destination: destination }); + match reply { + Ok(drtioaux::Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()), + Ok(drtioaux::Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"), + Ok(_) => Err("adding trace failed, unexpected aux packet"), + Err(_) => Err("erasing trace failed, aux error") + } + } + + pub fn ddma_send_playback(io: &Io, aux_mutex: &Mutex, + routing_table: &drtio_routing::RoutingTable, + ddma_mutex: &Mutex, id: u32, destination: u8, timestamp: u64) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let _lock = aux_mutex.lock(io).unwrap(); + drtioaux::send(linkno, &drtioaux::Packet::DmaPlaybackRequest{ + id: id, destination: destination, timestamp: timestamp }).unwrap(); + loop { + let reply = recv_aux_timeout(io, linkno, 200); + match reply { + Ok(drtioaux::Packet::DmaPlaybackReply { succeeded: true }) => { return Ok(()) }, + Ok(drtioaux::Packet::DmaPlaybackReply { succeeded: false }) => { + return Err("error on DMA playback request") }, + // in case we received status from another destination + // but we want to get DmaPlaybackReply anyway, thus the loop + Ok(drtioaux::Packet::DmaPlaybackStatus { id, destination, error, channel, timestamp }) => { + remote_dma::playback_done(io, ddma_mutex, id, destination, error, channel, timestamp); + }, + Ok(_) => { return Err("received unexpected aux packet while DMA playback") }, + Err(_) => { return Err("aux error on DMA playback") } + } + } + } + + } #[cfg(not(has_drtio))] @@ -336,7 +416,8 @@ pub mod drtio { pub fn startup(_io: &Io, _aux_mutex: &Mutex, _routing_table: &Urc>, - _up_destinations: &Urc>) {} + _up_destinations: &Urc>, + _ddma_mutex: &Mutex) {} pub fn reset(_io: &Io, _aux_mutex: &Mutex) {} } @@ -410,9 +491,10 @@ pub fn resolve_channel_name(channel: u32) -> String { pub fn startup(io: &Io, aux_mutex: &Mutex, routing_table: &Urc>, - up_destinations: &Urc>) { + up_destinations: &Urc>, + ddma_mutex: &Mutex) { unsafe { RTIO_DEVICE_MAP = read_device_map(); } - drtio::startup(io, aux_mutex, routing_table, up_destinations); + drtio::startup(io, aux_mutex, routing_table, up_destinations, ddma_mutex); unsafe { csr::rtio_core::reset_phy_write(1); } diff --git a/artiq/firmware/runtime/sched.rs b/artiq/firmware/runtime/sched.rs index 0adeaddd1..0d751da8e 100644 --- a/artiq/firmware/runtime/sched.rs +++ b/artiq/firmware/runtime/sched.rs @@ -301,6 +301,10 @@ impl Mutex { self.0.set(true); Ok(MutexGuard(&*self.0)) } + + pub fn test_lock<'a>(&'a self) -> bool { + self.0.get() + } } pub struct MutexGuard<'a>(&'a Cell); diff --git a/artiq/firmware/runtime/session.rs b/artiq/firmware/runtime/session.rs index df3865f6b..04ef42864 100644 --- a/artiq/firmware/runtime/session.rs +++ b/artiq/firmware/runtime/session.rs @@ -10,6 +10,8 @@ use urc::Urc; use sched::{ThreadHandle, Io, Mutex, TcpListener, TcpStream, Error as SchedError}; use rtio_clocking; use rtio_dma::Manager as DmaManager; +#[cfg(has_drtio)] +use rtio_dma::remote_dma; use rtio_mgt::{get_async_errors, resolve_channel_name}; use cache::Cache; use kern_hwreq; @@ -329,7 +331,7 @@ fn process_host_message(io: &Io, fn process_kern_message(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>, - mut stream: Option<&mut TcpStream>, + ddma_mutex: &Mutex, mut stream: Option<&mut TcpStream>, session: &mut Session) -> Result> { kern_recv_notrace(io, |request| { match (request, session.kernel_state) { @@ -368,19 +370,31 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, } &kern::DmaRecordStart(name) => { - session.congress.dma_manager.record_start(name); + if let Some(_id) = session.congress.dma_manager.record_start(name) { + // replace the record + #[cfg(has_drtio)] + remote_dma::erase(io, aux_mutex, routing_table, ddma_mutex, _id); + } kern_acknowledge() } &kern::DmaRecordAppend(data) => { session.congress.dma_manager.record_append(data); kern_acknowledge() } - &kern::DmaRecordStop { duration } => { - session.congress.dma_manager.record_stop(duration); + &kern::DmaRecordStop { duration, enable_ddma } => { + let _id = session.congress.dma_manager.record_stop(duration, enable_ddma, io, ddma_mutex); + #[cfg(has_drtio)] + { + remote_dma::upload_traces(io, aux_mutex, routing_table, ddma_mutex, _id); + } cache::flush_l2_cache(); kern_acknowledge() } &kern::DmaEraseRequest { name } => { + #[cfg(has_drtio)] + if let Some(id) = session.congress.dma_manager.get_id(name) { + remote_dma::erase(io, aux_mutex, routing_table, ddma_mutex, *id); + } session.congress.dma_manager.erase(name); kern_acknowledge() } @@ -392,6 +406,27 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, }) }) } + &kern::DmaStartRemoteRequest { id: _id, timestamp: _timestamp } => { + #[cfg(has_drtio)] + remote_dma::playback(io, aux_mutex, routing_table, ddma_mutex, _id as u32, _timestamp as u64); + kern_acknowledge() + } + &kern::DmaAwaitRemoteRequest { id: _id } => { + #[cfg(has_drtio)] + let reply = match remote_dma::await_done(io, ddma_mutex, _id as u32, 10_000) { + Ok(remote_dma::RemoteState::PlaybackEnded { error, channel, timestamp }) => + kern::DmaAwaitRemoteReply { + timeout: false, + error: error, + channel: channel, + timestamp: timestamp + }, + _ => kern::DmaAwaitRemoteReply { timeout: true, error: 0, channel: 0, timestamp: 0}, + }; + #[cfg(not(has_drtio))] + let reply = kern::DmaAwaitRemoteReply { timeout: false, error: 0, channel: 0, timestamp: 0}; + kern_send(io, &reply) + } &kern::RpcSend { async, service, tag, data } => { match stream { @@ -511,7 +546,7 @@ fn process_kern_queued_rpc(stream: &mut TcpStream, fn host_kernel_worker(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>, - stream: &mut TcpStream, + ddma_mutex: &Mutex, stream: &mut TcpStream, congress: &mut Congress) -> Result<(), Error> { let mut session = Session::new(congress); @@ -529,6 +564,7 @@ fn host_kernel_worker(io: &Io, aux_mutex: &Mutex, if mailbox::receive() != 0 { process_kern_message(io, aux_mutex, routing_table, up_destinations, + ddma_mutex, Some(stream), &mut session)?; } @@ -546,7 +582,7 @@ fn host_kernel_worker(io: &Io, aux_mutex: &Mutex, fn flash_kernel_worker(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc>, - congress: &mut Congress, + ddma_mutex: &Mutex, congress: &mut Congress, config_key: &str) -> Result<(), Error> { let mut session = Session::new(congress); @@ -568,7 +604,7 @@ fn flash_kernel_worker(io: &Io, aux_mutex: &Mutex, } if mailbox::receive() != 0 { - if process_kern_message(io, aux_mutex, routing_table, up_destinations, None, &mut session)? { + if process_kern_message(io, aux_mutex, routing_table, up_destinations, ddma_mutex, None, &mut session)? { return Ok(()) } } @@ -598,7 +634,8 @@ fn respawn(io: &Io, handle: &mut Option, f: F) pub fn thread(io: Io, aux_mutex: &Mutex, routing_table: &Urc>, - up_destinations: &Urc>) { + up_destinations: &Urc>, + ddma_mutex: &Mutex) { let listener = TcpListener::new(&io, 65535); listener.listen(1381).expect("session: cannot listen"); info!("accepting network sessions"); @@ -609,7 +646,7 @@ pub fn thread(io: Io, aux_mutex: &Mutex, { let mut congress = congress.borrow_mut(); info!("running startup kernel"); - match flash_kernel_worker(&io, &aux_mutex, &routing_table.borrow(), &up_destinations, &mut congress, "startup_kernel") { + match flash_kernel_worker(&io, &aux_mutex, &routing_table.borrow(), &up_destinations, ddma_mutex, &mut congress, "startup_kernel") { Ok(()) => info!("startup kernel finished"), Err(Error::KernelNotFound) => @@ -649,12 +686,13 @@ pub fn thread(io: Io, aux_mutex: &Mutex, let routing_table = routing_table.clone(); let up_destinations = up_destinations.clone(); let congress = congress.clone(); + let ddma_mutex = ddma_mutex.clone(); let stream = stream.into_handle(); respawn(&io, &mut kernel_thread, move |io| { let routing_table = routing_table.borrow(); let mut congress = congress.borrow_mut(); let mut stream = TcpStream::from_handle(&io, stream); - match host_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &mut stream, &mut *congress) { + match host_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex, &mut stream, &mut *congress) { Ok(()) => (), Err(Error::Protocol(host::Error::Io(IoError::UnexpectedEnd))) => info!("connection closed"), @@ -677,10 +715,11 @@ pub fn thread(io: Io, aux_mutex: &Mutex, let routing_table = routing_table.clone(); let up_destinations = up_destinations.clone(); let congress = congress.clone(); + let ddma_mutex = ddma_mutex.clone(); respawn(&io, &mut kernel_thread, move |io| { let routing_table = routing_table.borrow(); let mut congress = congress.borrow_mut(); - match flash_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &mut *congress, "idle_kernel") { + match flash_kernel_worker(&io, &aux_mutex, &routing_table, &up_destinations, &ddma_mutex, &mut *congress, "idle_kernel") { Ok(()) => info!("idle kernel finished, standing by"), Err(Error::Protocol(host::Error::Io( diff --git a/artiq/firmware/satman/dma.rs b/artiq/firmware/satman/dma.rs index c0a0affcf..133bfd3c0 100644 --- a/artiq/firmware/satman/dma.rs +++ b/artiq/firmware/satman/dma.rs @@ -1,4 +1,4 @@ -use board_misoc::csr; +use board_misoc::{csr, cache::flush_l2_cache}; use alloc::{vec::Vec, collections::btree_map::BTreeMap}; const ALIGNMENT: usize = 64; @@ -52,7 +52,19 @@ impl Manager { pub fn add(&mut self, id: u32, last: bool, trace: &[u8], trace_len: usize) -> Result<(), Error> { let entry = match self.entries.get_mut(&id) { - Some(entry) => entry, + Some(entry) => { + if entry.complete { + // replace entry + self.entries.remove(&id); + self.entries.insert(id, Entry { + trace: Vec::new(), + padding_len: 0, + complete: false }); + self.entries.get_mut(&id).unwrap() + } else { + entry + } + }, None => { self.entries.insert(id, Entry { trace: Vec::new(), @@ -80,6 +92,7 @@ impl Manager { } entry.complete = true; entry.padding_len = padding; + flush_l2_cache(); } Ok(()) } diff --git a/artiq/firmware/satman/main.rs b/artiq/firmware/satman/main.rs index 6f7ed4e6a..fb97b2bdb 100644 --- a/artiq/firmware/satman/main.rs +++ b/artiq/firmware/satman/main.rs @@ -301,19 +301,22 @@ fn process_aux_packet(_manager: &mut DmaManager, _repeaters: &mut [repeater::Rep } } #[cfg(has_rtio_dma)] - drtioaux::Packet::DmaAddTraceRequest { id, last, length, trace } => { + drtioaux::Packet::DmaAddTraceRequest { destination: _destination, id, last, length, trace } => { + forward!(_routing_table, _destination, *_rank, _repeaters, &packet); let succeeded = _manager.add(id, last, &trace, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::DmaAddTraceReply { succeeded: succeeded }) } #[cfg(has_rtio_dma)] - drtioaux::Packet::DmaRemoveTraceRequest { id } => { + drtioaux::Packet::DmaRemoveTraceRequest { destination: _destination, id } => { + forward!(_routing_table, _destination, *_rank, _repeaters, &packet); let succeeded = _manager.erase(id).is_ok(); drtioaux::send(0, &drtioaux::Packet::DmaRemoveTraceReply { succeeded: succeeded }) } #[cfg(has_rtio_dma)] - drtioaux::Packet::DmaPlaybackRequest { id, timestamp } => { + drtioaux::Packet::DmaPlaybackRequest { destination: _destination, id, timestamp } => { + forward!(_routing_table, _destination, *_rank, _repeaters, &packet); let succeeded = _manager.playback(id, timestamp).is_ok(); drtioaux::send(0, &drtioaux::Packet::DmaPlaybackReply { succeeded: succeeded }) @@ -463,7 +466,8 @@ pub extern fn main() -> i32 { unsafe { ALLOC.add_range(&mut _fheap, &mut _eheap); - pmp::init_stack_guard(&_sstack_guard as *const u8 as usize); + // stack guard disabled, see https://github.com/m-labs/artiq/issues/2067 + // pmp::init_stack_guard(&_sstack_guard as *const u8 as usize); } clock::init(); @@ -571,8 +575,9 @@ pub extern fn main() -> i32 { } } if let Some(status) = dma_manager.check_state() { + info!("playback done, error: {}, channel: {}, timestamp: {}", status.error, status.channel, status.timestamp); if let Err(e) = drtioaux::send(0, &drtioaux::Packet::DmaPlaybackStatus { - id: status.id, error: status.error, channel: status.channel, timestamp: status.timestamp }) { + destination: rank, id: status.id, error: status.error, channel: status.channel, timestamp: status.timestamp }) { error!("error sending DMA playback status: {}", e); } }