From b3c0d084d40ba6bd2cde0bcd42f33b16a8fb6bae Mon Sep 17 00:00:00 2001 From: mwojcik Date: Thu, 26 Oct 2023 16:43:11 +0800 Subject: [PATCH] drtio: better control state of bigger payloads --- .../firmware/libproto_artiq/drtioaux_proto.rs | 70 +++++++++++++++---- artiq/firmware/runtime/kernel.rs | 6 +- artiq/firmware/runtime/rtio_mgt.rs | 25 ++++--- artiq/firmware/satman/dma.rs | 8 ++- artiq/firmware/satman/kernel.rs | 40 +++++++---- artiq/firmware/satman/main.rs | 18 ++--- 6 files changed, 112 insertions(+), 55 deletions(-) diff --git a/artiq/firmware/libproto_artiq/drtioaux_proto.rs b/artiq/firmware/libproto_artiq/drtioaux_proto.rs index 6ca4230eb..c0333b18c 100644 --- a/artiq/firmware/libproto_artiq/drtioaux_proto.rs +++ b/artiq/firmware/libproto_artiq/drtioaux_proto.rs @@ -20,6 +20,46 @@ pub const SAT_PAYLOAD_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet I // used by DDMA, subkernel program data (need to provide extra ID and destination) pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*destination*/1 - /*ID*/4; +#[derive(PartialEq, Clone, Copy, Debug)] +#[repr(u8)] +pub enum PayloadStatus { + Middle = 0, + First = 1, + Last = 2, + FirstAndLast = 3, +} + +impl From for PayloadStatus { + fn from(value: u8) -> PayloadStatus { + match value { + 0 => PayloadStatus::Middle, + 1 => PayloadStatus::First, + 2 => PayloadStatus::Last, + 3 => PayloadStatus::FirstAndLast, + _ => unreachable!(), + } + } +} + +impl PayloadStatus { + pub fn is_first(self) -> bool { + self == PayloadStatus::First || self == PayloadStatus::FirstAndLast + } + + pub fn is_last(self) -> bool { + self == PayloadStatus::Last || self == PayloadStatus::FirstAndLast + } + + pub fn from_status(first: bool, last: bool) -> PayloadStatus { + match (first, last) { + (true, true) => PayloadStatus::FirstAndLast, + (true, false) => PayloadStatus::First, + (false, true) => PayloadStatus::Last, + (false, false) => PayloadStatus::Middle + } + } +} + #[derive(PartialEq, Debug)] pub enum Packet { EchoRequest, @@ -66,7 +106,7 @@ pub enum Packet { AnalyzerDataRequest { destination: u8 }, AnalyzerData { last: bool, length: u16, data: [u8; SAT_PAYLOAD_MAX_SIZE]}, - DmaAddTraceRequest { destination: u8, id: u32, last: bool, length: u16, trace: [u8; MASTER_PAYLOAD_MAX_SIZE] }, + DmaAddTraceRequest { destination: u8, id: u32, status: PayloadStatus, length: u16, trace: [u8; MASTER_PAYLOAD_MAX_SIZE] }, DmaAddTraceReply { succeeded: bool }, DmaRemoveTraceRequest { destination: u8, id: u32 }, DmaRemoveTraceReply { succeeded: bool }, @@ -74,14 +114,14 @@ pub enum Packet { DmaPlaybackReply { succeeded: bool }, DmaPlaybackStatus { destination: u8, id: u32, error: u8, channel: u32, timestamp: u64 }, - SubkernelAddDataRequest { destination: u8, id: u32, last: bool, length: u16, data: [u8; MASTER_PAYLOAD_MAX_SIZE] }, + SubkernelAddDataRequest { destination: u8, id: u32, status: PayloadStatus, length: u16, data: [u8; MASTER_PAYLOAD_MAX_SIZE] }, SubkernelAddDataReply { succeeded: bool }, SubkernelLoadRunRequest { destination: u8, id: u32, run: bool }, SubkernelLoadRunReply { succeeded: bool }, SubkernelFinished { id: u32, with_exception: bool }, SubkernelExceptionRequest { destination: u8 }, SubkernelException { last: bool, length: u16, data: [u8; SAT_PAYLOAD_MAX_SIZE] }, - SubkernelMessage { destination: u8, id: u32, last: bool, length: u16, data: [u8; MASTER_PAYLOAD_MAX_SIZE] }, + SubkernelMessage { destination: u8, id: u32, status: PayloadStatus, length: u16, data: [u8; MASTER_PAYLOAD_MAX_SIZE] }, SubkernelMessageAck { destination: u8 }, } @@ -240,14 +280,14 @@ impl Packet { 0xb0 => { let destination = reader.read_u8()?; let id = reader.read_u32()?; - let last = reader.read_bool()?; + let status = reader.read_u8()?; let length = reader.read_u16()?; let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut trace[0..length as usize])?; Packet::DmaAddTraceRequest { destination: destination, id: id, - last: last, + status: PayloadStatus::from(status), length: length as u16, trace: trace, } @@ -281,14 +321,14 @@ impl Packet { 0xc0 => { let destination = reader.read_u8()?; let id = reader.read_u32()?; - let last = reader.read_bool()?; + let status = reader.read_u8()?; let length = reader.read_u16()?; let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut data[0..length as usize])?; Packet::SubkernelAddDataRequest { destination: destination, id: id, - last: last, + status: PayloadStatus::from(status), length: length as u16, data: data, } @@ -325,14 +365,14 @@ impl Packet { 0xcb => { let destination = reader.read_u8()?; let id = reader.read_u32()?; - let last = reader.read_bool()?; + let status = reader.read_u8()?; let length = reader.read_u16()?; let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut data[0..length as usize])?; Packet::SubkernelMessage { destination: destination, id: id, - last: last, + status: PayloadStatus::from(status), length: length as u16, data: data, } @@ -521,11 +561,11 @@ impl Packet { writer.write_all(&data[0..length as usize])?; }, - Packet::DmaAddTraceRequest { destination, id, last, trace, length } => { + Packet::DmaAddTraceRequest { destination, id, status, trace, length } => { writer.write_u8(0xb0)?; writer.write_u8(destination)?; writer.write_u32(id)?; - writer.write_bool(last)?; + writer.write_u8(status as u8)?; // trace may be broken down to fit within drtio aux memory limit // will be reconstructed by satellite writer.write_u16(length)?; @@ -563,11 +603,11 @@ impl Packet { writer.write_u64(timestamp)?; }, - Packet::SubkernelAddDataRequest { destination, id, last, data, length } => { + Packet::SubkernelAddDataRequest { destination, id, status, data, length } => { writer.write_u8(0xc0)?; writer.write_u8(destination)?; writer.write_u32(id)?; - writer.write_bool(last)?; + writer.write_u8(status as u8)?; writer.write_u16(length)?; writer.write_all(&data[0..length as usize])?; }, @@ -600,11 +640,11 @@ impl Packet { writer.write_u16(length)?; writer.write_all(&data[0..length as usize])?; }, - Packet::SubkernelMessage { destination, id, last, data, length } => { + Packet::SubkernelMessage { destination, id, status, data, length } => { writer.write_u8(0xcb)?; writer.write_u8(destination)?; writer.write_u32(id)?; - writer.write_bool(last)?; + writer.write_u8(status as u8)?; writer.write_u16(length)?; writer.write_all(&data[0..length as usize])?; }, diff --git a/artiq/firmware/runtime/kernel.rs b/artiq/firmware/runtime/kernel.rs index 7196d2ffc..8bf46451f 100644 --- a/artiq/firmware/runtime/kernel.rs +++ b/artiq/firmware/runtime/kernel.rs @@ -95,7 +95,7 @@ pub mod subkernel { use core::str; use board_artiq::drtio_routing::RoutingTable; use board_misoc::clock; - use proto_artiq::{drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE, rpc_proto as rpc}; + use proto_artiq::{drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}, rpc_proto as rpc}; use io::Cursor; use rtio_mgt::drtio; use sched::{Io, Mutex, Error as SchedError}; @@ -317,7 +317,7 @@ pub mod subkernel { static mut CURRENT_MESSAGES: BTreeMap = BTreeMap::new(); pub fn message_handle_incoming(io: &Io, subkernel_mutex: &Mutex, - id: u32, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + id: u32, status: PayloadStatus, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { // called when receiving a message from satellite let _lock = match subkernel_mutex.lock(io) { Ok(lock) => lock, @@ -338,7 +338,7 @@ pub mod subkernel { }); } }; - if last { + if status.is_last() { unsafe { // when done, remove from working queue MESSAGE_QUEUE.push(CURRENT_MESSAGES.remove(&id).unwrap()); diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index ba6ba3865..a03737178 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -17,7 +17,7 @@ pub mod drtio { use super::*; use alloc::vec::Vec; use drtioaux; - use proto_artiq::drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE; + use proto_artiq::drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, PayloadStatus}; use rtio_dma::remote_dma; #[cfg(has_rtio_analyzer)] use analyzer::remote_analyzer::RemoteBuffer; @@ -75,8 +75,8 @@ pub mod drtio { subkernel::subkernel_finished(io, subkernel_mutex, id, with_exception); None }, - drtioaux::Packet::SubkernelMessage { id, destination: from, last, length, data } => { - subkernel::message_handle_incoming(io, subkernel_mutex, id, last, length as usize, &data); + drtioaux::Packet::SubkernelMessage { id, destination: from, status, length, data } => { + subkernel::message_handle_incoming(io, subkernel_mutex, id, status, length as usize, &data); // acknowledge receiving part of the message drtioaux::send(linkno, &drtioaux::Packet::SubkernelMessageAck { destination: from } @@ -391,15 +391,18 @@ pub mod drtio { } fn partition_data(data: &[u8], send_f: F) -> Result<(), &'static str> - where F: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], bool, usize) -> Result<(), &'static str> { + where F: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], PayloadStatus, usize) -> Result<(), &'static str> { let mut i = 0; + let mut first = true; while i < data.len() { let mut slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; let len: usize = if i + MASTER_PAYLOAD_MAX_SIZE < data.len() { MASTER_PAYLOAD_MAX_SIZE } else { data.len() - i } as usize; let last = i + len == data.len(); + let status = PayloadStatus::from_status(first, last); slice[..len].clone_from_slice(&data[i..i+len]); i += len; - send_f(&slice, last, len)?; + send_f(&slice, status, len)?; + first = false; } Ok(()) } @@ -408,10 +411,10 @@ pub mod drtio { routing_table: &drtio_routing::RoutingTable, id: u32, destination: u8, trace: &[u8]) -> Result<(), &'static str> { let linkno = routing_table.0[destination as usize][0] - 1; - partition_data(trace, |slice, last, len: usize| { + partition_data(trace, |slice, status, len: usize| { let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::DmaAddTraceRequest { - id: id, destination: destination, last: last, length: len as u16, trace: *slice}); + id: id, destination: destination, status: status, length: len as u16, trace: *slice}); match reply { Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: true }) => Ok(()), Ok(drtioaux::Packet::DmaAddTraceReply { succeeded: false }) => @@ -504,10 +507,10 @@ pub mod drtio { pub fn subkernel_upload(io: &Io, aux_mutex: &Mutex, routing_table: &drtio_routing::RoutingTable, id: u32, destination: u8, data: &Vec) -> Result<(), &'static str> { let linkno = routing_table.0[destination as usize][0] - 1; - partition_data(data, |slice, last, len: usize| { + partition_data(data, |slice, status, len: usize| { let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::SubkernelAddDataRequest { - id: id, destination: destination, last: last, length: len as u16, data: *slice}); + id: id, destination: destination, status: status, length: len as u16, data: *slice}); match reply { Ok(drtioaux::Packet::SubkernelAddDataReply { succeeded: true }) => Ok(()), Ok(drtioaux::Packet::SubkernelAddDataReply { succeeded: false }) => @@ -557,10 +560,10 @@ pub mod drtio { routing_table: &drtio_routing::RoutingTable, id: u32, destination: u8, message: &[u8] ) -> Result<(), &'static str> { let linkno = routing_table.0[destination as usize][0] - 1; - partition_data(message, |slice, last, len: usize| { + partition_data(message, |slice, status, len: usize| { let reply = aux_transact(io, aux_mutex, linkno, &drtioaux::Packet::SubkernelMessage { - destination: destination, id: id, last: last, length: len as u16, data: *slice}); + destination: destination, id: id, status: status, length: len as u16, data: *slice}); match reply { Ok(drtioaux::Packet::SubkernelMessageAck { .. }) => Ok(()), Ok(_) => Err("sending message to subkernel failed, unexpected aux packet"), diff --git a/artiq/firmware/satman/dma.rs b/artiq/firmware/satman/dma.rs index 34dcaf475..277e27927 100644 --- a/artiq/firmware/satman/dma.rs +++ b/artiq/firmware/satman/dma.rs @@ -1,4 +1,5 @@ use board_misoc::{csr, cache::flush_l2_cache}; +use proto_artiq::drtioaux_proto::PayloadStatus; use alloc::{vec::Vec, collections::btree_map::BTreeMap}; use ::{cricon_select, RtioMaster}; @@ -51,7 +52,10 @@ impl Manager { } } - pub fn add(&mut self, id: u32, last: bool, trace: &[u8], trace_len: usize) -> Result<(), Error> { + pub fn add(&mut self, id: u32, status: PayloadStatus, trace: &[u8], trace_len: usize) -> Result<(), Error> { + if status.is_first() { + self.entries.remove(&id); + } let entry = match self.entries.get_mut(&id) { Some(entry) => { if entry.complete { @@ -76,7 +80,7 @@ impl Manager { }; entry.trace.extend(&trace[0..trace_len]); - if last { + if status.is_last() { entry.trace.push(0); let data_len = entry.trace.len(); diff --git a/artiq/firmware/satman/kernel.rs b/artiq/firmware/satman/kernel.rs index 822a17b25..850a126a0 100644 --- a/artiq/firmware/satman/kernel.rs +++ b/artiq/firmware/satman/kernel.rs @@ -4,7 +4,11 @@ use cslice::AsCSlice; use board_artiq::{mailbox, spi}; use board_misoc::{csr, clock, i2c}; -use proto_artiq::{kernel_proto as kern, session_proto::Reply::KernelException as HostKernelException, rpc_proto as rpc}; +use proto_artiq::{ + drtioaux_proto::PayloadStatus, + kernel_proto as kern, + session_proto::Reply::KernelException as HostKernelException, + rpc_proto as rpc}; use eh::eh_artiq; use io::Cursor; use kernel::eh_artiq::StackPointerBacktrace; @@ -148,24 +152,22 @@ pub struct SubkernelFinished { pub struct SliceMeta { pub len: u16, - pub last: bool + pub status: PayloadStatus } macro_rules! get_slice_fn { ( $name:tt, $size:expr ) => { pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { - if self.data.len() == 0 { - return SliceMeta { len: 0, last: true }; - } + let first = self.it == 0; let len = min($size, self.data.len() - self.it); let last = self.it + len == self.data.len(); - + let status = PayloadStatus::from_status(first, last); data_slice[..len].clone_from_slice(&self.data[self.it..self.it+len]); self.it += len; SliceMeta { len: len as u16, - last: last + status: status } } }; @@ -193,8 +195,12 @@ impl MessageManager { } } - pub fn handle_incoming(&mut self, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + pub fn handle_incoming(&mut self, status: PayloadStatus, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { // called when receiving a message from master + if status.is_first() { + // clear the buffer for first message + self.in_buffer = None; + } match self.in_buffer.as_mut() { Some(message) => message.data.extend(&data[..length]), None => { @@ -204,7 +210,7 @@ impl MessageManager { }); } }; - if last { + if status.is_last() { // when done, remove from working queue self.in_queue.push_back(self.in_buffer.take().unwrap()); } @@ -236,7 +242,7 @@ impl MessageManager { return None; } let meta = self.out_message.as_mut()?.get_slice_master(data_slice); - if meta.last { + if meta.status.is_last() { // clear the message slot self.out_message = None; // notify kernel with a flag that message is sent @@ -315,7 +321,11 @@ impl Manager { } } - pub fn add(&mut self, id: u32, last: bool, data: &[u8], data_len: usize) -> Result<(), Error> { + pub fn add(&mut self, id: u32, status: PayloadStatus, data: &[u8], data_len: usize) -> Result<(), Error> { + if status.is_first() { + // in case master is interrupted, and subkernel is sent again, clean the state + self.kernels.remove(&id); + } let kernel = match self.kernels.get_mut(&id) { Some(kernel) => { if kernel.complete { @@ -338,7 +348,7 @@ impl Manager { }; kernel.library.extend(&data[0..data_len]); - kernel.complete = last; + kernel.complete = status.is_last(); Ok(()) } @@ -371,11 +381,11 @@ impl Manager { kern_acknowledge() } - pub fn message_handle_incoming(&mut self, last: bool, length: usize, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + pub fn message_handle_incoming(&mut self, status: PayloadStatus, length: usize, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { if !self.is_running() { return; } - self.session.messages.handle_incoming(last, length, slice); + self.session.messages.handle_incoming(status, length, slice); } pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option { @@ -437,7 +447,7 @@ impl Manager { pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta { match self.session.last_exception.as_mut() { Some(exception) => exception.get_slice_sat(data_slice), - None => SliceMeta { len: 0, last: true } + None => SliceMeta { len: 0, status: PayloadStatus::FirstAndLast } } } diff --git a/artiq/firmware/satman/main.rs b/artiq/firmware/satman/main.rs index f844abd1e..1856c56b2 100644 --- a/artiq/firmware/satman/main.rs +++ b/artiq/firmware/satman/main.rs @@ -145,7 +145,7 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg let meta = kernelmgr.message_get_slice(&mut data_slice).unwrap(); drtioaux::send(0, &drtioaux::Packet::SubkernelMessage { destination: destination, id: kernelmgr.get_current_id().unwrap(), - last: meta.last, length: meta.len as u16, data: data_slice + status: meta.status, length: meta.len as u16, data: data_slice })?; } else { let errors; @@ -370,9 +370,9 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg }) } - drtioaux::Packet::DmaAddTraceRequest { destination: _destination, id, last, length, trace } => { + drtioaux::Packet::DmaAddTraceRequest { destination: _destination, id, status, length, trace } => { forward!(_routing_table, _destination, *_rank, _repeaters, &packet); - let succeeded = dmamgr.add(id, last, &trace, length as usize).is_ok(); + let succeeded = dmamgr.add(id, status, &trace, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::DmaAddTraceReply { succeeded: succeeded }) } @@ -390,9 +390,9 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg &drtioaux::Packet::DmaPlaybackReply { succeeded: succeeded }) } - drtioaux::Packet::SubkernelAddDataRequest { destination: _destination, id, last, length, data } => { + drtioaux::Packet::SubkernelAddDataRequest { destination: _destination, id, status, length, data } => { forward!(_routing_table, _destination, *_rank, _repeaters, &packet); - let succeeded = kernelmgr.add(id, last, &data, length as usize).is_ok(); + let succeeded = kernelmgr.add(id, status, &data, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) } @@ -416,14 +416,14 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let meta = kernelmgr.exception_get_slice(&mut data_slice); drtioaux::send(0, &drtioaux::Packet::SubkernelException { - last: meta.last, + last: meta.status.is_last(), length: meta.len, data: data_slice, }) } - drtioaux::Packet::SubkernelMessage { destination, id: _id, last, length, data } => { + drtioaux::Packet::SubkernelMessage { destination, id: _id, status, length, data } => { forward!(_routing_table, destination, *_rank, _repeaters, &packet); - kernelmgr.message_handle_incoming(last, length as usize, &data); + kernelmgr.message_handle_incoming(status, length as usize, &data); drtioaux::send(0, &drtioaux::Packet::SubkernelMessageAck { destination: destination }) @@ -435,7 +435,7 @@ fn process_aux_packet(dmamgr: &mut DmaManager, analyzer: &mut Analyzer, kernelmg if let Some(meta) = kernelmgr.message_get_slice(&mut data_slice) { drtioaux::send(0, &drtioaux::Packet::SubkernelMessage { destination: *_rank, id: kernelmgr.get_current_id().unwrap(), - last: meta.last, length: meta.len as u16, data: data_slice + status: meta.status, length: meta.len as u16, data: data_slice })? } else { error!("Error receiving message slice");