diff --git a/src/libboard_artiq/src/drtioaux_proto.rs b/src/libboard_artiq/src/drtioaux_proto.rs index 154a5e0..b18fd66 100644 --- a/src/libboard_artiq/src/drtioaux_proto.rs +++ b/src/libboard_artiq/src/drtioaux_proto.rs @@ -5,7 +5,7 @@ use io::proto::{ProtoRead, ProtoWrite}; // used by satellite -> master analyzer, subkernel exceptions pub const SAT_PAYLOAD_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*last*/1 - /*length*/2; // used by DDMA, subkernel program data (need to provide extra ID and destination) -pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*destination*/1 - /*ID*/4; +pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*source*/1 - /*destination*/1 - /*ID*/4; #[derive(Debug)] pub enum Error { @@ -89,6 +89,8 @@ pub enum Packet { RoutingSetRank { rank: u8, }, + RoutingRetrievePackets, + RoutingNoPackets, RoutingAck, MonitorRequest { @@ -197,6 +199,7 @@ pub enum Packet { }, DmaAddTraceRequest { + source: u8, destination: u8, id: u32, status: PayloadStatus, @@ -204,24 +207,30 @@ pub enum Packet { trace: [u8; MASTER_PAYLOAD_MAX_SIZE], }, DmaAddTraceReply { + destination: u8, succeeded: bool, }, DmaRemoveTraceRequest { + source: u8, destination: u8, id: u32, }, DmaRemoveTraceReply { + destination: u8, succeeded: bool, }, DmaPlaybackRequest { + source: u8, destination: u8, id: u32, timestamp: u64, }, DmaPlaybackReply { + destination: u8, succeeded: bool, }, DmaPlaybackStatus { + source: u8, destination: u8, id: u32, error: u8, @@ -240,22 +249,20 @@ pub enum Packet { succeeded: bool, }, SubkernelLoadRunRequest { + source: u8, destination: u8, id: u32, run: bool, }, SubkernelLoadRunReply { - succeeded: bool, - }, - SubkernelStopRequest { destination: u8, - }, - SubkernelStopReply { succeeded: bool, }, SubkernelFinished { + destination: u8, id: u32, with_exception: bool, + exception_src: u8, }, SubkernelExceptionRequest { destination: u8, @@ -266,6 +273,7 @@ pub enum Packet { data: [u8; SAT_PAYLOAD_MAX_SIZE], }, SubkernelMessage { + source: u8, destination: u8, id: u32, status: PayloadStatus, @@ -315,6 +323,8 @@ impl Packet { rank: reader.read_u8()?, }, 0x32 => Packet::RoutingAck, + 0x33 => Packet::RoutingRetrievePackets, + 0x34 => Packet::RoutingNoPackets, 0x40 => Packet::MonitorRequest { destination: reader.read_u8()?, @@ -429,39 +439,47 @@ impl Packet { } 0xb0 => { + let source = reader.read_u8()?; let destination = reader.read_u8()?; let id = reader.read_u32()?; - let status = PayloadStatus::from(reader.read_u8()?); + let status = reader.read_u8()?; let length = reader.read_u16()?; let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut trace[0..length as usize])?; Packet::DmaAddTraceRequest { + source: source, destination: destination, id: id, - status: status, + status: PayloadStatus::from(status), length: length as u16, trace: trace, } } 0xb1 => Packet::DmaAddTraceReply { + destination: reader.read_u8()?, succeeded: reader.read_bool()?, }, 0xb2 => Packet::DmaRemoveTraceRequest { + source: reader.read_u8()?, destination: reader.read_u8()?, id: reader.read_u32()?, }, 0xb3 => Packet::DmaRemoveTraceReply { + destination: reader.read_u8()?, succeeded: reader.read_bool()?, }, 0xb4 => Packet::DmaPlaybackRequest { + source: reader.read_u8()?, destination: reader.read_u8()?, id: reader.read_u32()?, timestamp: reader.read_u64()?, }, 0xb5 => Packet::DmaPlaybackReply { + destination: reader.read_u8()?, succeeded: reader.read_bool()?, }, 0xb6 => Packet::DmaPlaybackStatus { + source: reader.read_u8()?, destination: reader.read_u8()?, id: reader.read_u32()?, error: reader.read_u8()?, @@ -488,22 +506,20 @@ impl Packet { succeeded: reader.read_bool()?, }, 0xc4 => Packet::SubkernelLoadRunRequest { + source: reader.read_u8()?, destination: reader.read_u8()?, id: reader.read_u32()?, run: reader.read_bool()?, }, 0xc5 => Packet::SubkernelLoadRunReply { - succeeded: reader.read_bool()?, - }, - 0xc6 => Packet::SubkernelStopRequest { destination: reader.read_u8()?, - }, - 0xc7 => Packet::SubkernelStopReply { succeeded: reader.read_bool()?, }, 0xc8 => Packet::SubkernelFinished { + destination: reader.read_u8()?, id: reader.read_u32()?, with_exception: reader.read_bool()?, + exception_src: reader.read_u8()?, }, 0xc9 => Packet::SubkernelExceptionRequest { destination: reader.read_u8()?, @@ -520,16 +536,18 @@ impl Packet { } } 0xcb => { + let source = reader.read_u8()?; let destination = reader.read_u8()?; let id = reader.read_u32()?; - let status = PayloadStatus::from(reader.read_u8()?); + let status = reader.read_u8()?; let length = reader.read_u16()?; let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut data[0..length as usize])?; Packet::SubkernelMessage { + source: source, destination: destination, id: id, - status: status, + status: PayloadStatus::from(status), length: length as u16, data: data, } @@ -580,6 +598,8 @@ impl Packet { writer.write_u8(rank)?; } Packet::RoutingAck => writer.write_u8(0x32)?, + Packet::RoutingRetrievePackets => writer.write_u8(0x33)?, + Packet::RoutingNoPackets => writer.write_u8(0x34)?, Packet::MonitorRequest { destination, @@ -751,6 +771,7 @@ impl Packet { } Packet::DmaAddTraceRequest { + source, destination, id, status, @@ -758,6 +779,7 @@ impl Packet { length, } => { writer.write_u8(0xb0)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_u8(status as u8)?; @@ -766,34 +788,45 @@ impl Packet { writer.write_u16(length)?; writer.write_all(&trace[0..length as usize])?; } - Packet::DmaAddTraceReply { succeeded } => { + Packet::DmaAddTraceReply { destination, succeeded } => { writer.write_u8(0xb1)?; + writer.write_u8(destination)?; writer.write_bool(succeeded)?; } - Packet::DmaRemoveTraceRequest { destination, id } => { + Packet::DmaRemoveTraceRequest { + source, + destination, + id, + } => { writer.write_u8(0xb2)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; } - Packet::DmaRemoveTraceReply { succeeded } => { + Packet::DmaRemoveTraceReply { destination, succeeded } => { writer.write_u8(0xb3)?; + writer.write_u8(destination)?; writer.write_bool(succeeded)?; } Packet::DmaPlaybackRequest { + source, destination, id, timestamp, } => { writer.write_u8(0xb4)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_u64(timestamp)?; } - Packet::DmaPlaybackReply { succeeded } => { + Packet::DmaPlaybackReply { destination, succeeded } => { writer.write_u8(0xb5)?; + writer.write_u8(destination)?; writer.write_bool(succeeded)?; } Packet::DmaPlaybackStatus { + source, destination, id, error, @@ -801,6 +834,7 @@ impl Packet { timestamp, } => { writer.write_u8(0xb6)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_u8(error)?; @@ -826,28 +860,34 @@ impl Packet { writer.write_u8(0xc1)?; writer.write_bool(succeeded)?; } - Packet::SubkernelLoadRunRequest { destination, id, run } => { + Packet::SubkernelLoadRunRequest { + source, + destination, + id, + run, + } => { writer.write_u8(0xc4)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_bool(run)?; } - Packet::SubkernelLoadRunReply { succeeded } => { + Packet::SubkernelLoadRunReply { destination, succeeded } => { writer.write_u8(0xc5)?; - writer.write_bool(succeeded)?; - } - Packet::SubkernelStopRequest { destination } => { - writer.write_u8(0xc6)?; writer.write_u8(destination)?; - } - Packet::SubkernelStopReply { succeeded } => { - writer.write_u8(0xc7)?; writer.write_bool(succeeded)?; } - Packet::SubkernelFinished { id, with_exception } => { + Packet::SubkernelFinished { + destination, + id, + with_exception, + exception_src, + } => { writer.write_u8(0xc8)?; + writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_bool(with_exception)?; + writer.write_u8(exception_src)?; } Packet::SubkernelExceptionRequest { destination } => { writer.write_u8(0xc9)?; @@ -860,6 +900,7 @@ impl Packet { writer.write_all(&data[0..length as usize])?; } Packet::SubkernelMessage { + source, destination, id, status, @@ -867,6 +908,7 @@ impl Packet { length, } => { writer.write_u8(0xcb)?; + writer.write_u8(source)?; writer.write_u8(destination)?; writer.write_u32(id)?; writer.write_u8(status as u8)?; diff --git a/src/libksupport/src/kernel/mod.rs b/src/libksupport/src/kernel/mod.rs index 7e8b859..664f32b 100644 --- a/src/libksupport/src/kernel/mod.rs +++ b/src/libksupport/src/kernel/mod.rs @@ -77,6 +77,7 @@ pub enum Message { #[cfg(has_drtio)] SubkernelLoadRunRequest { id: u32, + destination: u8, run: bool, }, #[cfg(has_drtio)] @@ -95,6 +96,7 @@ pub enum Message { #[cfg(has_drtio)] SubkernelMsgSend { id: u32, + destination: Option, data: Vec, }, #[cfg(has_drtio)] diff --git a/src/libksupport/src/kernel/subkernel.rs b/src/libksupport/src/kernel/subkernel.rs index 17754c7..90a71e0 100644 --- a/src/libksupport/src/kernel/subkernel.rs +++ b/src/libksupport/src/kernel/subkernel.rs @@ -5,12 +5,16 @@ use cslice::CSlice; use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0}; use crate::{artiq_raise, rpc::send_args}; -pub extern "C" fn load_run(id: u32, run: bool) { +pub extern "C" fn load_run(id: u32, destination: u8, run: bool) { unsafe { KERNEL_CHANNEL_1TO0 .as_mut() .unwrap() - .send(Message::SubkernelLoadRunRequest { id: id, run: run }); + .send(Message::SubkernelLoadRunRequest { + id: id, + destination: destination, + run: run, + }); } match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { Message::SubkernelLoadRunReply { succeeded: true } => (), @@ -51,7 +55,14 @@ pub extern "C" fn await_finish(id: u32, timeout: u64) { } } -pub extern "C" fn send_message(id: u32, count: u8, tag: &CSlice, data: *const *const ()) { +pub extern "C" fn send_message( + id: u32, + is_return: bool, + destination: u8, + count: u8, + tag: &CSlice, + data: *const *const (), +) { let mut buffer = Vec::::new(); send_args(&mut buffer, 0, tag.as_ref(), data, false).expect("RPC encoding failed"); // overwrite service tag, include how many tags are in the message @@ -59,6 +70,7 @@ pub extern "C" fn send_message(id: u32, count: u8, tag: &CSlice, data: *cons unsafe { KERNEL_CHANNEL_1TO0.as_mut().unwrap().send(Message::SubkernelMsgSend { id: id, + destination: if is_return { None } else { Some(destination) }, data: buffer[3..].to_vec(), }); } diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 13e5250..f0c42cb 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -401,7 +401,11 @@ async fn handle_run_kernel( control.borrow_mut().tx.async_send(reply).await; } #[cfg(has_drtio)] - kernel::Message::SubkernelLoadRunRequest { id, run } => { + kernel::Message::SubkernelLoadRunRequest { + id, + destination: _, + run, + } => { let succeeded = match subkernel::load(aux_mutex, routing_table, timer, id, run).await { Ok(()) => true, Err(e) => { @@ -447,7 +451,11 @@ async fn handle_run_kernel( .await; } #[cfg(has_drtio)] - kernel::Message::SubkernelMsgSend { id, data } => { + kernel::Message::SubkernelMsgSend { + id, + destination: _, + data, + } => { let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await; match res { Ok(_) => (), diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index b2f3aa3..adc7ca4 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -43,39 +43,97 @@ pub mod drtio { unsafe { (csr::DRTIO[linkno].rx_up_read)() == 1 } } - async fn process_async_packets(aux_mutex: &Mutex, linkno: u8, packet: Packet) -> Option { - // returns None if an async packet has been consumed - match packet { - Packet::DmaPlaybackStatus { - id, - destination, - error, - channel, - timestamp, - } => { - remote_dma::playback_done(id, destination, error, channel, timestamp).await; - None + async fn link_has_async_ready(linkno: u8) -> bool { + let linkno = linkno as usize; + let async_ready; + unsafe { + async_ready = (csr::DRTIO[linkno].async_messages_ready_read)() == 1; + (csr::DRTIO[linkno].async_messages_ready_write)(1); + } + async_ready + } + + async fn process_async_packets(aux_mutex: &Mutex, linkno: u8, routing_table: &drtio_routing::RoutingTable, timer: GlobalTimer) { + if link_has_async_ready(linkno).await { + loop { + let reply = aux_transact(aux_mutex, linkno, &Packet::RoutingRetrievePackets, timer).await; + if let Ok(packet) = reply { + match packet { + Packet::DmaPlaybackStatus { + id, + source, + destination: 0, + error, + channel, + timestamp, + } => { + remote_dma::playback_done(id, source, error, channel, timestamp).await; + } + Packet::SubkernelFinished { + id, + destination: 0, + with_exception, + exception_src, + } => { + subkernel::subkernel_finished(id, with_exception, exception_src).await; + } + Packet::SubkernelMessage { + id, + source, + destination: 0, + status, + length, + data, + } => { + subkernel::message_handle_incoming(id, status, length as usize, &data).await; + // acknowledge receiving part of the message + let _lock = aux_mutex.async_lock().await; + drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: source }) + .await + .unwrap(); + let mut countdown = timer.countdown(); + // give the satellites some time to process + delay(&mut countdown, Milliseconds(10)).await; + } + // routable packets + Packet::DmaAddTraceRequest { destination, .. } + | Packet::DmaAddTraceReply { destination, .. } + | Packet::DmaRemoveTraceRequest { destination, .. } + | Packet::DmaRemoveTraceReply { destination, .. } + | Packet::DmaPlaybackRequest { destination, .. } + | Packet::DmaPlaybackReply { destination, .. } + | Packet::SubkernelLoadRunRequest { destination, .. } + | Packet::SubkernelLoadRunReply { destination, .. } + | Packet::SubkernelMessage { destination, .. } + | Packet::SubkernelMessageAck { destination, .. } + | Packet::DmaPlaybackStatus { destination, .. } + | Packet::SubkernelFinished { destination, .. } => { + let dest_link = routing_table.0[destination as usize][0] - 1; + if dest_link == linkno { + warn!( + "[LINK#{}] Re-routed packet would return to the same link, dropping: {:?}", + linkno, packet + ); + } else if destination == 0 { + warn!("[LINK#{}] Received invalid routable packet: {:?}", linkno, packet) + } else { + drtioaux_async::send(dest_link, &packet).await.unwrap(); + } + } + + Packet::RoutingNoPackets => break, + + other => warn!("[LINK#{}] Received an unroutable packet: {:?}", linkno, other), + } + } else { + warn!( + "[LINK#{}] Error handling async packets ({})", + linkno, + reply.unwrap_err() + ); + return; + } } - Packet::SubkernelFinished { id, with_exception } => { - subkernel::subkernel_finished(id, with_exception).await; - None - } - Packet::SubkernelMessage { - id, - destination: from, - status, - length, - data, - } => { - subkernel::message_handle_incoming(id, status, length as usize, &data).await; - // acknowledge receiving part of the message - let _lock = aux_mutex.async_lock().await; - drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: from }) - .await - .unwrap(); - None - } - other => Some(other), } } @@ -210,11 +268,7 @@ pub mod drtio { async fn process_unsolicited_aux(aux_mutex: &Rc>, linkno: u8) { let _lock = aux_mutex.async_lock().await; match drtioaux_async::recv(linkno).await { - Ok(Some(packet)) => { - if let Some(packet) = process_async_packets(aux_mutex, linkno, packet).await { - warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet); - } - } + Ok(Some(packet)) => warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet), Ok(None) => (), Err(_) => warn!("[LINK#{}] aux packet error", linkno), } @@ -283,67 +337,53 @@ pub mod drtio { let linkno = hop - 1; if destination_up(up_destinations, destination).await { if up_links[linkno as usize] { - loop { - let reply = aux_transact( - aux_mutex, - linkno, - &Packet::DestinationStatusRequest { - destination: destination, - }, - timer, - ) - .await; - match reply { - Ok(Packet::DestinationDownReply) => { - destination_set_up(routing_table, up_destinations, destination, false).await; - remote_dma::destination_changed( - aux_mutex, - routing_table, - timer, - destination, - false, - ) + let reply = aux_transact( + aux_mutex, + linkno, + &Packet::DestinationStatusRequest { + destination: destination, + }, + timer, + ) + .await; + match reply { + Ok(Packet::DestinationDownReply) => { + destination_set_up(routing_table, up_destinations, destination, false).await; + remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false) + .await; + subkernel::destination_changed(aux_mutex, routing_table, timer, destination, false) .await; - subkernel::destination_changed(aux_mutex, routing_table, timer, destination, false) - .await; - } - Ok(Packet::DestinationOkReply) => (), - Ok(Packet::DestinationSequenceErrorReply { channel }) => { - error!( - "[DEST#{}] RTIO sequence error involving channel 0x{:04x}:{}", - destination, - channel, - resolve_channel_name(channel as u32) - ); - unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_SEQUENCE_ERROR }; - } - Ok(Packet::DestinationCollisionReply { channel }) => { - error!( - "[DEST#{}] RTIO collision involving channel 0x{:04x}:{}", - destination, - channel, - resolve_channel_name(channel as u32) - ); - unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_COLLISION }; - } - Ok(Packet::DestinationBusyReply { channel }) => { - error!( - "[DEST#{}] RTIO busy error involving channel 0x{:04x}:{}", - destination, - channel, - resolve_channel_name(channel as u32) - ); - unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY }; - } - Ok(packet) => match process_async_packets(aux_mutex, linkno, packet).await { - Some(packet) => { - error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet) - } - None => continue, - }, - Err(e) => error!("[DEST#{}] communication failed ({})", destination, e), } - break; + Ok(Packet::DestinationOkReply) => (), + Ok(Packet::DestinationSequenceErrorReply { channel }) => { + error!( + "[DEST#{}] RTIO sequence error involving channel 0x{:04x}:{}", + destination, + channel, + resolve_channel_name(channel as u32) + ); + unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_SEQUENCE_ERROR }; + } + Ok(Packet::DestinationCollisionReply { channel }) => { + error!( + "[DEST#{}] RTIO collision involving channel 0x{:04x}:{}", + destination, + channel, + resolve_channel_name(channel as u32) + ); + unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_COLLISION }; + } + Ok(Packet::DestinationBusyReply { channel }) => { + error!( + "[DEST#{}] RTIO busy error involving channel 0x{:04x}:{}", + destination, + channel, + resolve_channel_name(channel as u32) + ); + unsafe { SEEN_ASYNC_ERRORS |= ASYNC_ERROR_BUSY }; + } + Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), + Err(e) => error!("[DEST#{}] communication failed ({})", destination, e), } } else { destination_set_up(routing_table, up_destinations, destination, false).await; @@ -393,6 +433,7 @@ pub mod drtio { if up_links[linkno as usize] { /* link was previously up */ if link_rx_up(linkno).await { + process_async_packets(aux_mutex, linkno, routing_table, timer).await; process_unsolicited_aux(aux_mutex, linkno).await; process_local_errors(linkno).await; } else { @@ -504,14 +545,21 @@ pub mod drtio { trace, |slice, status, len| Packet::DmaAddTraceRequest { id: id, + source: 0, destination: destination, status: status, length: len as u16, trace: *slice, }, |reply| match reply { - Packet::DmaAddTraceReply { succeeded: true } => Ok(()), - Packet::DmaAddTraceReply { succeeded: false } => Err("error adding trace on satellite"), + Packet::DmaAddTraceReply { + destination: 0, + succeeded: true, + } => Ok(()), + Packet::DmaAddTraceReply { + destination: 0, + succeeded: false, + } => Err("error adding trace on satellite"), _ => Err("adding DMA trace failed, unexpected aux packet"), }, ) @@ -531,14 +579,21 @@ pub mod drtio { linkno, &Packet::DmaRemoveTraceRequest { id: id, + source: 0, destination: destination, }, timer, ) .await; match reply { - Ok(Packet::DmaRemoveTraceReply { succeeded: true }) => Ok(()), - Ok(Packet::DmaRemoveTraceReply { succeeded: false }) => Err("satellite DMA erase error"), + Ok(Packet::DmaRemoveTraceReply { + destination: 0, + succeeded: true, + }) => Ok(()), + Ok(Packet::DmaRemoveTraceReply { + destination: 0, + succeeded: false, + }) => Err("satellite DMA erase error"), Ok(_) => Err("adding trace failed, unexpected aux packet"), Err(_) => Err("erasing trace failed, aux error"), } @@ -558,6 +613,7 @@ pub mod drtio { linkno, &Packet::DmaPlaybackRequest { id: id, + source: 0, destination: destination, timestamp: timestamp, }, @@ -565,8 +621,14 @@ pub mod drtio { ) .await; match reply { - Ok(Packet::DmaPlaybackReply { succeeded: true }) => Ok(()), - Ok(Packet::DmaPlaybackReply { succeeded: false }) => Err("error on DMA playback request"), + Ok(Packet::DmaPlaybackReply { + destination: 0, + succeeded: true, + }) => Ok(()), + Ok(Packet::DmaPlaybackReply { + destination: 0, + succeeded: false, + }) => Err("error on DMA playback request"), Ok(_) => Err("received unexpected aux packet during DMA playback"), Err(_) => Err("aux error on DMA playback"), } @@ -689,6 +751,7 @@ pub mod drtio { linkno, &Packet::SubkernelLoadRunRequest { id: id, + source: 0, destination: destination, run: run, }, @@ -696,8 +759,14 @@ pub mod drtio { ) .await?; match reply { - Packet::SubkernelLoadRunReply { succeeded: true } => return Ok(()), - Packet::SubkernelLoadRunReply { succeeded: false } => return Err("error on subkernel run request"), + Packet::SubkernelLoadRunReply { + destination: 0, + succeeded: true, + } => return Ok(()), + Packet::SubkernelLoadRunReply { + destination: 0, + succeeded: false, + } => return Err("error on subkernel run request"), _ => return Err("received unexpected aux packet during subkernel run"), } } @@ -747,6 +816,7 @@ pub mod drtio { timer, message, |slice, status, len| Packet::SubkernelMessage { + source: 0, destination: destination, id: id, status: status, diff --git a/src/runtime/src/subkernel.rs b/src/runtime/src/subkernel.rs index d129865..02d9f3b 100644 --- a/src/runtime/src/subkernel.rs +++ b/src/runtime/src/subkernel.rs @@ -13,7 +13,7 @@ use crate::rtio_mgt::drtio; pub enum FinishStatus { Ok, CommLost, - Exception, + Exception(u8), // exception source } #[derive(Debug, PartialEq, Clone, Copy)] @@ -121,14 +121,14 @@ pub async fn clear_subkernels() { CURRENT_MESSAGES.async_lock().await.clear(); } -pub async fn subkernel_finished(id: u32, with_exception: bool) { +pub async fn subkernel_finished(id: u32, with_exception: bool, exception_src: u8) { // called upon receiving DRTIO SubkernelRunDone // may be None if session ends and is cleared if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { if subkernel.state == SubkernelState::Running { subkernel.state = SubkernelState::Finished { status: match with_exception { - true => FinishStatus::Exception, + true => FinishStatus::Exception(exception_src), false => FinishStatus::Ok, }, } @@ -196,11 +196,8 @@ pub async fn await_finish( Ok(SubkernelFinished { id: id, status: status, - exception: if status == FinishStatus::Exception { - Some( - drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, subkernel.destination) - .await?, - ) + exception: if let FinishStatus::Exception(dest) = status { + Some(drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, dest).await?) } else { None }, @@ -292,7 +289,7 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result< status: FinishStatus::CommLost, } => return Err(Error::CommLost), SubkernelState::Finished { - status: FinishStatus::Exception, + status: FinishStatus::Exception(_), } => return Err(Error::SubkernelException), _ => (), }