Compare commits

...

1 Commits

Author SHA1 Message Date
mwojcik 30adcdfc9e subkernel: support no-timeout, message passing 2024-01-30 14:04:11 +08:00
3 changed files with 103 additions and 67 deletions

View File

@ -87,7 +87,7 @@ pub enum Message {
#[cfg(has_drtio)]
SubkernelAwaitFinishRequest {
id: u32,
timeout: u64,
timeout: i64,
},
#[cfg(has_drtio)]
SubkernelAwaitFinishReply {
@ -104,7 +104,7 @@ pub enum Message {
#[cfg(has_drtio)]
SubkernelMsgRecvRequest {
id: u32,
timeout: u64,
timeout: i64,
tags: Vec<u8>,
},
#[cfg(has_drtio)]

View File

@ -169,25 +169,34 @@ pub async fn await_finish(
routing_table: &RoutingTable,
timer: GlobalTimer,
id: u32,
timeout: u64,
timeout: i64,
) -> Result<SubkernelFinished, Error> {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Running | SubkernelState::Finished { .. } => (),
_ => return Err(Error::IncorrectState),
}
let max_time = timer.get_time() + Milliseconds(timeout);
while timer.get_time() < max_time {
{
if timeout > 0 {
let max_time = timer.get_time() + Milliseconds(timeout);
while timer.get_time() < max_time {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished { .. } => break,
_ => (),
};
task::r#yield().await;
}
if timer.get_time() >= max_time {
error!("Remote subkernel finish await timed out");
return Err(Error::Timeout);
}
} else {
// no timeout, wait forever
loop {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished { .. } => break,
_ => (),
};
task::r#yield().await;
}
task::r#yield().await;
}
if timer.get_time() >= max_time {
error!("Remote subkernel finish await timed out");
return Err(Error::Timeout);
}
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
match subkernel.state {
@ -231,8 +240,9 @@ pub async fn message_handle_incoming(
{
let subkernel_lock = SUBKERNELS.async_lock().await;
let subkernel = subkernel_lock.get(&id);
if subkernel.is_none() || subkernel.unwrap().state != SubkernelState::Running {
// do not add messages for non-existing or deleted subkernels
if subkernel.is_some() && subkernel.unwrap().state != SubkernelState::Running {
// do not add messages for non-running or deleted subkernels
warn!("received a message for a non-running subkernel #{}", id);
return;
}
}
@ -264,16 +274,19 @@ pub async fn message_handle_incoming(
}
}
pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<Message, Error> {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished {
status: FinishStatus::CommLost,
} => return Err(Error::CommLost),
SubkernelState::Running | SubkernelState::Finished { .. } => (),
_ => return Err(Error::IncorrectState),
pub async fn message_await(id: u32, timeout: i64, timer: GlobalTimer) -> Result<Message, Error> {
let is_subkernel = SUBKERNELS.async_lock().await.get(&id).is_some();
if is_subkernel {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished {
status: FinishStatus::CommLost,
} => return Err(Error::CommLost),
SubkernelState::Running | SubkernelState::Finished { .. } => (),
_ => return Err(Error::IncorrectState),
}
}
let max_time = timer.get_time() + Milliseconds(timeout);
while timer.get_time() < max_time {
while timeout < 0 || (timeout > 0 && timer.get_time() < max_time) {
{
let mut message_queue = MESSAGE_QUEUE.async_lock().await;
for i in 0..message_queue.len() {
@ -284,14 +297,16 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<
}
}
}
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished {
status: FinishStatus::CommLost,
} => return Err(Error::CommLost),
SubkernelState::Finished {
status: FinishStatus::Exception(_),
} => return Err(Error::SubkernelException),
_ => (),
if is_subkernel {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished {
status: FinishStatus::CommLost,
} => return Err(Error::CommLost),
SubkernelState::Finished {
status: FinishStatus::Exception(_),
} => return Err(Error::SubkernelException),
_ => (),
}
}
task::r#yield().await;
}

View File

@ -1,4 +1,4 @@
use alloc::{collections::{BTreeMap, VecDeque},
use alloc::{collections::BTreeMap,
format,
string::{String, ToString},
vec::Vec};
@ -23,11 +23,15 @@ enum KernelState {
Absent,
Loaded,
Running,
MsgAwait(Milliseconds, Vec<u8>),
MsgAwait {
max_time: Option<Milliseconds>,
id: u32,
tags: Vec<u8>
},
MsgSending,
SubkernelAwaitLoad,
SubkernelAwaitFinish {
max_time: Milliseconds,
max_time: Option<Milliseconds>,
id: u32,
},
DmaUploading,
@ -110,7 +114,7 @@ enum OutMessageState {
struct MessageManager {
out_message: Option<Sliceable>,
out_state: OutMessageState,
in_queue: VecDeque<Message>,
in_queue: Vec<Message>,
in_buffer: Option<Message>,
}
@ -170,7 +174,7 @@ impl MessageManager {
MessageManager {
out_message: None,
out_state: OutMessageState::NoMessage,
in_queue: VecDeque::new(),
in_queue: Vec::new(),
in_buffer: None,
}
}
@ -191,7 +195,7 @@ impl MessageManager {
};
if status.is_last() {
// when done, remove from working queue
self.in_queue.push_back(self.in_buffer.take().unwrap());
self.in_queue.push(self.in_buffer.take().unwrap());
}
}
@ -265,8 +269,13 @@ impl MessageManager {
Ok(())
}
pub fn get_incoming(&mut self) -> Option<Message> {
self.in_queue.pop_front()
pub fn get_incoming(&mut self, id: u32) -> Option<Message> {
for i in 0..self.in_queue.len() {
if self.in_queue[i].id == id {
return Some(self.in_queue.remove(i));
}
}
None
}
}
@ -707,9 +716,13 @@ impl<'a> Manager<'_> {
)?;
self.session.kernel_state = KernelState::MsgSending;
}
kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => {
let max_time = timer.get_time() + Milliseconds(timeout);
self.session.kernel_state = KernelState::MsgAwait(max_time, tags);
kernel::Message::SubkernelMsgRecvRequest { id, timeout, tags } => {
let max_time = if timeout > 0 {
Some(timer.get_time() + Milliseconds(timeout));
} else {
None
}
self.session.kernel_state = KernelState::MsgAwait { max_time: max_time, id:id, tags: tags };
}
kernel::Message::SubkernelLoadRunRequest {
id,
@ -731,7 +744,11 @@ impl<'a> Manager<'_> {
}
kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => {
let max_time = timer.get_time() + Milliseconds(timeout);
let max_time = if timeout > 0 {
Some(timer.get_time() + Milliseconds(timeout));
} else {
None
}
self.session.kernel_state = KernelState::SubkernelAwaitFinish {
max_time: max_time,
id: id,
@ -751,16 +768,18 @@ impl<'a> Manager<'_> {
fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> {
match &self.session.kernel_state {
KernelState::MsgAwait(timeout, tags) => {
if timer.get_time() > *timeout {
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
status: kernel::SubkernelStatus::Timeout,
count: 0,
});
self.session.kernel_state = KernelState::Running;
return Ok(());
KernelState::MsgAwait {max_time , id, tags } => {
if let Some(max_time) = *max_time {
if timer.get_time() > max_time {
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
status: kernel::SubkernelStatus::Timeout,
count: 0,
});
self.session.kernel_state = KernelState::Running;
return Ok(());
}
}
if let Some(message) = self.session.messages.get_incoming() {
if let Some(message) = self.session.messages.get_incoming(id) {
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
status: kernel::SubkernelStatus::NoError,
count: message.count,
@ -782,25 +801,27 @@ impl<'a> Manager<'_> {
}
}
KernelState::SubkernelAwaitFinish { max_time, id } => {
if timer.get_time() > *max_time {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::Timeout,
});
self.session.kernel_state = KernelState::Running;
} else {
let mut i = 0;
for status in &self.session.subkernels_finished {
if *status == *id {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::NoError,
});
self.session.kernel_state = KernelState::Running;
self.session.subkernels_finished.swap_remove(i);
break;
}
i += 1;
if let Some(max_time) = *max_time {
if timer.get_time() > *max_time {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::Timeout,
});
self.session.kernel_state = KernelState::Running;
return Ok(());
}
}
let mut i = 0;
for status in &self.session.subkernels_finished {
if *status == *id {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::NoError,
});
self.session.kernel_state = KernelState::Running;
self.session.subkernels_finished.swap_remove(i);
break;
}
i += 1;
}
Ok(())
}
KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => {