diff --git a/flake.lock b/flake.lock index ff735d5..a2c79f0 100644 --- a/flake.lock +++ b/flake.lock @@ -11,11 +11,11 @@ "src-pythonparser": "src-pythonparser" }, "locked": { - "lastModified": 1720514025, - "narHash": "sha256-eU9Lux5olHTxRCwSiApKSPEdNhrlZ2OJ9ZqSofqp9Uk=", + "lastModified": 1722417433, + "narHash": "sha256-QEbcVdL1sUQEbMCvCUvPM8DKqwOth3gJpdiLTf4hPN8=", "ref": "refs/heads/master", - "rev": "7a50afd9a96be8e6510d5c952808f39f0b74202a", - "revCount": 8887, + "rev": "0623480c82c28d57e14dc4f363374758a52284d3", + "revCount": 8952, "type": "git", "url": "https://github.com/m-labs/artiq.git" }, @@ -37,11 +37,11 @@ ] }, "locked": { - "lastModified": 1717637438, - "narHash": "sha256-BXFidNm3Em8iChPGu1L0s2bY+f2yQ0VVid4MuOoTehw=", + "lastModified": 1720768567, + "narHash": "sha256-3VoK7o5MtHtbHLrc6Pv+eQWFtaz5Gd/YWyV5TD3c5Ss=", "owner": "m-labs", "repo": "artiq-comtools", - "rev": "78d27026efe76a13f7b4698a554f55811369ec4d", + "rev": "f93570d8f2ed5a3cfb3e1c16ab00f2540551e994", "type": "github" }, "original": { @@ -102,16 +102,16 @@ }, "nixpkgs": { "locked": { - "lastModified": 1720386169, - "narHash": "sha256-NGKVY4PjzwAa4upkGtAMz1npHGoRzWotlSnVlqI40mo=", + "lastModified": 1721924956, + "narHash": "sha256-Sb1jlyRO+N8jBXEX9Pg9Z1Qb8Bw9QyOgLDNMEpmjZ2M=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "194846768975b7ad2c4988bdb82572c00222c0d7", + "rev": "5ad6a14c6bf098e98800b091668718c336effc95", "type": "github" }, "original": { "owner": "NixOS", - "ref": "nixos-24.05", + "ref": "nixos-unstable", "repo": "nixpkgs", "type": "github" } @@ -131,11 +131,11 @@ ] }, "locked": { - "lastModified": 1720491570, - "narHash": "sha256-PHS2BcQ9kxBpu9GKlDg3uAlrX/ahQOoAiVmwGl6BjD4=", + "lastModified": 1722046723, + "narHash": "sha256-G7/gHz8ORRvHd1/RIURrdcswKRPe9K0FsIYR4v5jSWo=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "b970af40fdc4bd80fd764796c5f97c15e2b564eb", + "rev": "56baac5e6b2743d4730e664ea64f6d8a2aad0fbb", "type": "github" }, "original": { @@ -168,11 +168,11 @@ "src-migen": { "flake": false, "locked": { - "lastModified": 1720332047, - "narHash": "sha256-FdYVEHVtXHrzPhBqpXOTo9uHQAtuCsDPmAPY8JrfHOY=", + "lastModified": 1721561053, + "narHash": "sha256-z3LRhNmKZrjr6rFD0yxtccSa/SWvFIYmb+G/D5d2Jd8=", "owner": "m-labs", "repo": "migen", - "rev": "60739a161e64630ce7ba62d1a5bac1252b66c3b9", + "rev": "9279e8623f8433bc4f23ac51e5e2331bfe544417", "type": "github" }, "original": { @@ -239,11 +239,11 @@ ] }, "locked": { - "lastModified": 1717654016, - "narHash": "sha256-y/c0EZNDNlxb/yLy/D23X9PLoiQ8I9mXAA0zsVOy2t8=", + "lastModified": 1720537402, + "narHash": "sha256-ybvaQ48SVBqYVqgYmGUdefGZkni7PJ90qYQPHnFOwDs=", "ref": "refs/heads/master", - "rev": "0efbbe39fe643c03f15e29c208bff3247a987766", - "revCount": 647, + "rev": "b2b3e5c933cbc4b7cb14adde480d7561a3ae71ee", + "revCount": 648, "type": "git", "url": "https://git.m-labs.hk/m-labs/zynq-rs" }, diff --git a/flake.nix b/flake.nix index 5660e31..ff055e9 100644 --- a/flake.nix +++ b/flake.nix @@ -18,11 +18,11 @@ fastnumbers = pkgs.python3Packages.buildPythonPackage rec { pname = "fastnumbers"; - version = "2.2.1"; + version = "5.1.0"; src = pkgs.python3Packages.fetchPypi { inherit pname version; - sha256 = "0j15i54p7nri6hkzn1wal9pxri4pgql01wgjccig6ar0v5jjbvsy"; + sha256 = "sha256-4JLTP4uVwxcaL7NOV57+DFSwKQ3X+W/6onYkN2AdkKc="; }; }; diff --git a/src/libboard_artiq/src/drtioaux_proto.rs b/src/libboard_artiq/src/drtioaux_proto.rs index cc649dd..6232260 100644 --- a/src/libboard_artiq/src/drtioaux_proto.rs +++ b/src/libboard_artiq/src/drtioaux_proto.rs @@ -267,12 +267,14 @@ pub enum Packet { exception_src: u8, }, SubkernelExceptionRequest { + source: u8, destination: u8, }, SubkernelException { + destination: u8, last: bool, length: u16, - data: [u8; SAT_PAYLOAD_MAX_SIZE], + data: [u8; MASTER_PAYLOAD_MAX_SIZE], }, SubkernelMessage { source: u8, @@ -524,14 +526,17 @@ impl Packet { exception_src: reader.read_u8()?, }, 0xc9 => Packet::SubkernelExceptionRequest { + source: reader.read_u8()?, destination: reader.read_u8()?, }, 0xca => { + let destination = reader.read_u8()?; let last = reader.read_bool()?; let length = reader.read_u16()?; - let mut data: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; + let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut data[0..length as usize])?; Packet::SubkernelException { + destination: destination, last: last, length: length, data: data, @@ -896,12 +901,19 @@ impl Packet { writer.write_bool(with_exception)?; writer.write_u8(exception_src)?; } - Packet::SubkernelExceptionRequest { destination } => { + Packet::SubkernelExceptionRequest { source, destination } => { writer.write_u8(0xc9)?; + writer.write_u8(source)?; writer.write_u8(destination)?; } - Packet::SubkernelException { last, length, data } => { + Packet::SubkernelException { + destination, + last, + length, + data, + } => { writer.write_u8(0xca)?; + writer.write_u8(destination)?; writer.write_bool(last)?; writer.write_u16(length)?; writer.write_all(&data[0..length as usize])?; @@ -943,6 +955,8 @@ impl Packet { Packet::SubkernelLoadRunReply { destination, .. } => Some(*destination), Packet::SubkernelMessage { destination, .. } => Some(*destination), Packet::SubkernelMessageAck { destination } => Some(*destination), + Packet::SubkernelExceptionRequest { destination, .. } => Some(*destination), + Packet::SubkernelException { destination, .. } => Some(*destination), Packet::DmaPlaybackStatus { destination, .. } => Some(*destination), Packet::SubkernelFinished { destination, .. } => Some(*destination), _ => None, diff --git a/src/libksupport/src/eh_artiq.rs b/src/libksupport/src/eh_artiq.rs index 6f159ac..b42b880 100644 --- a/src/libksupport/src/eh_artiq.rs +++ b/src/libksupport/src/eh_artiq.rs @@ -14,8 +14,10 @@ use core::mem; +use core_io::Error as ReadError; use cslice::CSlice; use dwarf::eh::{self, EHAction, EHContext}; +use io::{Cursor, ProtoRead}; use libc::{c_int, c_void, uintptr_t}; use log::{error, trace}; use unwind as uw; @@ -295,6 +297,60 @@ pub unsafe extern "C" fn raise(exception: *const Exception) -> ! { unreachable!(); } +fn read_exception_string<'a>(reader: &mut Cursor<&[u8]>) -> Result, ReadError> { + let len = reader.read_u32()? as usize; + if len == usize::MAX { + let data = reader.read_u32()?; + Ok(unsafe { CSlice::new(data as *const u8, len) }) + } else { + let pos = reader.position(); + let slice = unsafe { + let ptr = reader.get_ref().as_ptr().offset(pos as isize); + CSlice::new(ptr, len) + }; + reader.set_position(pos + len); + Ok(slice) + } +} + +fn read_exception(raw_exception: &[u8]) -> Result { + let mut reader = Cursor::new(raw_exception); + + let mut byte = reader.read_u8()?; + // to sync + while byte != 0x5a { + byte = reader.read_u8()?; + } + // skip sync bytes, 0x09 indicates exception + while byte != 0x09 { + byte = reader.read_u8()?; + } + let _len = reader.read_u32()?; + // ignore the remaining exceptions, stack traces etc. - unwinding from another device would be unwise anyway + Ok(Exception { + id: reader.read_u32()?, + message: read_exception_string(&mut reader)?, + param: [ + reader.read_u64()? as i64, + reader.read_u64()? as i64, + reader.read_u64()? as i64, + ], + file: read_exception_string(&mut reader)?, + line: reader.read_u32()?, + column: reader.read_u32()?, + function: read_exception_string(&mut reader)?, + }) +} + +pub fn raise_raw(raw_exception: &[u8]) -> ! { + use crate::artiq_raise; + if let Ok(exception) = read_exception(raw_exception) { + unsafe { raise(&exception) }; + } else { + artiq_raise!("SubkernelError", "Error passing exception"); + } +} + pub unsafe extern "C" fn resume() -> ! { trace!("resume"); assert!(EXCEPTION_BUFFER.exception_count != 0); diff --git a/src/libksupport/src/kernel/mod.rs b/src/libksupport/src/kernel/mod.rs index b235cdd..8a8d48d 100644 --- a/src/libksupport/src/kernel/mod.rs +++ b/src/libksupport/src/kernel/mod.rs @@ -23,6 +23,7 @@ pub enum SubkernelStatus { Timeout, IncorrectState, CommLost, + Exception(Vec), OtherError, } @@ -90,9 +91,7 @@ pub enum Message { timeout: i64, }, #[cfg(has_drtio)] - SubkernelAwaitFinishReply { - status: SubkernelStatus, - }, + SubkernelAwaitFinishReply, #[cfg(has_drtio)] SubkernelMsgSend { id: u32, @@ -109,9 +108,10 @@ pub enum Message { }, #[cfg(has_drtio)] SubkernelMsgRecvReply { - status: SubkernelStatus, count: u8, }, + #[cfg(has_drtio)] + SubkernelError(SubkernelStatus), } static CHANNEL_0TO1: Mutex>> = Mutex::new(None); diff --git a/src/libksupport/src/kernel/subkernel.rs b/src/libksupport/src/kernel/subkernel.rs index 511e92b..66adb46 100644 --- a/src/libksupport/src/kernel/subkernel.rs +++ b/src/libksupport/src/kernel/subkernel.rs @@ -3,7 +3,7 @@ use alloc::vec::Vec; use cslice::CSlice; use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0}; -use crate::{artiq_raise, rpc::send_args}; +use crate::{artiq_raise, eh_artiq, rpc::send_args}; pub extern "C" fn load_run(id: u32, destination: u8, run: bool) { unsafe { @@ -36,21 +36,18 @@ pub extern "C" fn await_finish(id: u32, timeout: i64) { }); } match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { - Message::SubkernelAwaitFinishReply { - status: SubkernelStatus::NoError, - } => (), - Message::SubkernelAwaitFinishReply { - status: SubkernelStatus::IncorrectState, - } => artiq_raise!("SubkernelError", "Subkernel not running"), - Message::SubkernelAwaitFinishReply { - status: SubkernelStatus::Timeout, - } => artiq_raise!("SubkernelError", "Subkernel timed out"), - Message::SubkernelAwaitFinishReply { - status: SubkernelStatus::CommLost, - } => artiq_raise!("SubkernelError", "Lost communication with satellite"), - Message::SubkernelAwaitFinishReply { - status: SubkernelStatus::OtherError, - } => artiq_raise!("SubkernelError", "An error occurred during subkernel operation"), + Message::SubkernelAwaitFinishReply => (), + Message::SubkernelError(SubkernelStatus::IncorrectState) => { + artiq_raise!("SubkernelError", "Subkernel not running") + } + Message::SubkernelError(SubkernelStatus::Timeout) => artiq_raise!("SubkernelError", "Subkernel timed out"), + Message::SubkernelError(SubkernelStatus::CommLost) => { + artiq_raise!("SubkernelError", "Lost communication with satellite") + } + Message::SubkernelError(SubkernelStatus::OtherError) => { + artiq_raise!("SubkernelError", "An error occurred during subkernel operation") + } + Message::SubkernelError(SubkernelStatus::Exception(raw_exception)) => eh_artiq::raise_raw(&raw_exception), _ => panic!("expected SubkernelAwaitFinishReply after SubkernelAwaitFinishRequest"), } } @@ -92,30 +89,22 @@ pub extern "C" fn await_message(id: i32, timeout: i64, tags: &CSlice, min: u }); } match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { - Message::SubkernelMsgRecvReply { - status: SubkernelStatus::NoError, - count, - } => { + Message::SubkernelMsgRecvReply { count } => { if min > count || count > max { artiq_raise!("SubkernelError", "Received more or less arguments than required") } } - Message::SubkernelMsgRecvReply { - status: SubkernelStatus::IncorrectState, - .. - } => artiq_raise!("SubkernelError", "Subkernel not running"), - Message::SubkernelMsgRecvReply { - status: SubkernelStatus::Timeout, - .. - } => artiq_raise!("SubkernelError", "Subkernel timed out"), - Message::SubkernelMsgRecvReply { - status: SubkernelStatus::CommLost, - .. - } => artiq_raise!("SubkernelError", "Lost communication with satellite"), - Message::SubkernelMsgRecvReply { - status: SubkernelStatus::OtherError, - .. - } => artiq_raise!("SubkernelError", "An error occurred during subkernel operation"), + Message::SubkernelError(SubkernelStatus::IncorrectState) => { + artiq_raise!("SubkernelError", "Subkernel not running") + } + Message::SubkernelError(SubkernelStatus::Timeout) => artiq_raise!("SubkernelError", "Subkernel timed out"), + Message::SubkernelError(SubkernelStatus::CommLost) => { + artiq_raise!("SubkernelError", "Lost communication with satellite") + } + Message::SubkernelError(SubkernelStatus::OtherError) => { + artiq_raise!("SubkernelError", "An error occurred during subkernel operation") + } + Message::SubkernelError(SubkernelStatus::Exception(raw_exception)) => eh_artiq::raise_raw(&raw_exception), _ => panic!("expected SubkernelMsgRecvReply after SubkernelMsgRecvRequest"), } // RpcRecvRequest should be called after this to receive message data diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 3ee3201..e6abfb4 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -422,33 +422,23 @@ async fn handle_run_kernel( #[cfg(has_drtio)] kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => { let res = subkernel::await_finish(aux_mutex, routing_table, timer, id, timeout).await; - let status = match res { - Ok(ref res) => { + let response = match res { + Ok(res) => { if res.status == subkernel::FinishStatus::CommLost { - kernel::SubkernelStatus::CommLost - } else if let Some(exception) = &res.exception { - error!("Exception in subkernel"); - match stream { - None => (), - Some(stream) => { - write_chunk(stream, exception).await?; - } - } - // will not be called after exception is served - kernel::SubkernelStatus::OtherError + kernel::Message::SubkernelError(kernel::SubkernelStatus::CommLost) + } else if let Some(exception) = res.exception { + kernel::Message::SubkernelError(kernel::SubkernelStatus::Exception(exception)) } else { - kernel::SubkernelStatus::NoError + kernel::Message::SubkernelAwaitFinishReply } } - Err(SubkernelError::Timeout) => kernel::SubkernelStatus::Timeout, - Err(SubkernelError::IncorrectState) => kernel::SubkernelStatus::IncorrectState, - Err(_) => kernel::SubkernelStatus::OtherError, + Err(SubkernelError::Timeout) => kernel::Message::SubkernelError(kernel::SubkernelStatus::Timeout), + Err(SubkernelError::IncorrectState) => { + kernel::Message::SubkernelError(kernel::SubkernelStatus::IncorrectState) + } + Err(_) => kernel::Message::SubkernelError(kernel::SubkernelStatus::OtherError), }; - control - .borrow_mut() - .tx - .async_send(kernel::Message::SubkernelAwaitFinishReply { status: status }) - .await; + control.borrow_mut().tx.async_send(response).await; } #[cfg(has_drtio)] kernel::Message::SubkernelMsgSend { id, destination, data } => { @@ -469,35 +459,23 @@ async fn handle_run_kernel( #[cfg(has_drtio)] kernel::Message::SubkernelMsgRecvRequest { id, timeout, tags } => { let message_received = subkernel::message_await(id as u32, timeout, timer).await; - let (status, count) = match message_received { - Ok(ref message) => (kernel::SubkernelStatus::NoError, message.count), - Err(SubkernelError::Timeout) => (kernel::SubkernelStatus::Timeout, 0), - Err(SubkernelError::IncorrectState) => (kernel::SubkernelStatus::IncorrectState, 0), - Err(SubkernelError::CommLost) => (kernel::SubkernelStatus::CommLost, 0), + let response = match message_received { + Ok(ref message) => kernel::Message::SubkernelMsgRecvReply { count: message.count }, + Err(SubkernelError::Timeout) => kernel::Message::SubkernelError(kernel::SubkernelStatus::Timeout), + Err(SubkernelError::IncorrectState) => { + kernel::Message::SubkernelError(kernel::SubkernelStatus::IncorrectState) + } + Err(SubkernelError::CommLost) => kernel::Message::SubkernelError(kernel::SubkernelStatus::CommLost), Err(SubkernelError::SubkernelException) => { - error!("Exception in subkernel"); // just retrieve the exception let status = subkernel::await_finish(aux_mutex, routing_table, timer, id as u32, timeout) .await .unwrap(); - match stream { - None => (), - Some(stream) => { - write_chunk(stream, &status.exception.unwrap()).await?; - } - } - (kernel::SubkernelStatus::OtherError, 0) + kernel::Message::SubkernelError(kernel::SubkernelStatus::Exception(status.exception.unwrap())) } - Err(_) => (kernel::SubkernelStatus::OtherError, 0), + Err(_) => kernel::Message::SubkernelError(kernel::SubkernelStatus::OtherError), }; - control - .borrow_mut() - .tx - .async_send(kernel::Message::SubkernelMsgRecvReply { - status: status, - count: count, - }) - .await; + control.borrow_mut().tx.async_send(response).await; if let Ok(message) = message_received { // receive code almost identical to RPC recv, except we are not reading from a stream let mut reader = Cursor::new(message.data); @@ -529,7 +507,7 @@ async fn handle_run_kernel( .async_send(kernel::Message::RpcRecvReply(Ok(0))) .await; i += 1; - if i < count { + if i < message.count { current_tags = remaining_tags; } else { break; diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 8644e90..53a3b8b 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -129,6 +129,8 @@ pub mod drtio { | Packet::SubkernelLoadRunReply { destination, .. } | Packet::SubkernelMessage { destination, .. } | Packet::SubkernelMessageAck { destination, .. } + | Packet::SubkernelException { destination, .. } + | Packet::SubkernelExceptionRequest { destination, .. } | Packet::DmaPlaybackStatus { destination, .. } | Packet::SubkernelFinished { destination, .. } => { if destination == 0 { @@ -183,10 +185,7 @@ pub mod drtio { async fn drain_buffer(linkno: u8, draining_time: Milliseconds, timer: GlobalTimer) { let max_time = timer.get_time() + draining_time; - loop { - if timer.get_time() > max_time { - return; - } + while timer.get_time() < max_time { let _ = drtioaux_async::recv(linkno).await; } } @@ -835,13 +834,19 @@ pub mod drtio { linkno, routing_table, &Packet::SubkernelExceptionRequest { + source: 0, destination: destination, }, timer, ) .await?; match reply { - Packet::SubkernelException { last, length, data } => { + Packet::SubkernelException { + destination: 0, + last, + length, + data, + } => { remote_data.extend(&data[0..length as usize]); if last { return Ok(remote_data); diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index 422b260..aebd9d6 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -895,6 +895,7 @@ fn process_aux_packet( Ok(()) } drtioaux::Packet::SubkernelExceptionRequest { + source, destination: _destination, } => { forward!( @@ -907,17 +908,46 @@ fn process_aux_packet( &packet, timer ); - let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let meta = kernel_manager.exception_get_slice(&mut data_slice); - drtioaux::send( - 0, - &drtioaux::Packet::SubkernelException { + router.send( + drtioaux::Packet::SubkernelException { + destination: source, last: meta.status.is_last(), length: meta.len, data: data_slice, }, + _routing_table, + *rank, + *self_destination, ) } + drtioaux::Packet::SubkernelException { + destination: _destination, + last, + length, + data, + } => { + forward!( + router, + _routing_table, + _destination, + *rank, + *self_destination, + _repeaters, + &packet, + timer + ); + kernel_manager.received_exception( + &data[..length as usize], + last, + router, + _routing_table, + *rank, + *self_destination, + ); + Ok(()) + } drtioaux::Packet::SubkernelMessage { source, destination: _destination, diff --git a/src/satman/src/routing.rs b/src/satman/src/routing.rs index 3276444..87d5f09 100644 --- a/src/satman/src/routing.rs +++ b/src/satman/src/routing.rs @@ -4,7 +4,7 @@ use core::cmp::min; #[cfg(has_drtio_routing)] use libboard_artiq::pl::csr; use libboard_artiq::{drtio_routing, drtioaux, - drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}}; + drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}}; pub struct SliceMeta { pub destination: u8, @@ -57,7 +57,6 @@ impl Sliceable { self.data.extend(data); } - get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE); get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE); } diff --git a/src/satman/src/subkernel.rs b/src/satman/src/subkernel.rs index a7b5634..83d0c38 100644 --- a/src/satman/src/subkernel.rs +++ b/src/satman/src/subkernel.rs @@ -11,7 +11,7 @@ use io::{Cursor, ProtoWrite}; use ksupport::{eh_artiq, kernel, rpc}; use libboard_artiq::{drtio_routing::RoutingTable, drtioaux, - drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, + drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}, pl::csr}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libcortex_a9::sync_channel::Receiver; @@ -47,6 +47,9 @@ enum KernelState { DmaAwait { max_time: Milliseconds, }, + SubkernelRetrievingException { + destination: u8, + }, } #[derive(Debug)] @@ -123,10 +126,11 @@ struct MessageManager { struct Session { id: u32, kernel_state: KernelState, - last_exception: Option, + last_exception: Option, // exceptions raised locally + external_exception: Option>, // exceptions from sub-subkernels messages: MessageManager, source: u8, // which destination requested running the kernel - subkernels_finished: Vec, + subkernels_finished: Vec<(u32, Option)>, } impl Session { @@ -135,6 +139,7 @@ impl Session { id: id, kernel_state: KernelState::Absent, last_exception: None, + external_exception: None, messages: MessageManager::new(), source: 0, subkernels_finished: Vec::new(), @@ -410,9 +415,9 @@ impl<'a> Manager<'_> { } } - pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta { + pub fn exception_get_slice(&mut self, data_slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> SliceMeta { match self.session.last_exception.as_mut() { - Some(exception) => exception.get_slice_sat(data_slice), + Some(exception) => exception.get_slice_master(data_slice), None => SliceMeta { destination: 0, len: 0, @@ -540,7 +545,7 @@ impl<'a> Manager<'_> { return; } - match self.process_external_messages(timer) { + match self.process_external_messages(router, routing_table, rank, destination, timer) { Ok(()) => (), Err(Error::AwaitingMessage) => return, // kernel still waiting, do not process kernel messages Err(Error::KernelException(exception)) => { @@ -596,6 +601,41 @@ impl<'a> Manager<'_> { } } + fn check_finished_kernels( + &mut self, + id: u32, + router: &mut Router, + routing_table: &RoutingTable, + rank: u8, + self_destination: u8, + ) { + for (i, (status, exception_source)) in self.session.subkernels_finished.iter().enumerate() { + if *status == id { + if exception_source.is_none() { + self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply); + self.session.kernel_state = KernelState::Running; + self.session.subkernels_finished.swap_remove(i); + } else { + let destination = exception_source.unwrap(); + self.session.external_exception = Some(Vec::new()); + self.session.kernel_state = KernelState::SubkernelRetrievingException { + destination: destination, + }; + router.route( + drtioaux::Packet::SubkernelExceptionRequest { + source: self_destination, + destination: destination, + }, + &routing_table, + rank, + self_destination, + ); + } + break; + } + } + } + pub fn subkernel_load_run_reply(&mut self, succeeded: bool) { if self.session.kernel_state == KernelState::SubkernelAwaitLoad { self.control @@ -608,16 +648,46 @@ impl<'a> Manager<'_> { } pub fn remote_subkernel_finished(&mut self, id: u32, with_exception: bool, exception_source: u8) { - if with_exception { - self.kernel_stop(); - self.last_finished = Some(SubkernelFinished { - source: self.session.source, - id: self.session.id, - with_exception: true, - exception_source: exception_source, - }) + let exception_src = if with_exception { Some(exception_source) } else { None }; + self.session.subkernels_finished.push((id, exception_src)); + } + + pub fn received_exception( + &mut self, + exception_data: &[u8], + last: bool, + router: &mut Router, + routing_table: &RoutingTable, + rank: u8, + self_destination: u8, + ) { + if let KernelState::SubkernelRetrievingException { destination } = self.session.kernel_state { + self.session + .external_exception + .as_mut() + .unwrap() + .extend_from_slice(exception_data); + if last { + self.control + .tx + .send(kernel::Message::SubkernelError(kernel::SubkernelStatus::Exception( + self.session.external_exception.take().unwrap(), + ))); + self.session.kernel_state = KernelState::Running; + } else { + /* fetch another slice */ + router.route( + drtioaux::Packet::SubkernelExceptionRequest { + source: self_destination, + destination: destination, + }, + routing_table, + rank, + self_destination, + ); + } } else { - self.session.subkernels_finished.push(id); + warn!("Received unsolicited exception data"); } } @@ -780,28 +850,35 @@ impl<'a> Manager<'_> { Ok(false) } - fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> { + fn process_external_messages( + &mut self, + router: &mut Router, + routing_table: &RoutingTable, + rank: u8, + self_destination: u8, + timer: &GlobalTimer, + ) -> Result<(), Error> { match &self.session.kernel_state { KernelState::MsgAwait { max_time, id, tags } => { if let Some(max_time) = *max_time { if timer.get_time() > max_time { - self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { - status: kernel::SubkernelStatus::Timeout, - count: 0, - }); + self.control + .tx + .send(kernel::Message::SubkernelError(kernel::SubkernelStatus::Timeout)); self.session.kernel_state = KernelState::Running; return Ok(()); } } if let Some(message) = self.session.messages.get_incoming(*id) { - self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { - status: kernel::SubkernelStatus::NoError, - count: message.count, - }); + self.control + .tx + .send(kernel::Message::SubkernelMsgRecvReply { count: message.count }); let tags = tags.clone(); self.session.kernel_state = KernelState::Running; self.pass_message_to_kernel(&message, tags, timer) } else { + let id = *id; + self.check_finished_kernels(id, router, routing_table, rank, self_destination); Err(Error::AwaitingMessage) } } @@ -817,27 +894,18 @@ impl<'a> Manager<'_> { KernelState::SubkernelAwaitFinish { max_time, id } => { if let Some(max_time) = *max_time { if timer.get_time() > max_time { - self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { - status: kernel::SubkernelStatus::Timeout, - }); + self.control + .tx + .send(kernel::Message::SubkernelError(kernel::SubkernelStatus::Timeout)); self.session.kernel_state = KernelState::Running; return Ok(()); } } - let mut i = 0; - for status in &self.session.subkernels_finished { - if *status == *id { - self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { - status: kernel::SubkernelStatus::NoError, - }); - self.session.kernel_state = KernelState::Running; - self.session.subkernels_finished.swap_remove(i); - break; - } - i += 1; - } + let id = *id; + self.check_finished_kernels(id, router, routing_table, rank, self_destination); Ok(()) } + KernelState::SubkernelRetrievingException { .. } => Err(Error::AwaitingMessage), KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => { if timer.get_time() > *max_time { self.control.tx.send(kernel::Message::DmaAwaitRemoteReply {