1
0
Fork 0

subkernel: improve stability,

fix exception on awaiting message
This commit is contained in:
mwojcik 2023-11-02 14:49:53 +08:00
parent b76f634686
commit 6c8346ca5f
3 changed files with 77 additions and 26 deletions

View File

@ -458,6 +458,20 @@ async fn handle_run_kernel(
Err(SubkernelError::Timeout) => (kernel::SubkernelStatus::Timeout, 0), Err(SubkernelError::Timeout) => (kernel::SubkernelStatus::Timeout, 0),
Err(SubkernelError::IncorrectState) => (kernel::SubkernelStatus::IncorrectState, 0), Err(SubkernelError::IncorrectState) => (kernel::SubkernelStatus::IncorrectState, 0),
Err(SubkernelError::CommLost) => (kernel::SubkernelStatus::CommLost, 0), Err(SubkernelError::CommLost) => (kernel::SubkernelStatus::CommLost, 0),
Err(SubkernelError::SubkernelException) => {
error!("Exception in subkernel");
// just retrieve the exception
let status = subkernel::await_finish(aux_mutex, routing_table, timer, id, timeout)
.await
.unwrap();
match stream {
None => (),
Some(stream) => {
write_chunk(stream, &status.exception.unwrap()).await?;
}
}
(kernel::SubkernelStatus::OtherError, 0)
}
Err(_) => (kernel::SubkernelStatus::OtherError, 0), Err(_) => (kernel::SubkernelStatus::OtherError, 0),
}; };
control control

View File

@ -1,7 +1,8 @@
use alloc::{collections::BTreeMap, rc::Rc, vec::Vec}; use alloc::{collections::BTreeMap, rc::Rc, vec::Vec};
use libasync::task; use libasync::task;
use libboard_artiq::{drtio_routing::RoutingTable, drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE}; use libboard_artiq::{drtio_routing::RoutingTable,
drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}};
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
use libcortex_a9::mutex::Mutex; use libcortex_a9::mutex::Mutex;
use log::error; use log::error;
@ -28,6 +29,7 @@ pub enum Error {
Timeout, Timeout,
IncorrectState, IncorrectState,
SubkernelNotFound, SubkernelNotFound,
SubkernelException,
CommLost, CommLost,
DrtioError(&'static str), DrtioError(&'static str),
} }
@ -123,6 +125,7 @@ pub async fn subkernel_finished(id: u32, with_exception: bool) {
// called upon receiving DRTIO SubkernelRunDone // called upon receiving DRTIO SubkernelRunDone
// may be None if session ends and is cleared // may be None if session ends and is cleared
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
if subkernel.state == SubkernelState::Running {
subkernel.state = SubkernelState::Finished { subkernel.state = SubkernelState::Finished {
status: match with_exception { status: match with_exception {
true => FinishStatus::Exception, true => FinishStatus::Exception,
@ -130,6 +133,7 @@ pub async fn subkernel_finished(id: u32, with_exception: bool) {
}, },
} }
} }
}
} }
pub async fn destination_changed( pub async fn destination_changed(
@ -220,13 +224,27 @@ static MESSAGE_QUEUE: Mutex<Vec<Message>> = Mutex::new(Vec::new());
// currently under construction message(s) (can be from multiple sources) // currently under construction message(s) (can be from multiple sources)
static CURRENT_MESSAGES: Mutex<BTreeMap<u32, Message>> = Mutex::new(BTreeMap::new()); static CURRENT_MESSAGES: Mutex<BTreeMap<u32, Message>> = Mutex::new(BTreeMap::new());
pub async fn message_handle_incoming(id: u32, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { pub async fn message_handle_incoming(
id: u32,
status: PayloadStatus,
length: usize,
data: &[u8; MASTER_PAYLOAD_MAX_SIZE],
) {
// called when receiving a message from satellite // called when receiving a message from satellite
if SUBKERNELS.async_lock().await.get(&id).is_none() { {
let subkernel_lock = SUBKERNELS.async_lock().await;
let subkernel = subkernel_lock.get(&id);
if subkernel.is_none() || subkernel.unwrap().state != SubkernelState::Running {
// do not add messages for non-existing or deleted subkernels // do not add messages for non-existing or deleted subkernels
return; return;
} }
}
let mut current_messages = CURRENT_MESSAGES.async_lock().await; let mut current_messages = CURRENT_MESSAGES.async_lock().await;
if status.is_first() {
current_messages.remove(&id);
}
match current_messages.get_mut(&id) { match current_messages.get_mut(&id) {
Some(message) => message.data.extend(&data[..length]), Some(message) => message.data.extend(&data[..length]),
None => { None => {
@ -240,7 +258,7 @@ pub async fn message_handle_incoming(id: u32, last: bool, length: usize, data: &
); );
} }
}; };
if last { if status.is_last() {
// when done, remove from working queue // when done, remove from working queue
MESSAGE_QUEUE MESSAGE_QUEUE
.async_lock() .async_lock()
@ -269,6 +287,15 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<
} }
} }
} }
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished {
status: FinishStatus::CommLost,
} => return Err(Error::CommLost),
SubkernelState::Finished {
status: FinishStatus::Exception,
} => return Err(Error::SubkernelException),
_ => (),
}
task::r#yield().await; task::r#yield().await;
} }
Err(Error::Timeout) Err(Error::Timeout)

