subkernel: support no-timeout, message passing

This commit is contained in:
mwojcik 2024-01-30 14:04:11 +08:00
parent 30e6bf4a3a
commit f0f81dbf8a
6 changed files with 118 additions and 79 deletions

View File

@ -87,7 +87,7 @@ pub enum Message {
#[cfg(has_drtio)] #[cfg(has_drtio)]
SubkernelAwaitFinishRequest { SubkernelAwaitFinishRequest {
id: u32, id: u32,
timeout: u64, timeout: i64,
}, },
#[cfg(has_drtio)] #[cfg(has_drtio)]
SubkernelAwaitFinishReply { SubkernelAwaitFinishReply {
@ -104,7 +104,7 @@ pub enum Message {
#[cfg(has_drtio)] #[cfg(has_drtio)]
SubkernelMsgRecvRequest { SubkernelMsgRecvRequest {
id: u32, id: u32,
timeout: u64, timeout: i64,
tags: Vec<u8>, tags: Vec<u8>,
}, },
#[cfg(has_drtio)] #[cfg(has_drtio)]

View File

@ -25,7 +25,7 @@ pub extern "C" fn load_run(id: u32, destination: u8, run: bool) {
} }
} }
pub extern "C" fn await_finish(id: u32, timeout: u64) { pub extern "C" fn await_finish(id: u32, timeout: i64) {
unsafe { unsafe {
KERNEL_CHANNEL_1TO0 KERNEL_CHANNEL_1TO0
.as_mut() .as_mut()
@ -80,7 +80,7 @@ pub extern "C" fn send_message(
} }
} }
pub extern "C" fn await_message(id: u32, timeout: u64, tags: &CSlice<u8>, min: u8, max: u8) { pub extern "C" fn await_message(id: i32, timeout: i64, tags: &CSlice<u8>, min: u8, max: u8) {
unsafe { unsafe {
KERNEL_CHANNEL_1TO0 KERNEL_CHANNEL_1TO0
.as_mut() .as_mut()

View File

@ -453,10 +453,10 @@ async fn handle_run_kernel(
#[cfg(has_drtio)] #[cfg(has_drtio)]
kernel::Message::SubkernelMsgSend { kernel::Message::SubkernelMsgSend {
id, id,
destination: _, destination,
data, data,
} => { } => {
let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await; let res = subkernel::message_send(aux_mutex, routing_table, timer, id, destination.unwrap(), data).await;
match res { match res {
Ok(_) => (), Ok(_) => (),
Err(e) => { Err(e) => {

View File

@ -5,7 +5,7 @@ use libboard_artiq::{drtio_routing::RoutingTable,
drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}}; drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}};
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
use libcortex_a9::mutex::Mutex; use libcortex_a9::mutex::Mutex;
use log::error; use log::{error, warn};
use crate::rtio_mgt::drtio; use crate::rtio_mgt::drtio;
@ -169,25 +169,34 @@ pub async fn await_finish(
routing_table: &RoutingTable, routing_table: &RoutingTable,
timer: GlobalTimer, timer: GlobalTimer,
id: u32, id: u32,
timeout: u64, timeout: i64,
) -> Result<SubkernelFinished, Error> { ) -> Result<SubkernelFinished, Error> {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state { match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Running | SubkernelState::Finished { .. } => (), SubkernelState::Running | SubkernelState::Finished { .. } => (),
_ => return Err(Error::IncorrectState), _ => return Err(Error::IncorrectState),
} }
let max_time = timer.get_time() + Milliseconds(timeout); if timeout > 0 {
while timer.get_time() < max_time { let max_time = timer.get_time() + Milliseconds(timeout as u64);
{ while timer.get_time() < max_time {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state { match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished { .. } => break, SubkernelState::Finished { .. } => break,
_ => (), _ => (),
}; };
task::r#yield().await;
}
if timer.get_time() >= max_time {
error!("Remote subkernel finish await timed out");
return Err(Error::Timeout);
}
} else {
// no timeout, wait forever
loop {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished { .. } => break,
_ => (),
};
task::r#yield().await;
} }
task::r#yield().await;
}
if timer.get_time() >= max_time {
error!("Remote subkernel finish await timed out");
return Err(Error::Timeout);
} }
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
match subkernel.state { match subkernel.state {
@ -231,8 +240,9 @@ pub async fn message_handle_incoming(
{ {
let subkernel_lock = SUBKERNELS.async_lock().await; let subkernel_lock = SUBKERNELS.async_lock().await;
let subkernel = subkernel_lock.get(&id); let subkernel = subkernel_lock.get(&id);
if subkernel.is_none() || subkernel.unwrap().state != SubkernelState::Running { if subkernel.is_some() && subkernel.unwrap().state != SubkernelState::Running {
// do not add messages for non-existing or deleted subkernels // do not add messages for non-running or deleted subkernels
warn!("received a message for a non-running subkernel #{}", id);
return; return;
} }
} }
@ -264,16 +274,19 @@ pub async fn message_handle_incoming(
} }
} }
pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<Message, Error> { pub async fn message_await(id: u32, timeout: i64, timer: GlobalTimer) -> Result<Message, Error> {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state { let is_subkernel = SUBKERNELS.async_lock().await.get(&id).is_some();
SubkernelState::Finished { if is_subkernel {
status: FinishStatus::CommLost, match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
} => return Err(Error::CommLost), SubkernelState::Finished {
SubkernelState::Running | SubkernelState::Finished { .. } => (), status: FinishStatus::CommLost,
_ => return Err(Error::IncorrectState), } => return Err(Error::CommLost),
SubkernelState::Running | SubkernelState::Finished { .. } => (),
_ => return Err(Error::IncorrectState),
}
} }
let max_time = timer.get_time() + Milliseconds(timeout); let max_time = timer.get_time() + Milliseconds(timeout as u64);
while timer.get_time() < max_time { while timeout < 0 || (timeout > 0 && timer.get_time() < max_time) {
{ {
let mut message_queue = MESSAGE_QUEUE.async_lock().await; let mut message_queue = MESSAGE_QUEUE.async_lock().await;
for i in 0..message_queue.len() { for i in 0..message_queue.len() {
@ -284,14 +297,16 @@ pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<
} }
} }
} }
match SUBKERNELS.async_lock().await.get(&id).unwrap().state { if is_subkernel {
SubkernelState::Finished { match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
status: FinishStatus::CommLost, SubkernelState::Finished {
} => return Err(Error::CommLost), status: FinishStatus::CommLost,
SubkernelState::Finished { } => return Err(Error::CommLost),
status: FinishStatus::Exception(_), SubkernelState::Finished {
} => return Err(Error::SubkernelException), status: FinishStatus::Exception(_),
_ => (), } => return Err(Error::SubkernelException),
_ => (),
}
} }
task::r#yield().await; task::r#yield().await;
} }
@ -303,9 +318,8 @@ pub async fn message_send<'a>(
routing_table: &RoutingTable, routing_table: &RoutingTable,
timer: GlobalTimer, timer: GlobalTimer,
id: u32, id: u32,
destination: u8,
message: Vec<u8>, message: Vec<u8>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let destination = SUBKERNELS.async_lock().await.get(&id).unwrap().destination;
// rpc data prepared by the kernel core already
Ok(drtio::subkernel_send_message(aux_mutex, routing_table, timer, id, destination, &message).await?) Ok(drtio::subkernel_send_message(aux_mutex, routing_table, timer, id, destination, &message).await?)
} }

View File

@ -657,13 +657,13 @@ fn process_aux_packet(
drtioaux::Packet::SubkernelMessage { drtioaux::Packet::SubkernelMessage {
source, source,
destination: _destination, destination: _destination,
id: _id, id,
status, status,
length, length,
data, data,
} => { } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *rank, _repeaters, &packet, timer);
kernel_manager.message_handle_incoming(status, length as usize, &data); kernel_manager.message_handle_incoming(status, id, length as usize, &data);
router.send( router.send(
drtioaux::Packet::SubkernelMessageAck { destination: source }, drtioaux::Packet::SubkernelMessageAck { destination: source },
_routing_table, _routing_table,

View File

@ -1,4 +1,4 @@
use alloc::{collections::{BTreeMap, VecDeque}, use alloc::{collections::BTreeMap,
format, format,
string::{String, ToString}, string::{String, ToString},
vec::Vec}; vec::Vec};
@ -23,11 +23,15 @@ enum KernelState {
Absent, Absent,
Loaded, Loaded,
Running, Running,
MsgAwait(Milliseconds, Vec<u8>), MsgAwait {
max_time: Option<Milliseconds>,
id: u32,
tags: Vec<u8>
},
MsgSending, MsgSending,
SubkernelAwaitLoad, SubkernelAwaitLoad,
SubkernelAwaitFinish { SubkernelAwaitFinish {
max_time: Milliseconds, max_time: Option<Milliseconds>,
id: u32, id: u32,
}, },
DmaUploading, DmaUploading,
@ -95,6 +99,7 @@ macro_rules! unexpected {
/* represents interkernel messages */ /* represents interkernel messages */
struct Message { struct Message {
count: u8, count: u8,
id: u32,
data: Vec<u8>, data: Vec<u8>,
} }
@ -110,7 +115,7 @@ enum OutMessageState {
struct MessageManager { struct MessageManager {
out_message: Option<Sliceable>, out_message: Option<Sliceable>,
out_state: OutMessageState, out_state: OutMessageState,
in_queue: VecDeque<Message>, in_queue: Vec<Message>,
in_buffer: Option<Message>, in_buffer: Option<Message>,
} }
@ -170,12 +175,12 @@ impl MessageManager {
MessageManager { MessageManager {
out_message: None, out_message: None,
out_state: OutMessageState::NoMessage, out_state: OutMessageState::NoMessage,
in_queue: VecDeque::new(), in_queue: Vec::new(),
in_buffer: None, in_buffer: None,
} }
} }
pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { pub fn handle_incoming(&mut self, status: PayloadStatus, id: u32, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) {
// called when receiving a message from master // called when receiving a message from master
if status.is_first() { if status.is_first() {
self.in_buffer = None; self.in_buffer = None;
@ -185,13 +190,14 @@ impl MessageManager {
None => { None => {
self.in_buffer = Some(Message { self.in_buffer = Some(Message {
count: data[0], count: data[0],
id: id,
data: data[1..length].to_vec(), data: data[1..length].to_vec(),
}); });
} }
}; };
if status.is_last() { if status.is_last() {
// when done, remove from working queue // when done, remove from working queue
self.in_queue.push_back(self.in_buffer.take().unwrap()); self.in_queue.push(self.in_buffer.take().unwrap());
} }
} }
@ -265,8 +271,13 @@ impl MessageManager {
Ok(()) Ok(())
} }
pub fn get_incoming(&mut self) -> Option<Message> { pub fn get_incoming(&mut self, id: u32) -> Option<Message> {
self.in_queue.pop_front() for i in 0..self.in_queue.len() {
if self.in_queue[i].id == id {
return Some(self.in_queue.remove(i));
}
}
None
} }
} }
@ -344,13 +355,14 @@ impl<'a> Manager<'_> {
pub fn message_handle_incoming( pub fn message_handle_incoming(
&mut self, &mut self,
status: PayloadStatus, status: PayloadStatus,
id: u32,
length: usize, length: usize,
slice: &[u8; MASTER_PAYLOAD_MAX_SIZE], slice: &[u8; MASTER_PAYLOAD_MAX_SIZE],
) { ) {
if !self.running() { if !self.running() {
return; return;
} }
self.session.messages.handle_incoming(status, length, slice); self.session.messages.handle_incoming(status, id, length, slice);
} }
pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option<SliceMeta> { pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option<SliceMeta> {
@ -707,9 +719,14 @@ impl<'a> Manager<'_> {
)?; )?;
self.session.kernel_state = KernelState::MsgSending; self.session.kernel_state = KernelState::MsgSending;
} }
kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => { kernel::Message::SubkernelMsgRecvRequest { id, timeout, tags } => {
let max_time = timer.get_time() + Milliseconds(timeout); let id = if id == -1 { self.session.id } else { id as u32 };
self.session.kernel_state = KernelState::MsgAwait(max_time, tags); let max_time = if timeout > 0 {
Some(timer.get_time() + Milliseconds(timeout as u64))
} else {
None
};
self.session.kernel_state = KernelState::MsgAwait { max_time: max_time, id:id, tags: tags };
} }
kernel::Message::SubkernelLoadRunRequest { kernel::Message::SubkernelLoadRunRequest {
id, id,
@ -731,7 +748,11 @@ impl<'a> Manager<'_> {
} }
kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => { kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => {
let max_time = timer.get_time() + Milliseconds(timeout); let max_time = if timeout > 0 {
Some(timer.get_time() + Milliseconds(timeout as u64))
} else {
None
};
self.session.kernel_state = KernelState::SubkernelAwaitFinish { self.session.kernel_state = KernelState::SubkernelAwaitFinish {
max_time: max_time, max_time: max_time,
id: id, id: id,
@ -751,16 +772,18 @@ impl<'a> Manager<'_> {
fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> { fn process_external_messages(&mut self, timer: &GlobalTimer) -> Result<(), Error> {
match &self.session.kernel_state { match &self.session.kernel_state {
KernelState::MsgAwait(timeout, tags) => { KernelState::MsgAwait {max_time , id, tags } => {
if timer.get_time() > *timeout { if let Some(max_time) = *max_time {
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { if timer.get_time() > max_time {
status: kernel::SubkernelStatus::Timeout, self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
count: 0, status: kernel::SubkernelStatus::Timeout,
}); count: 0,
self.session.kernel_state = KernelState::Running; });
return Ok(()); self.session.kernel_state = KernelState::Running;
return Ok(());
}
} }
if let Some(message) = self.session.messages.get_incoming() { if let Some(message) = self.session.messages.get_incoming(*id) {
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
status: kernel::SubkernelStatus::NoError, status: kernel::SubkernelStatus::NoError,
count: message.count, count: message.count,
@ -782,25 +805,27 @@ impl<'a> Manager<'_> {
} }
} }
KernelState::SubkernelAwaitFinish { max_time, id } => { KernelState::SubkernelAwaitFinish { max_time, id } => {
if timer.get_time() > *max_time { if let Some(max_time) = *max_time {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply { if timer.get_time() > max_time {
status: kernel::SubkernelStatus::Timeout, self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
}); status: kernel::SubkernelStatus::Timeout,
self.session.kernel_state = KernelState::Running; });
} else { self.session.kernel_state = KernelState::Running;
let mut i = 0; return Ok(());
for status in &self.session.subkernels_finished {
if *status == *id {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::NoError,
});
self.session.kernel_state = KernelState::Running;
self.session.subkernels_finished.swap_remove(i);
break;
}
i += 1;
} }
} }
let mut i = 0;
for status in &self.session.subkernels_finished {
if *status == *id {
self.control.tx.send(kernel::Message::SubkernelAwaitFinishReply {
status: kernel::SubkernelStatus::NoError,
});
self.session.kernel_state = KernelState::Running;
self.session.subkernels_finished.swap_remove(i);
break;
}
i += 1;
}
Ok(()) Ok(())
} }
KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => { KernelState::DmaAwait { max_time } | KernelState::DmaPendingAwait { max_time, .. } => {