satman: pass exceptions from one subkernel to another

This commit is contained in:
mwojcik 2024-07-03 17:28:00 +08:00 committed by Sébastien Bourdeauducq
parent 02479e4fb3
commit e63ac3435f
4 changed files with 143 additions and 73 deletions

View File

@ -673,26 +673,26 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex,
&kern::SubkernelAwaitFinishRequest{ id, timeout } => {
let res = subkernel::await_finish(io, aux_mutex, ddma_mutex, subkernel_mutex, routing_table,
id, timeout);
let status = match res {
let response = match res {
Ok(ref res) => {
if res.comm_lost {
kern::SubkernelStatus::CommLost
kern::SubkernelError(kern::SubkernelStatus::CommLost)
} else if let Some(raw_exception) = &res.exception {
let exception = subkernel::read_exception(raw_exception);
if let Ok(exception) = exception {
kern::SubkernelStatus::Exception(exception)
kern::SubkernelError(kern::SubkernelStatus::Exception(exception))
} else {
kern::SubkernelStatus::OtherError
kern::SubkernelError(kern::SubkernelStatus::OtherError)
}
} else {
kern::SubkernelStatus::NoError
kern::SubkernelAwaitFinishReply
}
},
Err(SubkernelError::Timeout) => kern::SubkernelStatus::Timeout,
Err(SubkernelError::IncorrectState) => kern::SubkernelStatus::IncorrectState,
Err(_) => kern::SubkernelStatus::OtherError
Err(SubkernelError::Timeout) => kern::SubkernelError(kern::SubkernelStatus::Timeout),
Err(SubkernelError::IncorrectState) => kern::SubkernelError(kern::SubkernelStatus::IncorrectState),
Err(_) => kern::SubkernelError(kern::SubkernelStatus::OtherError)
};
kern_send(io, &kern::SubkernelAwaitFinishReply { status: status })
kern_send(io, &response)
}
#[cfg(has_drtio)]
&kern::SubkernelMsgSend { id, destination, count, tag, data } => {
@ -707,41 +707,29 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex,
routing_table, id as u32)?;
if res.comm_lost {
kern_send(io,
&kern::SubkernelMsgRecvReply {
status: kern::SubkernelStatus::CommLost,
count: 0
})?;
&kern::SubkernelError(kern::SubkernelStatus::CommLost))?;
} else if let Some(raw_exception) = &res.exception {
let exception = subkernel::read_exception(raw_exception);
if let Ok(exception) = exception {
kern_send(io,
&kern::SubkernelMsgRecvReply {
status: kern::SubkernelStatus::Exception(exception),
count: 0
})?;
kern_send(io,
&kern::SubkernelError(kern::SubkernelStatus::Exception(exception)))?;
} else {
kern_send(io,
&kern::SubkernelMsgRecvReply {
status: kern::SubkernelStatus::OtherError,
count: 0
})?;
kern_send(io,
&kern::SubkernelError(kern::SubkernelStatus::OtherError))?;
}
} else {
kern_send(io,
&kern::SubkernelMsgRecvReply {
status: kern::SubkernelStatus::OtherError,
count: 0
})?;
kern_send(io,
&kern::SubkernelError(kern::SubkernelStatus::OtherError))?;
}
} else {
let (status, count) = match message_received {
Ok(ref message) => (kern::SubkernelStatus::NoError, message.count),
Err(SubkernelError::Timeout) => (kern::SubkernelStatus::Timeout, 0),
Err(SubkernelError::IncorrectState) => (kern::SubkernelStatus::IncorrectState, 0),
let message = match message_received {
Ok(ref message) => kern::SubkernelMsgRecvReply { count: message.count },
Err(SubkernelError::Timeout) => kern::SubkernelError(kern::SubkernelStatus::Timeout),
Err(SubkernelError::IncorrectState) => kern::SubkernelError(kern::SubkernelStatus::IncorrectState),
Err(SubkernelError::SubkernelFinished) => unreachable!(), // taken care of above
Err(_) => (kern::SubkernelStatus::OtherError, 0)
Err(_) => kern::SubkernelError(kern::SubkernelStatus::OtherError)
};
kern_send(io, &kern::SubkernelMsgRecvReply { status: status, count: count})?;
kern_send(io, &message)?;
if let Ok(message) = message_received {
// receive code almost identical to RPC recv, except we are not reading from a stream
let mut reader = Cursor::new(message.data);

View File

@ -1,23 +1,22 @@
use core::mem;
use alloc::{string::String, format, vec::Vec, collections::btree_map::BTreeMap};
use cslice::AsCSlice;
use cslice::{CSlice, AsCSlice};
use board_artiq::{drtioaux, drtio_routing::RoutingTable, mailbox, spi};
use board_misoc::{csr, clock, i2c};
use proto_artiq::{
drtioaux_proto::PayloadStatus,
kernel_proto as kern,
kernel_proto as kern,
session_proto::Reply::KernelException as HostKernelException,
rpc_proto as rpc};
use eh::eh_artiq;
use io::Cursor;
use io::{Cursor, ProtoRead};
use kernel::eh_artiq::StackPointerBacktrace;
use ::{cricon_select, RtioMaster};
use cache::Cache;
use dma::{Manager as DmaManager, Error as DmaError};
use routing::{Router, Sliceable, SliceMeta};
use SAT_PAYLOAD_MAX_SIZE;
use MASTER_PAYLOAD_MAX_SIZE;
mod kernel_cpu {
@ -69,6 +68,7 @@ enum KernelState {
SubkernelAwaitFinish { max_time: i64, id: u32 },
DmaUploading { max_time: u64 },
DmaAwait { max_time: u64 },
SubkernelRetrievingException { destination: u8 },
}
#[derive(Debug)]
@ -134,10 +134,13 @@ struct MessageManager {
struct Session {
kernel_state: KernelState,
log_buffer: String,
last_exception: Option<Sliceable>,
source: u8, // which destination requested running the kernel
last_exception: Option<Sliceable>, // exceptions raised locally
external_exception: Vec<u8>, // exceptions from sub-subkernels
// which destination requested running the kernel
source: u8,
messages: MessageManager,
subkernels_finished: Vec<u32> // ids of subkernels finished
// ids of subkernels finished (with exception)
subkernels_finished: Vec<(u32, Option<u8>)>,
}
#[derive(Debug)]
@ -277,6 +280,7 @@ impl Session {
kernel_state: KernelState::Absent,
log_buffer: String::new(),
last_exception: None,
external_exception: Vec::new(),
source: 0,
messages: MessageManager::new(),
subkernels_finished: Vec::new()
@ -428,9 +432,9 @@ impl Manager {
}
}
pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta {
pub fn exception_get_slice(&mut self, data_slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> SliceMeta {
match self.session.last_exception.as_mut() {
Some(exception) => exception.get_slice_sat(data_slice),
Some(exception) => exception.get_slice_master(data_slice),
None => SliceMeta { destination: 0, len: 0, status: PayloadStatus::FirstAndLast }
}
}
@ -517,7 +521,7 @@ impl Manager {
return;
}
match self.process_external_messages() {
match self.process_external_messages(router, routing_table, rank, destination) {
Ok(()) => (),
Err(Error::AwaitingMessage) => return, // kernel still waiting, do not process kernel messages
Err(Error::KernelException(exception)) => {
@ -549,20 +553,42 @@ impl Manager {
}
}
fn process_external_messages(&mut self) -> Result<(), Error> {
fn check_finished_kernels(&mut self, id: u32, router: &mut Router, routing_table: &RoutingTable, rank: u8, self_destination: u8) {
for (i, (status, exception_source)) in self.session.subkernels_finished.iter().enumerate() {
if *status == id {
if exception_source.is_none() {
kern_send(&kern::SubkernelAwaitFinishReply).unwrap();
self.session.kernel_state = KernelState::Running;
self.session.subkernels_finished.swap_remove(i);
} else {
let destination = exception_source.unwrap();
self.session.external_exception = Vec::new();
self.session.kernel_state = KernelState::SubkernelRetrievingException { destination: destination };
router.route(drtioaux::Packet::SubkernelExceptionRequest {
source: self_destination, destination: destination
}, &routing_table, rank, self_destination);
}
break;
}
}
}
fn process_external_messages(&mut self, router: &mut Router, routing_table: &RoutingTable, rank: u8, self_destination: u8) -> Result<(), Error> {
match &self.session.kernel_state {
KernelState::MsgAwait { id, max_time, tags } => {
if *max_time > 0 && clock::get_ms() > *max_time as u64 {
kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::Timeout, count: 0 })?;
kern_send(&kern::SubkernelError(kern::SubkernelStatus::Timeout))?;
self.session.kernel_state = KernelState::Running;
return Ok(())
}
if let Some(message) = self.session.messages.get_incoming(*id) {
kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::NoError, count: message.count })?;
kern_send(&kern::SubkernelMsgRecvReply { count: message.count })?;
let tags = tags.clone();
self.session.kernel_state = KernelState::Running;
pass_message_to_kernel(&message, &tags)
} else {
let id = *id;
self.check_finished_kernels(id, router, routing_table, rank, self_destination);
Err(Error::AwaitingMessage)
}
},
@ -576,19 +602,11 @@ impl Manager {
},
KernelState::SubkernelAwaitFinish { max_time, id } => {
if *max_time > 0 && clock::get_ms() > *max_time as u64 {
kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::Timeout })?;
kern_send(&kern::SubkernelError(kern::SubkernelStatus::Timeout))?;
self.session.kernel_state = KernelState::Running;
} else {
let mut i = 0;
for status in &self.session.subkernels_finished {
if *status == *id {
kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::NoError })?;
self.session.kernel_state = KernelState::Running;
self.session.subkernels_finished.swap_remove(i);
break;
}
i += 1;
}
let id = *id;
self.check_finished_kernels(id, router, routing_table, rank, self_destination);
}
Ok(())
}
@ -606,6 +624,9 @@ impl Manager {
}
Ok(())
}
KernelState::SubkernelRetrievingException { destination: _ } => {
Err(Error::AwaitingMessage)
}
_ => Ok(())
}
}
@ -628,16 +649,30 @@ impl Manager {
}
pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) {
if with_exception {
unsafe { kernel_cpu::stop() }
self.session.kernel_state = KernelState::Absent;
unsafe { self.cache.unborrow() }
self.last_finished = Some(SubkernelFinished {
source: self.session.source, id: self.current_id,
with_exception: true, exception_source: exception_source
})
let exception_src = if with_exception { Some(exception_source) } else { None };
self.session.subkernels_finished.push((id, exception_src));
}
pub fn received_exception(&mut self, exception_data: &[u8], last: bool, router: &mut Router, routing_table: &RoutingTable,
rank: u8, self_destination: u8) {
if let KernelState::SubkernelRetrievingException { destination } = self.session.kernel_state {
self.session.external_exception.extend_from_slice(exception_data);
if last {
if let Ok(exception) = read_exception(&self.session.external_exception) {
kern_send(&kern::SubkernelError(kern::SubkernelStatus::Exception(exception))).unwrap();
} else {
kern_send(
&kern::SubkernelError(kern::SubkernelStatus::OtherError)).unwrap();
}
self.session.kernel_state = KernelState::Running;
} else {
/* fetch another slice */
router.route(drtioaux::Packet::SubkernelExceptionRequest {
source: self_destination, destination: destination
}, routing_table, rank, self_destination);
}
} else {
self.session.subkernels_finished.push(id);
warn!("Received unsolicited exception data");
}
}
@ -655,6 +690,7 @@ impl Manager {
(_, KernelState::DmaAwait { .. }) |
(_, KernelState::MsgSending) |
(_, KernelState::SubkernelAwaitLoad) |
(_, KernelState::SubkernelRetrievingException { .. }) |
(_, KernelState::SubkernelAwaitFinish { .. }) => {
// We're standing by; ignore the message.
return Ok(None)
@ -822,6 +858,48 @@ impl Drop for Manager {
}
}
fn read_exception_string<'a>(reader: &mut Cursor<&[u8]>) -> Result<CSlice<'a, u8>, Error> {
let len = reader.read_u32()? as usize;
if len == usize::MAX {
let data = reader.read_u32()?;
Ok(unsafe { CSlice::new(data as *const u8, len) })
} else {
let pos = reader.position();
let slice = unsafe {
let ptr = reader.get_ref().as_ptr().offset(pos as isize);
CSlice::new(ptr, len)
};
reader.set_position(pos + len);
Ok(slice)
}
}
fn read_exception(buffer: &[u8]) -> Result<eh_artiq::Exception, Error>
{
let mut reader = Cursor::new(buffer);
let mut byte = reader.read_u8()?;
// to sync
while byte != 0x5a {
byte = reader.read_u8()?;
}
// skip sync bytes, 0x09 indicates exception
while byte != 0x09 {
byte = reader.read_u8()?;
}
let _len = reader.read_u32()?;
// ignore the remaining exceptions, stack traces etc. - unwinding from another device would be unwise anyway
Ok(eh_artiq::Exception {
id: reader.read_u32()?,
message: read_exception_string(&mut reader)?,
param: [reader.read_u64()? as i64, reader.read_u64()? as i64, reader.read_u64()? as i64],
file: read_exception_string(&mut reader)?,
line: reader.read_u32()?,
column: reader.read_u32()?,
function: read_exception_string(&mut reader)?
})
}
fn kern_recv<R, F>(f: F) -> Result<R, Error>
where F: FnOnce(&kern::Message) -> Result<R, Error> {
if mailbox::receive() == 0 {

View File

@ -455,15 +455,21 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg
kernelmgr.remote_subkernel_finished(id, with_exception, exception_src);
Ok(())
}
drtioaux::Packet::SubkernelExceptionRequest { destination: _destination } => {
drtioaux::Packet::SubkernelExceptionRequest { source, destination: _destination } => {
forward!(router, _routing_table, _destination, *rank, *self_destination, _repeaters, &packet);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
let meta = kernelmgr.exception_get_slice(&mut data_slice);
drtioaux::send(0, &drtioaux::Packet::SubkernelException {
router.send(drtioaux::Packet::SubkernelException {
destination: source,
last: meta.status.is_last(),
length: meta.len,
data: data_slice,
})
}, _routing_table, *rank, *self_destination)
}
drtioaux::Packet::SubkernelException { destination: _destination, last, length, data } => {
forward!(router, _routing_table, _destination, *rank, *self_destination, _repeaters, &packet);
kernelmgr.received_exception(&data[..length as usize], last, router, _routing_table, *rank, *self_destination);
Ok(())
}
drtioaux::Packet::SubkernelMessage { source, destination: _destination, id, status, length, data } => {
forward!(router, _routing_table, _destination, *rank, *self_destination, _repeaters, &packet);

View File

@ -4,7 +4,6 @@ use board_artiq::{drtioaux, drtio_routing};
use board_misoc::csr;
use core::cmp::min;
use proto_artiq::drtioaux_proto::PayloadStatus;
use SAT_PAYLOAD_MAX_SIZE;
use MASTER_PAYLOAD_MAX_SIZE;
/* represents data that has to be sent with the aux protocol */
@ -57,7 +56,6 @@ impl Sliceable {
self.data.extend(data);
}
get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE);
get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE);
}