forked from M-Labs/artiq
satman: pass exceptions from one subkernel to another
This commit is contained in:
parent
5f1e33198e
commit
7f36c9e9c1
@ -673,26 +673,26 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex,
|
||||
&kern::SubkernelAwaitFinishRequest{ id, timeout } => {
|
||||
let res = subkernel::await_finish(io, aux_mutex, ddma_mutex, subkernel_mutex, routing_table,
|
||||
id, timeout);
|
||||
let status = match res {
|
||||
let response = match res {
|
||||
Ok(ref res) => {
|
||||
if res.comm_lost {
|
||||
kern::SubkernelStatus::CommLost
|
||||
kern::SubkernelError(kern::SubkernelStatus::CommLost)
|
||||
} else if let Some(raw_exception) = &res.exception {
|
||||
let exception = subkernel::read_exception(raw_exception);
|
||||
if let Ok(exception) = exception {
|
||||
kern::SubkernelStatus::Exception(exception)
|
||||
kern::SubkernelError(kern::SubkernelStatus::Exception(exception))
|
||||
} else {
|
||||
kern::SubkernelStatus::OtherError
|
||||
kern::SubkernelError(kern::SubkernelStatus::OtherError)
|
||||
}
|
||||
} else {
|
||||
kern::SubkernelStatus::NoError
|
||||
kern::SubkernelAwaitFinishReply
|
||||
}
|
||||
},
|
||||
Err(SubkernelError::Timeout) => kern::SubkernelStatus::Timeout,
|
||||
Err(SubkernelError::IncorrectState) => kern::SubkernelStatus::IncorrectState,
|
||||
Err(_) => kern::SubkernelStatus::OtherError
|
||||
Err(SubkernelError::Timeout) => kern::SubkernelError(kern::SubkernelStatus::Timeout),
|
||||
Err(SubkernelError::IncorrectState) => kern::SubkernelError(kern::SubkernelStatus::IncorrectState),
|
||||
Err(_) => kern::SubkernelError(kern::SubkernelStatus::OtherError)
|
||||
};
|
||||
kern_send(io, &kern::SubkernelAwaitFinishReply { status: status })
|
||||
kern_send(io, &response)
|
||||
}
|
||||
#[cfg(has_drtio)]
|
||||
&kern::SubkernelMsgSend { id, destination, count, tag, data } => {
|
||||
@ -707,41 +707,29 @@ fn process_kern_message(io: &Io, aux_mutex: &Mutex,
|
||||
routing_table, id as u32)?;
|
||||
if res.comm_lost {
|
||||
kern_send(io,
|
||||
&kern::SubkernelMsgRecvReply {
|
||||
status: kern::SubkernelStatus::CommLost,
|
||||
count: 0
|
||||
})?;
|
||||
&kern::SubkernelError(kern::SubkernelStatus::CommLost))?;
|
||||
} else if let Some(raw_exception) = &res.exception {
|
||||
let exception = subkernel::read_exception(raw_exception);
|
||||
if let Ok(exception) = exception {
|
||||
kern_send(io,
|
||||
&kern::SubkernelMsgRecvReply {
|
||||
status: kern::SubkernelStatus::Exception(exception),
|
||||
count: 0
|
||||
})?;
|
||||
kern_send(io,
|
||||
&kern::SubkernelError(kern::SubkernelStatus::Exception(exception)))?;
|
||||
} else {
|
||||
kern_send(io,
|
||||
&kern::SubkernelMsgRecvReply {
|
||||
status: kern::SubkernelStatus::OtherError,
|
||||
count: 0
|
||||
})?;
|
||||
kern_send(io,
|
||||
&kern::SubkernelError(kern::SubkernelStatus::OtherError))?;
|
||||
}
|
||||
} else {
|
||||
kern_send(io,
|
||||
&kern::SubkernelMsgRecvReply {
|
||||
status: kern::SubkernelStatus::OtherError,
|
||||
count: 0
|
||||
})?;
|
||||
kern_send(io,
|
||||
&kern::SubkernelError(kern::SubkernelStatus::OtherError))?;
|
||||
}
|
||||
} else {
|
||||
let (status, count) = match message_received {
|
||||
Ok(ref message) => (kern::SubkernelStatus::NoError, message.count),
|
||||
Err(SubkernelError::Timeout) => (kern::SubkernelStatus::Timeout, 0),
|
||||
Err(SubkernelError::IncorrectState) => (kern::SubkernelStatus::IncorrectState, 0),
|
||||
let message = match message_received {
|
||||
Ok(ref message) => kern::SubkernelMsgRecvReply { count: message.count },
|
||||
Err(SubkernelError::Timeout) => kern::SubkernelError(kern::SubkernelStatus::Timeout),
|
||||
Err(SubkernelError::IncorrectState) => kern::SubkernelError(kern::SubkernelStatus::IncorrectState),
|
||||
Err(SubkernelError::SubkernelFinished) => unreachable!(), // taken care of above
|
||||
Err(_) => (kern::SubkernelStatus::OtherError, 0)
|
||||
Err(_) => kern::SubkernelError(kern::SubkernelStatus::OtherError)
|
||||
};
|
||||
kern_send(io, &kern::SubkernelMsgRecvReply { status: status, count: count})?;
|
||||
kern_send(io, &message)?;
|
||||
if let Ok(message) = message_received {
|
||||
// receive code almost identical to RPC recv, except we are not reading from a stream
|
||||
let mut reader = Cursor::new(message.data);
|
||||
|
@ -1,23 +1,22 @@
|
||||
use core::mem;
|
||||
use alloc::{string::String, format, vec::Vec, collections::btree_map::BTreeMap};
|
||||
use cslice::AsCSlice;
|
||||
use cslice::{CSlice, AsCSlice};
|
||||
|
||||
use board_artiq::{drtioaux, drtio_routing::RoutingTable, mailbox, spi};
|
||||
use board_misoc::{csr, clock, i2c};
|
||||
use proto_artiq::{
|
||||
drtioaux_proto::PayloadStatus,
|
||||
kernel_proto as kern,
|
||||
kernel_proto as kern,
|
||||
session_proto::Reply::KernelException as HostKernelException,
|
||||
rpc_proto as rpc};
|
||||
use eh::eh_artiq;
|
||||
use io::Cursor;
|
||||
use io::{Cursor, ProtoRead};
|
||||
use kernel::eh_artiq::StackPointerBacktrace;
|
||||
|
||||
use ::{cricon_select, RtioMaster};
|
||||
use cache::Cache;
|
||||
use dma::{Manager as DmaManager, Error as DmaError};
|
||||
use routing::{Router, Sliceable, SliceMeta};
|
||||
use SAT_PAYLOAD_MAX_SIZE;
|
||||
use MASTER_PAYLOAD_MAX_SIZE;
|
||||
|
||||
mod kernel_cpu {
|
||||
@ -69,6 +68,7 @@ enum KernelState {
|
||||
SubkernelAwaitFinish { max_time: i64, id: u32 },
|
||||
DmaUploading { max_time: u64 },
|
||||
DmaAwait { max_time: u64 },
|
||||
SubkernelRetrievingException { destination: u8 },
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -134,10 +134,13 @@ struct MessageManager {
|
||||
struct Session {
|
||||
kernel_state: KernelState,
|
||||
log_buffer: String,
|
||||
last_exception: Option<Sliceable>,
|
||||
source: u8, // which destination requested running the kernel
|
||||
last_exception: Option<Sliceable>, // exceptions raised locally
|
||||
external_exception: Vec<u8>, // exceptions from sub-subkernels
|
||||
// which destination requested running the kernel
|
||||
source: u8,
|
||||
messages: MessageManager,
|
||||
subkernels_finished: Vec<u32> // ids of subkernels finished
|
||||
// ids of subkernels finished (with exception)
|
||||
subkernels_finished: Vec<(u32, Option<u8>)>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
@ -277,6 +280,7 @@ impl Session {
|
||||
kernel_state: KernelState::Absent,
|
||||
log_buffer: String::new(),
|
||||
last_exception: None,
|
||||
external_exception: Vec::new(),
|
||||
source: 0,
|
||||
messages: MessageManager::new(),
|
||||
subkernels_finished: Vec::new()
|
||||
@ -428,9 +432,9 @@ impl Manager {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta {
|
||||
pub fn exception_get_slice(&mut self, data_slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> SliceMeta {
|
||||
match self.session.last_exception.as_mut() {
|
||||
Some(exception) => exception.get_slice_sat(data_slice),
|
||||
Some(exception) => exception.get_slice_master(data_slice),
|
||||
None => SliceMeta { destination: 0, len: 0, status: PayloadStatus::FirstAndLast }
|
||||
}
|
||||
}
|
||||
@ -517,7 +521,7 @@ impl Manager {
|
||||
return;
|
||||
}
|
||||
|
||||
match self.process_external_messages() {
|
||||
match self.process_external_messages(router, routing_table, rank, destination) {
|
||||
Ok(()) => (),
|
||||
Err(Error::AwaitingMessage) => return, // kernel still waiting, do not process kernel messages
|
||||
Err(Error::KernelException(exception)) => {
|
||||
@ -549,20 +553,42 @@ impl Manager {
|
||||
}
|
||||
}
|
||||
|
||||
fn process_external_messages(&mut self) -> Result<(), Error> {
|
||||
fn check_finished_kernels(&mut self, id: u32, router: &mut Router, routing_table: &RoutingTable, rank: u8, self_destination: u8) {
|
||||
for (i, (status, exception_source)) in self.session.subkernels_finished.iter().enumerate() {
|
||||
if *status == id {
|
||||
if exception_source.is_none() {
|
||||
kern_send(&kern::SubkernelAwaitFinishReply).unwrap();
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
self.session.subkernels_finished.swap_remove(i);
|
||||
} else {
|
||||
let destination = exception_source.unwrap();
|
||||
self.session.external_exception = Vec::new();
|
||||
self.session.kernel_state = KernelState::SubkernelRetrievingException { destination: destination };
|
||||
router.route(drtioaux::Packet::SubkernelExceptionRequest {
|
||||
source: self_destination, destination: destination
|
||||
}, &routing_table, rank, self_destination);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn process_external_messages(&mut self, router: &mut Router, routing_table: &RoutingTable, rank: u8, self_destination: u8) -> Result<(), Error> {
|
||||
match &self.session.kernel_state {
|
||||
KernelState::MsgAwait { id, max_time, tags } => {
|
||||
if *max_time > 0 && clock::get_ms() > *max_time as u64 {
|
||||
kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::Timeout, count: 0 })?;
|
||||
kern_send(&kern::SubkernelError(kern::SubkernelStatus::Timeout))?;
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
return Ok(())
|
||||
}
|
||||
if let Some(message) = self.session.messages.get_incoming(*id) {
|
||||
kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::NoError, count: message.count })?;
|
||||
kern_send(&kern::SubkernelMsgRecvReply { count: message.count })?;
|
||||
let tags = tags.clone();
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
pass_message_to_kernel(&message, &tags)
|
||||
} else {
|
||||
let id = *id;
|
||||
self.check_finished_kernels(id, router, routing_table, rank, self_destination);
|
||||
Err(Error::AwaitingMessage)
|
||||
}
|
||||
},
|
||||
@ -576,19 +602,11 @@ impl Manager {
|
||||
},
|
||||
KernelState::SubkernelAwaitFinish { max_time, id } => {
|
||||
if *max_time > 0 && clock::get_ms() > *max_time as u64 {
|
||||
kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::Timeout })?;
|
||||
kern_send(&kern::SubkernelError(kern::SubkernelStatus::Timeout))?;
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
} else {
|
||||
let mut i = 0;
|
||||
for status in &self.session.subkernels_finished {
|
||||
if *status == *id {
|
||||
kern_send(&kern::SubkernelAwaitFinishReply { status: kern::SubkernelStatus::NoError })?;
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
self.session.subkernels_finished.swap_remove(i);
|
||||
break;
|
||||
}
|
||||
i += 1;
|
||||
}
|
||||
let id = *id;
|
||||
self.check_finished_kernels(id, router, routing_table, rank, self_destination);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@ -606,6 +624,9 @@ impl Manager {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
KernelState::SubkernelRetrievingException { destination: _ } => {
|
||||
Err(Error::AwaitingMessage)
|
||||
}
|
||||
_ => Ok(())
|
||||
}
|
||||
}
|
||||
@ -628,16 +649,30 @@ impl Manager {
|
||||
}
|
||||
|
||||
pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) {
|
||||
if with_exception {
|
||||
unsafe { kernel_cpu::stop() }
|
||||
self.session.kernel_state = KernelState::Absent;
|
||||
unsafe { self.cache.unborrow() }
|
||||
self.last_finished = Some(SubkernelFinished {
|
||||
source: self.session.source, id: self.current_id,
|
||||
with_exception: true, exception_source: exception_source
|
||||
})
|
||||
let exception_src = if with_exception { Some(exception_source) } else { None };
|
||||
self.session.subkernels_finished.push((id, exception_src));
|
||||
}
|
||||
|
||||
pub fn received_exception(&mut self, exception_data: &[u8], last: bool, router: &mut Router, routing_table: &RoutingTable,
|
||||
rank: u8, self_destination: u8) {
|
||||
if let KernelState::SubkernelRetrievingException { destination } = self.session.kernel_state {
|
||||
self.session.external_exception.extend_from_slice(exception_data);
|
||||
if last {
|
||||
if let Ok(exception) = read_exception(&self.session.external_exception) {
|
||||
kern_send(&kern::SubkernelError(kern::SubkernelStatus::Exception(exception))).unwrap();
|
||||
} else {
|
||||
kern_send(
|
||||
&kern::SubkernelError(kern::SubkernelStatus::OtherError)).unwrap();
|
||||
}
|
||||
self.session.kernel_state = KernelState::Running;
|
||||
} else {
|
||||
/* fetch another slice */
|
||||
router.route(drtioaux::Packet::SubkernelExceptionRequest {
|
||||
source: self_destination, destination: destination
|
||||
}, routing_table, rank, self_destination);
|
||||
}
|
||||
} else {
|
||||
self.session.subkernels_finished.push(id);
|
||||
warn!("Received unsolicited exception data");
|
||||
}
|
||||
}
|
||||
|
||||
@ -655,6 +690,7 @@ impl Manager {
|
||||
(_, KernelState::DmaAwait { .. }) |
|
||||
(_, KernelState::MsgSending) |
|
||||
(_, KernelState::SubkernelAwaitLoad) |
|
||||
(_, KernelState::SubkernelRetrievingException { .. }) |
|
||||
(_, KernelState::SubkernelAwaitFinish { .. }) => {
|
||||
// We're standing by; ignore the message.
|
||||
return Ok(None)
|
||||
@ -822,6 +858,48 @@ impl Drop for Manager {
|
||||
}
|
||||
}
|
||||
|
||||
fn read_exception_string<'a>(reader: &mut Cursor<&[u8]>) -> Result<CSlice<'a, u8>, Error> {
|
||||
let len = reader.read_u32()? as usize;
|
||||
if len == usize::MAX {
|
||||
let data = reader.read_u32()?;
|
||||
Ok(unsafe { CSlice::new(data as *const u8, len) })
|
||||
} else {
|
||||
let pos = reader.position();
|
||||
let slice = unsafe {
|
||||
let ptr = reader.get_ref().as_ptr().offset(pos as isize);
|
||||
CSlice::new(ptr, len)
|
||||
};
|
||||
reader.set_position(pos + len);
|
||||
Ok(slice)
|
||||
}
|
||||
}
|
||||
|
||||
fn read_exception(buffer: &[u8]) -> Result<eh_artiq::Exception, Error>
|
||||
{
|
||||
let mut reader = Cursor::new(buffer);
|
||||
|
||||
let mut byte = reader.read_u8()?;
|
||||
// to sync
|
||||
while byte != 0x5a {
|
||||
byte = reader.read_u8()?;
|
||||
}
|
||||
// skip sync bytes, 0x09 indicates exception
|
||||
while byte != 0x09 {
|
||||
byte = reader.read_u8()?;
|
||||
}
|
||||
let _len = reader.read_u32()?;
|
||||
// ignore the remaining exceptions, stack traces etc. - unwinding from another device would be unwise anyway
|
||||
Ok(eh_artiq::Exception {
|
||||
id: reader.read_u32()?,
|
||||
message: read_exception_string(&mut reader)?,
|
||||
param: [reader.read_u64()? as i64, reader.read_u64()? as i64, reader.read_u64()? as i64],
|
||||
file: read_exception_string(&mut reader)?,
|
||||
line: reader.read_u32()?,
|
||||
column: reader.read_u32()?,
|
||||
function: read_exception_string(&mut reader)?
|
||||
})
|
||||
}
|
||||
|
||||
fn kern_recv<R, F>(f: F) -> Result<R, Error>
|
||||
where F: FnOnce(&kern::Message) -> Result<R, Error> {
|
||||
if mailbox::receive() == 0 {
|
||||
|
@ -455,15 +455,21 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg
|
||||
kernelmgr.remote_subkernel_finished(id, with_exception, exception_src);
|
||||
Ok(())
|
||||
}
|
||||
drtioaux::Packet::SubkernelExceptionRequest { destination: _destination } => {
|
||||
drtioaux::Packet::SubkernelExceptionRequest { source, destination: _destination } => {
|
||||
forward!(router, _routing_table, _destination, *rank, *self_destination, _repeaters, &packet);
|
||||
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
|
||||
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
|
||||
let meta = kernelmgr.exception_get_slice(&mut data_slice);
|
||||
drtioaux::send(0, &drtioaux::Packet::SubkernelException {
|
||||
router.send(drtioaux::Packet::SubkernelException {
|
||||
destination: source,
|
||||
last: meta.status.is_last(),
|
||||
length: meta.len,
|
||||
data: data_slice,
|
||||
})
|
||||
}, _routing_table, *rank, *self_destination)
|
||||
}
|
||||
drtioaux::Packet::SubkernelException { destination: _destination, last, length, data } => {
|
||||
forward!(router, _routing_table, _destination, *rank, *self_destination, _repeaters, &packet);
|
||||
kernelmgr.received_exception(&data[..length as usize], last, router, _routing_table, *rank, *self_destination);
|
||||
Ok(())
|
||||
}
|
||||
drtioaux::Packet::SubkernelMessage { source, destination: _destination, id, status, length, data } => {
|
||||
forward!(router, _routing_table, _destination, *rank, *self_destination, _repeaters, &packet);
|
||||
|
@ -4,7 +4,6 @@ use board_artiq::{drtioaux, drtio_routing};
|
||||
use board_misoc::csr;
|
||||
use core::cmp::min;
|
||||
use proto_artiq::drtioaux_proto::PayloadStatus;
|
||||
use SAT_PAYLOAD_MAX_SIZE;
|
||||
use MASTER_PAYLOAD_MAX_SIZE;
|
||||
|
||||
/* represents data that has to be sent with the aux protocol */
|
||||
@ -57,7 +56,6 @@ impl Sliceable {
|
||||
self.data.extend(data);
|
||||
}
|
||||
|
||||
get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE);
|
||||
get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user