diff --git a/artiq/firmware/satman/kernel.rs b/artiq/firmware/satman/kernel.rs index d4cc226cb..fdf8792f8 100644 --- a/artiq/firmware/satman/kernel.rs +++ b/artiq/firmware/satman/kernel.rs @@ -1,5 +1,5 @@ use core::{mem, option::NoneError}; -use alloc::{string::String, format, vec::Vec, collections::{btree_map::BTreeMap, vec_deque::VecDeque}}; +use alloc::{string::String, format, vec::Vec, collections::btree_map::BTreeMap}; use cslice::AsCSlice; use board_artiq::{drtioaux, drtio_routing::RoutingTable, mailbox, spi}; @@ -63,7 +63,7 @@ enum KernelState { Absent, Loaded, Running, - MsgAwait { max_time: u64, tags: Vec }, + MsgAwait { id: u32, max_time: u64, tags: Vec }, MsgSending, SubkernelAwaitLoad, SubkernelAwaitFinish { max_time: u64, id: u32 }, @@ -115,6 +115,7 @@ macro_rules! unexpected { /* represents interkernel messages */ struct Message { + id: u32, count: u8, data: Vec } @@ -131,7 +132,7 @@ enum OutMessageState { struct MessageManager { out_message: Option, out_state: OutMessageState, - in_queue: VecDeque, + in_queue: Vec, in_buffer: Option, } @@ -171,12 +172,12 @@ impl MessageManager { MessageManager { out_message: None, out_state: OutMessageState::NoMessage, - in_queue: VecDeque::new(), + in_queue: Vec::new(), in_buffer: None } } - pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, id: u32, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { // called when receiving a message from master if status.is_first() { // clear the buffer for first message @@ -186,6 +187,7 @@ impl MessageManager { Some(message) => message.data.extend(&data[..length]), None => { self.in_buffer = Some(Message { + id: id, count: data[0], data: data[1..length].to_vec() }); @@ -193,7 +195,7 @@ impl MessageManager { }; if status.is_last() { // when done, remove from working queue - self.in_queue.push_back(self.in_buffer.take().unwrap()); + self.in_queue.push(self.in_buffer.take().unwrap()); } } @@ -257,8 +259,13 @@ impl MessageManager { Ok(()) } - pub fn get_incoming(&mut self) -> Option { - self.in_queue.pop_front() + pub fn get_incoming(&mut self, id: u32) -> Option { + for i in 0..self.in_queue.len() { + if self.in_queue[i].id == id { + return Some(self.in_queue.remove(i)); + } + } + None } } @@ -363,11 +370,11 @@ impl Manager { kern_acknowledge() } - pub fn message_handle_incoming(&mut self, status: PayloadStatus, length: usize, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + pub fn message_handle_incoming(&mut self, status: PayloadStatus, length: usize, id: u32, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { if !self.is_running() { return; } - self.session.messages.handle_incoming(status, length, slice); + self.session.messages.handle_incoming(status, length, id, slice); } pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option { @@ -539,13 +546,13 @@ impl Manager { fn process_external_messages(&mut self) -> Result<(), Error> { match &self.session.kernel_state { - KernelState::MsgAwait { max_time, tags } => { + KernelState::MsgAwait { id, max_time, tags } => { if clock::get_ms() > *max_time { kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::Timeout, count: 0 })?; self.session.kernel_state = KernelState::Running; return Ok(()) } - if let Some(message) = self.session.messages.get_incoming() { + if let Some(message) = self.session.messages.get_incoming(*id) { kern_send(&kern::SubkernelMsgRecvReply { status: kern::SubkernelStatus::NoError, count: message.count })?; let tags = tags.clone(); self.session.kernel_state = KernelState::Running; @@ -752,22 +759,29 @@ impl Manager { Ok(()) } - &kern::SubkernelMsgSend { id: _, destination: msg_dest, count, tag, data } => { - let dest = match msg_dest { - Some(dest) => dest, - None => self.session.source - }; - self.session.messages.accept_outgoing(self.current_id, destination, - dest, count, tag, data, + &kern::SubkernelMsgSend { id, destination: msg_dest, count, tag, data } => { + let message_destination; + let message_id; + if let Some(dest) = msg_dest { + message_destination = dest; + message_id = id; + } else { + // return message, return to source + message_destination = self.session.source; + message_id = self.current_id; + } + self.session.messages.accept_outgoing(message_id, destination, + message_destination, count, tag, data, routing_table, rank, router)?; // acknowledge after the message is sent self.session.kernel_state = KernelState::MsgSending; Ok(()) } - &kern::SubkernelMsgRecvRequest { id: _, timeout, tags } => { + &kern::SubkernelMsgRecvRequest { id, timeout, tags } => { let max_time = clock::get_ms() + timeout as u64; - self.session.kernel_state = KernelState::MsgAwait { max_time: max_time, tags: tags.to_vec() }; + self.session.kernel_state = KernelState::MsgAwait { + id: id, max_time: max_time, tags: tags.to_vec() }; Ok(()) }, diff --git a/artiq/firmware/satman/main.rs b/artiq/firmware/satman/main.rs index 78c84103a..2105b4c26 100644 --- a/artiq/firmware/satman/main.rs +++ b/artiq/firmware/satman/main.rs @@ -458,9 +458,9 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg data: data_slice, }) } - drtioaux::Packet::SubkernelMessage { source, destination: _destination, id: _id, status, length, data } => { + drtioaux::Packet::SubkernelMessage { source, destination: _destination, id, status, length, data } => { forward!(_routing_table, _destination, *rank, _repeaters, &packet); - kernelmgr.message_handle_incoming(status, length as usize, &data); + kernelmgr.message_handle_incoming(status, length as usize, id, &data); router.send(drtioaux::Packet::SubkernelMessageAck { destination: source }, _routing_table, *rank, *self_destination)