diff --git a/src/libksupport/src/kernel/mod.rs b/src/libksupport/src/kernel/mod.rs index 664f32b..89dacb3 100644 --- a/src/libksupport/src/kernel/mod.rs +++ b/src/libksupport/src/kernel/mod.rs @@ -87,7 +87,7 @@ pub enum Message { #[cfg(has_drtio)] SubkernelAwaitFinishRequest { id: u32, - timeout: u64, + timeout: i64, }, #[cfg(has_drtio)] SubkernelAwaitFinishReply { @@ -104,7 +104,7 @@ pub enum Message { #[cfg(has_drtio)] SubkernelMsgRecvRequest { id: u32, - timeout: u64, + timeout: i64, tags: Vec, }, #[cfg(has_drtio)] diff --git a/src/libksupport/src/kernel/subkernel.rs b/src/libksupport/src/kernel/subkernel.rs index 90a71e0..511e92b 100644 --- a/src/libksupport/src/kernel/subkernel.rs +++ b/src/libksupport/src/kernel/subkernel.rs @@ -25,7 +25,7 @@ pub extern "C" fn load_run(id: u32, destination: u8, run: bool) { } } -pub extern "C" fn await_finish(id: u32, timeout: u64) { +pub extern "C" fn await_finish(id: u32, timeout: i64) { unsafe { KERNEL_CHANNEL_1TO0 .as_mut() @@ -80,7 +80,7 @@ pub extern "C" fn send_message( } } -pub extern "C" fn await_message(id: u32, timeout: u64, tags: &CSlice, min: u8, max: u8) { +pub extern "C" fn await_message(id: i32, timeout: i64, tags: &CSlice, min: u8, max: u8) { unsafe { KERNEL_CHANNEL_1TO0 .as_mut() diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index f0c42cb..163793b 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -453,10 +453,10 @@ async fn handle_run_kernel( #[cfg(has_drtio)] kernel::Message::SubkernelMsgSend { id, - destination: _, + destination, data, } => { - let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await; + let res = subkernel::message_send(aux_mutex, routing_table, timer, id, destination.unwrap(), data).await; match res { Ok(_) => (), Err(e) => { diff --git a/src/runtime/src/subkernel.rs b/src/runtime/src/subkernel.rs index 02d9f3b..1e4f720 100644 --- a/src/runtime/src/subkernel.rs +++ b/src/runtime/src/subkernel.rs @@ -5,7 +5,7 @@ use libboard_artiq::{drtio_routing::RoutingTable, drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libcortex_a9::mutex::Mutex; -use log::error; +use log::{error, warn}; use crate::rtio_mgt::drtio; @@ -169,25 +169,34 @@ pub async fn await_finish( routing_table: &RoutingTable, timer: GlobalTimer, id: u32, - timeout: u64, + timeout: i64, ) -> Result { match SUBKERNELS.async_lock().await.get(&id).unwrap().state { SubkernelState::Running | SubkernelState::Finished { .. } => (), _ => return Err(Error::IncorrectState), } - let max_time = timer.get_time() + Milliseconds(timeout); - while timer.get_time() < max_time { - { + if timeout > 0 { + let max_time = timer.get_time() + Milliseconds(timeout as u64); + while timer.get_time() < max_time { match SUBKERNELS.async_lock().await.get(&id).unwrap().state { SubkernelState::Finished { .. } => break, _ => (), }; + task::r#yield().await; + } + if timer.get_time() >= max_time { + error!("Remote subkernel finish await timed out"); + return Err(Error::Timeout); + } + } else { + // no timeout, wait forever + loop { + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Finished { .. } => break, + _ => (), + }; + task::r#yield().await; } - task::r#yield().await; - } - if timer.get_time() >= max_time { - error!("Remote subkernel finish await timed out"); - return Err(Error::Timeout); } if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { match subkernel.state { @@ -231,8 +240,9 @@ pub async fn message_handle_incoming( { let subkernel_lock = SUBKERNELS.async_lock().await; let subkernel = subkernel_lock.get(&id); - if subkernel.is_none() || subkernel.unwrap().state != SubkernelState::Running { - // do not add messages for non-existing or deleted subkernels + if subkernel.is_some() && subkernel.unwrap().state != SubkernelState::Running { + // do not add messages for non-running or deleted subkernels + warn!("received a message for a non-running subkernel #{}", id); return; } } @@ -264,16 +274,19 @@ pub async fn message_handle_incoming( } } -pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result { - match SUBKERNELS.async_lock().await.get(&id).unwrap().state { - SubkernelState::Finished { - status: FinishStatus::CommLost, - } => return Err(Error::CommLost), - SubkernelState::Running | SubkernelState::Finished { .. } => (), - _ => return Err(Error::IncorrectState), +pub async fn message_await(id: u32, timeout: i64, timer: GlobalTimer) -> Result { + let is_subkernel = SUBKERNELS.async_lock().await.get(&id).is_some(); + if is_subkernel { + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Finished { + status: FinishStatus::CommLost, + } => return Err(Error::CommLost), + SubkernelState::Running | SubkernelState::Finished { .. } => (), + _ => return Err(Error::IncorrectState), + } } - let max_time = timer.get_time() + Milliseconds(timeout); - while timer.get_time() < max_time { + let max_time = timer.get_time() + Milliseconds(timeout as u64); + while timeout < 0 || (timeout > 0 && timer.get_time() < max_time) { { let mut message_queue = MESSAGE_QUEUE.async_lock().await; for i in 0..message_queue.len() { @@ -284,14 +297,16 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result< } } } - match SUBKERNELS.async_lock().await.get(&id).unwrap().state { - SubkernelState::Finished { - status: FinishStatus::CommLost, - } => return Err(Error::CommLost), - SubkernelState::Finished { - status: FinishStatus::Exception(_), - } => return Err(Error::SubkernelException), - _ => (), + if is_subkernel { + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Finished { + status: FinishStatus::CommLost, + } => return Err(Error::CommLost), + SubkernelState::Finished { + status: FinishStatus::Exception(_), + } => return Err(Error::SubkernelException), + _ => (), + } } task::r#yield().await; } @@ -303,9 +318,8 @@ pub async fn message_send<'a>( routing_table: &RoutingTable, timer: GlobalTimer, id: u32, + destination: u8, message: Vec, ) -> Result<(), Error> { - let destination = SUBKERNELS.async_lock().await.get(&id).unwrap().destination; - // rpc data prepared by the kernel core already Ok(drtio::subkernel_send_message(aux_mutex, routing_table, timer, id, destination, &message).await?) } diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index fab1218..5cded72 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -657,13 +657,13 @@ fn process_aux_packet( drtioaux::Packet::SubkernelMessage { source, destination: _destination, - id: _id, + id, status, length, data, } => { forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); - kernel_manager.message_handle_incoming(status, length as usize, &data); + kernel_manager.message_handle_incoming(status, id, length as usize, &data); router.send( drtioaux::Packet::SubkernelMessageAck { destination: source }, _routing_table, diff --git a/src/satman/src/subkernel.rs b/src/satman/src/subkernel.rs index 50c9690..af86506 100644 --- a/src/satman/src/subkernel.rs +++ b/src/satman/src/subkernel.rs @@ -1,4 +1,4 @@ -use alloc::{collections::{BTreeMap, VecDeque}, +use alloc::{collections::BTreeMap, format, string::{String, ToString}, vec::Vec}; @@ -23,11 +23,15 @@ enum KernelState { Absent, Loaded, Running, - MsgAwait(Milliseconds, Vec), + MsgAwait { + max_time: Option, + id: u32, + tags: Vec + }, MsgSending, SubkernelAwaitLoad, SubkernelAwaitFinish { - max_time: Milliseconds, + max_time: Option, id: u32, }, DmaUploading, @@ -95,6 +99,7 @@ macro_rules! unexpected { /* represents interkernel messages */ struct Message { count: u8, + id: u32, data: Vec, } @@ -110,7 +115,7 @@ enum OutMessageState { struct MessageManager { out_message: Option, out_state: OutMessageState, - in_queue: VecDeque, + in_queue: Vec, in_buffer: Option, } @@ -170,12 +175,12 @@ impl MessageManager { MessageManager { out_message: None, out_state: OutMessageState::NoMessage, - in_queue: VecDeque::new(), + in_queue: Vec::new(), in_buffer: None, } } - pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + pub fn handle_incoming(&mut self, status: PayloadStatus, id: u32, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { // called when receiving a message from master if status.is_first() { self.in_buffer = None; @@ -185,13 +190,14 @@ impl MessageManager { None => { self.in_buffer = Some(Message { count: data[0], + id: id, data: data[1..length].to_vec(), }); } }; if status.is_last() { // when done, remove from working queue - self.in_queue.push_back(self.in_buffer.take().unwrap()); + self.in_queue.push(self.in_buffer.take().unwrap()); } } @@ -265,8 +271,13 @@ impl MessageManager { Ok(()) } - pub fn get_incoming(&mut self) -> Option { - self.in_queue.pop_front() + pub fn get_incoming(&mut self, id: u32) -> Option { + for i in 0..self.in_queue.len() { + if self.in_queue[i].id == id { + return Some(self.in_queue.remove(i)); + } + } + None } } @@ -344,13 +355,14 @@ impl<'a> Manager<'_> { pub fn message_handle_incoming( &mut self, status: PayloadStatus, + id: u32, length: usize, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE], ) { if !self.running() { return; } - self.session.messages.handle_incoming(status, length, slice); + self.session.messages.handle_incoming(status, id, length, slice); } pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option { @@ -707,9 +719,14 @@ impl<'a> Manager<'_> { )?; self.session.kernel_state = KernelState::MsgSending; } - kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => { - let max_time = timer.get_time() + Milliseconds(timeout); - self.session.kernel_state = KernelState::MsgAwait(max_time, tags); + kernel::Message::SubkernelMsgRecvRequest { id, timeout, tags } => { + let id = if id == -1 { self.session.id } else { id as u32 }; + let max_time = if timeout > 0 { + Some(timer.get_time() + Milliseconds(timeout as u64)) + } else { + None + }; + self.session.kernel_state = KernelState::MsgAwait { max_time: max_time, id:id, tags: tags }; } kernel::Message::SubkernelLoadRunRequest { id, @@ -731,7 +748,11 @@ impl<'a> Manager<'_> { } kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => { - let max_time = timer.get_time() + Milliseconds(timeout); + let max_time = if timeout > 0 { + Some(timer.get_time() + Milliseconds(timeout as u64)) + } else { + None + }; self.session.kernel_state = KernelState::SubkernelAwaitFinish { max_time: max_time, id: id, @@ -751,16 +772,18 @@ impl<'a> Manager<'_> { fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> { match &self.session.kernel_state { - KernelState::MsgAwait(timeout, tags) => { - if timer.get_time() > *timeout { - self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { - status: kernel::SubkernelStatus::Timeout, - count: 0, - }); - self.session.kernel_state = KernelState::Running; - return Ok(()); + KernelState::MsgAwait {max_time , id, tags } => { + if let Some(max_time) = *max_time { + if timer.get_time() > max_time { + self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { + status: kernel::SubkernelStatus::Timeout, + count: 0, + }); + self.session.kernel_state = KernelState::Running; + return Ok(()); + } } - if let Some(message) = self.session.messages.get_incoming() { + if let Some(message) = self.session.messages.get_incoming(*id) { self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { status: kernel::SubkernelStatus::NoError, count: message.count, @@ -782,25 +805,27 @@ impl<'a> Manager<'_> { } } KernelState::SubkernelAwaitFinish { max_time, id } => { - if timer.get_time() > *max_time { - self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { - status: kernel::SubkernelStatus::Timeout, - }); - self.session.kernel_state = KernelState::Running; - } else { - let mut i = 0; - for status in &self.session.subkernels_finished { - if *status == *id { - self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { - status: kernel::SubkernelStatus::NoError, - }); - self.session.kernel_state = KernelState::Running; - self.session.subkernels_finished.swap_remove(i); - break; - } - i += 1; + if let Some(max_time) = *max_time { + if timer.get_time() > max_time { + self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { + status: kernel::SubkernelStatus::Timeout, + }); + self.session.kernel_state = KernelState::Running; + return Ok(()); } } + let mut i = 0; + for status in &self.session.subkernels_finished { + if *status == *id { + self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { + status: kernel::SubkernelStatus::NoError, + }); + self.session.kernel_state = KernelState::Running; + self.session.subkernels_finished.swap_remove(i); + break; + } + i += 1; + } Ok(()) } KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => {