From 6c8346ca5ff43fb5372a8bde548685e963d080cf Mon Sep 17 00:00:00 2001 From: mwojcik Date: Thu, 2 Nov 2023 14:49:53 +0800 Subject: [PATCH] subkernel: improve stability, fix exception on awaiting message --- src/runtime/src/comms.rs | 14 +++++++++++ src/runtime/src/subkernel.rs | 49 ++++++++++++++++++++++++++++-------- src/satman/src/subkernel.rs | 40 ++++++++++++++++++----------- 3 files changed, 77 insertions(+), 26 deletions(-) diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index d02d974c..26a09e69 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -458,6 +458,20 @@ async fn handle_run_kernel( Err(SubkernelError::Timeout) => (kernel::SubkernelStatus::Timeout, 0), Err(SubkernelError::IncorrectState) => (kernel::SubkernelStatus::IncorrectState, 0), Err(SubkernelError::CommLost) => (kernel::SubkernelStatus::CommLost, 0), + Err(SubkernelError::SubkernelException) => { + error!("Exception in subkernel"); + // just retrieve the exception + let status = subkernel::await_finish(aux_mutex, routing_table, timer, id, timeout) + .await + .unwrap(); + match stream { + None => (), + Some(stream) => { + write_chunk(stream, &status.exception.unwrap()).await?; + } + } + (kernel::SubkernelStatus::OtherError, 0) + } Err(_) => (kernel::SubkernelStatus::OtherError, 0), }; control diff --git a/src/runtime/src/subkernel.rs b/src/runtime/src/subkernel.rs index 383716a5..d129865f 100644 --- a/src/runtime/src/subkernel.rs +++ b/src/runtime/src/subkernel.rs @@ -1,7 +1,8 @@ use alloc::{collections::BTreeMap, rc::Rc, vec::Vec}; use libasync::task; -use libboard_artiq::{drtio_routing::RoutingTable, drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE}; +use libboard_artiq::{drtio_routing::RoutingTable, + drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libcortex_a9::mutex::Mutex; use log::error; @@ -28,6 +29,7 @@ pub enum Error { Timeout, IncorrectState, SubkernelNotFound, + SubkernelException, CommLost, DrtioError(&'static str), } @@ -123,11 +125,13 @@ pub async fn subkernel_finished(id: u32, with_exception: bool) { // called upon receiving DRTIO SubkernelRunDone // may be None if session ends and is cleared if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { - subkernel.state = SubkernelState::Finished { - status: match with_exception { - true => FinishStatus::Exception, - false => FinishStatus::Ok, - }, + if subkernel.state == SubkernelState::Running { + subkernel.state = SubkernelState::Finished { + status: match with_exception { + true => FinishStatus::Exception, + false => FinishStatus::Ok, + }, + } } } } @@ -220,13 +224,27 @@ static MESSAGE_QUEUE: Mutex> = Mutex::new(Vec::new()); // currently under construction message(s) (can be from multiple sources) static CURRENT_MESSAGES: Mutex> = Mutex::new(BTreeMap::new()); -pub async fn message_handle_incoming(id: u32, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { +pub async fn message_handle_incoming( + id: u32, + status: PayloadStatus, + length: usize, + data: &[u8; MASTER_PAYLOAD_MAX_SIZE], +) { // called when receiving a message from satellite - if SUBKERNELS.async_lock().await.get(&id).is_none() { - // do not add messages for non-existing or deleted subkernels - return; + { + let subkernel_lock = SUBKERNELS.async_lock().await; + let subkernel = subkernel_lock.get(&id); + if subkernel.is_none() || subkernel.unwrap().state != SubkernelState::Running { + // do not add messages for non-existing or deleted subkernels + return; + } } let mut current_messages = CURRENT_MESSAGES.async_lock().await; + + if status.is_first() { + current_messages.remove(&id); + } + match current_messages.get_mut(&id) { Some(message) => message.data.extend(&data[..length]), None => { @@ -240,7 +258,7 @@ pub async fn message_handle_incoming(id: u32, last: bool, length: usize, data: & ); } }; - if last { + if status.is_last() { // when done, remove from working queue MESSAGE_QUEUE .async_lock() @@ -269,6 +287,15 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result< } } } + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Finished { + status: FinishStatus::CommLost, + } => return Err(Error::CommLost), + SubkernelState::Finished { + status: FinishStatus::Exception, + } => return Err(Error::SubkernelException), + _ => (), + } task::r#yield().await; } Err(Error::Timeout) diff --git a/src/satman/src/subkernel.rs b/src/satman/src/subkernel.rs index 4b38b67e..ce32bd49 100644 --- a/src/satman/src/subkernel.rs +++ b/src/satman/src/subkernel.rs @@ -8,7 +8,7 @@ use core_io::{Error as IoError, Write}; use cslice::AsCSlice; use io::{Cursor, ProtoWrite}; use ksupport::{eh_artiq, kernel, rpc}; -use libboard_artiq::{drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, +use libboard_artiq::{drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, pl::csr}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libcortex_a9::sync_channel::Receiver; @@ -133,24 +133,23 @@ pub struct SubkernelFinished { pub struct SliceMeta { pub len: u16, - pub last: bool, + pub status: PayloadStatus, } macro_rules! get_slice_fn { ($name:tt, $size:expr) => { pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { - if self.data.len() == 0 { - return SliceMeta { len: 0, last: true }; - } + let first = self.it == 0; let len = min($size, self.data.len() - self.it); let last = self.it + len == self.data.len(); + let status = PayloadStatus::from_status(first, last); data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]); self.it += len; SliceMeta { len: len as u16, - last: last, + status: status, } } }; @@ -175,8 +174,11 @@ impl MessageManager { } } - pub fn handle_incoming(&mut self, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { // called when receiving a message from master + if status.is_first() { + self.in_buffer = None; + } match self.in_buffer.as_mut() { Some(message) => message.data.extend(&data[..length]), None => { @@ -186,7 +188,7 @@ impl MessageManager { }); } }; - if last { + if status.is_last() { // when done, remove from working queue self.in_queue.push_back(self.in_buffer.take().unwrap()); } @@ -218,7 +220,7 @@ impl MessageManager { return None; } let meta = self.out_message.as_mut()?.get_slice_master(data_slice); - if meta.last { + if meta.status.is_last() { // clear the message slot self.out_message = None; // notify kernel with a flag that message is sent @@ -265,10 +267,10 @@ impl<'a> Manager<'_> { } } - pub fn add(&mut self, id: u32, last: bool, data: &[u8], data_len: usize) -> Result<(), Error> { + pub fn add(&mut self, id: u32, status: PayloadStatus, data: &[u8], data_len: usize) -> Result<(), Error> { let kernel = match self.kernels.get_mut(&id) { Some(kernel) => { - if kernel.complete { + if kernel.complete || status.is_first() { // replace entry self.kernels.remove(&id); self.kernels.insert( @@ -296,7 +298,7 @@ impl<'a> Manager<'_> { }; kernel.library.extend(&data[0..data_len]); - kernel.complete = last; + kernel.complete = status.is_last(); Ok(()) } @@ -325,11 +327,16 @@ impl<'a> Manager<'_> { Ok(()) } - pub fn message_handle_incoming(&mut self, last: bool, length: usize, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + pub fn message_handle_incoming( + &mut self, + status: PayloadStatus, + length: usize, + slice: &[u8; MASTER_PAYLOAD_MAX_SIZE], + ) { if !self.running() { return; } - self.session.messages.handle_incoming(last, length, slice); + self.session.messages.handle_incoming(status, length, slice); } pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option { @@ -378,7 +385,10 @@ impl<'a> Manager<'_> { pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta { match self.session.last_exception.as_mut() { Some(exception) => exception.get_slice_sat(data_slice), - None => SliceMeta { len: 0, last: true }, + None => SliceMeta { + len: 0, + status: PayloadStatus::FirstAndLast, + }, } }