diff --git a/src/libboard_artiq/src/drtioaux_proto.rs b/src/libboard_artiq/src/drtioaux_proto.rs index c8b4221..154a5e0 100644 --- a/src/libboard_artiq/src/drtioaux_proto.rs +++ b/src/libboard_artiq/src/drtioaux_proto.rs @@ -19,6 +19,46 @@ impl From for Error { } } +#[derive(PartialEq, Clone, Copy, Debug)] +#[repr(u8)] +pub enum PayloadStatus { + Middle = 0, + First = 1, + Last = 2, + FirstAndLast = 3, +} + +impl From for PayloadStatus { + fn from(value: u8) -> PayloadStatus { + match value { + 0 => PayloadStatus::Middle, + 1 => PayloadStatus::First, + 2 => PayloadStatus::Last, + 3 => PayloadStatus::FirstAndLast, + _ => unreachable!(), + } + } +} + +impl PayloadStatus { + pub fn is_first(self) -> bool { + self == PayloadStatus::First || self == PayloadStatus::FirstAndLast + } + + pub fn is_last(self) -> bool { + self == PayloadStatus::Last || self == PayloadStatus::FirstAndLast + } + + pub fn from_status(first: bool, last: bool) -> PayloadStatus { + match (first, last) { + (true, true) => PayloadStatus::FirstAndLast, + (true, false) => PayloadStatus::First, + (false, true) => PayloadStatus::Last, + (false, false) => PayloadStatus::Middle, + } + } +} + #[derive(PartialEq, Debug)] pub enum Packet { EchoRequest, @@ -159,7 +199,7 @@ pub enum Packet { DmaAddTraceRequest { destination: u8, id: u32, - last: bool, + status: PayloadStatus, length: u16, trace: [u8; MASTER_PAYLOAD_MAX_SIZE], }, @@ -192,7 +232,7 @@ pub enum Packet { SubkernelAddDataRequest { destination: u8, id: u32, - last: bool, + status: PayloadStatus, length: u16, data: [u8; MASTER_PAYLOAD_MAX_SIZE], }, @@ -228,7 +268,7 @@ pub enum Packet { SubkernelMessage { destination: u8, id: u32, - last: bool, + status: PayloadStatus, length: u16, data: [u8; MASTER_PAYLOAD_MAX_SIZE], }, @@ -391,14 +431,14 @@ impl Packet { 0xb0 => { let destination = reader.read_u8()?; let id = reader.read_u32()?; - let last = reader.read_bool()?; + let status = PayloadStatus::from(reader.read_u8()?); let length = reader.read_u16()?; let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut trace[0..length as usize])?; Packet::DmaAddTraceRequest { destination: destination, id: id, - last: last, + status: status, length: length as u16, trace: trace, } @@ -432,14 +472,14 @@ impl Packet { 0xc0 => { let destination = reader.read_u8()?; let id = reader.read_u32()?; - let last = reader.read_bool()?; + let status = PayloadStatus::from(reader.read_u8()?); let length = reader.read_u16()?; let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut data[0..length as usize])?; Packet::SubkernelAddDataRequest { destination: destination, id: id, - last: last, + status: status, length: length as u16, data: data, } @@ -482,14 +522,14 @@ impl Packet { 0xcb => { let destination = reader.read_u8()?; let id = reader.read_u32()?; - let last = reader.read_bool()?; + let status = PayloadStatus::from(reader.read_u8()?); let length = reader.read_u16()?; let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut data[0..length as usize])?; Packet::SubkernelMessage { destination: destination, id: id, - last: last, + status: status, length: length as u16, data: data, } @@ -713,14 +753,14 @@ impl Packet { Packet::DmaAddTraceRequest { destination, id, - last, + status, trace, length, } => { writer.write_u8(0xb0)?; writer.write_u8(destination)?; writer.write_u32(id)?; - writer.write_bool(last)?; + writer.write_u8(status as u8)?; // trace may be broken down to fit within drtio aux memory limit // will be reconstructed by satellite writer.write_u16(length)?; @@ -771,14 +811,14 @@ impl Packet { Packet::SubkernelAddDataRequest { destination, id, - last, + status, data, length, } => { writer.write_u8(0xc0)?; writer.write_u8(destination)?; writer.write_u32(id)?; - writer.write_bool(last)?; + writer.write_u8(status as u8)?; writer.write_u16(length)?; writer.write_all(&data[0..length as usize])?; } @@ -822,14 +862,14 @@ impl Packet { Packet::SubkernelMessage { destination, id, - last, + status, data, length, } => { writer.write_u8(0xcb)?; writer.write_u8(destination)?; writer.write_u32(id)?; - writer.write_bool(last)?; + writer.write_u8(status as u8)?; writer.write_u16(length)?; writer.write_all(&data[0..length as usize])?; } diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index d02d974..26a09e6 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -458,6 +458,20 @@ async fn handle_run_kernel( Err(SubkernelError::Timeout) => (kernel::SubkernelStatus::Timeout, 0), Err(SubkernelError::IncorrectState) => (kernel::SubkernelStatus::IncorrectState, 0), Err(SubkernelError::CommLost) => (kernel::SubkernelStatus::CommLost, 0), + Err(SubkernelError::SubkernelException) => { + error!("Exception in subkernel"); + // just retrieve the exception + let status = subkernel::await_finish(aux_mutex, routing_table, timer, id, timeout) + .await + .unwrap(); + match stream { + None => (), + Some(stream) => { + write_chunk(stream, &status.exception.unwrap()).await?; + } + } + (kernel::SubkernelStatus::OtherError, 0) + } Err(_) => (kernel::SubkernelStatus::OtherError, 0), }; control diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index f7d2671..b2f3aa3 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -13,8 +13,10 @@ pub mod drtio { use ksupport::{resolve_channel_name, ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, SEEN_ASYNC_ERRORS}; use libasync::{delay, task}; - use libboard_artiq::{drtioaux::Error, drtioaux_async, drtioaux_async::Packet, - drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE}; + use libboard_artiq::{drtioaux::Error, + drtioaux_async, + drtioaux_async::Packet, + drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}}; use libboard_zynq::time::Milliseconds; use log::{error, info, warn}; @@ -61,11 +63,11 @@ pub mod drtio { Packet::SubkernelMessage { id, destination: from, - last, + status, length, data, } => { - subkernel::message_handle_incoming(id, last, length as usize, &data).await; + subkernel::message_handle_incoming(id, status, length as usize, &data).await; // acknowledge receiving part of the message let _lock = aux_mutex.async_lock().await; drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: from }) @@ -463,7 +465,7 @@ pub mod drtio { reply_handler_f: HandlerF, ) -> Result<(), &'static str> where - PacketF: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], bool, usize) -> Packet, + PacketF: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], PayloadStatus, usize) -> Packet, HandlerF: Fn(&Packet) -> Result<(), &'static str>, { let mut i = 0; @@ -474,10 +476,12 @@ pub mod drtio { } else { data.len() - i } as usize; + let first = i == 0; let last = i + len == data.len(); slice[..len].clone_from_slice(&data[i..i + len]); i += len; - let packet = packet_f(&slice, last, len); + let status = PayloadStatus::from_status(first, last); + let packet = packet_f(&slice, status, len); let reply = aux_transact(aux_mutex, linkno, &packet, timer).await?; reply_handler_f(&reply)?; } @@ -498,10 +502,10 @@ pub mod drtio { aux_mutex, timer, trace, - |slice, last, len| Packet::DmaAddTraceRequest { + |slice, status, len| Packet::DmaAddTraceRequest { id: id, destination: destination, - last: last, + status: status, length: len as u16, trace: *slice, }, @@ -655,10 +659,10 @@ pub mod drtio { aux_mutex, timer, data, - |slice, last, len| Packet::SubkernelAddDataRequest { + |slice, status, len| Packet::SubkernelAddDataRequest { id: id, destination: destination, - last: last, + status: status, length: len as u16, data: *slice, }, @@ -742,10 +746,10 @@ pub mod drtio { aux_mutex, timer, message, - |slice, last, len| Packet::SubkernelMessage { + |slice, status, len| Packet::SubkernelMessage { destination: destination, id: id, - last: last, + status: status, length: len as u16, data: *slice, }, diff --git a/src/runtime/src/subkernel.rs b/src/runtime/src/subkernel.rs index 383716a..d129865 100644 --- a/src/runtime/src/subkernel.rs +++ b/src/runtime/src/subkernel.rs @@ -1,7 +1,8 @@ use alloc::{collections::BTreeMap, rc::Rc, vec::Vec}; use libasync::task; -use libboard_artiq::{drtio_routing::RoutingTable, drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE}; +use libboard_artiq::{drtio_routing::RoutingTable, + drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libcortex_a9::mutex::Mutex; use log::error; @@ -28,6 +29,7 @@ pub enum Error { Timeout, IncorrectState, SubkernelNotFound, + SubkernelException, CommLost, DrtioError(&'static str), } @@ -123,11 +125,13 @@ pub async fn subkernel_finished(id: u32, with_exception: bool) { // called upon receiving DRTIO SubkernelRunDone // may be None if session ends and is cleared if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { - subkernel.state = SubkernelState::Finished { - status: match with_exception { - true => FinishStatus::Exception, - false => FinishStatus::Ok, - }, + if subkernel.state == SubkernelState::Running { + subkernel.state = SubkernelState::Finished { + status: match with_exception { + true => FinishStatus::Exception, + false => FinishStatus::Ok, + }, + } } } } @@ -220,13 +224,27 @@ static MESSAGE_QUEUE: Mutex> = Mutex::new(Vec::new()); // currently under construction message(s) (can be from multiple sources) static CURRENT_MESSAGES: Mutex> = Mutex::new(BTreeMap::new()); -pub async fn message_handle_incoming(id: u32, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { +pub async fn message_handle_incoming( + id: u32, + status: PayloadStatus, + length: usize, + data: &[u8; MASTER_PAYLOAD_MAX_SIZE], +) { // called when receiving a message from satellite - if SUBKERNELS.async_lock().await.get(&id).is_none() { - // do not add messages for non-existing or deleted subkernels - return; + { + let subkernel_lock = SUBKERNELS.async_lock().await; + let subkernel = subkernel_lock.get(&id); + if subkernel.is_none() || subkernel.unwrap().state != SubkernelState::Running { + // do not add messages for non-existing or deleted subkernels + return; + } } let mut current_messages = CURRENT_MESSAGES.async_lock().await; + + if status.is_first() { + current_messages.remove(&id); + } + match current_messages.get_mut(&id) { Some(message) => message.data.extend(&data[..length]), None => { @@ -240,7 +258,7 @@ pub async fn message_handle_incoming(id: u32, last: bool, length: usize, data: & ); } }; - if last { + if status.is_last() { // when done, remove from working queue MESSAGE_QUEUE .async_lock() @@ -269,6 +287,15 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result< } } } + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Finished { + status: FinishStatus::CommLost, + } => return Err(Error::CommLost), + SubkernelState::Finished { + status: FinishStatus::Exception, + } => return Err(Error::SubkernelException), + _ => (), + } task::r#yield().await; } Err(Error::Timeout) diff --git a/src/satman/src/dma.rs b/src/satman/src/dma.rs index 5955e3c..dc901db 100644 --- a/src/satman/src/dma.rs +++ b/src/satman/src/dma.rs @@ -1,6 +1,6 @@ use alloc::{collections::btree_map::BTreeMap, vec::Vec}; -use libboard_artiq::pl::csr; +use libboard_artiq::{drtioaux_proto::PayloadStatus, pl::csr}; use libcortex_a9::cache::dcci_slice; const ALIGNMENT: usize = 64; @@ -50,10 +50,10 @@ impl Manager { } } - pub fn add(&mut self, id: u32, last: bool, trace: &[u8], trace_len: usize) -> Result<(), Error> { + pub fn add(&mut self, id: u32, status: PayloadStatus, trace: &[u8], trace_len: usize) -> Result<(), Error> { let entry = match self.entries.get_mut(&id) { Some(entry) => { - if entry.complete { + if entry.complete || status.is_first() { // replace entry self.entries.remove(&id); self.entries.insert( @@ -83,7 +83,7 @@ impl Manager { }; entry.trace.extend(&trace[0..trace_len]); - if last { + if status.is_last() { entry.trace.push(0); let data_len = entry.trace.len(); diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index b501cba..80d53cd 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -162,7 +162,7 @@ fn process_aux_packet( &drtioaux::Packet::SubkernelMessage { destination: destination, id: kernel_manager.get_current_id().unwrap(), - last: meta.last, + status: meta.status, length: meta.len as u16, data: data_slice, }, @@ -494,12 +494,12 @@ fn process_aux_packet( drtioaux::Packet::DmaAddTraceRequest { destination: _destination, id, - last, + status, length, trace, } => { forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); - let succeeded = dma_manager.add(id, last, &trace, length as usize).is_ok(); + let succeeded = dma_manager.add(id, status, &trace, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::DmaAddTraceReply { succeeded: succeeded }) } drtioaux::Packet::DmaRemoveTraceRequest { @@ -527,12 +527,12 @@ fn process_aux_packet( drtioaux::Packet::SubkernelAddDataRequest { destination: _destination, id, - last, + status, length, data, } => { forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); - let succeeded = kernel_manager.add(id, last, &data, length as usize).is_ok(); + let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) } drtioaux::Packet::SubkernelLoadRunRequest { @@ -562,7 +562,7 @@ fn process_aux_packet( drtioaux::send( 0, &drtioaux::Packet::SubkernelException { - last: meta.last, + last: meta.status.is_last(), length: meta.len, data: data_slice, }, @@ -571,12 +571,12 @@ fn process_aux_packet( drtioaux::Packet::SubkernelMessage { destination, id: _id, - last, + status, length, data, } => { forward!(_routing_table, destination, *_rank, _repeaters, &packet, timer); - kernel_manager.message_handle_incoming(last, length as usize, &data); + kernel_manager.message_handle_incoming(status, length as usize, &data); drtioaux::send( 0, &drtioaux::Packet::SubkernelMessageAck { @@ -596,7 +596,7 @@ fn process_aux_packet( &drtioaux::Packet::SubkernelMessage { destination: *_rank, id: kernel_manager.get_current_id().unwrap(), - last: meta.last, + status: meta.status, length: meta.len as u16, data: data_slice, }, diff --git a/src/satman/src/subkernel.rs b/src/satman/src/subkernel.rs index 4b38b67..ce32bd4 100644 --- a/src/satman/src/subkernel.rs +++ b/src/satman/src/subkernel.rs @@ -8,7 +8,7 @@ use core_io::{Error as IoError, Write}; use cslice::AsCSlice; use io::{Cursor, ProtoWrite}; use ksupport::{eh_artiq, kernel, rpc}; -use libboard_artiq::{drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, +use libboard_artiq::{drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, pl::csr}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libcortex_a9::sync_channel::Receiver; @@ -133,24 +133,23 @@ pub struct SubkernelFinished { pub struct SliceMeta { pub len: u16, - pub last: bool, + pub status: PayloadStatus, } macro_rules! get_slice_fn { ($name:tt, $size:expr) => { pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { - if self.data.len() == 0 { - return SliceMeta { len: 0, last: true }; - } + let first = self.it == 0; let len = min($size, self.data.len() - self.it); let last = self.it + len == self.data.len(); + let status = PayloadStatus::from_status(first, last); data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]); self.it += len; SliceMeta { len: len as u16, - last: last, + status: status, } } }; @@ -175,8 +174,11 @@ impl MessageManager { } } - pub fn handle_incoming(&mut self, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { // called when receiving a message from master + if status.is_first() { + self.in_buffer = None; + } match self.in_buffer.as_mut() { Some(message) => message.data.extend(&data[..length]), None => { @@ -186,7 +188,7 @@ impl MessageManager { }); } }; - if last { + if status.is_last() { // when done, remove from working queue self.in_queue.push_back(self.in_buffer.take().unwrap()); } @@ -218,7 +220,7 @@ impl MessageManager { return None; } let meta = self.out_message.as_mut()?.get_slice_master(data_slice); - if meta.last { + if meta.status.is_last() { // clear the message slot self.out_message = None; // notify kernel with a flag that message is sent @@ -265,10 +267,10 @@ impl<'a> Manager<'_> { } } - pub fn add(&mut self, id: u32, last: bool, data: &[u8], data_len: usize) -> Result<(), Error> { + pub fn add(&mut self, id: u32, status: PayloadStatus, data: &[u8], data_len: usize) -> Result<(), Error> { let kernel = match self.kernels.get_mut(&id) { Some(kernel) => { - if kernel.complete { + if kernel.complete || status.is_first() { // replace entry self.kernels.remove(&id); self.kernels.insert( @@ -296,7 +298,7 @@ impl<'a> Manager<'_> { }; kernel.library.extend(&data[0..data_len]); - kernel.complete = last; + kernel.complete = status.is_last(); Ok(()) } @@ -325,11 +327,16 @@ impl<'a> Manager<'_> { Ok(()) } - pub fn message_handle_incoming(&mut self, last: bool, length: usize, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + pub fn message_handle_incoming( + &mut self, + status: PayloadStatus, + length: usize, + slice: &[u8; MASTER_PAYLOAD_MAX_SIZE], + ) { if !self.running() { return; } - self.session.messages.handle_incoming(last, length, slice); + self.session.messages.handle_incoming(status, length, slice); } pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option { @@ -378,7 +385,10 @@ impl<'a> Manager<'_> { pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta { match self.session.last_exception.as_mut() { Some(exception) => exception.get_slice_sat(data_slice), - None => SliceMeta { len: 0, last: true }, + None => SliceMeta { + len: 0, + status: PayloadStatus::FirstAndLast, + }, } }