diff --git a/src/libksupport/src/kernel/mod.rs b/src/libksupport/src/kernel/mod.rs index 664f32bc..89dacb38 100644 --- a/src/libksupport/src/kernel/mod.rs +++ b/src/libksupport/src/kernel/mod.rs @@ -87,7 +87,7 @@ pub enum Message { #[cfg(has_drtio)] SubkernelAwaitFinishRequest { id: u32, - timeout: u64, + timeout: i64, }, #[cfg(has_drtio)] SubkernelAwaitFinishReply { @@ -104,7 +104,7 @@ pub enum Message { #[cfg(has_drtio)] SubkernelMsgRecvRequest { id: u32, - timeout: u64, + timeout: i64, tags: Vec, }, #[cfg(has_drtio)] diff --git a/src/runtime/src/subkernel.rs b/src/runtime/src/subkernel.rs index 02d9f3be..9c933f8b 100644 --- a/src/runtime/src/subkernel.rs +++ b/src/runtime/src/subkernel.rs @@ -169,25 +169,34 @@ pub async fn await_finish( routing_table: &RoutingTable, timer: GlobalTimer, id: u32, - timeout: u64, + timeout: i64, ) -> Result { match SUBKERNELS.async_lock().await.get(&id).unwrap().state { SubkernelState::Running | SubkernelState::Finished { .. } => (), _ => return Err(Error::IncorrectState), } - let max_time = timer.get_time() + Milliseconds(timeout); - while timer.get_time() < max_time { - { + if timeout > 0 { + let max_time = timer.get_time() + Milliseconds(timeout); + while timer.get_time() < max_time { match SUBKERNELS.async_lock().await.get(&id).unwrap().state { SubkernelState::Finished { .. } => break, _ => (), }; + task::r#yield().await; + } + if timer.get_time() >= max_time { + error!("Remote subkernel finish await timed out"); + return Err(Error::Timeout); + } + } else { + // no timeout, wait forever + loop { + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Finished { .. } => break, + _ => (), + }; + task::r#yield().await; } - task::r#yield().await; - } - if timer.get_time() >= max_time { - error!("Remote subkernel finish await timed out"); - return Err(Error::Timeout); } if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { match subkernel.state { @@ -231,8 +240,9 @@ pub async fn message_handle_incoming( { let subkernel_lock = SUBKERNELS.async_lock().await; let subkernel = subkernel_lock.get(&id); - if subkernel.is_none() || subkernel.unwrap().state != SubkernelState::Running { - // do not add messages for non-existing or deleted subkernels + if subkernel.is_some() && subkernel.unwrap().state != SubkernelState::Running { + // do not add messages for non-running or deleted subkernels + warn!("received a message for a non-running subkernel #{}", id); return; } } @@ -264,16 +274,19 @@ pub async fn message_handle_incoming( } } -pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result { - match SUBKERNELS.async_lock().await.get(&id).unwrap().state { - SubkernelState::Finished { - status: FinishStatus::CommLost, - } => return Err(Error::CommLost), - SubkernelState::Running | SubkernelState::Finished { .. } => (), - _ => return Err(Error::IncorrectState), +pub async fn message_await(id: u32, timeout: i64, timer: GlobalTimer) -> Result { + let is_subkernel = SUBKERNELS.async_lock().await.get(&id).is_some(); + if is_subkernel { + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Finished { + status: FinishStatus::CommLost, + } => return Err(Error::CommLost), + SubkernelState::Running | SubkernelState::Finished { .. } => (), + _ => return Err(Error::IncorrectState), + } } let max_time = timer.get_time() + Milliseconds(timeout); - while timer.get_time() < max_time { + while timeout < 0 || (timeout > 0 && timer.get_time() < max_time) { { let mut message_queue = MESSAGE_QUEUE.async_lock().await; for i in 0..message_queue.len() { @@ -284,14 +297,16 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result< } } } - match SUBKERNELS.async_lock().await.get(&id).unwrap().state { - SubkernelState::Finished { - status: FinishStatus::CommLost, - } => return Err(Error::CommLost), - SubkernelState::Finished { - status: FinishStatus::Exception(_), - } => return Err(Error::SubkernelException), - _ => (), + if is_subkernel { + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Finished { + status: FinishStatus::CommLost, + } => return Err(Error::CommLost), + SubkernelState::Finished { + status: FinishStatus::Exception(_), + } => return Err(Error::SubkernelException), + _ => (), + } } task::r#yield().await; } diff --git a/src/satman/src/subkernel.rs b/src/satman/src/subkernel.rs index 50c9690c..2d074e06 100644 --- a/src/satman/src/subkernel.rs +++ b/src/satman/src/subkernel.rs @@ -1,4 +1,4 @@ -use alloc::{collections::{BTreeMap, VecDeque}, +use alloc::{collections::BTreeMap, format, string::{String, ToString}, vec::Vec}; @@ -23,11 +23,15 @@ enum KernelState { Absent, Loaded, Running, - MsgAwait(Milliseconds, Vec), + MsgAwait { + max_time: Option, + id: u32, + tags: Vec + }, MsgSending, SubkernelAwaitLoad, SubkernelAwaitFinish { - max_time: Milliseconds, + max_time: Option, id: u32, }, DmaUploading, @@ -110,7 +114,7 @@ enum OutMessageState { struct MessageManager { out_message: Option, out_state: OutMessageState, - in_queue: VecDeque, + in_queue: Vec, in_buffer: Option, } @@ -170,7 +174,7 @@ impl MessageManager { MessageManager { out_message: None, out_state: OutMessageState::NoMessage, - in_queue: VecDeque::new(), + in_queue: Vec::new(), in_buffer: None, } } @@ -191,7 +195,7 @@ impl MessageManager { }; if status.is_last() { // when done, remove from working queue - self.in_queue.push_back(self.in_buffer.take().unwrap()); + self.in_queue.push(self.in_buffer.take().unwrap()); } } @@ -265,8 +269,13 @@ impl MessageManager { Ok(()) } - pub fn get_incoming(&mut self) -> Option { - self.in_queue.pop_front() + pub fn get_incoming(&mut self, id: u32) -> Option { + for i in 0..self.in_queue.len() { + if self.in_queue[i].id == id { + return Some(self.in_queue.remove(i)); + } + } + None } } @@ -707,9 +716,13 @@ impl<'a> Manager<'_> { )?; self.session.kernel_state = KernelState::MsgSending; } - kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => { - let max_time = timer.get_time() + Milliseconds(timeout); - self.session.kernel_state = KernelState::MsgAwait(max_time, tags); + kernel::Message::SubkernelMsgRecvRequest { id, timeout, tags } => { + let max_time = if timeout > 0 { + Some(timer.get_time() + Milliseconds(timeout)); + } else { + None + } + self.session.kernel_state = KernelState::MsgAwait { max_time: max_time, id:id, tags: tags }; } kernel::Message::SubkernelLoadRunRequest { id, @@ -731,7 +744,11 @@ impl<'a> Manager<'_> { } kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => { - let max_time = timer.get_time() + Milliseconds(timeout); + let max_time = if timeout > 0 { + Some(timer.get_time() + Milliseconds(timeout)); + } else { + None + } self.session.kernel_state = KernelState::SubkernelAwaitFinish { max_time: max_time, id: id, @@ -751,16 +768,18 @@ impl<'a> Manager<'_> { fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> { match &self.session.kernel_state { - KernelState::MsgAwait(timeout, tags) => { - if timer.get_time() > *timeout { - self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { - status: kernel::SubkernelStatus::Timeout, - count: 0, - }); - self.session.kernel_state = KernelState::Running; - return Ok(()); + KernelState::MsgAwait {max_time , id, tags } => { + if let Some(max_time) = *max_time { + if timer.get_time() > max_time { + self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { + status: kernel::SubkernelStatus::Timeout, + count: 0, + }); + self.session.kernel_state = KernelState::Running; + return Ok(()); + } } - if let Some(message) = self.session.messages.get_incoming() { + if let Some(message) = self.session.messages.get_incoming(id) { self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { status: kernel::SubkernelStatus::NoError, count: message.count, @@ -782,25 +801,27 @@ impl<'a> Manager<'_> { } } KernelState::SubkernelAwaitFinish { max_time, id } => { - if timer.get_time() > *max_time { - self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { - status: kernel::SubkernelStatus::Timeout, - }); - self.session.kernel_state = KernelState::Running; - } else { - let mut i = 0; - for status in &self.session.subkernels_finished { - if *status == *id { - self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { - status: kernel::SubkernelStatus::NoError, - }); - self.session.kernel_state = KernelState::Running; - self.session.subkernels_finished.swap_remove(i); - break; - } - i += 1; + if let Some(max_time) = *max_time { + if timer.get_time() > *max_time { + self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { + status: kernel::SubkernelStatus::Timeout, + }); + self.session.kernel_state = KernelState::Running; + return Ok(()); } } + let mut i = 0; + for status in &self.session.subkernels_finished { + if *status == *id { + self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { + status: kernel::SubkernelStatus::NoError, + }); + self.session.kernel_state = KernelState::Running; + self.session.subkernels_finished.swap_remove(i); + break; + } + i += 1; + } Ok(()) } KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => {