From 7f36c9e9c1b1310dda9edaf39cdb4b523c85cba4 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Wed, 3 Jul 2024 17:28:00 +0800 Subject: [PATCH] satman: pass exceptions from one subkernel to another --- artiq/firmware/runtime/session.rs | 56 +++++------- artiq/firmware/satman/kernel.rs | 144 +++++++++++++++++++++++------- artiq/firmware/satman/main.rs | 14 ++- artiq/firmware/satman/routing.rs | 2 - 4 files changed, 143 insertions(+), 73 deletions(-) diff --git a/artiq/firmware/runtime/session.rs b/artiq/firmware/runtime/session.rs index 4f27c7b74..71fbb2ce3 100644 --- a/artiq/firmware/runtime/session.rs +++ b/artiq/firmware/runtime/session.rs @@ -673,26 +673,26 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, &kern::SubkernelAwaitFinishRequest{ id, timeout } => { let res = subkernel::await_finish(io, aux_mutex, ddma_mutex, subkernel_mutex, routing_table, id, timeout); - let status = match res { + let response = match res { Ok(ref res) => { if res.comm_lost { - kern::SubkernelStatus::CommLost + kern::SubkernelError(kern::SubkernelStatus::CommLost) } else if let Some(raw_exception) = &res.exception { let exception = subkernel::read_exception(raw_exception); if let Ok(exception) = exception { - kern::SubkernelStatus::Exception(exception) + kern::SubkernelError(kern::SubkernelStatus::Exception(exception)) } else { - kern::SubkernelStatus::OtherError + kern::SubkernelError(kern::SubkernelStatus::OtherError) } } else { - kern::SubkernelStatus::NoError + kern::SubkernelAwaitFinishReply } }, - Err(SubkernelError::Timeout) => kern::SubkernelStatus::Timeout, - Err(SubkernelError::IncorrectState) => kern::SubkernelStatus::IncorrectState, - Err(_) => kern::SubkernelStatus::OtherError + Err(SubkernelError::Timeout) => kern::SubkernelError(kern::SubkernelStatus::Timeout), + Err(SubkernelError::IncorrectState) => kern::SubkernelError(kern::SubkernelStatus::IncorrectState), + Err(_) => kern::SubkernelError(kern::SubkernelStatus::OtherError) }; - kern_send(io, &kern::SubkernelAwaitFinishReply { status: status }) + kern_send(io, &response) } #[cfg(has_drtio)] &kern::SubkernelMsgSend { id, destination, count, tag, data } => { @@ -707,41 +707,29 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex, routing_table, id as u32)?; if res.comm_lost { kern_send(io, - &kern::SubkernelMsgRecvReply { - status: kern::SubkernelStatus::CommLost, - count: 0 - })?; + &kern::SubkernelError(kern::SubkernelStatus::CommLost))?; } else if let Some(raw_exception) = &res.exception { let exception = subkernel::read_exception(raw_exception); if let Ok(exception) = exception { - kern_send(io, - &kern::SubkernelMsgRecvReply { - status: kern::SubkernelStatus::Exception(exception), - count: 0 - })?; + kern_send(io, + &kern::SubkernelError(kern::SubkernelStatus::Exception(exception)))?; } else { - kern_send(io, - &kern::SubkernelMsgRecvReply { - status: kern::SubkernelStatus::OtherError, - count: 0 - })?; + kern_send(io, + &kern::SubkernelError(kern::SubkernelStatus::OtherError))?; } } else { - kern_send(io, - &kern::SubkernelMsgRecvReply { - status: kern::SubkernelStatus::OtherError, - count: 0 - })?; + kern_send(io, + &kern::SubkernelError(kern::SubkernelStatus::OtherError))?; } } else { - let (status, count) = match message_received { - Ok(ref message) => (kern::SubkernelStatus::NoError, message.count), - Err(SubkernelError::Timeout) => (kern::SubkernelStatus::Timeout, 0), - Err(SubkernelError::IncorrectState) => (kern::SubkernelStatus::IncorrectState, 0), + let message = match message_received { + Ok(ref message) => kern::SubkernelMsgRecvReply { count: message.count }, + Err(SubkernelError::Timeout) => kern::SubkernelError(kern::SubkernelStatus::Timeout), + Err(SubkernelError::IncorrectState) => kern::SubkernelError(kern::SubkernelStatus::IncorrectState), Err(SubkernelError::SubkernelFinished) => unreachable!(), // taken care of above - Err(_) => (kern::SubkernelStatus::OtherError, 0) + Err(_) => kern::SubkernelError(kern::SubkernelStatus::OtherError) }; - kern_send(io, &kern::SubkernelMsgRecvReply { status: status, count: count})?; + kern_send(io, &message)?; if let Ok(message) = message_received { // receive code almost identical to RPC recv, except we are not reading from a stream let mut reader = Cursor::new(message.data); diff --git a/artiq/firmware/satman/kernel.rs b/artiq/firmware/satman/kernel.rs index b00861abb..6f8f5b7c7 100644 --- a/artiq/firmware/satman/kernel.rs +++ b/artiq/firmware/satman/kernel.rs @@ -1,23 +1,22 @@ use core::mem; use alloc::{string::String, format, vec::Vec, collections::btree_map::BTreeMap}; -use cslice::AsCSlice; +use cslice::{CSlice, AsCSlice}; use board_artiq::{drtioaux, drtio_routing::RoutingTable, mailbox, spi}; use board_misoc::{csr, clock, i2c}; use proto_artiq::{ drtioaux_proto::PayloadStatus, - kernel_proto as kern, + kernel_proto as kern, session_proto::Reply::KernelException as HostKernelException, rpc_proto as rpc}; use eh::eh_artiq; -use io::Cursor; +use io::{Cursor, ProtoRead}; use kernel::eh_artiq::StackPointerBacktrace; use ::{cricon_select, RtioMaster}; use cache::Cache; use dma::{Manager as DmaManager, Error as DmaError}; use routing::{Router, Sliceable, SliceMeta}; -use SAT_PAYLOAD_MAX_SIZE; use MASTER_PAYLOAD_MAX_SIZE; mod kernel_cpu { @@ -69,6 +68,7 @@ enum KernelState { SubkernelAwaitFinish { max_time: i64, id: u32 }, DmaUploading { max_time: u64 }, DmaAwait { max_time: u64 }, + SubkernelRetrievingException { destination: u8 }, } #[derive(Debug)] @@ -134,10 +134,13 @@ struct MessageManager { struct Session { kernel_state: KernelState, log_buffer: String, - last_exception: Option, - source: u8, // which destination requested running the kernel + last_exception: Option, // exceptions raised locally + external_exception: Vec, // exceptions from sub-subkernels + // which destination requested running the kernel + source: u8, messages: MessageManager, - subkernels_finished: Vec // ids of subkernels finished + // ids of subkernels finished (with exception) + subkernels_finished: Vec<(u32, Option)>, } #[derive(Debug)] @@ -277,6 +280,7 @@ impl Session { kernel_state: KernelState::Absent, log_buffer: String::new(), last_exception: None, + external_exception: Vec::new(), source: 0, messages: MessageManager::new(), subkernels_finished: Vec::new() @@ -428,9 +432,9 @@ impl Manager { } } - pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta { + pub fn exception_get_slice(&mut self, data_slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> SliceMeta { match self.session.last_exception.as_mut() { - Some(exception) => exception.get_slice_sat(data_slice), + Some(exception) => exception.get_slice_master(data_slice), None => SliceMeta { destination: 0, len: 0, status: PayloadStatus::FirstAndLast } } } @@ -517,7 +521,7 @@ impl Manager { return; } - match self.process_external_messages() { + match self.process_external_messages(router, routing_table, rank, destination) { Ok(()) => (), Err(Error::AwaitingMessage) => return, // kernel still waiting, do not process kernel messages Err(Error::KernelException(exception)) => { @@ -549,20 +553,42 @@ impl Manager { } } - fn process_external_messages(&mut self) -> Result<(), Error> { + fn check_finished_kernels(&mut self, id: u32, router: &mut Router, routing_table: &RoutingTable, rank: u8, self_destination: u8) { + for (i, (status, exception_source)) in self.session.subkernels_finished.iter().enumerate() { + if *status == id { + if exception_source.is_none() { + kern_send(&kern::SubkernelAwaitFinishReply).unwrap(); + self.session.kernel_state = KernelState::Running; + self.session.subkernels_finished.swap_remove(i); + } else { + let destination = exception_source.unwrap(); + self.session.external_exception = Vec::new(); + self.session.kernel_state = KernelState::SubkernelRetrievingException { destination: destination }; + router.route(drtioaux::Packet::SubkernelExceptionRequest { + source: self_destination, destination: destination + }, &routing_table, rank, self_destination); + } + break; + } + } + } + + fn process_external_messages(&mut self, router: &mut Router, routing_table: &RoutingTable, rank: u8, self_destination: u8) -> Result<(), Error> { match &self.session.kernel_state { KernelState::MsgAwait { id, max_time, tags } => { if *max_time > 0 && clock::get_ms() > *max_time as u64 { - kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::Timeout, count: 0 })?; + kern_send(&kern::SubkernelError(kern::SubkernelStatus::Timeout))?; self.session.kernel_state = KernelState::Running; return Ok(()) } if let Some(message) = self.session.messages.get_incoming(*id) { - kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::NoError, count: message.count })?; + kern_send(&kern::SubkernelMsgRecvReply { count: message.count })?; let tags = tags.clone(); self.session.kernel_state = KernelState::Running; pass_message_to_kernel(&message, &tags) } else { + let id = *id; + self.check_finished_kernels(id, router, routing_table, rank, self_destination); Err(Error::AwaitingMessage) } }, @@ -576,19 +602,11 @@ impl Manager { }, KernelState::SubkernelAwaitFinish { max_time, id } => { if *max_time > 0 && clock::get_ms() > *max_time as u64 { - kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::Timeout })?; + kern_send(&kern::SubkernelError(kern::SubkernelStatus::Timeout))?; self.session.kernel_state = KernelState::Running; } else { - let mut i = 0; - for status in &self.session.subkernels_finished { - if *status == *id { - kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::NoError })?; - self.session.kernel_state = KernelState::Running; - self.session.subkernels_finished.swap_remove(i); - break; - } - i += 1; - } + let id = *id; + self.check_finished_kernels(id, router, routing_table, rank, self_destination); } Ok(()) } @@ -606,6 +624,9 @@ impl Manager { } Ok(()) } + KernelState::SubkernelRetrievingException { destination: _ } => { + Err(Error::AwaitingMessage) + } _ => Ok(()) } } @@ -628,16 +649,30 @@ impl Manager { } pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) { - if with_exception { - unsafe { kernel_cpu::stop() } - self.session.kernel_state = KernelState::Absent; - unsafe { self.cache.unborrow() } - self.last_finished = Some(SubkernelFinished { - source: self.session.source, id: self.current_id, - with_exception: true, exception_source: exception_source - }) + let exception_src = if with_exception { Some(exception_source) } else { None }; + self.session.subkernels_finished.push((id, exception_src)); + } + + pub fn received_exception(&mut self, exception_data: &[u8], last: bool, router: &mut Router, routing_table: &RoutingTable, + rank: u8, self_destination: u8) { + if let KernelState::SubkernelRetrievingException { destination } = self.session.kernel_state { + self.session.external_exception.extend_from_slice(exception_data); + if last { + if let Ok(exception) = read_exception(&self.session.external_exception) { + kern_send(&kern::SubkernelError(kern::SubkernelStatus::Exception(exception))).unwrap(); + } else { + kern_send( + &kern::SubkernelError(kern::SubkernelStatus::OtherError)).unwrap(); + } + self.session.kernel_state = KernelState::Running; + } else { + /* fetch another slice */ + router.route(drtioaux::Packet::SubkernelExceptionRequest { + source: self_destination, destination: destination + }, routing_table, rank, self_destination); + } } else { - self.session.subkernels_finished.push(id); + warn!("Received unsolicited exception data"); } } @@ -655,6 +690,7 @@ impl Manager { (_, KernelState::DmaAwait { .. }) | (_, KernelState::MsgSending) | (_, KernelState::SubkernelAwaitLoad) | + (_, KernelState::SubkernelRetrievingException { .. }) | (_, KernelState::SubkernelAwaitFinish { .. }) => { // We're standing by; ignore the message. return Ok(None) @@ -822,6 +858,48 @@ impl Drop for Manager { } } +fn read_exception_string<'a>(reader: &mut Cursor<&[u8]>) -> Result, Error> { + let len = reader.read_u32()? as usize; + if len == usize::MAX { + let data = reader.read_u32()?; + Ok(unsafe { CSlice::new(data as *const u8, len) }) + } else { + let pos = reader.position(); + let slice = unsafe { + let ptr = reader.get_ref().as_ptr().offset(pos as isize); + CSlice::new(ptr, len) + }; + reader.set_position(pos + len); + Ok(slice) + } +} + +fn read_exception(buffer: &[u8]) -> Result +{ + let mut reader = Cursor::new(buffer); + + let mut byte = reader.read_u8()?; + // to sync + while byte != 0x5a { + byte = reader.read_u8()?; + } + // skip sync bytes, 0x09 indicates exception + while byte != 0x09 { + byte = reader.read_u8()?; + } + let _len = reader.read_u32()?; + // ignore the remaining exceptions, stack traces etc. - unwinding from another device would be unwise anyway + Ok(eh_artiq::Exception { + id: reader.read_u32()?, + message: read_exception_string(&mut reader)?, + param: [reader.read_u64()? as i64, reader.read_u64()? as i64, reader.read_u64()? as i64], + file: read_exception_string(&mut reader)?, + line: reader.read_u32()?, + column: reader.read_u32()?, + function: read_exception_string(&mut reader)? + }) +} + fn kern_recv(f: F) -> Result where F: FnOnce(&kern::Message) -> Result { if mailbox::receive() == 0 { diff --git a/artiq/firmware/satman/main.rs b/artiq/firmware/satman/main.rs index aac2ad298..d402a440a 100644 --- a/artiq/firmware/satman/main.rs +++ b/artiq/firmware/satman/main.rs @@ -455,15 +455,21 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg kernelmgr.remote_subkernel_finished(id, with_exception, exception_src); Ok(()) } - drtioaux::Packet::SubkernelExceptionRequest { destination: _destination } => { + drtioaux::Packet::SubkernelExceptionRequest { source, destination: _destination } => { forward!(router, _routing_table, _destination, *rank, *self_destination, _repeaters, &packet); - let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let meta = kernelmgr.exception_get_slice(&mut data_slice); - drtioaux::send(0, &drtioaux::Packet::SubkernelException { + router.send(drtioaux::Packet::SubkernelException { + destination: source, last: meta.status.is_last(), length: meta.len, data: data_slice, - }) + }, _routing_table, *rank, *self_destination) + } + drtioaux::Packet::SubkernelException { destination: _destination, last, length, data } => { + forward!(router, _routing_table, _destination, *rank, *self_destination, _repeaters, &packet); + kernelmgr.received_exception(&data[..length as usize], last, router, _routing_table, *rank, *self_destination); + Ok(()) } drtioaux::Packet::SubkernelMessage { source, destination: _destination, id, status, length, data } => { forward!(router, _routing_table, _destination, *rank, *self_destination, _repeaters, &packet); diff --git a/artiq/firmware/satman/routing.rs b/artiq/firmware/satman/routing.rs index cb17d6822..f8e0e6d24 100644 --- a/artiq/firmware/satman/routing.rs +++ b/artiq/firmware/satman/routing.rs @@ -4,7 +4,6 @@ use board_artiq::{drtioaux, drtio_routing}; use board_misoc::csr; use core::cmp::min; use proto_artiq::drtioaux_proto::PayloadStatus; -use SAT_PAYLOAD_MAX_SIZE; use MASTER_PAYLOAD_MAX_SIZE; /* represents data that has to be sent with the aux protocol */ @@ -57,7 +56,6 @@ impl Sliceable { self.data.extend(data); } - get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE); get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE); }