From 6640bf0e829f72d719a4740644e9c389052d7b46 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Thu, 26 Oct 2023 17:05:11 +0800 Subject: [PATCH] drtioaux/subkernel/ddma: introduce proper errors, more robust --- artiq/firmware/runtime/analyzer.rs | 9 +- artiq/firmware/runtime/kern_hwreq.rs | 12 +- artiq/firmware/runtime/kernel.rs | 71 ++++++---- artiq/firmware/runtime/rtio_dma.rs | 88 ++++++++----- artiq/firmware/runtime/rtio_mgt.rs | 189 ++++++++++++++++----------- artiq/firmware/runtime/session.rs | 82 +++++++++--- 6 files changed, 281 insertions(+), 170 deletions(-) diff --git a/artiq/firmware/runtime/analyzer.rs b/artiq/firmware/runtime/analyzer.rs index fa62a535c..41cca6e46 100644 --- a/artiq/firmware/runtime/analyzer.rs +++ b/artiq/firmware/runtime/analyzer.rs @@ -54,19 +54,16 @@ pub mod remote_analyzer { pub fn get_data(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc> - ) -> Result { + ) -> Result { // gets data from satellites and returns consolidated data let mut remote_data: Vec = Vec::new(); let mut remote_overflow = false; let mut remote_sent_bytes = 0; let mut remote_total_bytes = 0; - let data_vec = match drtio::analyzer_query( + let data_vec = drtio::analyzer_query( io, aux_mutex, routing_table, up_destinations - ) { - Ok(data_vec) => data_vec, - Err(e) => return Err(e) - }; + )?; for data in data_vec { remote_total_bytes += data.total_byte_count; remote_sent_bytes += data.sent_bytes; diff --git a/artiq/firmware/runtime/kern_hwreq.rs b/artiq/firmware/runtime/kern_hwreq.rs index 7fd0b379c..49aa2d8af 100644 --- a/artiq/firmware/runtime/kern_hwreq.rs +++ b/artiq/firmware/runtime/kern_hwreq.rs @@ -32,7 +32,7 @@ mod remote_i2c { } Err(e) => { error!("aux packet error ({})", e); - Err(e) + Err("aux packet error") } } } @@ -55,7 +55,7 @@ mod remote_i2c { } Err(e) => { error!("aux packet error ({})", e); - Err(e) + Err("aux packet error") } } } @@ -78,7 +78,7 @@ mod remote_i2c { } Err(e) => { error!("aux packet error ({})", e); - Err(e) + Err("aux packet error") } } } @@ -102,7 +102,7 @@ mod remote_i2c { } Err(e) => { error!("aux packet error ({})", e); - Err(e) + Err("aux packet error") } } } @@ -126,7 +126,7 @@ mod remote_i2c { } Err(e) => { error!("aux packet error ({})", e); - Err(e) + Err("aux packet error") } } } @@ -151,7 +151,7 @@ mod remote_i2c { } Err(e) => { error!("aux packet error ({})", e); - Err(e) + Err("aux packet error") } } } diff --git a/artiq/firmware/runtime/kernel.rs b/artiq/firmware/runtime/kernel.rs index 8bf46451f..a308e6c1c 100644 --- a/artiq/firmware/runtime/kernel.rs +++ b/artiq/firmware/runtime/kernel.rs @@ -91,8 +91,7 @@ pub fn validate(ptr: usize) -> bool { #[cfg(has_drtio)] pub mod subkernel { - use alloc::{vec::Vec, collections::btree_map::BTreeMap, string::String, string::ToString}; - use core::str; + use alloc::{vec::Vec, collections::btree_map::BTreeMap}; use board_artiq::drtio_routing::RoutingTable; use board_misoc::clock; use proto_artiq::{drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}, rpc_proto as rpc}; @@ -119,32 +118,30 @@ pub mod subkernel { pub enum Error { #[fail(display = "Timed out waiting for subkernel")] Timeout, - #[fail(display = "Session killed while waiting for subkernel")] - SessionKilled, #[fail(display = "Subkernel is in incorrect state for the given operation")] IncorrectState, #[fail(display = "DRTIO error: {}", _0)] - DrtioError(String), - #[fail(display = "scheduler error")] - SchedError(SchedError), + DrtioError(#[cause] drtio::Error), + #[fail(display = "scheduler error: {}", _0)] + SchedError(#[cause] SchedError), #[fail(display = "rpc io error")] RpcIoError, #[fail(display = "subkernel finished prematurely")] SubkernelFinished, } - impl From<&str> for Error { - fn from(value: &str) -> Error { - Error::DrtioError(value.to_string()) + impl From for Error { + fn from(value: drtio::Error) -> Error { + match value { + drtio::Error::SchedError(x) => Error::SchedError(x), + x => Error::DrtioError(x), + } } } impl From for Error { fn from(value: SchedError) -> Error { - match value { - SchedError::Interrupted => Error::SessionKilled, - x => Error::SchedError(x) - } + Error::SchedError(value) } } @@ -178,14 +175,15 @@ pub mod subkernel { static mut SUBKERNELS: BTreeMap = BTreeMap::new(); - pub fn add_subkernel(io: &Io, subkernel_mutex: &Mutex, id: u32, destination: u8, kernel: Vec) { - let _lock = subkernel_mutex.lock(io).unwrap(); + pub fn add_subkernel(io: &Io, subkernel_mutex: &Mutex, id: u32, destination: u8, kernel: Vec) -> Result<(), Error> { + let _lock = subkernel_mutex.lock(io)?; unsafe { SUBKERNELS.insert(id, Subkernel::new(destination, kernel)); } + Ok(()) } pub fn upload(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex, routing_table: &RoutingTable, id: u32) -> Result<(), Error> { - let _lock = subkernel_mutex.lock(io).unwrap(); + let _lock = subkernel_mutex.lock(io)?; let subkernel = unsafe { SUBKERNELS.get_mut(&id).unwrap() }; drtio::subkernel_upload(io, aux_mutex, routing_table, id, subkernel.destination, &subkernel.data)?; @@ -195,9 +193,10 @@ pub mod subkernel { pub fn load(io: &Io, aux_mutex: &Mutex, subkernel_mutex: &Mutex, routing_table: &RoutingTable, id: u32, run: bool) -> Result<(), Error> { - let _lock = subkernel_mutex.lock(io).unwrap(); + let _lock = subkernel_mutex.lock(io)?; let subkernel = unsafe { SUBKERNELS.get_mut(&id).unwrap() }; if subkernel.state != SubkernelState::Uploaded { + error!("for id: {} expected Uploaded, got: {:?}", id, subkernel.state); return Err(Error::IncorrectState); } drtio::subkernel_load(io, aux_mutex, routing_table, id, subkernel.destination, run)?; @@ -207,13 +206,14 @@ pub mod subkernel { Ok(()) } - pub fn clear_subkernels(io: &Io, subkernel_mutex: &Mutex) { - let _lock = subkernel_mutex.lock(io).unwrap(); + pub fn clear_subkernels(io: &Io, subkernel_mutex: &Mutex) -> Result<(), Error> { + let _lock = subkernel_mutex.lock(io)?; unsafe { SUBKERNELS = BTreeMap::new(); MESSAGE_QUEUE = Vec::new(); CURRENT_MESSAGES = BTreeMap::new(); } + Ok(()) } pub fn subkernel_finished(io: &Io, subkernel_mutex: &Mutex, id: u32, with_exception: bool) { @@ -222,10 +222,13 @@ pub mod subkernel { let subkernel = unsafe { SUBKERNELS.get_mut(&id) }; // may be None if session ends and is cleared if let Some(subkernel) = subkernel { - subkernel.state = SubkernelState::Finished { - status: match with_exception { - true => FinishStatus::Exception, - false => FinishStatus::Ok, + // ignore other messages, could be a late finish reported + if subkernel.state == SubkernelState::Running { + subkernel.state = SubkernelState::Finished { + status: match with_exception { + true => FinishStatus::Exception, + false => FinishStatus::Ok, + } } } } @@ -269,7 +272,9 @@ pub mod subkernel { } else { None } }) }, - _ => Err(Error::IncorrectState) + _ => { + Err(Error::IncorrectState) + } } } @@ -279,7 +284,9 @@ pub mod subkernel { let _lock = subkernel_mutex.lock(io)?; match unsafe { SUBKERNELS.get(&id).unwrap().state } { SubkernelState::Running | SubkernelState::Finished { .. } => (), - _ => return Err(Error::IncorrectState) + _ => { + return Err(Error::IncorrectState); + } } } let max_time = clock::get_ms() + timeout as u64; @@ -324,10 +331,16 @@ pub mod subkernel { // may get interrupted, when session is cancelled or main kernel finishes without await Err(_) => return, }; - if unsafe { SUBKERNELS.get(&id).is_none() } { - // do not add messages for non-existing or deleted subkernels + let subkernel = unsafe { SUBKERNELS.get(&id) }; + if subkernel.is_none() || subkernel.unwrap().state != SubkernelState::Running { + // do not add messages for non-existing, non-running or deleted subkernels return } + if status.is_first() { + unsafe { + CURRENT_MESSAGES.remove(&id); + } + } match unsafe { CURRENT_MESSAGES.get_mut(&id) } { Some(message) => message.data.extend(&data[..length]), None => unsafe { @@ -398,7 +411,7 @@ pub mod subkernel { routing_table: &RoutingTable, id: u32, count: u8, tag: &'a [u8], message: *const *const () ) -> Result<(), Error> { let mut writer = Cursor::new(Vec::new()); - let _lock = subkernel_mutex.lock(io).unwrap(); + let _lock = subkernel_mutex.lock(io)?; let destination = unsafe { SUBKERNELS.get(&id).unwrap().destination }; // reuse rpc code for sending arbitrary data diff --git a/artiq/firmware/runtime/rtio_dma.rs b/artiq/firmware/runtime/rtio_dma.rs index ce0fd4088..63bf563a6 100644 --- a/artiq/firmware/runtime/rtio_dma.rs +++ b/artiq/firmware/runtime/rtio_dma.rs @@ -1,6 +1,6 @@ use core::mem; use alloc::{vec::Vec, string::String, collections::btree_map::BTreeMap}; -use sched::{Io, Mutex}; +use sched::{Io, Mutex, Error as SchedError}; const ALIGNMENT: usize = 64; @@ -39,19 +39,48 @@ pub mod remote_dma { } } + #[derive(Fail, Debug)] + pub enum Error { + #[fail(display = "Timed out waiting for DMA results")] + Timeout, + #[fail(display = "DDMA trace is in incorrect state for the given operation")] + IncorrectState, + #[fail(display = "scheduler error: {}", _0)] + SchedError(#[cause] SchedError), + #[fail(display = "DRTIO error: {}", _0)] + DrtioError(#[cause] drtio::Error), + } + + impl From for Error { + fn from(value: drtio::Error) -> Error { + match value { + drtio::Error::SchedError(x) => Error::SchedError(x), + x => Error::DrtioError(x), + } + } + } + + impl From for Error { + fn from(value: SchedError) -> Error { + Error::SchedError(value) + } + } + // remote traces map. ID -> destination, trace pair static mut TRACES: BTreeMap> = BTreeMap::new(); - pub fn add_traces(io: &Io, ddma_mutex: &Mutex, id: u32, traces: BTreeMap>) { - let _lock = ddma_mutex.lock(io); + pub fn add_traces(io: &Io, ddma_mutex: &Mutex, id: u32, traces: BTreeMap> + ) -> Result<(), SchedError> { + let _lock = ddma_mutex.lock(io)?; let mut trace_map: BTreeMap = BTreeMap::new(); for (destination, trace) in traces { trace_map.insert(destination, trace.into()); } unsafe { TRACES.insert(id, trace_map); } + Ok(()) } - pub fn await_done(io: &Io, ddma_mutex: &Mutex, id: u32, timeout: u64) -> Result { + pub fn await_done(io: &Io, ddma_mutex: &Mutex, id: u32, timeout: u64) -> Result { let max_time = clock::get_ms() + timeout as u64; io.until(|| { if clock::get_ms() > max_time { @@ -70,15 +99,15 @@ pub mod remote_dma { } } true - }).unwrap(); + })?; if clock::get_ms() > max_time { error!("Remote DMA await done timed out"); - return Err("Timed out waiting for results."); + return Err(Error::Timeout); } // clear the internal state, and if there have been any errors, return one of them let mut playback_state: RemoteState = RemoteState::PlaybackEnded { error: 0, channel: 0, timestamp: 0 }; { - let _lock = ddma_mutex.lock(io).unwrap(); + let _lock = ddma_mutex.lock(io)?; let traces = unsafe { TRACES.get_mut(&id).unwrap() }; for (_dest, trace) in traces { match trace.state { @@ -92,8 +121,8 @@ pub mod remote_dma { } pub fn erase(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, - routing_table: &RoutingTable, id: u32) { - let _lock = ddma_mutex.lock(io).unwrap(); + routing_table: &RoutingTable, id: u32) -> Result<(), Error> { + let _lock = ddma_mutex.lock(io)?; let destinations = unsafe { TRACES.get(&id).unwrap() }; for destination in destinations.keys() { match drtio::ddma_send_erase(io, aux_mutex, routing_table, id, *destination) { @@ -102,42 +131,39 @@ pub mod remote_dma { } } unsafe { TRACES.remove(&id); } + Ok(()) } pub fn upload_traces(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, - routing_table: &RoutingTable, id: u32) { - let _lock = ddma_mutex.lock(io); + routing_table: &RoutingTable, id: u32) -> Result<(), Error> { + let _lock = ddma_mutex.lock(io)?; let traces = unsafe { TRACES.get_mut(&id).unwrap() }; for (destination, mut trace) in traces { - match drtio::ddma_upload_trace(io, aux_mutex, routing_table, id, *destination, trace.get_trace()) - { - Ok(_) => trace.state = RemoteState::Loaded, - Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e) - } + drtio::ddma_upload_trace(io, aux_mutex, routing_table, id, *destination, trace.get_trace())?; + trace.state = RemoteState::Loaded; } + Ok(()) } pub fn playback(io: &Io, aux_mutex: &Mutex, ddma_mutex: &Mutex, - routing_table: &RoutingTable, id: u32, timestamp: u64) { + routing_table: &RoutingTable, id: u32, timestamp: u64) -> Result<(), Error>{ // triggers playback on satellites let destinations = unsafe { - let _lock = ddma_mutex.lock(io).unwrap(); + let _lock = ddma_mutex.lock(io)?; TRACES.get(&id).unwrap() }; for (destination, trace) in destinations { { // need to drop the lock before sending the playback request to avoid a deadlock // if a PlaybackStatus is returned from another satellite in the meanwhile. - let _lock = ddma_mutex.lock(io).unwrap(); + let _lock = ddma_mutex.lock(io)?; if trace.state != RemoteState::Loaded { error!("Destination {} not ready for DMA, state: {:?}", *destination, trace.state); - continue; + return Err(Error::IncorrectState); } } - match drtio::ddma_send_playback(io, aux_mutex, routing_table, id, *destination, timestamp) { - Ok(_) => (), - Err(e) => error!("Error during remote DMA playback: {}", e) - } + drtio::ddma_send_playback(io, aux_mutex, routing_table, id, *destination, timestamp)?; } + Ok(()) } pub fn playback_done(io: &Io, ddma_mutex: &Mutex, @@ -172,10 +198,10 @@ pub mod remote_dma { } } - pub fn has_remote_traces(io: &Io, ddma_mutex: &Mutex, id: u32) -> bool { - let _lock = ddma_mutex.lock(io).unwrap(); + pub fn has_remote_traces(io: &Io, ddma_mutex: &Mutex, id: u32) -> Result { + let _lock = ddma_mutex.lock(io)?; let trace_list = unsafe { TRACES.get(&id).unwrap() }; - !trace_list.is_empty() + Ok(!trace_list.is_empty()) } } @@ -225,11 +251,11 @@ impl Manager { } pub fn record_stop(&mut self, duration: u64, _enable_ddma: bool, - _io: &Io, _ddma_mutex: &Mutex) -> u32 { + _io: &Io, _ddma_mutex: &Mutex) -> Result { let mut local_trace = Vec::new(); let mut _remote_traces: BTreeMap> = BTreeMap::new(); - if _enable_ddma & cfg!(has_drtio) { + if _enable_ddma && cfg!(has_drtio) { let mut trace = Vec::new(); mem::swap(&mut self.recording_trace, &mut trace); trace.push(0); @@ -284,9 +310,9 @@ impl Manager { self.name_map.insert(name, id); #[cfg(has_drtio)] - remote_dma::add_traces(_io, _ddma_mutex, id, _remote_traces); + remote_dma::add_traces(_io, _ddma_mutex, id, _remote_traces)?; - id + Ok(id) } pub fn erase(&mut self, name: &str) { diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index a03737178..44661e822 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -22,6 +22,39 @@ pub mod drtio { #[cfg(has_rtio_analyzer)] use analyzer::remote_analyzer::RemoteBuffer; use kernel::subkernel; + use sched::Error as SchedError; + + #[derive(Fail, Debug)] + pub enum Error { + #[fail(display = "timed out")] + Timeout, + #[fail(display = "unexpected packet: {:?}", _0)] + UnexpectedPacket(drtioaux::Packet), + #[fail(display = "aux packet error")] + AuxError, + #[fail(display = "link down")] + LinkDown, + #[fail(display = "unexpected reply")] + UnexpectedReply, + #[fail(display = "error adding DMA trace on satellite #{}", _0)] + DmaAddTraceFail(u8), + #[fail(display = "error erasing DMA trace on satellite #{}", _0)] + DmaEraseFail(u8), + #[fail(display = "error playing back DMA trace on satellite #{}", _0)] + DmaPlaybackFail(u8), + #[fail(display = "error adding subkernel on satellite #{}", _0)] + SubkernelAddFail(u8), + #[fail(display = "error on subkernel run request on satellite #{}", _0)] + SubkernelRunFail(u8), + #[fail(display = "sched error: {}", _0)] + SchedError(#[cause] SchedError), + } + + impl From for Error { + fn from(value: SchedError) -> Error { + Error::SchedError(value) + } + } pub fn startup(io: &Io, aux_mutex: &Mutex, routing_table: &Urc>, @@ -45,21 +78,21 @@ pub mod drtio { } } - fn recv_aux_timeout(io: &Io, linkno: u8, timeout: u32) -> Result { + fn recv_aux_timeout(io: &Io, linkno: u8, timeout: u32) -> Result { let max_time = clock::get_ms() + timeout as u64; loop { if !link_rx_up(linkno) { - return Err("link went down"); + return Err(Error::LinkDown); } if clock::get_ms() > max_time { - return Err("timeout"); + return Err(Error::Timeout); } match drtioaux::recv(linkno) { Ok(Some(packet)) => return Ok(packet), Ok(None) => (), - Err(_) => return Err("aux packet error") + Err(_) => return Err(Error::AuxError) } - io.relinquish().unwrap(); + io.relinquish()?; } } @@ -88,13 +121,23 @@ pub mod drtio { } pub fn aux_transact(io: &Io, aux_mutex: &Mutex, linkno: u8, request: &drtioaux::Packet - ) -> Result { - let _lock = aux_mutex.lock(io).unwrap(); + ) -> Result { + let _lock = aux_mutex.lock(io)?; drtioaux::send(linkno, request).unwrap(); let reply = recv_aux_timeout(io, linkno, 200)?; Ok(reply) } + pub fn clear_buffers(io: &Io, aux_mutex: &Mutex) { + let _lock = aux_mutex.lock(io).unwrap(); + for linkno in 0..(csr::DRTIO.len() as u8) { + if !link_rx_up(linkno) { + continue; + } + let _ = recv_aux_timeout(io, linkno, 200); + } + } + fn ping_remote(io: &Io, aux_mutex: &Mutex, linkno: u8) -> u32 { let mut count = 0; loop { @@ -124,7 +167,7 @@ pub mod drtio { } } - fn sync_tsc(io: &Io, aux_mutex: &Mutex, linkno: u8) -> Result<(), &'static str> { + fn sync_tsc(io: &Io, aux_mutex: &Mutex, linkno: u8) -> Result<(), Error> { let _lock = aux_mutex.lock(io).unwrap(); unsafe { @@ -137,32 +180,32 @@ pub mod drtio { if reply == drtioaux::Packet::TSCAck { return Ok(()); } else { - return Err("unexpected reply"); + return Err(Error::UnexpectedReply); } } fn load_routing_table(io: &Io, aux_mutex: &Mutex, - linkno: u8, routing_table: &drtio_routing::RoutingTable) -> Result<(), &'static str> { + linkno: u8, routing_table: &drtio_routing::RoutingTable) -> Result<(), Error> { for i in 0..drtio_routing::DEST_COUNT { let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::RoutingSetPath { destination: i as u8, hops: routing_table.0[i] })?; if reply != drtioaux::Packet::RoutingAck { - return Err("unexpected reply"); + return Err(Error::UnexpectedReply); } } Ok(()) } fn set_rank(io: &Io, aux_mutex: &Mutex, - linkno: u8, rank: u8) -> Result<(), &'static str> { + linkno: u8, rank: u8) -> Result<(), Error> { let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::RoutingSetRank { rank: rank })?; if reply != drtioaux::Packet::RoutingAck { - return Err("unexpected reply"); + return Err(Error::UnexpectedReply); } Ok(()) } @@ -285,7 +328,7 @@ pub mod drtio { } } } else { - error!("[DEST#{}] communication failed ({})", destination, reply.unwrap_err()); + error!("[DEST#{}] communication failed ({:?})", destination, reply.unwrap_err()); } break; } @@ -309,7 +352,7 @@ pub mod drtio { subkernel::destination_changed(io, aux_mutex, subkernel_mutex, routing_table, destination, true); }, Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), - Err(e) => error!("[DEST#{}] communication failed ({})", destination, e) + Err(e) => error!("[DEST#{}] communication failed ({:?})", destination, e) } } } @@ -343,13 +386,13 @@ pub mod drtio { info!("[LINK#{}] remote replied after {} packets", linkno, ping_count); up_links[linkno as usize] = true; if let Err(e) = sync_tsc(&io, aux_mutex, linkno) { - error!("[LINK#{}] failed to sync TSC ({})", linkno, e); + error!("[LINK#{}] failed to sync TSC ({:?})", linkno, e); } if let Err(e) = load_routing_table(&io, aux_mutex, linkno, routing_table) { - error!("[LINK#{}] failed to load routing table ({})", linkno, e); + error!("[LINK#{}] failed to load routing table ({:?})", linkno, e); } if let Err(e) = set_rank(&io, aux_mutex, linkno, 1) { - error!("[LINK#{}] failed to set rank ({})", linkno, e); + error!("[LINK#{}] failed to set rank ({:?})", linkno, e); } info!("[LINK#{}] link initialization completed", linkno); } else { @@ -384,86 +427,79 @@ pub mod drtio { match reply { Ok(drtioaux::Packet::ResetAck) => (), Ok(_) => error!("[LINK#{}] reset failed, received unexpected aux packet", linkno), - Err(e) => error!("[LINK#{}] reset failed, aux packet error ({})", linkno, e) + Err(e) => error!("[LINK#{}] reset failed, aux packet error ({:?})", linkno, e) } } } } - fn partition_data(data: &[u8], send_f: F) -> Result<(), &'static str> - where F: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], PayloadStatus, usize) -> Result<(), &'static str> { + fn partition_data(data: &[u8], send_f: F) -> Result<(), Error> + where F: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], PayloadStatus, usize) -> Result<(), Error> { let mut i = 0; - let mut first = true; while i < data.len() { let mut slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let len: usize = if i + MASTER_PAYLOAD_MAX_SIZE < data.len() { MASTER_PAYLOAD_MAX_SIZE } else { data.len() - i } as usize; + let first = i == 0; let last = i + len == data.len(); let status = PayloadStatus::from_status(first, last); slice[..len].clone_from_slice(&data[i..i+len]); i += len; send_f(&slice, status, len)?; - first = false; } Ok(()) } pub fn ddma_upload_trace(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, - id: u32, destination: u8, trace: &[u8]) -> Result<(), &'static str> { + id: u32, destination: u8, trace: &[u8]) -> Result<(), Error> { let linkno = routing_table.0[destination as usize][0] - 1; partition_data(trace, |slice, status, len: usize| { let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DmaAddTraceRequest { - id: id, destination: destination, status: status, length: len as u16, trace: *slice}); + id: id, destination: destination, status: status, length: len as u16, trace: *slice})?; match reply { - Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: true }) => Ok(()), - Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: false }) => - Err("error adding trace on satellite"), - Ok(_) => Err("adding DMA trace failed, unexpected aux packet"), - Err(_) => Err("adding DMA trace failed, aux error") + drtioaux::Packet::DmaAddTraceReply { succeeded: true } => Ok(()), + drtioaux::Packet::DmaAddTraceReply { succeeded: false } => Err(Error::DmaAddTraceFail(destination)), + packet => Err(Error::UnexpectedPacket(packet)), } }) } pub fn ddma_send_erase(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, - id: u32, destination: u8) -> Result<(), &'static str> { + id: u32, destination: u8) -> Result<(), Error> { let linkno = routing_table.0[destination as usize][0] - 1; let reply = aux_transact(io, aux_mutex, linkno, - &drtioaux::Packet::DmaRemoveTraceRequest { id: id, destination: destination }); + &drtioaux::Packet::DmaRemoveTraceRequest { id: id, destination: destination })?; match reply { - Ok(drtioaux::Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()), - Ok(drtioaux::Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"), - Ok(_) => Err("erasing trace failed, unexpected aux packet"), - Err(_) => Err("erasing trace failed, aux error") + drtioaux::Packet::DmaRemoveTraceReply { succeeded: true } => Ok(()), + drtioaux::Packet::DmaRemoveTraceReply { succeeded: false } => Err(Error::DmaEraseFail(destination)), + packet => Err(Error::UnexpectedPacket(packet)), } } pub fn ddma_send_playback(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, - id: u32, destination: u8, timestamp: u64) -> Result<(), &'static str> { + id: u32, destination: u8, timestamp: u64) -> Result<(), Error> { let linkno = routing_table.0[destination as usize][0] - 1; let reply = aux_transact(io, aux_mutex, linkno, - &drtioaux::Packet::DmaPlaybackRequest{ id: id, destination: destination, timestamp: timestamp }); + &drtioaux::Packet::DmaPlaybackRequest{ id: id, destination: destination, timestamp: timestamp })?; match reply { - Ok(drtioaux::Packet::DmaPlaybackReply { succeeded: true }) => return Ok(()), - Ok(drtioaux::Packet::DmaPlaybackReply { succeeded: false }) => - return Err("error on DMA playback request"), - Ok(_) => return Err("received unexpected aux packet during DMA playback"), - Err(_) => return Err("aux error on DMA playback") + drtioaux::Packet::DmaPlaybackReply { succeeded: true } => Ok(()), + drtioaux::Packet::DmaPlaybackReply { succeeded: false } => + Err(Error::DmaPlaybackFail(destination)), + packet => Err(Error::UnexpectedPacket(packet)), } } #[cfg(has_rtio_analyzer)] fn analyzer_get_data(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, - destination: u8) -> Result { + destination: u8) -> Result { let linkno = routing_table.0[destination as usize][0] - 1; let reply = aux_transact(io, aux_mutex, linkno, - &drtioaux::Packet::AnalyzerHeaderRequest { destination: destination }); + &drtioaux::Packet::AnalyzerHeaderRequest { destination: destination })?; let (sent, total, overflow) = match reply { - Ok(drtioaux::Packet::AnalyzerHeader { - sent_bytes, total_byte_count, overflow_occurred } - ) => (sent_bytes, total_byte_count, overflow_occurred), - Ok(_) => return Err("received unexpected aux packet during remote analyzer header request"), - Err(e) => return Err(e) + drtioaux::Packet::AnalyzerHeader { sent_bytes, total_byte_count, overflow_occurred } => + (sent_bytes, total_byte_count, overflow_occurred), + packet => return Err(Error::UnexpectedPacket(packet)), }; let mut remote_data: Vec = Vec::new(); @@ -471,14 +507,13 @@ pub mod drtio { let mut last_packet = false; while !last_packet { let reply = aux_transact(io, aux_mutex, linkno, - &drtioaux::Packet::AnalyzerDataRequest { destination: destination }); + &drtioaux::Packet::AnalyzerDataRequest { destination: destination })?; match reply { - Ok(drtioaux::Packet::AnalyzerData { last, length, data }) => { + drtioaux::Packet::AnalyzerData { last, length, data } => { last_packet = last; remote_data.extend(&data[0..length as usize]); }, - Ok(_) => return Err("received unexpected aux packet during remote analyzer data request"), - Err(e) => return Err(e) + packet => return Err(Error::UnexpectedPacket(packet)), } } } @@ -494,7 +529,7 @@ pub mod drtio { #[cfg(has_rtio_analyzer)] pub fn analyzer_query(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, up_destinations: &Urc> - ) -> Result, &'static str> { + ) -> Result, Error> { let mut remote_buffers: Vec = Vec::new(); for i in 1..drtio_routing::DEST_COUNT { if destination_up(up_destinations, i as u8) { @@ -505,69 +540,65 @@ pub mod drtio { } pub fn subkernel_upload(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, - id: u32, destination: u8, data: &Vec) -> Result<(), &'static str> { + id: u32, destination: u8, data: &Vec) -> Result<(), Error> { let linkno = routing_table.0[destination as usize][0] - 1; partition_data(data, |slice, status, len: usize| { let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::SubkernelAddDataRequest { - id: id, destination: destination, status: status, length: len as u16, data: *slice}); + id: id, destination: destination, status: status, length: len as u16, data: *slice})?; match reply { - Ok(drtioaux::Packet::SubkernelAddDataReply { succeeded: true }) => Ok(()), - Ok(drtioaux::Packet::SubkernelAddDataReply { succeeded: false }) => - Err("error adding subkernel on satellite"), - Ok(_) => Err("adding subkernel failed, unexpected aux packet"), - Err(_) => Err("adding subkernel failed, aux error") + drtioaux::Packet::SubkernelAddDataReply { succeeded: true } => Ok(()), + drtioaux::Packet::SubkernelAddDataReply { succeeded: false } => + Err(Error::SubkernelAddFail(destination)), + packet => Err(Error::UnexpectedPacket(packet)), } }) } pub fn subkernel_load(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, - id: u32, destination: u8, run: bool) -> Result<(), &'static str> { + id: u32, destination: u8, run: bool) -> Result<(), Error> { let linkno = routing_table.0[destination as usize][0] - 1; let reply = aux_transact(io, aux_mutex, linkno, - &drtioaux::Packet::SubkernelLoadRunRequest{ id: id, destination: destination, run: run }); + &drtioaux::Packet::SubkernelLoadRunRequest{ id: id, destination: destination, run: run })?; match reply { - Ok(drtioaux::Packet::SubkernelLoadRunReply { succeeded: true }) => return Ok(()), - Ok(drtioaux::Packet::SubkernelLoadRunReply { succeeded: false }) => - return Err("error on subkernel run request"), - Ok(_) => return Err("received unexpected aux packet during subkernel run"), - Err(_) => return Err("aux error on subkernel run") + drtioaux::Packet::SubkernelLoadRunReply { succeeded: true } => Ok(()), + drtioaux::Packet::SubkernelLoadRunReply { succeeded: false } => + Err(Error::SubkernelRunFail(destination)), + packet => Err(Error::UnexpectedPacket(packet)), } } pub fn subkernel_retrieve_exception(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, destination: u8 - ) -> Result, &'static str> { + ) -> Result, Error> { let linkno = routing_table.0[destination as usize][0] - 1; let mut remote_data: Vec = Vec::new(); loop { let reply = aux_transact(io, aux_mutex, linkno, - &drtioaux::Packet::SubkernelExceptionRequest { destination: destination }); + &drtioaux::Packet::SubkernelExceptionRequest { destination: destination })?; match reply { - Ok(drtioaux::Packet::SubkernelException { last, length, data }) => { + drtioaux::Packet::SubkernelException { last, length, data } => { remote_data.extend(&data[0..length as usize]); if last { return Ok(remote_data); } }, - Ok(_) => return Err("received unexpected aux packet during subkernel exception request"), - Err(e) => return Err(e) + packet => return Err(Error::UnexpectedPacket(packet)), } } } pub fn subkernel_send_message(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, id: u32, destination: u8, message: &[u8] - ) -> Result<(), &'static str> { + ) -> Result<(), Error> { let linkno = routing_table.0[destination as usize][0] - 1; partition_data(message, |slice, status, len: usize| { let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::SubkernelMessage { - destination: destination, id: id, status: status, length: len as u16, data: *slice}); + destination: destination, id: id, status: status, length: len as u16, data: *slice})?; match reply { - Ok(drtioaux::Packet::SubkernelMessageAck { .. }) => Ok(()), - Ok(_) => Err("sending message to subkernel failed, unexpected aux packet"), - Err(_) => Err("sending message to subkernel, aux error") + drtioaux::Packet::SubkernelMessageAck { .. } => Ok(()), + packet => Err(Error::UnexpectedPacket(packet)), } }) } diff --git a/artiq/firmware/runtime/session.rs b/artiq/firmware/runtime/session.rs index 471cc20c9..d08e42986 100644 --- a/artiq/firmware/runtime/session.rs +++ b/artiq/firmware/runtime/session.rs @@ -5,7 +5,7 @@ use cslice::CSlice; use io::{Read, Write, Error as IoError}; #[cfg(has_drtio)] -use io::{Cursor, ProtoRead}; +use io::Cursor; use board_misoc::{ident, cache, config}; use {mailbox, rpc_queue, kernel}; use urc::Urc; @@ -16,6 +16,8 @@ use rtio_dma::Manager as DmaManager; use rtio_dma::remote_dma; #[cfg(has_drtio)] use kernel::{subkernel, subkernel::Error as SubkernelError}; +#[cfg(has_drtio)] +use rtio_mgt::drtio; use rtio_mgt::get_async_errors; use cache::Cache; use kern_hwreq; @@ -40,8 +42,14 @@ pub enum Error { #[fail(display = "subkernel io error")] SubkernelIoError, #[cfg(has_drtio)] + #[fail(display = "DDMA error: {}", _0)] + Ddma(#[cause] remote_dma::Error), + #[cfg(has_drtio)] #[fail(display = "subkernel error: {}", _0)] Subkernel(#[cause] SubkernelError), + #[cfg(has_drtio)] + #[fail(display = "drtio aux error: {}", _0)] + DrtioAux(#[cause] drtio::Error), #[fail(display = "{}", _0)] Unexpected(String), } @@ -52,6 +60,16 @@ impl From> for Error { } } +#[cfg(has_drtio)] +impl From for Error { + fn from(value: drtio::Error) -> Error { + match value { + drtio::Error::SchedError(x) => Error::from(x), + x => Error::DrtioAux(x), + } + } +} + impl From for Error { fn from(value: SchedError) -> Error { Error::Protocol(host::Error::Io(IoError::Other(value))) @@ -79,7 +97,22 @@ impl From> for Error { #[cfg(has_drtio)] impl From for Error { fn from(value: SubkernelError) -> Error { - Error::Subkernel(value) + match value { + SubkernelError::SchedError(x) => Error::from(x), + SubkernelError::DrtioError(x) => Error::from(x), + x => Error::Subkernel(x), + } + } +} + +#[cfg(has_drtio)] +impl From for Error { + fn from(value: remote_dma::Error) -> Error { + match value { + remote_dma::Error::SchedError(x) => Error::from(x), + remote_dma::Error::DrtioError(x) => Error::from(x), + x => Error::Ddma(x), + } } } @@ -371,7 +404,7 @@ fn process_host_message(io: &Io, _aux_mutex: &Mutex, _ddma_mutex: &Mutex, _subke host::Request::UploadSubkernel { id: _id, destination: _dest, kernel: _kernel } => { #[cfg(has_drtio)] { - subkernel::add_subkernel(io, _subkernel_mutex, _id, _dest, _kernel); + subkernel::add_subkernel(io, _subkernel_mutex, _id, _dest, _kernel)?; match subkernel::upload(io, _aux_mutex, _subkernel_mutex, _routing_table, _id) { Ok(_) => host_write(stream, host::Reply::LoadCompleted)?, Err(error) => { @@ -434,7 +467,7 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, if let Some(_id) = session.congress.dma_manager.record_start(name) { // replace the record #[cfg(has_drtio)] - remote_dma::erase(io, aux_mutex, ddma_mutex, routing_table, _id); + remote_dma::erase(io, aux_mutex, ddma_mutex, routing_table, _id)?; } kern_acknowledge() } @@ -443,10 +476,10 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, kern_acknowledge() } &kern::DmaRecordStop { duration, enable_ddma } => { - let _id = session.congress.dma_manager.record_stop(duration, enable_ddma, io, ddma_mutex); + let _id = session.congress.dma_manager.record_stop(duration, enable_ddma, io, ddma_mutex)?; #[cfg(has_drtio)] if enable_ddma { - remote_dma::upload_traces(io, aux_mutex, ddma_mutex, routing_table, _id); + remote_dma::upload_traces(io, aux_mutex, ddma_mutex, routing_table, _id)?; } cache::flush_l2_cache(); kern_acknowledge() @@ -454,7 +487,7 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, &kern::DmaEraseRequest { name } => { #[cfg(has_drtio)] if let Some(id) = session.congress.dma_manager.get_id(name) { - remote_dma::erase(io, aux_mutex, ddma_mutex, routing_table, *id); + remote_dma::erase(io, aux_mutex, ddma_mutex, routing_table, *id)?; } session.congress.dma_manager.erase(name); kern_acknowledge() @@ -463,7 +496,7 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, session.congress.dma_manager.with_trace(name, |trace, duration| { #[cfg(has_drtio)] let uses_ddma = match trace { - Some(trace) => remote_dma::has_remote_traces(io, aux_mutex, trace.as_ptr() as u32), + Some(trace) => remote_dma::has_remote_traces(io, aux_mutex, trace.as_ptr() as u32)?, None => false }; #[cfg(not(has_drtio))] @@ -477,7 +510,7 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, } &kern::DmaStartRemoteRequest { id: _id, timestamp: _timestamp } => { #[cfg(has_drtio)] - remote_dma::playback(io, aux_mutex, ddma_mutex, routing_table, _id as u32, _timestamp as u64); + remote_dma::playback(io, aux_mutex, ddma_mutex, routing_table, _id as u32, _timestamp as u64)?; kern_acknowledge() } &kern::DmaAwaitRemoteRequest { id: _id } => { @@ -703,7 +736,7 @@ fn host_kernel_worker(io: &Io, aux_mutex: &Mutex, congress: &mut Congress) -> Result<(), Error> { let mut session = Session::new(congress); #[cfg(has_drtio)] - subkernel::clear_subkernels(&io, &subkernel_mutex); + subkernel::clear_subkernels(&io, &subkernel_mutex)?; loop { if stream.can_recv() { @@ -785,7 +818,7 @@ fn respawn(io: &Io, handle: &mut Option, f: F) } } - *handle = Some(io.spawn(16384, f)) + *handle = Some(io.spawn(24576, f)) } pub fn thread(io: Io, aux_mutex: &Mutex, @@ -857,16 +890,19 @@ pub fn thread(io: Io, aux_mutex: &Mutex, Err(Error::Protocol(host::Error::Io(IoError::UnexpectedEnd))) => info!("connection closed"), Err(Error::Protocol(host::Error::Io( - IoError::Other(SchedError::Interrupted)))) => - info!("kernel interrupted"), + IoError::Other(SchedError::Interrupted)))) => { + info!("kernel interrupted"); + #[cfg(has_drtio)] + drtio::clear_buffers(&io, &aux_mutex); + } Err(err) => { congress.finished_cleanly.set(false); error!("session aborted: {}", err); + #[cfg(has_drtio)] + drtio::clear_buffers(&io, &aux_mutex); } } stream.close().expect("session: close socket"); - #[cfg(has_drtio)] - subkernel::clear_subkernels(&io, &subkernel_mutex); }); } @@ -887,15 +923,23 @@ pub fn thread(io: Io, aux_mutex: &Mutex, Ok(()) => info!("idle kernel finished, standing by"), Err(Error::Protocol(host::Error::Io( - IoError::Other(SchedError::Interrupted)))) => - info!("idle kernel interrupted"), + IoError::Other(SchedError::Interrupted)))) => { + info!("idle kernel interrupted"); + // clear state for regular kernel + #[cfg(has_drtio)] + drtio::clear_buffers(&io, &aux_mutex); + } Err(Error::KernelNotFound) => { info!("no idle kernel found"); while io.relinquish().is_ok() {} } - Err(err) => - error!("idle kernel aborted: {}", err) + Err(err) => { + error!("idle kernel aborted: {}", err); + #[cfg(has_drtio)] + drtio::clear_buffers(&io, &aux_mutex); + } } + }) }