forked from M-Labs/artiq-zynq
Compare commits
1 Commits
d7d553be41
...
30adcdfc9e
Author | SHA1 | Date |
---|---|---|
mwojcik | 30adcdfc9e |
|
@ -87,7 +87,7 @@ pub enum Message {
|
||||||
#[cfg(has_drtio)]
|
#[cfg(has_drtio)]
|
||||||
SubkernelAwaitFinishRequest {
|
SubkernelAwaitFinishRequest {
|
||||||
id: u32,
|
id: u32,
|
||||||
timeout: u64,
|
timeout: i64,
|
||||||
},
|
},
|
||||||
#[cfg(has_drtio)]
|
#[cfg(has_drtio)]
|
||||||
SubkernelAwaitFinishReply {
|
SubkernelAwaitFinishReply {
|
||||||
|
@ -104,7 +104,7 @@ pub enum Message {
|
||||||
#[cfg(has_drtio)]
|
#[cfg(has_drtio)]
|
||||||
SubkernelMsgRecvRequest {
|
SubkernelMsgRecvRequest {
|
||||||
id: u32,
|
id: u32,
|
||||||
timeout: u64,
|
timeout: i64,
|
||||||
tags: Vec<u8>,
|
tags: Vec<u8>,
|
||||||
},
|
},
|
||||||
#[cfg(has_drtio)]
|
#[cfg(has_drtio)]
|
||||||
|
|
|
@ -169,25 +169,34 @@ pub async fn await_finish(
|
||||||
routing_table: &RoutingTable,
|
routing_table: &RoutingTable,
|
||||||
timer: GlobalTimer,
|
timer: GlobalTimer,
|
||||||
id: u32,
|
id: u32,
|
||||||
timeout: u64,
|
timeout: i64,
|
||||||
) -> Result<SubkernelFinished, Error> {
|
) -> Result<SubkernelFinished, Error> {
|
||||||
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
|
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
|
||||||
SubkernelState::Running | SubkernelState::Finished { .. } => (),
|
SubkernelState::Running | SubkernelState::Finished { .. } => (),
|
||||||
_ => return Err(Error::IncorrectState),
|
_ => return Err(Error::IncorrectState),
|
||||||
}
|
}
|
||||||
let max_time = timer.get_time() + Milliseconds(timeout);
|
if timeout > 0 {
|
||||||
while timer.get_time() < max_time {
|
let max_time = timer.get_time() + Milliseconds(timeout);
|
||||||
{
|
while timer.get_time() < max_time {
|
||||||
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
|
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
|
||||||
SubkernelState::Finished { .. } => break,
|
SubkernelState::Finished { .. } => break,
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
|
task::r#yield().await;
|
||||||
|
}
|
||||||
|
if timer.get_time() >= max_time {
|
||||||
|
error!("Remote subkernel finish await timed out");
|
||||||
|
return Err(Error::Timeout);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// no timeout, wait forever
|
||||||
|
loop {
|
||||||
|
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
|
||||||
|
SubkernelState::Finished { .. } => break,
|
||||||
|
_ => (),
|
||||||
|
};
|
||||||
|
task::r#yield().await;
|
||||||
}
|
}
|
||||||
task::r#yield().await;
|
|
||||||
}
|
|
||||||
if timer.get_time() >= max_time {
|
|
||||||
error!("Remote subkernel finish await timed out");
|
|
||||||
return Err(Error::Timeout);
|
|
||||||
}
|
}
|
||||||
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
|
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
|
||||||
match subkernel.state {
|
match subkernel.state {
|
||||||
|
@ -231,8 +240,9 @@ pub async fn message_handle_incoming(
|
||||||
{
|
{
|
||||||
let subkernel_lock = SUBKERNELS.async_lock().await;
|
let subkernel_lock = SUBKERNELS.async_lock().await;
|
||||||
let subkernel = subkernel_lock.get(&id);
|
let subkernel = subkernel_lock.get(&id);
|
||||||
if subkernel.is_none() || subkernel.unwrap().state != SubkernelState::Running {
|
if subkernel.is_some() && subkernel.unwrap().state != SubkernelState::Running {
|
||||||
// do not add messages for non-existing or deleted subkernels
|
// do not add messages for non-running or deleted subkernels
|
||||||
|
warn!("received a message for a non-running subkernel #{}", id);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -264,16 +274,19 @@ pub async fn message_handle_incoming(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<Message, Error> {
|
pub async fn message_await(id: u32, timeout: i64, timer: GlobalTimer) -> Result<Message, Error> {
|
||||||
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
|
let is_subkernel = SUBKERNELS.async_lock().await.get(&id).is_some();
|
||||||
SubkernelState::Finished {
|
if is_subkernel {
|
||||||
status: FinishStatus::CommLost,
|
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
|
||||||
} => return Err(Error::CommLost),
|
SubkernelState::Finished {
|
||||||
SubkernelState::Running | SubkernelState::Finished { .. } => (),
|
status: FinishStatus::CommLost,
|
||||||
_ => return Err(Error::IncorrectState),
|
} => return Err(Error::CommLost),
|
||||||
|
SubkernelState::Running | SubkernelState::Finished { .. } => (),
|
||||||
|
_ => return Err(Error::IncorrectState),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let max_time = timer.get_time() + Milliseconds(timeout);
|
let max_time = timer.get_time() + Milliseconds(timeout);
|
||||||
while timer.get_time() < max_time {
|
while timeout < 0 || (timeout > 0 && timer.get_time() < max_time) {
|
||||||
{
|
{
|
||||||
let mut message_queue = MESSAGE_QUEUE.async_lock().await;
|
let mut message_queue = MESSAGE_QUEUE.async_lock().await;
|
||||||
for i in 0..message_queue.len() {
|
for i in 0..message_queue.len() {
|
||||||
|
@ -284,14 +297,16 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
|
if is_subkernel {
|
||||||
SubkernelState::Finished {
|
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
|
||||||
status: FinishStatus::CommLost,
|
SubkernelState::Finished {
|
||||||
} => return Err(Error::CommLost),
|
status: FinishStatus::CommLost,
|
||||||
SubkernelState::Finished {
|
} => return Err(Error::CommLost),
|
||||||
status: FinishStatus::Exception(_),
|
SubkernelState::Finished {
|
||||||
} => return Err(Error::SubkernelException),
|
status: FinishStatus::Exception(_),
|
||||||
_ => (),
|
} => return Err(Error::SubkernelException),
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
task::r#yield().await;
|
task::r#yield().await;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use alloc::{collections::{BTreeMap, VecDeque},
|
use alloc::{collections::BTreeMap,
|
||||||
format,
|
format,
|
||||||
string::{String, ToString},
|
string::{String, ToString},
|
||||||
vec::Vec};
|
vec::Vec};
|
||||||
|
@ -23,11 +23,15 @@ enum KernelState {
|
||||||
Absent,
|
Absent,
|
||||||
Loaded,
|
Loaded,
|
||||||
Running,
|
Running,
|
||||||
MsgAwait(Milliseconds, Vec<u8>),
|
MsgAwait {
|
||||||
|
max_time: Option<Milliseconds>,
|
||||||
|
id: u32,
|
||||||
|
tags: Vec<u8>
|
||||||
|
},
|
||||||
MsgSending,
|
MsgSending,
|
||||||
SubkernelAwaitLoad,
|
SubkernelAwaitLoad,
|
||||||
SubkernelAwaitFinish {
|
SubkernelAwaitFinish {
|
||||||
max_time: Milliseconds,
|
max_time: Option<Milliseconds>,
|
||||||
id: u32,
|
id: u32,
|
||||||
},
|
},
|
||||||
DmaUploading,
|
DmaUploading,
|
||||||
|
@ -110,7 +114,7 @@ enum OutMessageState {
|
||||||
struct MessageManager {
|
struct MessageManager {
|
||||||
out_message: Option<Sliceable>,
|
out_message: Option<Sliceable>,
|
||||||
out_state: OutMessageState,
|
out_state: OutMessageState,
|
||||||
in_queue: VecDeque<Message>,
|
in_queue: Vec<Message>,
|
||||||
in_buffer: Option<Message>,
|
in_buffer: Option<Message>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,7 +174,7 @@ impl MessageManager {
|
||||||
MessageManager {
|
MessageManager {
|
||||||
out_message: None,
|
out_message: None,
|
||||||
out_state: OutMessageState::NoMessage,
|
out_state: OutMessageState::NoMessage,
|
||||||
in_queue: VecDeque::new(),
|
in_queue: Vec::new(),
|
||||||
in_buffer: None,
|
in_buffer: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -191,7 +195,7 @@ impl MessageManager {
|
||||||
};
|
};
|
||||||
if status.is_last() {
|
if status.is_last() {
|
||||||
// when done, remove from working queue
|
// when done, remove from working queue
|
||||||
self.in_queue.push_back(self.in_buffer.take().unwrap());
|
self.in_queue.push(self.in_buffer.take().unwrap());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,8 +269,13 @@ impl MessageManager {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_incoming(&mut self) -> Option<Message> {
|
pub fn get_incoming(&mut self, id: u32) -> Option<Message> {
|
||||||
self.in_queue.pop_front()
|
for i in 0..self.in_queue.len() {
|
||||||
|
if self.in_queue[i].id == id {
|
||||||
|
return Some(self.in_queue.remove(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -707,9 +716,13 @@ impl<'a> Manager<'_> {
|
||||||
)?;
|
)?;
|
||||||
self.session.kernel_state = KernelState::MsgSending;
|
self.session.kernel_state = KernelState::MsgSending;
|
||||||
}
|
}
|
||||||
kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => {
|
kernel::Message::SubkernelMsgRecvRequest { id, timeout, tags } => {
|
||||||
let max_time = timer.get_time() + Milliseconds(timeout);
|
let max_time = if timeout > 0 {
|
||||||
self.session.kernel_state = KernelState::MsgAwait(max_time, tags);
|
Some(timer.get_time() + Milliseconds(timeout));
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
self.session.kernel_state = KernelState::MsgAwait { max_time: max_time, id:id, tags: tags };
|
||||||
}
|
}
|
||||||
kernel::Message::SubkernelLoadRunRequest {
|
kernel::Message::SubkernelLoadRunRequest {
|
||||||
id,
|
id,
|
||||||
|
@ -731,7 +744,11 @@ impl<'a> Manager<'_> {
|
||||||
}
|
}
|
||||||
|
|
||||||
kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => {
|
kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => {
|
||||||
let max_time = timer.get_time() + Milliseconds(timeout);
|
let max_time = if timeout > 0 {
|
||||||
|
Some(timer.get_time() + Milliseconds(timeout));
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
self.session.kernel_state = KernelState::SubkernelAwaitFinish {
|
self.session.kernel_state = KernelState::SubkernelAwaitFinish {
|
||||||
max_time: max_time,
|
max_time: max_time,
|
||||||
id: id,
|
id: id,
|
||||||
|
@ -751,16 +768,18 @@ impl<'a> Manager<'_> {
|
||||||
|
|
||||||
fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> {
|
fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> {
|
||||||
match &self.session.kernel_state {
|
match &self.session.kernel_state {
|
||||||
KernelState::MsgAwait(timeout, tags) => {
|
KernelState::MsgAwait {max_time , id, tags } => {
|
||||||
if timer.get_time() > *timeout {
|
if let Some(max_time) = *max_time {
|
||||||
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
|
if timer.get_time() > max_time {
|
||||||
status: kernel::SubkernelStatus::Timeout,
|
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
|
||||||
count: 0,
|
status: kernel::SubkernelStatus::Timeout,
|
||||||
});
|
count: 0,
|
||||||
self.session.kernel_state = KernelState::Running;
|
});
|
||||||
return Ok(());
|
self.session.kernel_state = KernelState::Running;
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if let Some(message) = self.session.messages.get_incoming() {
|
if let Some(message) = self.session.messages.get_incoming(id) {
|
||||||
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
|
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
|
||||||
status: kernel::SubkernelStatus::NoError,
|
status: kernel::SubkernelStatus::NoError,
|
||||||
count: message.count,
|
count: message.count,
|
||||||
|
@ -782,25 +801,27 @@ impl<'a> Manager<'_> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
KernelState::SubkernelAwaitFinish { max_time, id } => {
|
KernelState::SubkernelAwaitFinish { max_time, id } => {
|
||||||
if timer.get_time() > *max_time {
|
if let Some(max_time) = *max_time {
|
||||||
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
|
if timer.get_time() > *max_time {
|
||||||
status: kernel::SubkernelStatus::Timeout,
|
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
|
||||||
});
|
status: kernel::SubkernelStatus::Timeout,
|
||||||
self.session.kernel_state = KernelState::Running;
|
});
|
||||||
} else {
|
self.session.kernel_state = KernelState::Running;
|
||||||
let mut i = 0;
|
return Ok(());
|
||||||
for status in &self.session.subkernels_finished {
|
|
||||||
if *status == *id {
|
|
||||||
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
|
|
||||||
status: kernel::SubkernelStatus::NoError,
|
|
||||||
});
|
|
||||||
self.session.kernel_state = KernelState::Running;
|
|
||||||
self.session.subkernels_finished.swap_remove(i);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
i += 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
let mut i = 0;
|
||||||
|
for status in &self.session.subkernels_finished {
|
||||||
|
if *status == *id {
|
||||||
|
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
|
||||||
|
status: kernel::SubkernelStatus::NoError,
|
||||||
|
});
|
||||||
|
self.session.kernel_state = KernelState::Running;
|
||||||
|
self.session.subkernels_finished.swap_remove(i);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => {
|
KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => {
|
||||||
|
|
Loading…
Reference in New Issue