From b76f6346862754f14cd6b85b1e1ce536b3648320 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Thu, 2 Nov 2023 14:48:52 +0800 Subject: [PATCH] drtio: increase robustness for longer payloads --- src/libboard_artiq/src/drtioaux_proto.rs | 70 +++++++++++++++++++----- src/runtime/src/rtio_mgt.rs | 28 ++++++---- src/satman/src/dma.rs | 8 +-- src/satman/src/main.rs | 18 +++--- 4 files changed, 84 insertions(+), 40 deletions(-) diff --git a/src/libboard_artiq/src/drtioaux_proto.rs b/src/libboard_artiq/src/drtioaux_proto.rs index c8b4221..154a5e0 100644 --- a/src/libboard_artiq/src/drtioaux_proto.rs +++ b/src/libboard_artiq/src/drtioaux_proto.rs @@ -19,6 +19,46 @@ impl From for Error { } } +#[derive(PartialEq, Clone, Copy, Debug)] +#[repr(u8)] +pub enum PayloadStatus { + Middle = 0, + First = 1, + Last = 2, + FirstAndLast = 3, +} + +impl From for PayloadStatus { + fn from(value: u8) -> PayloadStatus { + match value { + 0 => PayloadStatus::Middle, + 1 => PayloadStatus::First, + 2 => PayloadStatus::Last, + 3 => PayloadStatus::FirstAndLast, + _ => unreachable!(), + } + } +} + +impl PayloadStatus { + pub fn is_first(self) -> bool { + self == PayloadStatus::First || self == PayloadStatus::FirstAndLast + } + + pub fn is_last(self) -> bool { + self == PayloadStatus::Last || self == PayloadStatus::FirstAndLast + } + + pub fn from_status(first: bool, last: bool) -> PayloadStatus { + match (first, last) { + (true, true) => PayloadStatus::FirstAndLast, + (true, false) => PayloadStatus::First, + (false, true) => PayloadStatus::Last, + (false, false) => PayloadStatus::Middle, + } + } +} + #[derive(PartialEq, Debug)] pub enum Packet { EchoRequest, @@ -159,7 +199,7 @@ pub enum Packet { DmaAddTraceRequest { destination: u8, id: u32, - last: bool, + status: PayloadStatus, length: u16, trace: [u8; MASTER_PAYLOAD_MAX_SIZE], }, @@ -192,7 +232,7 @@ pub enum Packet { SubkernelAddDataRequest { destination: u8, id: u32, - last: bool, + status: PayloadStatus, length: u16, data: [u8; MASTER_PAYLOAD_MAX_SIZE], }, @@ -228,7 +268,7 @@ pub enum Packet { SubkernelMessage { destination: u8, id: u32, - last: bool, + status: PayloadStatus, length: u16, data: [u8; MASTER_PAYLOAD_MAX_SIZE], }, @@ -391,14 +431,14 @@ impl Packet { 0xb0 => { let destination = reader.read_u8()?; let id = reader.read_u32()?; - let last = reader.read_bool()?; + let status = PayloadStatus::from(reader.read_u8()?); let length = reader.read_u16()?; let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut trace[0..length as usize])?; Packet::DmaAddTraceRequest { destination: destination, id: id, - last: last, + status: status, length: length as u16, trace: trace, } @@ -432,14 +472,14 @@ impl Packet { 0xc0 => { let destination = reader.read_u8()?; let id = reader.read_u32()?; - let last = reader.read_bool()?; + let status = PayloadStatus::from(reader.read_u8()?); let length = reader.read_u16()?; let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut data[0..length as usize])?; Packet::SubkernelAddDataRequest { destination: destination, id: id, - last: last, + status: status, length: length as u16, data: data, } @@ -482,14 +522,14 @@ impl Packet { 0xcb => { let destination = reader.read_u8()?; let id = reader.read_u32()?; - let last = reader.read_bool()?; + let status = PayloadStatus::from(reader.read_u8()?); let length = reader.read_u16()?; let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut data[0..length as usize])?; Packet::SubkernelMessage { destination: destination, id: id, - last: last, + status: status, length: length as u16, data: data, } @@ -713,14 +753,14 @@ impl Packet { Packet::DmaAddTraceRequest { destination, id, - last, + status, trace, length, } => { writer.write_u8(0xb0)?; writer.write_u8(destination)?; writer.write_u32(id)?; - writer.write_bool(last)?; + writer.write_u8(status as u8)?; // trace may be broken down to fit within drtio aux memory limit // will be reconstructed by satellite writer.write_u16(length)?; @@ -771,14 +811,14 @@ impl Packet { Packet::SubkernelAddDataRequest { destination, id, - last, + status, data, length, } => { writer.write_u8(0xc0)?; writer.write_u8(destination)?; writer.write_u32(id)?; - writer.write_bool(last)?; + writer.write_u8(status as u8)?; writer.write_u16(length)?; writer.write_all(&data[0..length as usize])?; } @@ -822,14 +862,14 @@ impl Packet { Packet::SubkernelMessage { destination, id, - last, + status, data, length, } => { writer.write_u8(0xcb)?; writer.write_u8(destination)?; writer.write_u32(id)?; - writer.write_bool(last)?; + writer.write_u8(status as u8)?; writer.write_u16(length)?; writer.write_all(&data[0..length as usize])?; } diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index f7d2671..b2f3aa3 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -13,8 +13,10 @@ pub mod drtio { use ksupport::{resolve_channel_name, ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, SEEN_ASYNC_ERRORS}; use libasync::{delay, task}; - use libboard_artiq::{drtioaux::Error, drtioaux_async, drtioaux_async::Packet, - drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE}; + use libboard_artiq::{drtioaux::Error, + drtioaux_async, + drtioaux_async::Packet, + drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE}}; use libboard_zynq::time::Milliseconds; use log::{error, info, warn}; @@ -61,11 +63,11 @@ pub mod drtio { Packet::SubkernelMessage { id, destination: from, - last, + status, length, data, } => { - subkernel::message_handle_incoming(id, last, length as usize, &data).await; + subkernel::message_handle_incoming(id, status, length as usize, &data).await; // acknowledge receiving part of the message let _lock = aux_mutex.async_lock().await; drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: from }) @@ -463,7 +465,7 @@ pub mod drtio { reply_handler_f: HandlerF, ) -> Result<(), &'static str> where - PacketF: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], bool, usize) -> Packet, + PacketF: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], PayloadStatus, usize) -> Packet, HandlerF: Fn(&Packet) -> Result<(), &'static str>, { let mut i = 0; @@ -474,10 +476,12 @@ pub mod drtio { } else { data.len() - i } as usize; + let first = i == 0; let last = i + len == data.len(); slice[..len].clone_from_slice(&data[i..i + len]); i += len; - let packet = packet_f(&slice, last, len); + let status = PayloadStatus::from_status(first, last); + let packet = packet_f(&slice, status, len); let reply = aux_transact(aux_mutex, linkno, &packet, timer).await?; reply_handler_f(&reply)?; } @@ -498,10 +502,10 @@ pub mod drtio { aux_mutex, timer, trace, - |slice, last, len| Packet::DmaAddTraceRequest { + |slice, status, len| Packet::DmaAddTraceRequest { id: id, destination: destination, - last: last, + status: status, length: len as u16, trace: *slice, }, @@ -655,10 +659,10 @@ pub mod drtio { aux_mutex, timer, data, - |slice, last, len| Packet::SubkernelAddDataRequest { + |slice, status, len| Packet::SubkernelAddDataRequest { id: id, destination: destination, - last: last, + status: status, length: len as u16, data: *slice, }, @@ -742,10 +746,10 @@ pub mod drtio { aux_mutex, timer, message, - |slice, last, len| Packet::SubkernelMessage { + |slice, status, len| Packet::SubkernelMessage { destination: destination, id: id, - last: last, + status: status, length: len as u16, data: *slice, }, diff --git a/src/satman/src/dma.rs b/src/satman/src/dma.rs index 5955e3c..dc901db 100644 --- a/src/satman/src/dma.rs +++ b/src/satman/src/dma.rs @@ -1,6 +1,6 @@ use alloc::{collections::btree_map::BTreeMap, vec::Vec}; -use libboard_artiq::pl::csr; +use libboard_artiq::{drtioaux_proto::PayloadStatus, pl::csr}; use libcortex_a9::cache::dcci_slice; const ALIGNMENT: usize = 64; @@ -50,10 +50,10 @@ impl Manager { } } - pub fn add(&mut self, id: u32, last: bool, trace: &[u8], trace_len: usize) -> Result<(), Error> { + pub fn add(&mut self, id: u32, status: PayloadStatus, trace: &[u8], trace_len: usize) -> Result<(), Error> { let entry = match self.entries.get_mut(&id) { Some(entry) => { - if entry.complete { + if entry.complete || status.is_first() { // replace entry self.entries.remove(&id); self.entries.insert( @@ -83,7 +83,7 @@ impl Manager { }; entry.trace.extend(&trace[0..trace_len]); - if last { + if status.is_last() { entry.trace.push(0); let data_len = entry.trace.len(); diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index b501cba..80d53cd 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -162,7 +162,7 @@ fn process_aux_packet( &drtioaux::Packet::SubkernelMessage { destination: destination, id: kernel_manager.get_current_id().unwrap(), - last: meta.last, + status: meta.status, length: meta.len as u16, data: data_slice, }, @@ -494,12 +494,12 @@ fn process_aux_packet( drtioaux::Packet::DmaAddTraceRequest { destination: _destination, id, - last, + status, length, trace, } => { forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); - let succeeded = dma_manager.add(id, last, &trace, length as usize).is_ok(); + let succeeded = dma_manager.add(id, status, &trace, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::DmaAddTraceReply { succeeded: succeeded }) } drtioaux::Packet::DmaRemoveTraceRequest { @@ -527,12 +527,12 @@ fn process_aux_packet( drtioaux::Packet::SubkernelAddDataRequest { destination: _destination, id, - last, + status, length, data, } => { forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); - let succeeded = kernel_manager.add(id, last, &data, length as usize).is_ok(); + let succeeded = kernel_manager.add(id, status, &data, length as usize).is_ok(); drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) } drtioaux::Packet::SubkernelLoadRunRequest { @@ -562,7 +562,7 @@ fn process_aux_packet( drtioaux::send( 0, &drtioaux::Packet::SubkernelException { - last: meta.last, + last: meta.status.is_last(), length: meta.len, data: data_slice, }, @@ -571,12 +571,12 @@ fn process_aux_packet( drtioaux::Packet::SubkernelMessage { destination, id: _id, - last, + status, length, data, } => { forward!(_routing_table, destination, *_rank, _repeaters, &packet, timer); - kernel_manager.message_handle_incoming(last, length as usize, &data); + kernel_manager.message_handle_incoming(status, length as usize, &data); drtioaux::send( 0, &drtioaux::Packet::SubkernelMessageAck { @@ -596,7 +596,7 @@ fn process_aux_packet( &drtioaux::Packet::SubkernelMessage { destination: *_rank, id: kernel_manager.get_current_id().unwrap(), - last: meta.last, + status: meta.status, length: meta.len as u16, data: data_slice, },