From 0a3bfc9a61d4eca8308e5c4ea56450bcfa5ec098 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Wed, 18 Oct 2023 11:56:38 +0800 Subject: [PATCH] subkernel: separate tags and data --- src/libksupport/src/kernel/mod.rs | 1 + src/libksupport/src/kernel/rpc.rs | 2 +- src/libksupport/src/kernel/subkernel.rs | 5 +- src/libksupport/src/rpc.rs | 71 +++++++++++++++++-------- src/runtime/src/comms.rs | 10 ++-- src/runtime/src/main.rs | 1 + src/runtime/src/subkernel.rs | 4 +- src/satman/src/subkernel.rs | 32 ++++++----- 8 files changed, 77 insertions(+), 49 deletions(-) diff --git a/src/libksupport/src/kernel/mod.rs b/src/libksupport/src/kernel/mod.rs index 00479c7..7e8b859 100644 --- a/src/libksupport/src/kernel/mod.rs +++ b/src/libksupport/src/kernel/mod.rs @@ -103,6 +103,7 @@ pub enum Message { SubkernelMsgRecvRequest { id: u32, timeout: u64, + tags: Vec, }, #[cfg(has_drtio)] SubkernelMsgRecvReply { diff --git a/src/libksupport/src/kernel/rpc.rs b/src/libksupport/src/kernel/rpc.rs index b016c59..b74feeb 100644 --- a/src/libksupport/src/kernel/rpc.rs +++ b/src/libksupport/src/kernel/rpc.rs @@ -10,7 +10,7 @@ use crate::{eh_artiq, rpc::send_args}; fn rpc_send_common(is_async: bool, service: u32, tag: &CSlice, data: *const *const ()) { let core1_tx = unsafe { KERNEL_CHANNEL_1TO0.as_mut().unwrap() }; let mut buffer = Vec::::new(); - send_args(&mut buffer, service, tag.as_ref(), data).expect("RPC encoding failed"); + send_args(&mut buffer, service, tag.as_ref(), data, true).expect("RPC encoding failed"); core1_tx.send(Message::RpcSend { is_async, data: buffer }); } diff --git a/src/libksupport/src/kernel/subkernel.rs b/src/libksupport/src/kernel/subkernel.rs index 9303227..17754c7 100644 --- a/src/libksupport/src/kernel/subkernel.rs +++ b/src/libksupport/src/kernel/subkernel.rs @@ -53,7 +53,7 @@ pub extern "C" fn await_finish(id: u32, timeout: u64) { pub extern "C" fn send_message(id: u32, count: u8, tag: &CSlice, data: *const *const ()) { let mut buffer = Vec::::new(); - send_args(&mut buffer, 0, tag.as_ref(), data).expect("RPC encoding failed"); + send_args(&mut buffer, 0, tag.as_ref(), data, false).expect("RPC encoding failed"); // overwrite service tag, include how many tags are in the message buffer[3] = count; unsafe { @@ -68,7 +68,7 @@ pub extern "C" fn send_message(id: u32, count: u8, tag: &CSlice, data: *cons } } -pub extern "C" fn await_message(id: u32, timeout: u64, min: u8, max: u8) { +pub extern "C" fn await_message(id: u32, timeout: u64, tags: &CSlice, min: u8, max: u8) { unsafe { KERNEL_CHANNEL_1TO0 .as_mut() @@ -76,6 +76,7 @@ pub extern "C" fn await_message(id: u32, timeout: u64, min: u8, max: u8) { .send(Message::SubkernelMsgRecvRequest { id: id, timeout: timeout, + tags: tags.as_ref().to_vec(), }); } match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { diff --git a/src/libksupport/src/rpc.rs b/src/libksupport/src/rpc.rs index f85fa50..20e10cf 100644 --- a/src/libksupport/src/rpc.rs +++ b/src/libksupport/src/rpc.rs @@ -175,7 +175,12 @@ where } } -pub fn recv_return(reader: &mut R, tag_bytes: &[u8], data: *mut (), alloc: &mut F) -> Result<(), Error> +pub fn recv_return<'a, F, R>( + reader: &mut R, + tag_bytes: &'a [u8], + data: *mut (), + alloc: &mut F, +) -> Result<&'a [u8], Error> where F: FnMut(usize) -> *mut (), R: Read + ?Sized, @@ -187,12 +192,22 @@ where let mut data = data; unsafe { recv_value(reader, tag, &mut data, alloc)? }; - Ok(()) + Ok(it.data) } -unsafe fn send_elements(writer: &mut W, elt_tag: Tag, length: usize, data: *const ()) -> Result<(), Error> -where W: Write + ?Sized { - writer.write_u8(elt_tag.as_u8())?; +unsafe fn send_elements( + writer: &mut W, + elt_tag: Tag, + length: usize, + data: *const (), + write_tags: bool, +) -> Result<(), Error> +where + W: Write + ?Sized, +{ + if write_tags { + writer.write_u8(elt_tag.as_u8())?; + } match elt_tag { // we cannot use NativeEndian::from_slice_i32 as the data is not mutable, // and that is not needed as the data is already in native endian @@ -211,14 +226,14 @@ where W: Write + ?Sized { _ => { let mut data = data; for _ in 0..length { - send_value(writer, elt_tag, &mut data)?; + send_value(writer, elt_tag, &mut data, write_tags)?; } } } Ok(()) } -unsafe fn send_value(writer: &mut W, tag: Tag, data: &mut *const ()) -> Result<(), Error> +unsafe fn send_value(writer: &mut W, tag: Tag, data: &mut *const (), write_tags: bool) -> Result<(), Error> where W: Write + ?Sized { macro_rules! consume_value { ($ty:ty, | $ptr:ident | $map:expr) => {{ @@ -228,7 +243,9 @@ where W: Write + ?Sized { }}; } - writer.write_u8(tag.as_u8())?; + if write_tags { + writer.write_u8(tag.as_u8())?; + } match tag { Tag::None => Ok(()), Tag::Bool => consume_value!(u8, |ptr| writer.write_u8(*ptr)), @@ -240,12 +257,14 @@ where W: Write + ?Sized { Tag::Bytes | Tag::ByteArray => consume_value!(CSlice, |ptr| writer.write_bytes((*ptr).as_ref())), Tag::Tuple(it, arity) => { let mut it = it.clone(); - writer.write_u8(arity)?; + if write_tags { + writer.write_u8(arity)?; + } let mut max_alignment = 0; for _ in 0..arity { let tag = it.next().expect("truncated tag"); max_alignment = core::cmp::max(max_alignment, tag.alignment()); - send_value(writer, tag, data)? + send_value(writer, tag, data, write_tags)? } *data = round_up_const(*data, max_alignment); Ok(()) @@ -260,11 +279,13 @@ where W: Write + ?Sized { let length = (**ptr).length as usize; writer.write_u32((*ptr).length)?; let tag = it.clone().next().expect("truncated tag"); - send_elements(writer, tag, length, (**ptr).elements) + send_elements(writer, tag, length, (**ptr).elements, write_tags) }) } Tag::Array(it, num_dims) => { - writer.write_u8(num_dims)?; + if write_tags { + writer.write_u8(num_dims)?; + } consume_value!(*const (), |buffer| { let elt_tag = it.clone().next().expect("truncated tag"); @@ -276,14 +297,14 @@ where W: Write + ?Sized { }) } let length = total_len as usize; - send_elements(writer, elt_tag, length, *buffer) + send_elements(writer, elt_tag, length, *buffer, write_tags) }) } Tag::Range(it) => { let tag = it.clone().next().expect("truncated tag"); - send_value(writer, tag, data)?; - send_value(writer, tag, data)?; - send_value(writer, tag, data)?; + send_value(writer, tag, data, write_tags)?; + send_value(writer, tag, data, write_tags)?; + send_value(writer, tag, data, write_tags)?; Ok(()) } Tag::Keyword(it) => { @@ -295,7 +316,7 @@ where W: Write + ?Sized { writer.write_string(str::from_utf8((*ptr).name.as_ref()).unwrap())?; let tag = it.clone().next().expect("truncated tag"); let mut data = ptr.offset(1) as *const (); - send_value(writer, tag, &mut data) + send_value(writer, tag, &mut data, write_tags) }) // Tag::Keyword never appears in composite types, so we don't have // to accurately advance data. @@ -310,8 +331,16 @@ where W: Write + ?Sized { } } -pub fn send_args(writer: &mut W, service: u32, tag_bytes: &[u8], data: *const *const ()) -> Result<(), Error> -where W: Write + ?Sized { +pub fn send_args( + writer: &mut W, + service: u32, + tag_bytes: &[u8], + data: *const *const (), + write_tags: bool, +) -> Result<(), Error> +where + W: Write + ?Sized, +{ let (arg_tags_bytes, return_tag_bytes) = split_tag(tag_bytes); let mut args_it = TagIterator::new(arg_tags_bytes); @@ -322,7 +351,7 @@ where W: Write + ?Sized { for index in 0.. { if let Some(arg_tag) = args_it.next() { let mut data = unsafe { *data.offset(index) }; - unsafe { send_value(writer, arg_tag, &mut data)? }; + unsafe { send_value(writer, arg_tag, &mut data, write_tags)? }; } else { break; } @@ -450,7 +479,7 @@ pub mod tag { #[derive(Debug, Clone, Copy)] pub struct TagIterator<'a> { - data: &'a [u8], + pub data: &'a [u8], } impl<'a> TagIterator<'a> { diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 0df0def..d02d974 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -5,7 +5,7 @@ use core_io::Error as IoError; use cslice::CSlice; use futures::{future::FutureExt, select_biased}; #[cfg(has_drtio)] -use io::{Cursor, ProtoRead}; +use io::Cursor; #[cfg(has_drtio)] use ksupport::rpc; use ksupport::{kernel, resolve_channel_name}; @@ -451,7 +451,7 @@ async fn handle_run_kernel( .await; } #[cfg(has_drtio)] - kernel::Message::SubkernelMsgRecvRequest { id, timeout } => { + kernel::Message::SubkernelMsgRecvRequest { id, timeout, tags } => { let message_received = subkernel::message_await(id, timeout, timer).await; let (status, count) = match message_received { Ok(ref message) => (kernel::SubkernelStatus::NoError, message.count), @@ -471,7 +471,7 @@ async fn handle_run_kernel( if let Ok(message) = message_received { // receive code almost identical to RPC recv, except we are not reading from a stream let mut reader = Cursor::new(message.data); - let mut tag: [u8; 1] = [message.tag]; + let mut current_tags: &[u8] = &tags; let mut i = 0; loop { // kernel has to consume all arguments in the whole message @@ -479,7 +479,7 @@ async fn handle_run_kernel( kernel::Message::RpcRecvRequest(slot) => slot, other => panic!("expected root value slot from core1, not {:?}", other), }; - rpc::recv_return(&mut reader, &tag, slot, &mut |size| { + let remaining_tags = rpc::recv_return(&mut reader, ¤t_tags, slot, &mut |size| { if size == 0 { 0 as *mut () } else { @@ -500,7 +500,7 @@ async fn handle_run_kernel( .await; i += 1; if i < count { - tag[0] = reader.read_u8()?; + current_tags = remaining_tags; } else { break; } diff --git a/src/runtime/src/main.rs b/src/runtime/src/main.rs index ffa1a6f..8ca61a9 100644 --- a/src/runtime/src/main.rs +++ b/src/runtime/src/main.rs @@ -62,6 +62,7 @@ mod grabber { use libasync::delay; use libboard_artiq::grabber; use libboard_zynq::time::Milliseconds; + use crate::GlobalTimer; pub async fn grabber_thread(timer: GlobalTimer) { let mut countdown = timer.countdown(); diff --git a/src/runtime/src/subkernel.rs b/src/runtime/src/subkernel.rs index 63c24af..383716a 100644 --- a/src/runtime/src/subkernel.rs +++ b/src/runtime/src/subkernel.rs @@ -212,7 +212,6 @@ pub async fn await_finish( pub struct Message { from_id: u32, pub count: u8, - pub tag: u8, pub data: Vec, } @@ -236,8 +235,7 @@ pub async fn message_handle_incoming(id: u32, last: bool, length: usize, data: & Message { from_id: id, count: data[0], - tag: data[1], - data: data[2..length].to_vec(), + data: data[1..length].to_vec(), }, ); } diff --git a/src/satman/src/subkernel.rs b/src/satman/src/subkernel.rs index 0e83359..4b38b67 100644 --- a/src/satman/src/subkernel.rs +++ b/src/satman/src/subkernel.rs @@ -6,7 +6,7 @@ use core::{cmp::min, option::NoneError, slice, str}; use core_io::{Error as IoError, Write}; use cslice::AsCSlice; -use io::{Cursor, ProtoRead, ProtoWrite}; +use io::{Cursor, ProtoWrite}; use ksupport::{eh_artiq, kernel, rpc}; use libboard_artiq::{drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, pl::csr}; @@ -14,12 +14,12 @@ use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; use libcortex_a9::sync_channel::Receiver; use log::warn; -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, PartialEq)] enum KernelState { Absent, Loaded, Running, - MsgAwait(Milliseconds), + MsgAwait(Milliseconds, Vec), MsgSending, } @@ -66,7 +66,6 @@ pub struct Sliceable { /* represents interkernel messages */ struct Message { count: u8, - tag: u8, data: Vec, } @@ -183,8 +182,7 @@ impl MessageManager { None => { self.in_buffer = Some(Message { count: data[0], - tag: data[1], - data: data[2..length].to_vec(), + data: data[1..length].to_vec(), }); } }; @@ -509,9 +507,9 @@ impl<'a> Manager<'_> { self.session.messages.accept_outgoing(data)?; self.session.kernel_state = KernelState::MsgSending; } - kernel::Message::SubkernelMsgRecvRequest { id: _, timeout } => { + kernel::Message::SubkernelMsgRecvRequest { id: _, timeout, tags } => { let max_time = timer.get_time() + Milliseconds(timeout); - self.session.kernel_state = KernelState::MsgAwait(max_time); + self.session.kernel_state = KernelState::MsgAwait(max_time, tags); } kernel::Message::UpDestinationsRequest(destination) => { self.control @@ -526,9 +524,9 @@ impl<'a> Manager<'_> { } fn process_external_messages(&mut self, timer: GlobalTimer) -> Result<(), Error> { - match self.session.kernel_state { - KernelState::MsgAwait(timeout) => { - if timer.get_time() > timeout { + match &self.session.kernel_state { + KernelState::MsgAwait(timeout, tags) => { + if timer.get_time() > *timeout { self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { status: kernel::SubkernelStatus::Timeout, count: 0, @@ -541,8 +539,9 @@ impl<'a> Manager<'_> { status: kernel::SubkernelStatus::NoError, count: message.count, }); + let tags = tags.clone(); self.session.kernel_state = KernelState::Running; - self.pass_message_to_kernel(&message, timer) + self.pass_message_to_kernel(&message, tags, timer) } else { Err(Error::AwaitingMessage) } @@ -560,9 +559,9 @@ impl<'a> Manager<'_> { } } - fn pass_message_to_kernel(&mut self, message: &Message, timer: GlobalTimer) -> Result<(), Error> { + fn pass_message_to_kernel(&mut self, message: &Message, tags: Vec, timer: GlobalTimer) -> Result<(), Error> { let mut reader = Cursor::new(&message.data); - let mut tag: [u8; 1] = [message.tag]; + let mut current_tags: &[u8] = &tags; let mut i = message.count; loop { let slot = match recv_w_timeout(&mut self.control.rx, timer, 100)? { @@ -571,7 +570,7 @@ impl<'a> Manager<'_> { }; let mut exception: Option = None; let mut unexpected: Option = None; - rpc::recv_return(&mut reader, &tag, slot, &mut |size| { + let remaining_tags = rpc::recv_return(&mut reader, current_tags, slot, &mut |size| { if size == 0 { 0 as *mut () } else { @@ -610,8 +609,7 @@ impl<'a> Manager<'_> { if i == 0 { break; } else { - // update the tag for next read - tag[0] = reader.read_u8()?; + current_tags = remaining_tags; } } Ok(()) -- 2.44.2