View File

@ -8,7 +8,7 @@ use core_io::{Error as IoError, Write};
use cslice::AsCSlice; use cslice::AsCSlice;
use io::{Cursor, ProtoWrite}; use io::{Cursor, ProtoWrite};
use ksupport::{eh_artiq, kernel, rpc}; use ksupport::{eh_artiq, kernel, rpc};
use libboard_artiq::{drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, use libboard_artiq::{drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE},
pl::csr}; pl::csr};
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
use libcortex_a9::sync_channel::Receiver; use libcortex_a9::sync_channel::Receiver;
@ -133,24 +133,23 @@ pub struct SubkernelFinished {
pub struct SliceMeta { pub struct SliceMeta {
pub len: u16, pub len: u16,
pub last: bool, pub status: PayloadStatus,
} }
macro_rules! get_slice_fn { macro_rules! get_slice_fn {
($name:tt, $size:expr) => { ($name:tt, $size:expr) => {
pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta {
if self.data.len() == 0 { let first = self.it == 0;
return SliceMeta { len: 0, last: true };
}
let len = min($size, self.data.len() - self.it); let len = min($size, self.data.len() - self.it);
let last = self.it + len == self.data.len(); let last = self.it + len == self.data.len();
let status = PayloadStatus::from_status(first, last);
data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]); data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]);
self.it += len; self.it += len;
SliceMeta { SliceMeta {
len: len as u16, len: len as u16,
last: last, status: status,
} }
} }
}; };
@ -175,8 +174,11 @@ impl MessageManager {
} }
} }
pub fn handle_incoming(&mut self, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) {
// called when receiving a message from master // called when receiving a message from master
if status.is_first() {
self.in_buffer = None;
}
match self.in_buffer.as_mut() { match self.in_buffer.as_mut() {
Some(message) => message.data.extend(&data[..length]), Some(message) => message.data.extend(&data[..length]),
None => { None => {
@ -186,7 +188,7 @@ impl MessageManager {
}); });
} }
}; };
if last { if status.is_last() {
// when done, remove from working queue // when done, remove from working queue
self.in_queue.push_back(self.in_buffer.take().unwrap()); self.in_queue.push_back(self.in_buffer.take().unwrap());
} }
@ -218,7 +220,7 @@ impl MessageManager {
return None; return None;
} }
let meta = self.out_message.as_mut()?.get_slice_master(data_slice); let meta = self.out_message.as_mut()?.get_slice_master(data_slice);
if meta.last { if meta.status.is_last() {
// clear the message slot // clear the message slot
self.out_message = None; self.out_message = None;
// notify kernel with a flag that message is sent // notify kernel with a flag that message is sent
@ -265,10 +267,10 @@ impl<'a> Manager<'_> {
} }
} }
pub fn add(&mut self, id: u32, last: bool, data: &[u8], data_len: usize) -> Result<(), Error> { pub fn add(&mut self, id: u32, status: PayloadStatus, data: &[u8], data_len: usize) -> Result<(), Error> {
let kernel = match self.kernels.get_mut(&id) { let kernel = match self.kernels.get_mut(&id) {
Some(kernel) => { Some(kernel) => {
if kernel.complete { if kernel.complete || status.is_first() {
// replace entry // replace entry
self.kernels.remove(&id); self.kernels.remove(&id);
self.kernels.insert( self.kernels.insert(
@ -296,7 +298,7 @@ impl<'a> Manager<'_> {
}; };
kernel.library.extend(&data[0..data_len]); kernel.library.extend(&data[0..data_len]);
kernel.complete = last; kernel.complete = status.is_last();
Ok(()) Ok(())
} }
@ -325,11 +327,16 @@ impl<'a> Manager<'_> {
Ok(()) Ok(())
} }
pub fn message_handle_incoming(&mut self, last: bool, length: usize, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { pub fn message_handle_incoming(
&mut self,
status: PayloadStatus,
length: usize,
slice: &[u8; MASTER_PAYLOAD_MAX_SIZE],
) {
if !self.running() { if !self.running() {
return; return;
} }
self.session.messages.handle_incoming(last, length, slice); self.session.messages.handle_incoming(status, length, slice);
} }
pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option<SliceMeta> { pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option<SliceMeta> {
@ -378,7 +385,10 @@ impl<'a> Manager<'_> {
pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta { pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta {
match self.session.last_exception.as_mut() { match self.session.last_exception.as_mut() {
Some(exception) => exception.get_slice_sat(data_slice), Some(exception) => exception.get_slice_sat(data_slice),
None => SliceMeta { len: 0, last: true }, None => SliceMeta {
len: 0,
status: PayloadStatus::FirstAndLast,
},
} }
} }