forked from M-Labs/artiq
1
0
Fork 0

satman: support free subkernel message passing

This commit is contained in:
mwojcik 2024-01-26 15:48:00 +08:00 committed by Sébastien Bourdeauducq
parent 171c7a6e11
commit 7d3bcc7cac
2 changed files with 37 additions and 23 deletions

View File

@ -1,5 +1,5 @@
use core::{mem, option::NoneError}; use core::{mem, option::NoneError};
use alloc::{string::String, format, vec::Vec, collections::{btree_map::BTreeMap, vec_deque::VecDeque}}; use alloc::{string::String, format, vec::Vec, collections::btree_map::BTreeMap};
use cslice::AsCSlice; use cslice::AsCSlice;
use board_artiq::{drtioaux, drtio_routing::RoutingTable, mailbox, spi}; use board_artiq::{drtioaux, drtio_routing::RoutingTable, mailbox, spi};
@ -63,7 +63,7 @@ enum KernelState {
Absent, Absent,
Loaded, Loaded,
Running, Running,
MsgAwait { max_time: u64, tags: Vec<u8> }, MsgAwait { id: u32, max_time: u64, tags: Vec<u8> },
MsgSending, MsgSending,
SubkernelAwaitLoad, SubkernelAwaitLoad,
SubkernelAwaitFinish { max_time: u64, id: u32 }, SubkernelAwaitFinish { max_time: u64, id: u32 },
@ -115,6 +115,7 @@ macro_rules! unexpected {
/* represents interkernel messages */ /* represents interkernel messages */
struct Message { struct Message {
id: u32,
count: u8, count: u8,
data: Vec<u8> data: Vec<u8>
} }
@ -131,7 +132,7 @@ enum OutMessageState {
struct MessageManager { struct MessageManager {
out_message: Option<Sliceable>, out_message: Option<Sliceable>,
out_state: OutMessageState, out_state: OutMessageState,
in_queue: VecDeque<Message>, in_queue: Vec<Message>,
in_buffer: Option<Message>, in_buffer: Option<Message>,
} }
@ -171,12 +172,12 @@ impl MessageManager {
MessageManager { MessageManager {
out_message: None, out_message: None,
out_state: OutMessageState::NoMessage, out_state: OutMessageState::NoMessage,
in_queue: VecDeque::new(), in_queue: Vec::new(),
in_buffer: None in_buffer: None
} }
} }
pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, id: u32, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) {
// called when receiving a message from master // called when receiving a message from master
if status.is_first() { if status.is_first() {
// clear the buffer for first message // clear the buffer for first message
@ -186,6 +187,7 @@ impl MessageManager {
Some(message) => message.data.extend(&data[..length]), Some(message) => message.data.extend(&data[..length]),
None => { None => {
self.in_buffer = Some(Message { self.in_buffer = Some(Message {
id: id,
count: data[0], count: data[0],
data: data[1..length].to_vec() data: data[1..length].to_vec()
}); });
@ -193,7 +195,7 @@ impl MessageManager {
}; };
if status.is_last() { if status.is_last() {
// when done, remove from working queue // when done, remove from working queue
self.in_queue.push_back(self.in_buffer.take().unwrap()); self.in_queue.push(self.in_buffer.take().unwrap());
} }
} }
@ -257,8 +259,13 @@ impl MessageManager {
Ok(()) Ok(())
} }
pub fn get_incoming(&mut self) -> Option<Message> { pub fn get_incoming(&mut self, id: u32) -> Option<Message> {
self.in_queue.pop_front() for i in 0..self.in_queue.len() {
if self.in_queue[i].id == id {
return Some(self.in_queue.remove(i));
}
}
None
} }
} }
@ -363,11 +370,11 @@ impl Manager {
kern_acknowledge() kern_acknowledge()
} }
pub fn message_handle_incoming(&mut self, status: PayloadStatus, length: usize, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { pub fn message_handle_incoming(&mut self, status: PayloadStatus, length: usize, id: u32, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) {
if !self.is_running() { if !self.is_running() {
return; return;
} }
self.session.messages.handle_incoming(status, length, slice); self.session.messages.handle_incoming(status, length, id, slice);
} }
pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option<SliceMeta> { pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option<SliceMeta> {
@ -539,13 +546,13 @@ impl Manager {
fn process_external_messages(&mut self) -> Result<(), Error> { fn process_external_messages(&mut self) -> Result<(), Error> {
match &self.session.kernel_state { match &self.session.kernel_state {
KernelState::MsgAwait { max_time, tags } => { KernelState::MsgAwait { id, max_time, tags } => {
if clock::get_ms() > *max_time { if clock::get_ms() > *max_time {
kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::Timeout, count: 0 })?; kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::Timeout, count: 0 })?;
self.session.kernel_state = KernelState::Running; self.session.kernel_state = KernelState::Running;
return Ok(()) return Ok(())
} }
if let Some(message) = self.session.messages.get_incoming() { if let Some(message) = self.session.messages.get_incoming(*id) {
kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::NoError, count: message.count })?; kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::NoError, count: message.count })?;
let tags = tags.clone(); let tags = tags.clone();
self.session.kernel_state = KernelState::Running; self.session.kernel_state = KernelState::Running;
@ -752,22 +759,29 @@ impl Manager {
Ok(()) Ok(())
} }
&kern::SubkernelMsgSend { id: _, destination: msg_dest, count, tag, data } => { &kern::SubkernelMsgSend { id, destination: msg_dest, count, tag, data } => {
let dest = match msg_dest { let message_destination;
Some(dest) => dest, let message_id;
None => self.session.source if let Some(dest) = msg_dest {
}; message_destination = dest;
self.session.messages.accept_outgoing(self.current_id, destination, message_id = id;
dest, count, tag, data, } else {
// return message, return to source
message_destination = self.session.source;
message_id = self.current_id;
}
self.session.messages.accept_outgoing(message_id, destination,
message_destination, count, tag, data,
routing_table, rank, router)?; routing_table, rank, router)?;
// acknowledge after the message is sent // acknowledge after the message is sent
self.session.kernel_state = KernelState::MsgSending; self.session.kernel_state = KernelState::MsgSending;
Ok(()) Ok(())
} }
&kern::SubkernelMsgRecvRequest { id: _, timeout, tags } => { &kern::SubkernelMsgRecvRequest { id, timeout, tags } => {
let max_time = clock::get_ms() + timeout as u64; let max_time = clock::get_ms() + timeout as u64;
self.session.kernel_state = KernelState::MsgAwait { max_time: max_time, tags: tags.to_vec() }; self.session.kernel_state = KernelState::MsgAwait {
id: id, max_time: max_time, tags: tags.to_vec() };
Ok(()) Ok(())
}, },

View File

@ -458,9 +458,9 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg
data: data_slice, data: data_slice,
}) })
} }
drtioaux::Packet::SubkernelMessage { source, destination: _destination, id: _id, status, length, data } => { drtioaux::Packet::SubkernelMessage { source, destination: _destination, id, status, length, data } => {
forward!(_routing_table, _destination, *rank, _repeaters, &packet); forward!(_routing_table, _destination, *rank, _repeaters, &packet);
kernelmgr.message_handle_incoming(status, length as usize, &data); kernelmgr.message_handle_incoming(status, length as usize, id, &data);
router.send(drtioaux::Packet::SubkernelMessageAck { router.send(drtioaux::Packet::SubkernelMessageAck {
destination: source destination: source
}, _routing_table, *rank, *self_destination) }, _routing_table, *rank, *self_destination)