From 6785ca2c85e7ff67212bf11db9b60e9466395090 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Thu, 31 Aug 2023 17:36:01 +0800 Subject: [PATCH] subkernel: port master support --- src/libboard_artiq/src/drtioaux_proto.rs | 201 +++++++++++++- src/runtime/src/comms.rs | 179 +++++++++++++ src/runtime/src/eh_artiq.rs | 3 +- src/runtime/src/kernel/api.rs | 12 + src/runtime/src/kernel/mod.rs | 45 ++++ src/runtime/src/kernel/subkernel.rs | 94 +++++++ src/runtime/src/main.rs | 2 + src/runtime/src/rpc.rs | 173 +++++++++++- src/runtime/src/rtio_mgt.rs | 241 ++++++++++++++--- src/runtime/src/subkernel.rs | 321 +++++++++++++++++++++++ src/satman/src/analyzer.rs | 6 +- src/satman/src/main.rs | 4 +- 12 files changed, 1226 insertions(+), 55 deletions(-) create mode 100644 src/runtime/src/kernel/subkernel.rs create mode 100644 src/runtime/src/subkernel.rs diff --git a/src/libboard_artiq/src/drtioaux_proto.rs b/src/libboard_artiq/src/drtioaux_proto.rs index 576a0aac..c8b4221f 100644 --- a/src/libboard_artiq/src/drtioaux_proto.rs +++ b/src/libboard_artiq/src/drtioaux_proto.rs @@ -1,8 +1,11 @@ use core_io::{Error as IoError, Read, Write}; use io::proto::{ProtoRead, ProtoWrite}; -pub const DMA_TRACE_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*trace ID*/4 - /*last*/1 -/*length*/2; -pub const ANALYZER_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*last*/1 - /*length*/2; +// maximum size of arbitrary payloads +// used by satellite -> master analyzer, subkernel exceptions +pub const SAT_PAYLOAD_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*last*/1 - /*length*/2; +// used by DDMA, subkernel program data (need to provide extra ID and destination) +pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*destination*/1 - /*ID*/4; #[derive(Debug)] pub enum Error { @@ -150,7 +153,7 @@ pub enum Packet { AnalyzerData { last: bool, length: u16, - data: [u8; ANALYZER_MAX_SIZE], + data: [u8; SAT_PAYLOAD_MAX_SIZE], }, DmaAddTraceRequest { @@ -158,7 +161,7 @@ pub enum Packet { id: u32, last: bool, length: u16, - trace: [u8; DMA_TRACE_MAX_SIZE], + trace: [u8; MASTER_PAYLOAD_MAX_SIZE], }, DmaAddTraceReply { succeeded: bool, @@ -185,6 +188,53 @@ pub enum Packet { channel: u32, timestamp: u64, }, + + SubkernelAddDataRequest { + destination: u8, + id: u32, + last: bool, + length: u16, + data: [u8; MASTER_PAYLOAD_MAX_SIZE], + }, + SubkernelAddDataReply { + succeeded: bool, + }, + SubkernelLoadRunRequest { + destination: u8, + id: u32, + run: bool, + }, + SubkernelLoadRunReply { + succeeded: bool, + }, + SubkernelStopRequest { + destination: u8, + }, + SubkernelStopReply { + succeeded: bool, + }, + SubkernelFinished { + id: u32, + with_exception: bool, + }, + SubkernelExceptionRequest { + destination: u8, + }, + SubkernelException { + last: bool, + length: u16, + data: [u8; SAT_PAYLOAD_MAX_SIZE], + }, + SubkernelMessage { + destination: u8, + id: u32, + last: bool, + length: u16, + data: [u8; MASTER_PAYLOAD_MAX_SIZE], + }, + SubkernelMessageAck { + destination: u8, + }, } impl Packet { @@ -329,7 +379,7 @@ impl Packet { 0xa3 => { let last = reader.read_bool()?; let length = reader.read_u16()?; - let mut data: [u8; ANALYZER_MAX_SIZE] = [0; ANALYZER_MAX_SIZE]; + let mut data: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut data[0..length as usize])?; Packet::AnalyzerData { last: last, @@ -343,7 +393,7 @@ impl Packet { let id = reader.read_u32()?; let last = reader.read_bool()?; let length = reader.read_u16()?; - let mut trace: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE]; + let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; reader.read_exact(&mut trace[0..length as usize])?; Packet::DmaAddTraceRequest { destination: destination, @@ -379,6 +429,75 @@ impl Packet { timestamp: reader.read_u64()?, }, + 0xc0 => { + let destination = reader.read_u8()?; + let id = reader.read_u32()?; + let last = reader.read_bool()?; + let length = reader.read_u16()?; + let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + reader.read_exact(&mut data[0..length as usize])?; + Packet::SubkernelAddDataRequest { + destination: destination, + id: id, + last: last, + length: length as u16, + data: data, + } + } + 0xc1 => Packet::SubkernelAddDataReply { + succeeded: reader.read_bool()?, + }, + 0xc4 => Packet::SubkernelLoadRunRequest { + destination: reader.read_u8()?, + id: reader.read_u32()?, + run: reader.read_bool()?, + }, + 0xc5 => Packet::SubkernelLoadRunReply { + succeeded: reader.read_bool()?, + }, + 0xc6 => Packet::SubkernelStopRequest { + destination: reader.read_u8()?, + }, + 0xc7 => Packet::SubkernelStopReply { + succeeded: reader.read_bool()?, + }, + 0xc8 => Packet::SubkernelFinished { + id: reader.read_u32()?, + with_exception: reader.read_bool()?, + }, + 0xc9 => Packet::SubkernelExceptionRequest { + destination: reader.read_u8()?, + }, + 0xca => { + let last = reader.read_bool()?; + let length = reader.read_u16()?; + let mut data: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; + reader.read_exact(&mut data[0..length as usize])?; + Packet::SubkernelException { + last: last, + length: length, + data: data, + } + } + 0xcb => { + let destination = reader.read_u8()?; + let id = reader.read_u32()?; + let last = reader.read_bool()?; + let length = reader.read_u16()?; + let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + reader.read_exact(&mut data[0..length as usize])?; + Packet::SubkernelMessage { + destination: destination, + id: id, + last: last, + length: length as u16, + data: data, + } + } + 0xcc => Packet::SubkernelMessageAck { + destination: reader.read_u8()?, + }, + ty => return Err(Error::UnknownPacket(ty)), }) } @@ -648,6 +767,76 @@ impl Packet { writer.write_u32(channel)?; writer.write_u64(timestamp)?; } + + Packet::SubkernelAddDataRequest { + destination, + id, + last, + data, + length, + } => { + writer.write_u8(0xc0)?; + writer.write_u8(destination)?; + writer.write_u32(id)?; + writer.write_bool(last)?; + writer.write_u16(length)?; + writer.write_all(&data[0..length as usize])?; + } + Packet::SubkernelAddDataReply { succeeded } => { + writer.write_u8(0xc1)?; + writer.write_bool(succeeded)?; + } + Packet::SubkernelLoadRunRequest { destination, id, run } => { + writer.write_u8(0xc4)?; + writer.write_u8(destination)?; + writer.write_u32(id)?; + writer.write_bool(run)?; + } + Packet::SubkernelLoadRunReply { succeeded } => { + writer.write_u8(0xc5)?; + writer.write_bool(succeeded)?; + } + Packet::SubkernelStopRequest { destination } => { + writer.write_u8(0xc6)?; + writer.write_u8(destination)?; + } + Packet::SubkernelStopReply { succeeded } => { + writer.write_u8(0xc7)?; + writer.write_bool(succeeded)?; + } + Packet::SubkernelFinished { id, with_exception } => { + writer.write_u8(0xc8)?; + writer.write_u32(id)?; + writer.write_bool(with_exception)?; + } + Packet::SubkernelExceptionRequest { destination } => { + writer.write_u8(0xc9)?; + writer.write_u8(destination)?; + } + Packet::SubkernelException { last, length, data } => { + writer.write_u8(0xca)?; + writer.write_bool(last)?; + writer.write_u16(length)?; + writer.write_all(&data[0..length as usize])?; + } + Packet::SubkernelMessage { + destination, + id, + last, + data, + length, + } => { + writer.write_u8(0xcb)?; + writer.write_u8(destination)?; + writer.write_u32(id)?; + writer.write_bool(last)?; + writer.write_u16(length)?; + writer.write_all(&data[0..length as usize])?; + } + Packet::SubkernelMessageAck { destination } => { + writer.write_u8(0xcc)?; + writer.write_u8(destination)?; + } } Ok(()) } diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 6475e147..cdcb4e9e 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -1,8 +1,11 @@ use alloc::{collections::BTreeMap, rc::Rc, string::String, vec, vec::Vec}; use core::{cell::RefCell, fmt, slice, str}; +use core_io::Error as IoError; use cslice::CSlice; use futures::{future::FutureExt, select_biased}; +#[cfg(has_drtio)] +use io::{Cursor, ProtoRead}; use libasync::{smoltcp::{Sockets, TcpStream}, task}; use libboard_artiq::drtio_routing; @@ -28,13 +31,18 @@ use crate::{analyzer, kernel, mgmt, moninj, proto_async::*, rpc, rtio_dma, rtio_mgt::{self, resolve_channel_name}}; +#[cfg(has_drtio)] +use crate::{subkernel, subkernel::Error as SubkernelError}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Error { NetworkError(smoltcp::Error), + IoError, UnexpectedPattern, UnrecognizedPacket, BufferExhausted, + #[cfg(has_drtio)] + SubkernelError(subkernel::Error), } pub type Result = core::result::Result; @@ -43,9 +51,12 @@ impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { Error::NetworkError(error) => write!(f, "network error: {}", error), + Error::IoError => write!(f, "io error"), Error::UnexpectedPattern => write!(f, "unexpected pattern"), Error::UnrecognizedPacket => write!(f, "unrecognized packet"), Error::BufferExhausted => write!(f, "buffer exhausted"), + #[cfg(has_drtio)] + Error::SubkernelError(error) => write!(f, "subkernel error: {:?}", error), } } } @@ -56,6 +67,19 @@ impl From for Error { } } +impl From for Error { + fn from(_error: IoError) -> Self { + Error::IoError + } +} + +#[cfg(has_drtio)] +impl From for Error { + fn from(error: subkernel::Error) -> Self { + Error::SubkernelError(error) + } +} + #[derive(Debug, FromPrimitive, ToPrimitive)] enum Request { SystemInfo = 3, @@ -63,6 +87,7 @@ enum Request { RunKernel = 6, RPCReply = 7, RPCException = 8, + UploadSubkernel = 9, } #[derive(Debug, FromPrimitive, ToPrimitive)] @@ -162,6 +187,22 @@ async fn handle_run_kernel( ) -> Result<()> { control.borrow_mut().tx.async_send(kernel::Message::StartRequest).await; loop { + #[cfg(has_drtio)] + while let Some(subkernel_finished) = + subkernel::get_finished_with_exception(aux_mutex, routing_table, timer).await? + { + if subkernel_finished.status == subkernel::FinishStatus::CommLost { + error!( + "Communication with satellite lost while subkernel {} was running", + subkernel_finished.id + ); + } + if let Some(exception) = subkernel_finished.exception { + if let Some(stream) = stream { + write_chunk(stream, &exception).await?; + } + } + } let reply = control.borrow_mut().rx.async_recv().await; match reply { kernel::Message::RpcSend { is_async, data } => { @@ -365,6 +406,117 @@ async fn handle_run_kernel( control.borrow_mut().tx.async_send(reply).await; } #[cfg(has_drtio)] + kernel::Message::SubkernelLoadRunRequest { id, run } => { + let succeeded = match subkernel::load(aux_mutex, routing_table, timer, id, run).await { + Ok(()) => true, + Err(e) => { + error!("Error loading subkernel: {:?}", e); + false + } + }; + control + .borrow_mut() + .tx + .async_send(kernel::Message::SubkernelLoadRunReply { succeeded: succeeded }) + .await; + } + #[cfg(has_drtio)] + kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => { + let res = subkernel::await_finish(aux_mutex, routing_table, timer, id, timeout).await; + let status = match res { + Ok(ref res) => { + if res.status == subkernel::FinishStatus::CommLost { + kernel::SubkernelStatus::CommLost + } else if let Some(exception) = &res.exception { + error!("Exception in subkernel"); + match stream { + None => (), + Some(stream) => { + write_chunk(stream, exception).await?; + } + } + // will not be called after exception is served + kernel::SubkernelStatus::OtherError + } else { + kernel::SubkernelStatus::NoError + } + } + Err(SubkernelError::Timeout) => kernel::SubkernelStatus::Timeout, + Err(SubkernelError::IncorrectState) => kernel::SubkernelStatus::IncorrectState, + Err(_) => kernel::SubkernelStatus::OtherError, + }; + control + .borrow_mut() + .tx + .async_send(kernel::Message::SubkernelAwaitFinishReply { status: status }) + .await; + } + #[cfg(has_drtio)] + kernel::Message::SubkernelMsgSend { id, data } => { + let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await; + match res { + Ok(_) => (), + Err(e) => { + error!("error sending subkernel message: {:?}", e) + } + }; + } + #[cfg(has_drtio)] + kernel::Message::SubkernelMsgRecvRequest { id, timeout } => { + let message_received = subkernel::message_await(id, timeout, timer).await; + let status = match message_received { + Ok(_) => kernel::SubkernelStatus::NoError, + Err(SubkernelError::Timeout) => kernel::SubkernelStatus::Timeout, + Err(SubkernelError::IncorrectState) => kernel::SubkernelStatus::IncorrectState, + Err(SubkernelError::CommLost) => kernel::SubkernelStatus::CommLost, + Err(_) => kernel::SubkernelStatus::OtherError, + }; + control + .borrow_mut() + .tx + .async_send(kernel::Message::SubkernelMsgRecvReply { status: status }) + .await; + if let Ok((tag, data)) = message_received { + // receive code almost identical to RPC recv, except we are not reading from a stream + let mut reader = Cursor::new(data); + let mut tag: [u8; 1] = [tag]; + loop { + // kernel has to consume all arguments in the whole message + let slot = match fast_recv(&mut control.borrow_mut().rx).await { + kernel::Message::RpcRecvRequest(slot) => slot, + other => panic!("expected root value slot from core1, not {:?}", other), + }; + rpc::recv_return_cursor(&mut reader, &tag, slot, &|size| { + let control = control.clone(); + async move { + if size == 0 { + 0 as *mut () + } else { + let mut control = control.borrow_mut(); + fast_send(&mut control.tx, kernel::Message::RpcRecvReply(Ok(size))).await; + match fast_recv(&mut control.rx).await { + kernel::Message::RpcRecvRequest(slot) => slot, + other => { + panic!("expected nested value slot from kernel CPU, not {:?}", other) + } + } + } + } + }) + .await?; + control + .borrow_mut() + .tx + .async_send(kernel::Message::RpcRecvReply(Ok(0))) + .await; + match reader.read_u8() { + Ok(0) | Err(_) => break, // reached the end of data, we're done + Ok(t) => tag[0] = t, // update the tag for next read + }; + } + } + } + #[cfg(has_drtio)] kernel::Message::UpDestinationsRequest(destination) => { let result = _up_destinations.borrow()[destination as usize]; control @@ -434,9 +586,13 @@ async fn handle_connection( return Err(Error::UnexpectedPattern); } stream.send_slice("e".as_bytes()).await?; + #[cfg(has_drtio)] + subkernel::clear_subkernels().await; loop { let request = read_request(stream, true).await?; if request.is_none() { + #[cfg(has_drtio)] + subkernel::clear_subkernels().await; return Ok(()); } let request = request.unwrap(); @@ -460,6 +616,29 @@ async fn handle_connection( ) .await?; } + Request::UploadSubkernel => { + #[cfg(has_drtio)] + { + let id = read_i32(stream).await? as u32; + let destination = read_i8(stream).await? as u8; + let buffer = read_bytes(stream, 1024 * 1024).await?; + subkernel::add_subkernel(id, destination, buffer).await; + match subkernel::upload(aux_mutex, routing_table, timer, id).await { + Ok(_) => write_header(stream, Reply::LoadCompleted).await?, + Err(_) => { + write_header(stream, Reply::LoadFailed).await?; + write_chunk(stream, b"subkernel failed to load").await?; + return Err(Error::UnexpectedPattern); + } + } + } + #[cfg(not(has_drtio))] + { + write_header(stream, Reply::LoadFailed).await?; + write_chunk(stream, b"No DRTIO on this system, subkernels are not supported").await?; + return Err(Error::UnexpectedPattern); + } + } _ => { error!("unexpected request from host: {:?}", request); return Err(Error::UnrecognizedPacket); diff --git a/src/runtime/src/eh_artiq.rs b/src/runtime/src/eh_artiq.rs index 5db3aad1..6f159ac6 100644 --- a/src/runtime/src/eh_artiq.rs +++ b/src/runtime/src/eh_artiq.rs @@ -422,7 +422,7 @@ extern "C" fn stop_fn( } // Must be kept in sync with preallocate_runtime_exception_names() in artiq/language/embedding_map.py -static EXCEPTION_ID_LOOKUP: [(&str, u32); 11] = [ +static EXCEPTION_ID_LOOKUP: [(&str, u32); 12] = [ ("RuntimeError", 0), ("RTIOUnderflow", 1), ("RTIOOverflow", 2), @@ -434,6 +434,7 @@ static EXCEPTION_ID_LOOKUP: [(&str, u32); 11] = [ ("ZeroDivisionError", 8), ("IndexError", 9), ("UnwrapNoneError", 10), + ("SubkernelError", 11), ]; pub fn get_exception_id(name: &str) -> u32 { diff --git a/src/runtime/src/kernel/api.rs b/src/runtime/src/kernel/api.rs index ce4587a5..87b57aee 100644 --- a/src/runtime/src/kernel/api.rs +++ b/src/runtime/src/kernel/api.rs @@ -9,6 +9,8 @@ use super::{cache, core1::rtio_get_destination_status, dma, rpc::{rpc_recv, rpc_send, rpc_send_async}}; +#[cfg(has_drtio)] +use super::subkernel; use crate::{eh_artiq, i2c, rtio}; extern "C" { @@ -114,6 +116,16 @@ pub fn resolve(required: &[u8]) -> Option { api!(i2c_read = i2c::read), api!(i2c_switch_select = i2c::switch_select), + // subkernel + #[cfg(has_drtio)] + api!(subkernel_load_run = subkernel::load_run), + #[cfg(has_drtio)] + api!(subkernel_await_finish = subkernel::await_finish), + #[cfg(has_drtio)] + api!(subkernel_send_message = subkernel::send_message), + #[cfg(has_drtio)] + api!(subkernel_await_message = subkernel::await_message), + // Double-precision floating-point arithmetic helper functions // RTABI chapter 4.1.2, Table 2 api!(__aeabi_dadd), diff --git a/src/runtime/src/kernel/mod.rs b/src/runtime/src/kernel/mod.rs index 864ac017..592d5d9a 100644 --- a/src/runtime/src/kernel/mod.rs +++ b/src/runtime/src/kernel/mod.rs @@ -13,6 +13,8 @@ mod dma; mod rpc; pub use dma::DmaRecorder; mod cache; +#[cfg(has_drtio)] +mod subkernel; #[derive(Debug, Clone)] pub struct RPCException { @@ -25,6 +27,16 @@ pub struct RPCException { pub function: u32, } +#[cfg(has_drtio)] +#[derive(Debug, Clone)] +pub enum SubkernelStatus { + NoError, + Timeout, + IncorrectState, + CommLost, + OtherError, +} + #[derive(Debug, Clone)] pub enum Message { LoadRequest(Vec), @@ -72,6 +84,39 @@ pub enum Message { UpDestinationsRequest(i32), #[cfg(has_drtio)] UpDestinationsReply(bool), + + #[cfg(has_drtio)] + SubkernelLoadRunRequest { + id: u32, + run: bool, + }, + #[cfg(has_drtio)] + SubkernelLoadRunReply { + succeeded: bool, + }, + #[cfg(has_drtio)] + SubkernelAwaitFinishRequest { + id: u32, + timeout: u64, + }, + #[cfg(has_drtio)] + SubkernelAwaitFinishReply { + status: SubkernelStatus, + }, + #[cfg(has_drtio)] + SubkernelMsgSend { + id: u32, + data: Vec, + }, + #[cfg(has_drtio)] + SubkernelMsgRecvRequest { + id: u32, + timeout: u64, + }, + #[cfg(has_drtio)] + SubkernelMsgRecvReply { + status: SubkernelStatus, + }, } static CHANNEL_0TO1: Mutex>> = Mutex::new(None); diff --git a/src/runtime/src/kernel/subkernel.rs b/src/runtime/src/kernel/subkernel.rs new file mode 100644 index 00000000..4d7b528b --- /dev/null +++ b/src/runtime/src/kernel/subkernel.rs @@ -0,0 +1,94 @@ +use alloc::vec::Vec; + +use cslice::CSlice; + +use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0}; +use crate::{artiq_raise, rpc::send_args}; + +pub extern "C" fn load_run(id: u32, run: bool) { + unsafe { + KERNEL_CHANNEL_1TO0 + .as_mut() + .unwrap() + .send(Message::SubkernelLoadRunRequest { id: id, run: run }); + } + match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { + Message::SubkernelLoadRunReply { succeeded: true } => (), + Message::SubkernelLoadRunReply { succeeded: false } => { + artiq_raise!("SubkernelError", "Error loading or running the subkernel") + } + _ => panic!("Expected SubkernelLoadRunReply after SubkernelLoadRunRequest!"), + } +} + +pub extern "C" fn await_finish(id: u32, timeout: u64) { + unsafe { + KERNEL_CHANNEL_1TO0 + .as_mut() + .unwrap() + .send(Message::SubkernelAwaitFinishRequest { + id: id, + timeout: timeout, + }); + } + match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { + Message::SubkernelAwaitFinishReply { + status: SubkernelStatus::NoError, + } => (), + Message::SubkernelAwaitFinishReply { + status: SubkernelStatus::IncorrectState, + } => artiq_raise!("SubkernelError", "Subkernel not running"), + Message::SubkernelAwaitFinishReply { + status: SubkernelStatus::Timeout, + } => artiq_raise!("SubkernelError", "Subkernel timed out"), + Message::SubkernelAwaitFinishReply { + status: SubkernelStatus::CommLost, + } => artiq_raise!("SubkernelError", "Lost communication with satellite"), + Message::SubkernelAwaitFinishReply { + status: SubkernelStatus::OtherError, + } => artiq_raise!("SubkernelError", "An error occurred during subkernel operation"), + _ => panic!("expected SubkernelAwaitFinishReply after SubkernelAwaitFinishRequest"), + } +} + +pub extern "C" fn send_message(id: u32, tag: &CSlice, data: *const *const ()) { + let mut buffer = Vec::::new(); + send_args(&mut buffer, 0, tag.as_ref(), data).expect("RPC encoding failed"); + unsafe { + KERNEL_CHANNEL_1TO0.as_mut().unwrap().send(Message::SubkernelMsgSend { + id: id, + data: buffer[4..].to_vec(), + }); + } +} + +pub extern "C" fn await_message(id: u32, timeout: u64) { + unsafe { + KERNEL_CHANNEL_1TO0 + .as_mut() + .unwrap() + .send(Message::SubkernelMsgRecvRequest { + id: id, + timeout: timeout, + }); + } + match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() { + Message::SubkernelMsgRecvReply { + status: SubkernelStatus::NoError, + } => (), + Message::SubkernelMsgRecvReply { + status: SubkernelStatus::IncorrectState, + } => artiq_raise!("SubkernelError", "Subkernel not running"), + Message::SubkernelMsgRecvReply { + status: SubkernelStatus::Timeout, + } => artiq_raise!("SubkernelError", "Subkernel timed out"), + Message::SubkernelMsgRecvReply { + status: SubkernelStatus::CommLost, + } => artiq_raise!("SubkernelError", "Lost communication with satellite"), + Message::SubkernelMsgRecvReply { + status: SubkernelStatus::OtherError, + } => artiq_raise!("SubkernelError", "An error occurred during subkernel operation"), + _ => panic!("expected SubkernelMsgRecvReply after SubkernelMsgRecvRequest"), + } + // RpcRecvRequest should be called after this to receive message data +} diff --git a/src/runtime/src/main.rs b/src/runtime/src/main.rs index 2c5910eb..826b76a0 100644 --- a/src/runtime/src/main.rs +++ b/src/runtime/src/main.rs @@ -51,6 +51,8 @@ mod rtio; mod rtio_clocking; mod rtio_dma; mod rtio_mgt; +#[cfg(has_drtio)] +mod subkernel; static mut SEEN_ASYNC_ERRORS: u8 = 0; diff --git a/src/runtime/src/rpc.rs b/src/runtime/src/rpc.rs index 3db24772..6ca5d02a 100644 --- a/src/runtime/src/rpc.rs +++ b/src/runtime/src/rpc.rs @@ -1,11 +1,15 @@ use alloc::boxed::Box; +#[cfg(has_drtio)] +use alloc::vec::Vec; use core::{future::Future, str}; use async_recursion::async_recursion; use byteorder::{ByteOrder, NativeEndian}; use core_io::{Error, Write}; use cslice::{CMutSlice, CSlice}; -use io::proto::ProtoWrite; +#[cfg(has_drtio)] +use io::{Cursor, ProtoRead}; +use io::ProtoWrite; use libasync::smoltcp::TcpStream; use libboard_zynq::smoltcp; use log::trace; @@ -224,6 +228,173 @@ where Ok(()) } +// versions for Cursor rather than TcpStream +// they will be made into sync for satellite subkernels later +#[cfg(has_drtio)] +#[async_recursion(?Send)] +async unsafe fn recv_elements_cursor( + cursor: &mut Cursor>, + elt_tag: Tag<'async_recursion>, + length: usize, + storage: *mut (), + alloc: &(impl Fn(usize) -> F + 'async_recursion), +) -> Result<(), Error> +where + F: Future, +{ + match elt_tag { + Tag::Bool => { + let dest = core::slice::from_raw_parts_mut(storage as *mut u8, length); + cursor.read_exact(dest)?; + } + Tag::Int32 => { + let ptr = storage as *mut u32; + let dest = core::slice::from_raw_parts_mut(ptr as *mut u8, length * 4); + cursor.read_exact(dest)?; + drop(dest); + let dest = core::slice::from_raw_parts_mut(ptr, length); + NativeEndian::from_slice_u32(dest); + } + Tag::Int64 | Tag::Float64 => { + let ptr = storage as *mut u64; + let dest = core::slice::from_raw_parts_mut(ptr as *mut u8, length * 8); + cursor.read_exact(dest)?; + drop(dest); + let dest = core::slice::from_raw_parts_mut(ptr, length); + NativeEndian::from_slice_u64(dest); + } + _ => { + let mut data = storage; + for _ in 0..length { + recv_value_cursor(cursor, elt_tag, &mut data, alloc).await? + } + } + } + Ok(()) +} + +#[cfg(has_drtio)] +#[async_recursion(?Send)] +async unsafe fn recv_value_cursor( + cursor: &mut Cursor>, + tag: Tag<'async_recursion>, + data: &mut *mut (), + alloc: &(impl Fn(usize) -> F + 'async_recursion), +) -> Result<(), Error> +where + F: Future, +{ + macro_rules! consume_value { + ($ty:ty, | $ptr:ident | $map:expr) => {{ + let $ptr = align_ptr_mut::<$ty>(*data); + *data = $ptr.offset(1) as *mut (); + $map + }}; + } + + match tag { + Tag::None => Ok(()), + Tag::Bool => consume_value!(i8, |ptr| { + *ptr = cursor.read_u8()? as i8; + Ok(()) + }), + Tag::Int32 => consume_value!(i32, |ptr| { + *ptr = cursor.read_u32()? as i32; + Ok(()) + }), + Tag::Int64 | Tag::Float64 => consume_value!(i64, |ptr| { + *ptr = cursor.read_u64()? as i64; + Ok(()) + }), + Tag::String | Tag::Bytes | Tag::ByteArray => { + consume_value!(CMutSlice, |ptr| { + let length = cursor.read_u32()? as usize; + *ptr = CMutSlice::new(alloc(length).await as *mut u8, length); + cursor.read_exact((*ptr).as_mut())?; + Ok(()) + }) + } + Tag::Tuple(it, arity) => { + let alignment = tag.alignment(); + *data = round_up_mut(*data, alignment); + let mut it = it.clone(); + for _ in 0..arity { + let tag = it.next().expect("truncated tag"); + recv_value_cursor(cursor, tag, data, alloc).await? + } + *data = round_up_mut(*data, alignment); + Ok(()) + } + Tag::List(it) => { + #[repr(C)] + struct List { + elements: *mut (), + length: usize, + } + consume_value!(*mut List, |ptr_to_list| { + let tag = it.clone().next().expect("truncated tag"); + let length = cursor.read_u32()? as usize; + + let list_size = 4 + 4; + let storage_offset = round_up(list_size, tag.alignment()); + let storage_size = tag.size() * length; + + let allocation = alloc(storage_offset + storage_size).await as *mut u8; + *ptr_to_list = allocation as *mut List; + let storage = allocation.offset(storage_offset as isize) as *mut (); + + (**ptr_to_list).length = length; + (**ptr_to_list).elements = storage; + recv_elements_cursor(cursor, tag, length, storage, alloc).await + }) + } + Tag::Array(it, num_dims) => { + consume_value!(*mut (), |buffer| { + let mut total_len: usize = 1; + for _ in 0..num_dims { + let len = cursor.read_u32()? as usize; + total_len *= len; + consume_value!(usize, |ptr| *ptr = len) + } + + let elt_tag = it.clone().next().expect("truncated tag"); + *buffer = alloc(elt_tag.size() * total_len).await; + recv_elements_cursor(cursor, elt_tag, total_len, *buffer, alloc).await + }) + } + Tag::Range(it) => { + *data = round_up_mut(*data, tag.alignment()); + let tag = it.clone().next().expect("truncated tag"); + recv_value_cursor(cursor, tag, data, alloc).await?; + recv_value_cursor(cursor, tag, data, alloc).await?; + recv_value_cursor(cursor, tag, data, alloc).await?; + Ok(()) + } + Tag::Keyword(_) => unreachable!(), + Tag::Object => unreachable!(), + } +} + +#[cfg(has_drtio)] +pub async fn recv_return_cursor( + cursor: &mut Cursor>, + tag_bytes: &[u8], + data: *mut (), + alloc: &impl Fn(usize) -> F, +) -> Result<(), Error> +where + F: Future, +{ + let mut it = TagIterator::new(tag_bytes); + trace!("recv ...->{}", it); + + let tag = it.next().expect("truncated tag"); + let mut data = data; + unsafe { recv_value_cursor(cursor, tag, &mut data, alloc).await? }; + + Ok(()) +} + unsafe fn send_elements(writer: &mut W, elt_tag: Tag, length: usize, data: *const ()) -> Result<(), Error> where W: Write + ?Sized { writer.write_u8(elt_tag.as_u8())?; diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 63390e93..75d69f63 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -16,12 +16,13 @@ pub mod drtio { use embedded_hal::blocking::delay::DelayMs; use libasync::{delay, task}; - use libboard_artiq::{drtioaux::Error, drtioaux_async, drtioaux_async::Packet, drtioaux_proto::DMA_TRACE_MAX_SIZE}; + use libboard_artiq::{drtioaux::Error, drtioaux_async, drtioaux_async::Packet, + drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE}; use libboard_zynq::time::Milliseconds; use log::{error, info, warn}; use super::*; - use crate::{analyzer::remote_analyzer::RemoteBuffer, rtio_dma::remote_dma, ASYNC_ERROR_BUSY, + use crate::{analyzer::remote_analyzer::RemoteBuffer, rtio_dma::remote_dma, subkernel, ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, SEEN_ASYNC_ERRORS}; pub fn startup( @@ -44,7 +45,7 @@ pub mod drtio { unsafe { (csr::DRTIO[linkno].rx_up_read)() == 1 } } - async fn process_async_packets(packet: Packet) -> Option { + async fn process_async_packets(linkno: u8, packet: Packet) -> Option { // returns None if an async packet has been consumed match packet { Packet::DmaPlaybackStatus { @@ -57,6 +58,24 @@ pub mod drtio { remote_dma::playback_done(id, destination, error, channel, timestamp).await; None } + Packet::SubkernelFinished { id, with_exception } => { + subkernel::subkernel_finished(id, with_exception).await; + None + } + Packet::SubkernelMessage { + id, + destination: from, + last, + length, + data, + } => { + subkernel::message_handle_incoming(id, last, length as usize, &data).await; + // acknowledge receiving part of the message + drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: from }) + .await + .unwrap(); + None + } other => Some(other), } } @@ -193,7 +212,7 @@ pub mod drtio { let _lock = aux_mutex.async_lock().await; match drtioaux_async::recv(linkno).await { Ok(Some(packet)) => { - if let Some(packet) = process_async_packets(packet).await { + if let Some(packet) = process_async_packets(linkno, packet).await { warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet); } } @@ -286,6 +305,14 @@ pub mod drtio { false, ) .await; + subkernel::destination_changed( + aux_mutex, + routing_table, + timer, + destination, + false, + ) + .await; } Ok(Packet::DestinationOkReply) => (), Ok(Packet::DestinationSequenceErrorReply { channel }) => { @@ -328,6 +355,7 @@ pub mod drtio { } else { destination_set_up(routing_table, up_destinations, destination, false).await; remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false).await; + subkernel::destination_changed(aux_mutex, routing_table, timer, destination, false).await; } } else { if up_links[linkno as usize] { @@ -347,6 +375,8 @@ pub mod drtio { init_buffer_space(destination as u8, linkno).await; remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, true) .await; + subkernel::destination_changed(aux_mutex, routing_table, timer, destination, true) + .await; } Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), Err(e) => error!("[DEST#{}] communication failed ({})", destination, e), @@ -433,6 +463,36 @@ pub mod drtio { } } + async fn partition_data( + linkno: u8, + aux_mutex: &Rc>, + timer: GlobalTimer, + data: &[u8], + packet_f: PacketF, + reply_handler_f: HandlerF, + ) -> Result<(), &'static str> + where + PacketF: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], bool, usize) -> Packet, + HandlerF: Fn(&Packet) -> Result<(), &'static str>, + { + let mut i = 0; + while i < data.len() { + let mut slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + let len: usize = if i + MASTER_PAYLOAD_MAX_SIZE < data.len() { + MASTER_PAYLOAD_MAX_SIZE + } else { + data.len() - i + } as usize; + let last = i + len == data.len(); + slice[..len].clone_from_slice(&data[i..i + len]); + i += len; + let packet = packet_f(&slice, last, len); + let reply = aux_transact(aux_mutex, linkno, &packet, timer).await?; + reply_handler_f(&reply)?; + } + Ok(()) + } + pub async fn ddma_upload_trace( aux_mutex: &Rc>, routing_table: &drtio_routing::RoutingTable, @@ -442,44 +502,25 @@ pub mod drtio { trace: &Vec, ) -> Result<(), &'static str> { let linkno = routing_table.0[destination as usize][0] - 1; - let mut i = 0; - while i < trace.len() { - let mut trace_slice: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE]; - let len: usize = if i + DMA_TRACE_MAX_SIZE < trace.len() { - DMA_TRACE_MAX_SIZE - } else { - trace.len() - i - } as usize; - let last = i + len == trace.len(); - trace_slice[..len].clone_from_slice(&trace[i..i + len]); - i += len; - let reply = aux_transact( - aux_mutex, - linkno, - &Packet::DmaAddTraceRequest { - id: id, - destination: destination, - last: last, - length: len as u16, - trace: trace_slice, - }, - timer, - ) - .await; - match reply { - Ok(Packet::DmaAddTraceReply { succeeded: true }) => (), - Ok(Packet::DmaAddTraceReply { succeeded: false }) => { - return Err("error adding trace on satellite"); - } - Ok(_) => { - return Err("adding DMA trace failed, unexpected aux packet"); - } - Err(_) => { - return Err("adding DMA trace failed, aux error"); - } - } - } - Ok(()) + partition_data( + linkno, + aux_mutex, + timer, + trace, + |slice, last, len| Packet::DmaAddTraceRequest { + id: id, + destination: destination, + last: last, + length: len as u16, + trace: *slice, + }, + |reply| match reply { + Packet::DmaAddTraceReply { succeeded: true } => Ok(()), + Packet::DmaAddTraceReply { succeeded: false } => Err("error adding trace on satellite"), + _ => Err("adding DMA trace failed, unexpected aux packet"), + }, + ) + .await } pub async fn ddma_send_erase( @@ -608,6 +649,122 @@ pub mod drtio { } Ok(remote_buffers) } + + pub async fn subkernel_upload( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + id: u32, + destination: u8, + data: &Vec, + ) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + partition_data( + linkno, + aux_mutex, + timer, + data, + |slice, last, len| Packet::SubkernelAddDataRequest { + id: id, + destination: destination, + last: last, + length: len as u16, + data: *slice, + }, + |reply| match reply { + Packet::SubkernelAddDataReply { succeeded: true } => Ok(()), + Packet::SubkernelAddDataReply { succeeded: false } => Err("error adding subkernel on satellite"), + _ => Err("adding subkernel failed, unexpected aux packet"), + }, + ) + .await + } + + pub async fn subkernel_load( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + id: u32, + destination: u8, + run: bool, + ) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let reply = aux_transact( + aux_mutex, + linkno, + &Packet::SubkernelLoadRunRequest { + id: id, + destination: destination, + run: run, + }, + timer, + ) + .await?; + match reply { + Packet::SubkernelLoadRunReply { succeeded: true } => return Ok(()), + Packet::SubkernelLoadRunReply { succeeded: false } => return Err("error on subkernel run request"), + _ => return Err("received unexpected aux packet during subkernel run"), + } + } + + pub async fn subkernel_retrieve_exception( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + destination: u8, + ) -> Result, &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + let mut remote_data: Vec = Vec::new(); + loop { + let reply = aux_transact( + aux_mutex, + linkno, + &Packet::SubkernelExceptionRequest { + destination: destination, + }, + timer, + ) + .await?; + match reply { + Packet::SubkernelException { last, length, data } => { + remote_data.extend(&data[0..length as usize]); + if last { + return Ok(remote_data); + } + } + _ => return Err("received unexpected aux packet during subkernel exception request"), + } + } + } + + pub async fn subkernel_send_message( + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + id: u32, + destination: u8, + message: &[u8], + ) -> Result<(), &'static str> { + let linkno = routing_table.0[destination as usize][0] - 1; + partition_data( + linkno, + aux_mutex, + timer, + message, + |slice, last, len| Packet::SubkernelMessage { + destination: destination, + id: id, + last: last, + length: len as u16, + data: *slice, + }, + |reply| match reply { + Packet::SubkernelMessageAck { .. } => Ok(()), + _ => Err("sending message to subkernel failed, unexpected aux packet"), + }, + ) + .await + } } fn read_device_map(cfg: &Config) -> BTreeMap { diff --git a/src/runtime/src/subkernel.rs b/src/runtime/src/subkernel.rs new file mode 100644 index 00000000..2fcc7e93 --- /dev/null +++ b/src/runtime/src/subkernel.rs @@ -0,0 +1,321 @@ +use alloc::{collections::BTreeMap, rc::Rc, vec::Vec}; + +use libasync::task; +use libboard_artiq::{drtio_routing::RoutingTable, drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE}; +use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; +use libcortex_a9::mutex::Mutex; +use log::error; + +use crate::rtio_mgt::drtio; + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum FinishStatus { + Ok, + CommLost, + Exception, +} + +#[derive(Debug, PartialEq, Clone, Copy)] +pub enum SubkernelState { + NotLoaded, + Uploaded, + Running, + Finished { status: FinishStatus }, +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy)] +pub enum Error { + Timeout, + IncorrectState, + SubkernelNotFound, + CommLost, + DrtioError(&'static str), +} + +impl From<&'static str> for Error { + fn from(value: &'static str) -> Error { + Error::DrtioError(value) + } +} + +pub struct SubkernelFinished { + pub id: u32, + pub status: FinishStatus, + pub exception: Option>, +} + +struct Subkernel { + pub destination: u8, + pub data: Vec, + pub state: SubkernelState, +} + +impl Subkernel { + pub fn new(destination: u8, data: Vec) -> Self { + Subkernel { + destination: destination, + data: data, + state: SubkernelState::NotLoaded, + } + } +} + +static SUBKERNELS: Mutex> = Mutex::new(BTreeMap::new()); + +pub async fn add_subkernel(id: u32, destination: u8, kernel: Vec) { + SUBKERNELS + .async_lock() + .await + .insert(id, Subkernel::new(destination, kernel)); +} + +pub async fn upload( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32, +) -> Result<(), Error> { + if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { + drtio::subkernel_upload( + aux_mutex, + routing_table, + timer, + id, + subkernel.destination, + &subkernel.data, + ) + .await?; + subkernel.state = SubkernelState::Uploaded; + Ok(()) + } else { + Err(Error::SubkernelNotFound) + } +} + +pub async fn load( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32, + run: bool, +) -> Result<(), Error> { + if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { + if subkernel.state != SubkernelState::Uploaded { + return Err(Error::IncorrectState); + } + drtio::subkernel_load(aux_mutex, routing_table, timer, id, subkernel.destination, run).await?; + if run { + subkernel.state = SubkernelState::Running; + } + Ok(()) + } else { + Err(Error::SubkernelNotFound) + } +} + +pub async fn clear_subkernels() { + SUBKERNELS.async_lock().await.clear(); + MESSAGE_QUEUE.async_lock().await.clear(); + CURRENT_MESSAGES.async_lock().await.clear(); +} + +pub async fn subkernel_finished(id: u32, with_exception: bool) { + // called upon receiving DRTIO SubkernelRunDone + // may be None if session ends and is cleared + if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { + subkernel.state = SubkernelState::Finished { + status: match with_exception { + true => FinishStatus::Exception, + false => FinishStatus::Ok, + }, + } + } +} + +pub async fn destination_changed( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + destination: u8, + up: bool, +) { + let mut locked_subkernels = SUBKERNELS.async_lock().await; + for (id, subkernel) in locked_subkernels.iter_mut() { + if subkernel.destination == destination { + if up { + match drtio::subkernel_upload(aux_mutex, routing_table, timer, *id, destination, &subkernel.data).await + { + Ok(_) => subkernel.state = SubkernelState::Uploaded, + Err(e) => error!("Error adding subkernel on destination {}: {}", destination, e), + } + } else { + subkernel.state = match subkernel.state { + SubkernelState::Running => SubkernelState::Finished { + status: FinishStatus::CommLost, + }, + _ => SubkernelState::NotLoaded, + } + } + } + } +} + +pub async fn get_finished_with_exception( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, +) -> Result, Error> { + let mut locked_subkernels = SUBKERNELS.async_lock().await; + for (id, subkernel) in locked_subkernels.iter_mut() { + match subkernel.state { + SubkernelState::Finished { + status: FinishStatus::Ok, + } => (), + SubkernelState::Finished { status } => { + subkernel.state = SubkernelState::Finished { + status: FinishStatus::Ok, + }; + return Ok(Some(SubkernelFinished { + id: *id, + status: status, + exception: if status == FinishStatus::Exception { + Some( + drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, subkernel.destination) + .await?, + ) + } else { + None + }, + })); + } + _ => (), + } + } + Ok(None) +} + +pub async fn await_finish( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32, + timeout: u64, +) -> Result { + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Running | SubkernelState::Finished { .. } => (), + _ => return Err(Error::IncorrectState), + } + let max_time = timer.get_time() + Milliseconds(timeout); + while timer.get_time() < max_time { + { + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Finished { .. } => break, + _ => (), + }; + } + task::r#yield().await; + } + if timer.get_time() >= max_time { + error!("Remote subkernel finish await timed out"); + return Err(Error::Timeout); + } + if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) { + match subkernel.state { + SubkernelState::Finished { status } => { + subkernel.state = SubkernelState::Uploaded; + Ok(SubkernelFinished { + id: id, + status: status, + exception: if status == FinishStatus::Exception { + Some( + drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, subkernel.destination) + .await?, + ) + } else { + None + }, + }) + } + _ => Err(Error::IncorrectState), + } + } else { + Err(Error::SubkernelNotFound) + } +} + +struct Message { + from_id: u32, + pub tag: u8, + pub data: Vec, +} + +// FIFO queue of messages +static MESSAGE_QUEUE: Mutex> = Mutex::new(Vec::new()); +// currently under construction message(s) (can be from multiple sources) +static CURRENT_MESSAGES: Mutex> = Mutex::new(BTreeMap::new()); + +pub async fn message_handle_incoming(id: u32, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + // called when receiving a message from satellite + if SUBKERNELS.async_lock().await.get(&id).is_none() { + // do not add messages for non-existing or deleted subkernels + return; + } + let mut current_messages = CURRENT_MESSAGES.async_lock().await; + match current_messages.get_mut(&id) { + Some(message) => message.data.extend(&data[..length]), + None => { + current_messages.insert( + id, + Message { + from_id: id, + tag: data[0], + data: data[1..length].to_vec(), + }, + ); + } + }; + if last { + // when done, remove from working queue + MESSAGE_QUEUE + .async_lock() + .await + .push(current_messages.remove(&id).unwrap()); + } +} + +pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<(u8, Vec), Error> { + match SUBKERNELS.async_lock().await.get(&id).unwrap().state { + SubkernelState::Finished { + status: FinishStatus::CommLost, + } => return Err(Error::CommLost), + SubkernelState::Running | SubkernelState::Finished { .. } => (), + _ => return Err(Error::IncorrectState), + } + let max_time = timer.get_time() + Milliseconds(timeout); + while timer.get_time() < max_time { + { + let mut message_queue = MESSAGE_QUEUE.async_lock().await; + for i in 0..message_queue.len() { + let msg = &message_queue[i]; + if msg.from_id == id { + let message = message_queue.remove(i); + return Ok((message.tag, message.data)); + } + } + } + task::r#yield().await; + } + Err(Error::Timeout) +} + +pub async fn message_send<'a>( + aux_mutex: &Rc>, + routing_table: &RoutingTable, + timer: GlobalTimer, + id: u32, + message: Vec, +) -> Result<(), Error> { + let destination = SUBKERNELS.async_lock().await.get(&id).unwrap().destination; + // rpc data prepared by the kernel core already + Ok(drtio::subkernel_send_message(aux_mutex, routing_table, timer, id, destination, &message).await?) +} diff --git a/src/satman/src/analyzer.rs b/src/satman/src/analyzer.rs index e4184873..3a7ac69c 100644 --- a/src/satman/src/analyzer.rs +++ b/src/satman/src/analyzer.rs @@ -1,6 +1,6 @@ use core::cmp::min; -use libboard_artiq::{drtioaux_proto::ANALYZER_MAX_SIZE, pl::csr}; +use libboard_artiq::{drtioaux_proto::SAT_PAYLOAD_MAX_SIZE, pl::csr}; use libcortex_a9::cache; const BUFFER_SIZE: usize = 512 * 1024; @@ -100,10 +100,10 @@ impl Analyzer { } } - pub fn get_data(&mut self, data_slice: &mut [u8; ANALYZER_MAX_SIZE]) -> AnalyzerSliceMeta { + pub fn get_data(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> AnalyzerSliceMeta { let data = unsafe { &BUFFER.data[..] }; let i = (self.data_pointer + self.sent_bytes) % BUFFER_SIZE; - let len = min(ANALYZER_MAX_SIZE, self.data_len - self.sent_bytes); + let len = min(SAT_PAYLOAD_MAX_SIZE, self.data_len - self.sent_bytes); let last = self.sent_bytes + len == self.data_len; if i + len >= BUFFER_SIZE { diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index 82f8b1c0..5228395a 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -27,7 +27,7 @@ use embedded_hal::blocking::delay::DelayUs; use libboard_artiq::io_expander; #[cfg(has_si5324)] use libboard_artiq::si5324; -use libboard_artiq::{drtio_routing, drtioaux, drtioaux_proto::ANALYZER_MAX_SIZE, identifier_read, logger, pl::csr}; +use libboard_artiq::{drtio_routing, drtioaux, drtioaux_proto::SAT_PAYLOAD_MAX_SIZE, identifier_read, logger, pl::csr}; #[cfg(feature = "target_kasli_soc")] use libboard_zynq::error_led::ErrorLED; use libboard_zynq::{gic, i2c::I2c, mpcore, print, println, stdio, time::Milliseconds, timer::GlobalTimer}; @@ -450,7 +450,7 @@ fn process_aux_packet( destination: _destination, } => { forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); - let mut data_slice: [u8; ANALYZER_MAX_SIZE] = [0; ANALYZER_MAX_SIZE]; + let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; let meta = analyzer.get_data(&mut data_slice); drtioaux::send( 0,