subkernel: port master support

This commit is contained in:
mwojcik 2023-08-31 17:36:01 +08:00
parent cded04e2d6
commit 6785ca2c85
12 changed files with 1226 additions and 55 deletions

View File

@ -1,8 +1,11 @@
use core_io::{Error as IoError, Read, Write}; use core_io::{Error as IoError, Read, Write};
use io::proto::{ProtoRead, ProtoWrite}; use io::proto::{ProtoRead, ProtoWrite};
pub const DMA_TRACE_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*trace ID*/4 - /*last*/1 -/*length*/2; // maximum size of arbitrary payloads
pub const ANALYZER_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*last*/1 - /*length*/2; // used by satellite -> master analyzer, subkernel exceptions
pub const SAT_PAYLOAD_MAX_SIZE: usize = /*max size*/512 - /*CRC*/4 - /*packet ID*/1 - /*last*/1 - /*length*/2;
// used by DDMA, subkernel program data (need to provide extra ID and destination)
pub const MASTER_PAYLOAD_MAX_SIZE: usize = SAT_PAYLOAD_MAX_SIZE - /*destination*/1 - /*ID*/4;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -150,7 +153,7 @@ pub enum Packet {
AnalyzerData { AnalyzerData {
last: bool, last: bool,
length: u16, length: u16,
data: [u8; ANALYZER_MAX_SIZE], data: [u8; SAT_PAYLOAD_MAX_SIZE],
}, },
DmaAddTraceRequest { DmaAddTraceRequest {
@ -158,7 +161,7 @@ pub enum Packet {
id: u32, id: u32,
last: bool, last: bool,
length: u16, length: u16,
trace: [u8; DMA_TRACE_MAX_SIZE], trace: [u8; MASTER_PAYLOAD_MAX_SIZE],
}, },
DmaAddTraceReply { DmaAddTraceReply {
succeeded: bool, succeeded: bool,
@ -185,6 +188,53 @@ pub enum Packet {
channel: u32, channel: u32,
timestamp: u64, timestamp: u64,
}, },
SubkernelAddDataRequest {
destination: u8,
id: u32,
last: bool,
length: u16,
data: [u8; MASTER_PAYLOAD_MAX_SIZE],
},
SubkernelAddDataReply {
succeeded: bool,
},
SubkernelLoadRunRequest {
destination: u8,
id: u32,
run: bool,
},
SubkernelLoadRunReply {
succeeded: bool,
},
SubkernelStopRequest {
destination: u8,
},
SubkernelStopReply {
succeeded: bool,
},
SubkernelFinished {
id: u32,
with_exception: bool,
},
SubkernelExceptionRequest {
destination: u8,
},
SubkernelException {
last: bool,
length: u16,
data: [u8; SAT_PAYLOAD_MAX_SIZE],
},
SubkernelMessage {
destination: u8,
id: u32,
last: bool,
length: u16,
data: [u8; MASTER_PAYLOAD_MAX_SIZE],
},
SubkernelMessageAck {
destination: u8,
},
} }
impl Packet { impl Packet {
@ -329,7 +379,7 @@ impl Packet {
0xa3 => { 0xa3 => {
let last = reader.read_bool()?; let last = reader.read_bool()?;
let length = reader.read_u16()?; let length = reader.read_u16()?;
let mut data: [u8; ANALYZER_MAX_SIZE] = [0; ANALYZER_MAX_SIZE]; let mut data: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
reader.read_exact(&mut data[0..length as usize])?; reader.read_exact(&mut data[0..length as usize])?;
Packet::AnalyzerData { Packet::AnalyzerData {
last: last, last: last,
@ -343,7 +393,7 @@ impl Packet {
let id = reader.read_u32()?; let id = reader.read_u32()?;
let last = reader.read_bool()?; let last = reader.read_bool()?;
let length = reader.read_u16()?; let length = reader.read_u16()?;
let mut trace: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE]; let mut trace: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
reader.read_exact(&mut trace[0..length as usize])?; reader.read_exact(&mut trace[0..length as usize])?;
Packet::DmaAddTraceRequest { Packet::DmaAddTraceRequest {
destination: destination, destination: destination,
@ -379,6 +429,75 @@ impl Packet {
timestamp: reader.read_u64()?, timestamp: reader.read_u64()?,
}, },
0xc0 => {
let destination = reader.read_u8()?;
let id = reader.read_u32()?;
let last = reader.read_bool()?;
let length = reader.read_u16()?;
let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
reader.read_exact(&mut data[0..length as usize])?;
Packet::SubkernelAddDataRequest {
destination: destination,
id: id,
last: last,
length: length as u16,
data: data,
}
}
0xc1 => Packet::SubkernelAddDataReply {
succeeded: reader.read_bool()?,
},
0xc4 => Packet::SubkernelLoadRunRequest {
destination: reader.read_u8()?,
id: reader.read_u32()?,
run: reader.read_bool()?,
},
0xc5 => Packet::SubkernelLoadRunReply {
succeeded: reader.read_bool()?,
},
0xc6 => Packet::SubkernelStopRequest {
destination: reader.read_u8()?,
},
0xc7 => Packet::SubkernelStopReply {
succeeded: reader.read_bool()?,
},
0xc8 => Packet::SubkernelFinished {
id: reader.read_u32()?,
with_exception: reader.read_bool()?,
},
0xc9 => Packet::SubkernelExceptionRequest {
destination: reader.read_u8()?,
},
0xca => {
let last = reader.read_bool()?;
let length = reader.read_u16()?;
let mut data: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
reader.read_exact(&mut data[0..length as usize])?;
Packet::SubkernelException {
last: last,
length: length,
data: data,
}
}
0xcb => {
let destination = reader.read_u8()?;
let id = reader.read_u32()?;
let last = reader.read_bool()?;
let length = reader.read_u16()?;
let mut data: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
reader.read_exact(&mut data[0..length as usize])?;
Packet::SubkernelMessage {
destination: destination,
id: id,
last: last,
length: length as u16,
data: data,
}
}
0xcc => Packet::SubkernelMessageAck {
destination: reader.read_u8()?,
},
ty => return Err(Error::UnknownPacket(ty)), ty => return Err(Error::UnknownPacket(ty)),
}) })
} }
@ -648,6 +767,76 @@ impl Packet {
writer.write_u32(channel)?; writer.write_u32(channel)?;
writer.write_u64(timestamp)?; writer.write_u64(timestamp)?;
} }
Packet::SubkernelAddDataRequest {
destination,
id,
last,
data,
length,
} => {
writer.write_u8(0xc0)?;
writer.write_u8(destination)?;
writer.write_u32(id)?;
writer.write_bool(last)?;
writer.write_u16(length)?;
writer.write_all(&data[0..length as usize])?;
}
Packet::SubkernelAddDataReply { succeeded } => {
writer.write_u8(0xc1)?;
writer.write_bool(succeeded)?;
}
Packet::SubkernelLoadRunRequest { destination, id, run } => {
writer.write_u8(0xc4)?;
writer.write_u8(destination)?;
writer.write_u32(id)?;
writer.write_bool(run)?;
}
Packet::SubkernelLoadRunReply { succeeded } => {
writer.write_u8(0xc5)?;
writer.write_bool(succeeded)?;
}
Packet::SubkernelStopRequest { destination } => {
writer.write_u8(0xc6)?;
writer.write_u8(destination)?;
}
Packet::SubkernelStopReply { succeeded } => {
writer.write_u8(0xc7)?;
writer.write_bool(succeeded)?;
}
Packet::SubkernelFinished { id, with_exception } => {
writer.write_u8(0xc8)?;
writer.write_u32(id)?;
writer.write_bool(with_exception)?;
}
Packet::SubkernelExceptionRequest { destination } => {
writer.write_u8(0xc9)?;
writer.write_u8(destination)?;
}
Packet::SubkernelException { last, length, data } => {
writer.write_u8(0xca)?;
writer.write_bool(last)?;
writer.write_u16(length)?;
writer.write_all(&data[0..length as usize])?;
}
Packet::SubkernelMessage {
destination,
id,
last,
data,
length,
} => {
writer.write_u8(0xcb)?;
writer.write_u8(destination)?;
writer.write_u32(id)?;
writer.write_bool(last)?;
writer.write_u16(length)?;
writer.write_all(&data[0..length as usize])?;
}
Packet::SubkernelMessageAck { destination } => {
writer.write_u8(0xcc)?;
writer.write_u8(destination)?;
}
} }
Ok(()) Ok(())
} }

View File

@ -1,8 +1,11 @@
use alloc::{collections::BTreeMap, rc::Rc, string::String, vec, vec::Vec}; use alloc::{collections::BTreeMap, rc::Rc, string::String, vec, vec::Vec};
use core::{cell::RefCell, fmt, slice, str}; use core::{cell::RefCell, fmt, slice, str};
use core_io::Error as IoError;
use cslice::CSlice; use cslice::CSlice;
use futures::{future::FutureExt, select_biased}; use futures::{future::FutureExt, select_biased};
#[cfg(has_drtio)]
use io::{Cursor, ProtoRead};
use libasync::{smoltcp::{Sockets, TcpStream}, use libasync::{smoltcp::{Sockets, TcpStream},
task}; task};
use libboard_artiq::drtio_routing; use libboard_artiq::drtio_routing;
@ -28,13 +31,18 @@ use crate::{analyzer, kernel, mgmt, moninj,
proto_async::*, proto_async::*,
rpc, rtio_dma, rpc, rtio_dma,
rtio_mgt::{self, resolve_channel_name}}; rtio_mgt::{self, resolve_channel_name}};
#[cfg(has_drtio)]
use crate::{subkernel, subkernel::Error as SubkernelError};
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Error { pub enum Error {
NetworkError(smoltcp::Error), NetworkError(smoltcp::Error),
IoError,
UnexpectedPattern, UnexpectedPattern,
UnrecognizedPacket, UnrecognizedPacket,
BufferExhausted, BufferExhausted,
#[cfg(has_drtio)]
SubkernelError(subkernel::Error),
} }
pub type Result<T> = core::result::Result<T, Error>; pub type Result<T> = core::result::Result<T, Error>;
@ -43,9 +51,12 @@ impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self { match self {
Error::NetworkError(error) => write!(f, "network error: {}", error), Error::NetworkError(error) => write!(f, "network error: {}", error),
Error::IoError => write!(f, "io error"),
Error::UnexpectedPattern => write!(f, "unexpected pattern"), Error::UnexpectedPattern => write!(f, "unexpected pattern"),
Error::UnrecognizedPacket => write!(f, "unrecognized packet"), Error::UnrecognizedPacket => write!(f, "unrecognized packet"),
Error::BufferExhausted => write!(f, "buffer exhausted"), Error::BufferExhausted => write!(f, "buffer exhausted"),
#[cfg(has_drtio)]
Error::SubkernelError(error) => write!(f, "subkernel error: {:?}", error),
} }
} }
} }
@ -56,6 +67,19 @@ impl From<smoltcp::Error> for Error {
} }
} }
impl From<IoError> for Error {
fn from(_error: IoError) -> Self {
Error::IoError
}
}
#[cfg(has_drtio)]
impl From<subkernel::Error> for Error {
fn from(error: subkernel::Error) -> Self {
Error::SubkernelError(error)
}
}
#[derive(Debug, FromPrimitive, ToPrimitive)] #[derive(Debug, FromPrimitive, ToPrimitive)]
enum Request { enum Request {
SystemInfo = 3, SystemInfo = 3,
@ -63,6 +87,7 @@ enum Request {
RunKernel = 6, RunKernel = 6,
RPCReply = 7, RPCReply = 7,
RPCException = 8, RPCException = 8,
UploadSubkernel = 9,
} }
#[derive(Debug, FromPrimitive, ToPrimitive)] #[derive(Debug, FromPrimitive, ToPrimitive)]
@ -162,6 +187,22 @@ async fn handle_run_kernel(
) -> Result<()> { ) -> Result<()> {
control.borrow_mut().tx.async_send(kernel::Message::StartRequest).await; control.borrow_mut().tx.async_send(kernel::Message::StartRequest).await;
loop { loop {
#[cfg(has_drtio)]
while let Some(subkernel_finished) =
subkernel::get_finished_with_exception(aux_mutex, routing_table, timer).await?
{
if subkernel_finished.status == subkernel::FinishStatus::CommLost {
error!(
"Communication with satellite lost while subkernel {} was running",
subkernel_finished.id
);
}
if let Some(exception) = subkernel_finished.exception {
if let Some(stream) = stream {
write_chunk(stream, &exception).await?;
}
}
}
let reply = control.borrow_mut().rx.async_recv().await; let reply = control.borrow_mut().rx.async_recv().await;
match reply { match reply {
kernel::Message::RpcSend { is_async, data } => { kernel::Message::RpcSend { is_async, data } => {
@ -365,6 +406,117 @@ async fn handle_run_kernel(
control.borrow_mut().tx.async_send(reply).await; control.borrow_mut().tx.async_send(reply).await;
} }
#[cfg(has_drtio)] #[cfg(has_drtio)]
kernel::Message::SubkernelLoadRunRequest { id, run } => {
let succeeded = match subkernel::load(aux_mutex, routing_table, timer, id, run).await {
Ok(()) => true,
Err(e) => {
error!("Error loading subkernel: {:?}", e);
false
}
};
control
.borrow_mut()
.tx
.async_send(kernel::Message::SubkernelLoadRunReply { succeeded: succeeded })
.await;
}
#[cfg(has_drtio)]
kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => {
let res = subkernel::await_finish(aux_mutex, routing_table, timer, id, timeout).await;
let status = match res {
Ok(ref res) => {
if res.status == subkernel::FinishStatus::CommLost {
kernel::SubkernelStatus::CommLost
} else if let Some(exception) = &res.exception {
error!("Exception in subkernel");
match stream {
None => (),
Some(stream) => {
write_chunk(stream, exception).await?;
}
}
// will not be called after exception is served
kernel::SubkernelStatus::OtherError
} else {
kernel::SubkernelStatus::NoError
}
}
Err(SubkernelError::Timeout) => kernel::SubkernelStatus::Timeout,
Err(SubkernelError::IncorrectState) => kernel::SubkernelStatus::IncorrectState,
Err(_) => kernel::SubkernelStatus::OtherError,
};
control
.borrow_mut()
.tx
.async_send(kernel::Message::SubkernelAwaitFinishReply { status: status })
.await;
}
#[cfg(has_drtio)]
kernel::Message::SubkernelMsgSend { id, data } => {
let res = subkernel::message_send(aux_mutex, routing_table, timer, id, data).await;
match res {
Ok(_) => (),
Err(e) => {
error!("error sending subkernel message: {:?}", e)
}
};
}
#[cfg(has_drtio)]
kernel::Message::SubkernelMsgRecvRequest { id, timeout } => {
let message_received = subkernel::message_await(id, timeout, timer).await;
let status = match message_received {
Ok(_) => kernel::SubkernelStatus::NoError,
Err(SubkernelError::Timeout) => kernel::SubkernelStatus::Timeout,
Err(SubkernelError::IncorrectState) => kernel::SubkernelStatus::IncorrectState,
Err(SubkernelError::CommLost) => kernel::SubkernelStatus::CommLost,
Err(_) => kernel::SubkernelStatus::OtherError,
};
control
.borrow_mut()
.tx
.async_send(kernel::Message::SubkernelMsgRecvReply { status: status })
.await;
if let Ok((tag, data)) = message_received {
// receive code almost identical to RPC recv, except we are not reading from a stream
let mut reader = Cursor::new(data);
let mut tag: [u8; 1] = [tag];
loop {
// kernel has to consume all arguments in the whole message
let slot = match fast_recv(&mut control.borrow_mut().rx).await {
kernel::Message::RpcRecvRequest(slot) => slot,
other => panic!("expected root value slot from core1, not {:?}", other),
};
rpc::recv_return_cursor(&mut reader, &tag, slot, &|size| {
let control = control.clone();
async move {
if size == 0 {
0 as *mut ()
} else {
let mut control = control.borrow_mut();
fast_send(&mut control.tx, kernel::Message::RpcRecvReply(Ok(size))).await;
match fast_recv(&mut control.rx).await {
kernel::Message::RpcRecvRequest(slot) => slot,
other => {
panic!("expected nested value slot from kernel CPU, not {:?}", other)
}
}
}
}
})
.await?;
control
.borrow_mut()
.tx
.async_send(kernel::Message::RpcRecvReply(Ok(0)))
.await;
match reader.read_u8() {
Ok(0) | Err(_) => break, // reached the end of data, we're done
Ok(t) => tag[0] = t, // update the tag for next read
};
}
}
}
#[cfg(has_drtio)]
kernel::Message::UpDestinationsRequest(destination) => { kernel::Message::UpDestinationsRequest(destination) => {
let result = _up_destinations.borrow()[destination as usize]; let result = _up_destinations.borrow()[destination as usize];
control control
@ -434,9 +586,13 @@ async fn handle_connection(
return Err(Error::UnexpectedPattern); return Err(Error::UnexpectedPattern);
} }
stream.send_slice("e".as_bytes()).await?; stream.send_slice("e".as_bytes()).await?;
#[cfg(has_drtio)]
subkernel::clear_subkernels().await;
loop { loop {
let request = read_request(stream, true).await?; let request = read_request(stream, true).await?;
if request.is_none() { if request.is_none() {
#[cfg(has_drtio)]
subkernel::clear_subkernels().await;
return Ok(()); return Ok(());
} }
let request = request.unwrap(); let request = request.unwrap();
@ -460,6 +616,29 @@ async fn handle_connection(
) )
.await?; .await?;
} }
Request::UploadSubkernel => {
#[cfg(has_drtio)]
{
let id = read_i32(stream).await? as u32;
let destination = read_i8(stream).await? as u8;
let buffer = read_bytes(stream, 1024 * 1024).await?;
subkernel::add_subkernel(id, destination, buffer).await;
match subkernel::upload(aux_mutex, routing_table, timer, id).await {
Ok(_) => write_header(stream, Reply::LoadCompleted).await?,
Err(_) => {
write_header(stream, Reply::LoadFailed).await?;
write_chunk(stream, b"subkernel failed to load").await?;
return Err(Error::UnexpectedPattern);
}
}
}
#[cfg(not(has_drtio))]
{
write_header(stream, Reply::LoadFailed).await?;
write_chunk(stream, b"No DRTIO on this system, subkernels are not supported").await?;
return Err(Error::UnexpectedPattern);
}
}
_ => { _ => {
error!("unexpected request from host: {:?}", request); error!("unexpected request from host: {:?}", request);
return Err(Error::UnrecognizedPacket); return Err(Error::UnrecognizedPacket);

View File

@ -422,7 +422,7 @@ extern "C" fn stop_fn(
} }
// Must be kept in sync with preallocate_runtime_exception_names() in artiq/language/embedding_map.py // Must be kept in sync with preallocate_runtime_exception_names() in artiq/language/embedding_map.py
static EXCEPTION_ID_LOOKUP: [(&str, u32); 11] = [ static EXCEPTION_ID_LOOKUP: [(&str, u32); 12] = [
("RuntimeError", 0), ("RuntimeError", 0),
("RTIOUnderflow", 1), ("RTIOUnderflow", 1),
("RTIOOverflow", 2), ("RTIOOverflow", 2),
@ -434,6 +434,7 @@ static EXCEPTION_ID_LOOKUP: [(&str, u32); 11] = [
("ZeroDivisionError", 8), ("ZeroDivisionError", 8),
("IndexError", 9), ("IndexError", 9),
("UnwrapNoneError", 10), ("UnwrapNoneError", 10),
("SubkernelError", 11),
]; ];
pub fn get_exception_id(name: &str) -> u32 { pub fn get_exception_id(name: &str) -> u32 {

View File

@ -9,6 +9,8 @@ use super::{cache,
core1::rtio_get_destination_status, core1::rtio_get_destination_status,
dma, dma,
rpc::{rpc_recv, rpc_send, rpc_send_async}}; rpc::{rpc_recv, rpc_send, rpc_send_async}};
#[cfg(has_drtio)]
use super::subkernel;
use crate::{eh_artiq, i2c, rtio}; use crate::{eh_artiq, i2c, rtio};
extern "C" { extern "C" {
@ -114,6 +116,16 @@ pub fn resolve(required: &[u8]) -> Option<u32> {
api!(i2c_read = i2c::read), api!(i2c_read = i2c::read),
api!(i2c_switch_select = i2c::switch_select), api!(i2c_switch_select = i2c::switch_select),
// subkernel
#[cfg(has_drtio)]
api!(subkernel_load_run = subkernel::load_run),
#[cfg(has_drtio)]
api!(subkernel_await_finish = subkernel::await_finish),
#[cfg(has_drtio)]
api!(subkernel_send_message = subkernel::send_message),
#[cfg(has_drtio)]
api!(subkernel_await_message = subkernel::await_message),
// Double-precision floating-point arithmetic helper functions // Double-precision floating-point arithmetic helper functions
// RTABI chapter 4.1.2, Table 2 // RTABI chapter 4.1.2, Table 2
api!(__aeabi_dadd), api!(__aeabi_dadd),

View File

@ -13,6 +13,8 @@ mod dma;
mod rpc; mod rpc;
pub use dma::DmaRecorder; pub use dma::DmaRecorder;
mod cache; mod cache;
#[cfg(has_drtio)]
mod subkernel;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct RPCException { pub struct RPCException {
@ -25,6 +27,16 @@ pub struct RPCException {
pub function: u32, pub function: u32,
} }
#[cfg(has_drtio)]
#[derive(Debug, Clone)]
pub enum SubkernelStatus {
NoError,
Timeout,
IncorrectState,
CommLost,
OtherError,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum Message { pub enum Message {
LoadRequest(Vec<u8>), LoadRequest(Vec<u8>),
@ -72,6 +84,39 @@ pub enum Message {
UpDestinationsRequest(i32), UpDestinationsRequest(i32),
#[cfg(has_drtio)] #[cfg(has_drtio)]
UpDestinationsReply(bool), UpDestinationsReply(bool),
#[cfg(has_drtio)]
SubkernelLoadRunRequest {
id: u32,
run: bool,
},
#[cfg(has_drtio)]
SubkernelLoadRunReply {
succeeded: bool,
},
#[cfg(has_drtio)]
SubkernelAwaitFinishRequest {
id: u32,
timeout: u64,
},
#[cfg(has_drtio)]
SubkernelAwaitFinishReply {
status: SubkernelStatus,
},
#[cfg(has_drtio)]
SubkernelMsgSend {
id: u32,
data: Vec<u8>,
},
#[cfg(has_drtio)]
SubkernelMsgRecvRequest {
id: u32,
timeout: u64,
},
#[cfg(has_drtio)]
SubkernelMsgRecvReply {
status: SubkernelStatus,
},
} }
static CHANNEL_0TO1: Mutex<Option<sync_channel::Sender<'static, Message>>> = Mutex::new(None); static CHANNEL_0TO1: Mutex<Option<sync_channel::Sender<'static, Message>>> = Mutex::new(None);

View File

@ -0,0 +1,94 @@
use alloc::vec::Vec;
use cslice::CSlice;
use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0};
use crate::{artiq_raise, rpc::send_args};
pub extern "C" fn load_run(id: u32, run: bool) {
unsafe {
KERNEL_CHANNEL_1TO0
.as_mut()
.unwrap()
.send(Message::SubkernelLoadRunRequest { id: id, run: run });
}
match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() {
Message::SubkernelLoadRunReply { succeeded: true } => (),
Message::SubkernelLoadRunReply { succeeded: false } => {
artiq_raise!("SubkernelError", "Error loading or running the subkernel")
}
_ => panic!("Expected SubkernelLoadRunReply after SubkernelLoadRunRequest!"),
}
}
pub extern "C" fn await_finish(id: u32, timeout: u64) {
unsafe {
KERNEL_CHANNEL_1TO0
.as_mut()
.unwrap()
.send(Message::SubkernelAwaitFinishRequest {
id: id,
timeout: timeout,
});
}
match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() {
Message::SubkernelAwaitFinishReply {
status: SubkernelStatus::NoError,
} => (),
Message::SubkernelAwaitFinishReply {
status: SubkernelStatus::IncorrectState,
} => artiq_raise!("SubkernelError", "Subkernel not running"),
Message::SubkernelAwaitFinishReply {
status: SubkernelStatus::Timeout,
} => artiq_raise!("SubkernelError", "Subkernel timed out"),
Message::SubkernelAwaitFinishReply {
status: SubkernelStatus::CommLost,
} => artiq_raise!("SubkernelError", "Lost communication with satellite"),
Message::SubkernelAwaitFinishReply {
status: SubkernelStatus::OtherError,
} => artiq_raise!("SubkernelError", "An error occurred during subkernel operation"),
_ => panic!("expected SubkernelAwaitFinishReply after SubkernelAwaitFinishRequest"),
}
}
pub extern "C" fn send_message(id: u32, tag: &CSlice<u8>, data: *const *const ()) {
let mut buffer = Vec::<u8>::new();
send_args(&mut buffer, 0, tag.as_ref(), data).expect("RPC encoding failed");
unsafe {
KERNEL_CHANNEL_1TO0.as_mut().unwrap().send(Message::SubkernelMsgSend {
id: id,
data: buffer[4..].to_vec(),
});
}
}
pub extern "C" fn await_message(id: u32, timeout: u64) {
unsafe {
KERNEL_CHANNEL_1TO0
.as_mut()
.unwrap()
.send(Message::SubkernelMsgRecvRequest {
id: id,
timeout: timeout,
});
}
match unsafe { KERNEL_CHANNEL_0TO1.as_mut().unwrap() }.recv() {
Message::SubkernelMsgRecvReply {
status: SubkernelStatus::NoError,
} => (),
Message::SubkernelMsgRecvReply {
status: SubkernelStatus::IncorrectState,
} => artiq_raise!("SubkernelError", "Subkernel not running"),
Message::SubkernelMsgRecvReply {
status: SubkernelStatus::Timeout,
} => artiq_raise!("SubkernelError", "Subkernel timed out"),
Message::SubkernelMsgRecvReply {
status: SubkernelStatus::CommLost,
} => artiq_raise!("SubkernelError", "Lost communication with satellite"),
Message::SubkernelMsgRecvReply {
status: SubkernelStatus::OtherError,
} => artiq_raise!("SubkernelError", "An error occurred during subkernel operation"),
_ => panic!("expected SubkernelMsgRecvReply after SubkernelMsgRecvRequest"),
}
// RpcRecvRequest should be called after this to receive message data
}

View File

@ -51,6 +51,8 @@ mod rtio;
mod rtio_clocking; mod rtio_clocking;
mod rtio_dma; mod rtio_dma;
mod rtio_mgt; mod rtio_mgt;
#[cfg(has_drtio)]
mod subkernel;
static mut SEEN_ASYNC_ERRORS: u8 = 0; static mut SEEN_ASYNC_ERRORS: u8 = 0;

View File

@ -1,11 +1,15 @@
use alloc::boxed::Box; use alloc::boxed::Box;
#[cfg(has_drtio)]
use alloc::vec::Vec;
use core::{future::Future, str}; use core::{future::Future, str};
use async_recursion::async_recursion; use async_recursion::async_recursion;
use byteorder::{ByteOrder, NativeEndian}; use byteorder::{ByteOrder, NativeEndian};
use core_io::{Error, Write}; use core_io::{Error, Write};
use cslice::{CMutSlice, CSlice}; use cslice::{CMutSlice, CSlice};
use io::proto::ProtoWrite; #[cfg(has_drtio)]
use io::{Cursor, ProtoRead};
use io::ProtoWrite;
use libasync::smoltcp::TcpStream; use libasync::smoltcp::TcpStream;
use libboard_zynq::smoltcp; use libboard_zynq::smoltcp;
use log::trace; use log::trace;
@ -224,6 +228,173 @@ where
Ok(()) Ok(())
} }
// versions for Cursor rather than TcpStream
// they will be made into sync for satellite subkernels later
#[cfg(has_drtio)]
#[async_recursion(?Send)]
async unsafe fn recv_elements_cursor<F>(
cursor: &mut Cursor<Vec<u8>>,
elt_tag: Tag<'async_recursion>,
length: usize,
storage: *mut (),
alloc: &(impl Fn(usize) -> F + 'async_recursion),
) -> Result<(), Error>
where
F: Future<Output = *mut ()>,
{
match elt_tag {
Tag::Bool => {
let dest = core::slice::from_raw_parts_mut(storage as *mut u8, length);
cursor.read_exact(dest)?;
}
Tag::Int32 => {
let ptr = storage as *mut u32;
let dest = core::slice::from_raw_parts_mut(ptr as *mut u8, length * 4);
cursor.read_exact(dest)?;
drop(dest);
let dest = core::slice::from_raw_parts_mut(ptr, length);
NativeEndian::from_slice_u32(dest);
}
Tag::Int64 | Tag::Float64 => {
let ptr = storage as *mut u64;
let dest = core::slice::from_raw_parts_mut(ptr as *mut u8, length * 8);
cursor.read_exact(dest)?;
drop(dest);
let dest = core::slice::from_raw_parts_mut(ptr, length);
NativeEndian::from_slice_u64(dest);
}
_ => {
let mut data = storage;
for _ in 0..length {
recv_value_cursor(cursor, elt_tag, &mut data, alloc).await?
}
}
}
Ok(())
}
#[cfg(has_drtio)]
#[async_recursion(?Send)]
async unsafe fn recv_value_cursor<F>(
cursor: &mut Cursor<Vec<u8>>,
tag: Tag<'async_recursion>,
data: &mut *mut (),
alloc: &(impl Fn(usize) -> F + 'async_recursion),
) -> Result<(), Error>
where
F: Future<Output = *mut ()>,
{
macro_rules! consume_value {
($ty:ty, | $ptr:ident | $map:expr) => {{
let $ptr = align_ptr_mut::<$ty>(*data);
*data = $ptr.offset(1) as *mut ();
$map
}};
}
match tag {
Tag::None => Ok(()),
Tag::Bool => consume_value!(i8, |ptr| {
*ptr = cursor.read_u8()? as i8;
Ok(())
}),
Tag::Int32 => consume_value!(i32, |ptr| {
*ptr = cursor.read_u32()? as i32;
Ok(())
}),
Tag::Int64 | Tag::Float64 => consume_value!(i64, |ptr| {
*ptr = cursor.read_u64()? as i64;
Ok(())
}),
Tag::String | Tag::Bytes | Tag::ByteArray => {
consume_value!(CMutSlice<u8>, |ptr| {
let length = cursor.read_u32()? as usize;
*ptr = CMutSlice::new(alloc(length).await as *mut u8, length);
cursor.read_exact((*ptr).as_mut())?;
Ok(())
})
}
Tag::Tuple(it, arity) => {
let alignment = tag.alignment();
*data = round_up_mut(*data, alignment);
let mut it = it.clone();
for _ in 0..arity {
let tag = it.next().expect("truncated tag");
recv_value_cursor(cursor, tag, data, alloc).await?
}
*data = round_up_mut(*data, alignment);
Ok(())
}
Tag::List(it) => {
#[repr(C)]
struct List {
elements: *mut (),
length: usize,
}
consume_value!(*mut List, |ptr_to_list| {
let tag = it.clone().next().expect("truncated tag");
let length = cursor.read_u32()? as usize;
let list_size = 4 + 4;
let storage_offset = round_up(list_size, tag.alignment());
let storage_size = tag.size() * length;
let allocation = alloc(storage_offset + storage_size).await as *mut u8;
*ptr_to_list = allocation as *mut List;
let storage = allocation.offset(storage_offset as isize) as *mut ();
(**ptr_to_list).length = length;
(**ptr_to_list).elements = storage;
recv_elements_cursor(cursor, tag, length, storage, alloc).await
})
}
Tag::Array(it, num_dims) => {
consume_value!(*mut (), |buffer| {
let mut total_len: usize = 1;
for _ in 0..num_dims {
let len = cursor.read_u32()? as usize;
total_len *= len;
consume_value!(usize, |ptr| *ptr = len)
}
let elt_tag = it.clone().next().expect("truncated tag");
*buffer = alloc(elt_tag.size() * total_len).await;
recv_elements_cursor(cursor, elt_tag, total_len, *buffer, alloc).await
})
}
Tag::Range(it) => {
*data = round_up_mut(*data, tag.alignment());
let tag = it.clone().next().expect("truncated tag");
recv_value_cursor(cursor, tag, data, alloc).await?;
recv_value_cursor(cursor, tag, data, alloc).await?;
recv_value_cursor(cursor, tag, data, alloc).await?;
Ok(())
}
Tag::Keyword(_) => unreachable!(),
Tag::Object => unreachable!(),
}
}
#[cfg(has_drtio)]
pub async fn recv_return_cursor<F>(
cursor: &mut Cursor<Vec<u8>>,
tag_bytes: &[u8],
data: *mut (),
alloc: &impl Fn(usize) -> F,
) -> Result<(), Error>
where
F: Future<Output = *mut ()>,
{
let mut it = TagIterator::new(tag_bytes);
trace!("recv ...->{}", it);
let tag = it.next().expect("truncated tag");
let mut data = data;
unsafe { recv_value_cursor(cursor, tag, &mut data, alloc).await? };
Ok(())
}
unsafe fn send_elements<W>(writer: &mut W, elt_tag: Tag, length: usize, data: *const ()) -> Result<(), Error> unsafe fn send_elements<W>(writer: &mut W, elt_tag: Tag, length: usize, data: *const ()) -> Result<(), Error>
where W: Write + ?Sized { where W: Write + ?Sized {
writer.write_u8(elt_tag.as_u8())?; writer.write_u8(elt_tag.as_u8())?;

View File

@ -16,12 +16,13 @@ pub mod drtio {
use embedded_hal::blocking::delay::DelayMs; use embedded_hal::blocking::delay::DelayMs;
use libasync::{delay, task}; use libasync::{delay, task};
use libboard_artiq::{drtioaux::Error, drtioaux_async, drtioaux_async::Packet, drtioaux_proto::DMA_TRACE_MAX_SIZE}; use libboard_artiq::{drtioaux::Error, drtioaux_async, drtioaux_async::Packet,
drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE};
use libboard_zynq::time::Milliseconds; use libboard_zynq::time::Milliseconds;
use log::{error, info, warn}; use log::{error, info, warn};
use super::*; use super::*;
use crate::{analyzer::remote_analyzer::RemoteBuffer, rtio_dma::remote_dma, ASYNC_ERROR_BUSY, use crate::{analyzer::remote_analyzer::RemoteBuffer, rtio_dma::remote_dma, subkernel, ASYNC_ERROR_BUSY,
ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, SEEN_ASYNC_ERRORS}; ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, SEEN_ASYNC_ERRORS};
pub fn startup( pub fn startup(
@ -44,7 +45,7 @@ pub mod drtio {
unsafe { (csr::DRTIO[linkno].rx_up_read)() == 1 } unsafe { (csr::DRTIO[linkno].rx_up_read)() == 1 }
} }
async fn process_async_packets(packet: Packet) -> Option<Packet> { async fn process_async_packets(linkno: u8, packet: Packet) -> Option<Packet> {
// returns None if an async packet has been consumed // returns None if an async packet has been consumed
match packet { match packet {
Packet::DmaPlaybackStatus { Packet::DmaPlaybackStatus {
@ -57,6 +58,24 @@ pub mod drtio {
remote_dma::playback_done(id, destination, error, channel, timestamp).await; remote_dma::playback_done(id, destination, error, channel, timestamp).await;
None None
} }
Packet::SubkernelFinished { id, with_exception } => {
subkernel::subkernel_finished(id, with_exception).await;
None
}
Packet::SubkernelMessage {
id,
destination: from,
last,
length,
data,
} => {
subkernel::message_handle_incoming(id, last, length as usize, &data).await;
// acknowledge receiving part of the message
drtioaux_async::send(linkno, &Packet::SubkernelMessageAck { destination: from })
.await
.unwrap();
None
}
other => Some(other), other => Some(other),
} }
} }
@ -193,7 +212,7 @@ pub mod drtio {
let _lock = aux_mutex.async_lock().await; let _lock = aux_mutex.async_lock().await;
match drtioaux_async::recv(linkno).await { match drtioaux_async::recv(linkno).await {
Ok(Some(packet)) => { Ok(Some(packet)) => {
if let Some(packet) = process_async_packets(packet).await { if let Some(packet) = process_async_packets(linkno, packet).await {
warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet); warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet);
} }
} }
@ -286,6 +305,14 @@ pub mod drtio {
false, false,
) )
.await; .await;
subkernel::destination_changed(
aux_mutex,
routing_table,
timer,
destination,
false,
)
.await;
} }
Ok(Packet::DestinationOkReply) => (), Ok(Packet::DestinationOkReply) => (),
Ok(Packet::DestinationSequenceErrorReply { channel }) => { Ok(Packet::DestinationSequenceErrorReply { channel }) => {
@ -328,6 +355,7 @@ pub mod drtio {
} else { } else {
destination_set_up(routing_table, up_destinations, destination, false).await; destination_set_up(routing_table, up_destinations, destination, false).await;
remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false).await; remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, false).await;
subkernel::destination_changed(aux_mutex, routing_table, timer, destination, false).await;
} }
} else { } else {
if up_links[linkno as usize] { if up_links[linkno as usize] {
@ -347,6 +375,8 @@ pub mod drtio {
init_buffer_space(destination as u8, linkno).await; init_buffer_space(destination as u8, linkno).await;
remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, true) remote_dma::destination_changed(aux_mutex, routing_table, timer, destination, true)
.await; .await;
subkernel::destination_changed(aux_mutex, routing_table, timer, destination, true)
.await;
} }
Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet), Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet),
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e), Err(e) => error!("[DEST#{}] communication failed ({})", destination, e),
@ -433,6 +463,36 @@ pub mod drtio {
} }
} }
async fn partition_data<PacketF, HandlerF>(
linkno: u8,
aux_mutex: &Rc<Mutex<bool>>,
timer: GlobalTimer,
data: &[u8],
packet_f: PacketF,
reply_handler_f: HandlerF,
) -> Result<(), &'static str>
where
PacketF: Fn(&[u8; MASTER_PAYLOAD_MAX_SIZE], bool, usize) -> Packet,
HandlerF: Fn(&Packet) -> Result<(), &'static str>,
{
let mut i = 0;
while i < data.len() {
let mut slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
let len: usize = if i + MASTER_PAYLOAD_MAX_SIZE < data.len() {
MASTER_PAYLOAD_MAX_SIZE
} else {
data.len() - i
} as usize;
let last = i + len == data.len();
slice[..len].clone_from_slice(&data[i..i + len]);
i += len;
let packet = packet_f(&slice, last, len);
let reply = aux_transact(aux_mutex, linkno, &packet, timer).await?;
reply_handler_f(&reply)?;
}
Ok(())
}
pub async fn ddma_upload_trace( pub async fn ddma_upload_trace(
aux_mutex: &Rc<Mutex<bool>>, aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable, routing_table: &drtio_routing::RoutingTable,
@ -442,44 +502,25 @@ pub mod drtio {
trace: &Vec<u8>, trace: &Vec<u8>,
) -> Result<(), &'static str> { ) -> Result<(), &'static str> {
let linkno = routing_table.0[destination as usize][0] - 1; let linkno = routing_table.0[destination as usize][0] - 1;
let mut i = 0; partition_data(
while i < trace.len() {
let mut trace_slice: [u8; DMA_TRACE_MAX_SIZE] = [0; DMA_TRACE_MAX_SIZE];
let len: usize = if i + DMA_TRACE_MAX_SIZE < trace.len() {
DMA_TRACE_MAX_SIZE
} else {
trace.len() - i
} as usize;
let last = i + len == trace.len();
trace_slice[..len].clone_from_slice(&trace[i..i + len]);
i += len;
let reply = aux_transact(
aux_mutex,
linkno, linkno,
&Packet::DmaAddTraceRequest { aux_mutex,
timer,
trace,
|slice, last, len| Packet::DmaAddTraceRequest {
id: id, id: id,
destination: destination, destination: destination,
last: last, last: last,
length: len as u16, length: len as u16,
trace: trace_slice, trace: *slice,
},
|reply| match reply {
Packet::DmaAddTraceReply { succeeded: true } => Ok(()),
Packet::DmaAddTraceReply { succeeded: false } => Err("error adding trace on satellite"),
_ => Err("adding DMA trace failed, unexpected aux packet"),
}, },
timer,
) )
.await; .await
match reply {
Ok(Packet::DmaAddTraceReply { succeeded: true }) => (),
Ok(Packet::DmaAddTraceReply { succeeded: false }) => {
return Err("error adding trace on satellite");
}
Ok(_) => {
return Err("adding DMA trace failed, unexpected aux packet");
}
Err(_) => {
return Err("adding DMA trace failed, aux error");
}
}
}
Ok(())
} }
pub async fn ddma_send_erase( pub async fn ddma_send_erase(
@ -608,6 +649,122 @@ pub mod drtio {
} }
Ok(remote_buffers) Ok(remote_buffers)
} }
pub async fn subkernel_upload(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
id: u32,
destination: u8,
data: &Vec<u8>,
) -> Result<(), &'static str> {
let linkno = routing_table.0[destination as usize][0] - 1;
partition_data(
linkno,
aux_mutex,
timer,
data,
|slice, last, len| Packet::SubkernelAddDataRequest {
id: id,
destination: destination,
last: last,
length: len as u16,
data: *slice,
},
|reply| match reply {
Packet::SubkernelAddDataReply { succeeded: true } => Ok(()),
Packet::SubkernelAddDataReply { succeeded: false } => Err("error adding subkernel on satellite"),
_ => Err("adding subkernel failed, unexpected aux packet"),
},
)
.await
}
pub async fn subkernel_load(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
id: u32,
destination: u8,
run: bool,
) -> Result<(), &'static str> {
let linkno = routing_table.0[destination as usize][0] - 1;
let reply = aux_transact(
aux_mutex,
linkno,
&Packet::SubkernelLoadRunRequest {
id: id,
destination: destination,
run: run,
},
timer,
)
.await?;
match reply {
Packet::SubkernelLoadRunReply { succeeded: true } => return Ok(()),
Packet::SubkernelLoadRunReply { succeeded: false } => return Err("error on subkernel run request"),
_ => return Err("received unexpected aux packet during subkernel run"),
}
}
pub async fn subkernel_retrieve_exception(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
destination: u8,
) -> Result<Vec<u8>, &'static str> {
let linkno = routing_table.0[destination as usize][0] - 1;
let mut remote_data: Vec<u8> = Vec::new();
loop {
let reply = aux_transact(
aux_mutex,
linkno,
&Packet::SubkernelExceptionRequest {
destination: destination,
},
timer,
)
.await?;
match reply {
Packet::SubkernelException { last, length, data } => {
remote_data.extend(&data[0..length as usize]);
if last {
return Ok(remote_data);
}
}
_ => return Err("received unexpected aux packet during subkernel exception request"),
}
}
}
pub async fn subkernel_send_message(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &drtio_routing::RoutingTable,
timer: GlobalTimer,
id: u32,
destination: u8,
message: &[u8],
) -> Result<(), &'static str> {
let linkno = routing_table.0[destination as usize][0] - 1;
partition_data(
linkno,
aux_mutex,
timer,
message,
|slice, last, len| Packet::SubkernelMessage {
destination: destination,
id: id,
last: last,
length: len as u16,
data: *slice,
},
|reply| match reply {
Packet::SubkernelMessageAck { .. } => Ok(()),
_ => Err("sending message to subkernel failed, unexpected aux packet"),
},
)
.await
}
} }
fn read_device_map(cfg: &Config) -> BTreeMap<u32, String> { fn read_device_map(cfg: &Config) -> BTreeMap<u32, String> {

View File

@ -0,0 +1,321 @@
use alloc::{collections::BTreeMap, rc::Rc, vec::Vec};
use libasync::task;
use libboard_artiq::{drtio_routing::RoutingTable, drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE};
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
use libcortex_a9::mutex::Mutex;
use log::error;
use crate::rtio_mgt::drtio;
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum FinishStatus {
Ok,
CommLost,
Exception,
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum SubkernelState {
NotLoaded,
Uploaded,
Running,
Finished { status: FinishStatus },
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Error {
Timeout,
IncorrectState,
SubkernelNotFound,
CommLost,
DrtioError(&'static str),
}
impl From<&'static str> for Error {
fn from(value: &'static str) -> Error {
Error::DrtioError(value)
}
}
pub struct SubkernelFinished {
pub id: u32,
pub status: FinishStatus,
pub exception: Option<Vec<u8>>,
}
struct Subkernel {
pub destination: u8,
pub data: Vec<u8>,
pub state: SubkernelState,
}
impl Subkernel {
pub fn new(destination: u8, data: Vec<u8>) -> Self {
Subkernel {
destination: destination,
data: data,
state: SubkernelState::NotLoaded,
}
}
}
static SUBKERNELS: Mutex<BTreeMap<u32, Subkernel>> = Mutex::new(BTreeMap::new());
pub async fn add_subkernel(id: u32, destination: u8, kernel: Vec<u8>) {
SUBKERNELS
.async_lock()
.await
.insert(id, Subkernel::new(destination, kernel));
}
pub async fn upload(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
id: u32,
) -> Result<(), Error> {
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
drtio::subkernel_upload(
aux_mutex,
routing_table,
timer,
id,
subkernel.destination,
&subkernel.data,
)
.await?;
subkernel.state = SubkernelState::Uploaded;
Ok(())
} else {
Err(Error::SubkernelNotFound)
}
}
pub async fn load(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
id: u32,
run: bool,
) -> Result<(), Error> {
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
if subkernel.state != SubkernelState::Uploaded {
return Err(Error::IncorrectState);
}
drtio::subkernel_load(aux_mutex, routing_table, timer, id, subkernel.destination, run).await?;
if run {
subkernel.state = SubkernelState::Running;
}
Ok(())
} else {
Err(Error::SubkernelNotFound)
}
}
pub async fn clear_subkernels() {
SUBKERNELS.async_lock().await.clear();
MESSAGE_QUEUE.async_lock().await.clear();
CURRENT_MESSAGES.async_lock().await.clear();
}
pub async fn subkernel_finished(id: u32, with_exception: bool) {
// called upon receiving DRTIO SubkernelRunDone
// may be None if session ends and is cleared
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
subkernel.state = SubkernelState::Finished {
status: match with_exception {
true => FinishStatus::Exception,
false => FinishStatus::Ok,
},
}
}
}
pub async fn destination_changed(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
destination: u8,
up: bool,
) {
let mut locked_subkernels = SUBKERNELS.async_lock().await;
for (id, subkernel) in locked_subkernels.iter_mut() {
if subkernel.destination == destination {
if up {
match drtio::subkernel_upload(aux_mutex, routing_table, timer, *id, destination, &subkernel.data).await
{
Ok(_) => subkernel.state = SubkernelState::Uploaded,
Err(e) => error!("Error adding subkernel on destination {}: {}", destination, e),
}
} else {
subkernel.state = match subkernel.state {
SubkernelState::Running => SubkernelState::Finished {
status: FinishStatus::CommLost,
},
_ => SubkernelState::NotLoaded,
}
}
}
}
}
pub async fn get_finished_with_exception(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
) -> Result<Option<SubkernelFinished>, Error> {
let mut locked_subkernels = SUBKERNELS.async_lock().await;
for (id, subkernel) in locked_subkernels.iter_mut() {
match subkernel.state {
SubkernelState::Finished {
status: FinishStatus::Ok,
} => (),
SubkernelState::Finished { status } => {
subkernel.state = SubkernelState::Finished {
status: FinishStatus::Ok,
};
return Ok(Some(SubkernelFinished {
id: *id,
status: status,
exception: if status == FinishStatus::Exception {
Some(
drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, subkernel.destination)
.await?,
)
} else {
None
},
}));
}
_ => (),
}
}
Ok(None)
}
pub async fn await_finish(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
id: u32,
timeout: u64,
) -> Result<SubkernelFinished, Error> {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Running | SubkernelState::Finished { .. } => (),
_ => return Err(Error::IncorrectState),
}
let max_time = timer.get_time() + Milliseconds(timeout);
while timer.get_time() < max_time {
{
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished { .. } => break,
_ => (),
};
}
task::r#yield().await;
}
if timer.get_time() >= max_time {
error!("Remote subkernel finish await timed out");
return Err(Error::Timeout);
}
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
match subkernel.state {
SubkernelState::Finished { status } => {
subkernel.state = SubkernelState::Uploaded;
Ok(SubkernelFinished {
id: id,
status: status,
exception: if status == FinishStatus::Exception {
Some(
drtio::subkernel_retrieve_exception(aux_mutex, routing_table, timer, subkernel.destination)
.await?,
)
} else {
None
},
})
}
_ => Err(Error::IncorrectState),
}
} else {
Err(Error::SubkernelNotFound)
}
}
struct Message {
from_id: u32,
pub tag: u8,
pub data: Vec<u8>,
}
// FIFO queue of messages
static MESSAGE_QUEUE: Mutex<Vec<Message>> = Mutex::new(Vec::new());
// currently under construction message(s) (can be from multiple sources)
static CURRENT_MESSAGES: Mutex<BTreeMap<u32, Message>> = Mutex::new(BTreeMap::new());
pub async fn message_handle_incoming(id: u32, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) {
// called when receiving a message from satellite
if SUBKERNELS.async_lock().await.get(&id).is_none() {
// do not add messages for non-existing or deleted subkernels
return;
}
let mut current_messages = CURRENT_MESSAGES.async_lock().await;
match current_messages.get_mut(&id) {
Some(message) => message.data.extend(&data[..length]),
None => {
current_messages.insert(
id,
Message {
from_id: id,
tag: data[0],
data: data[1..length].to_vec(),
},
);
}
};
if last {
// when done, remove from working queue
MESSAGE_QUEUE
.async_lock()
.await
.push(current_messages.remove(&id).unwrap());
}
}
pub async fn message_await(id: u32, timeout: u64, timer: GlobalTimer) -> Result<(u8, Vec<u8>), Error> {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Finished {
status: FinishStatus::CommLost,
} => return Err(Error::CommLost),
SubkernelState::Running | SubkernelState::Finished { .. } => (),
_ => return Err(Error::IncorrectState),
}
let max_time = timer.get_time() + Milliseconds(timeout);
while timer.get_time() < max_time {
{
let mut message_queue = MESSAGE_QUEUE.async_lock().await;
for i in 0..message_queue.len() {
let msg = &message_queue[i];
if msg.from_id == id {
let message = message_queue.remove(i);
return Ok((message.tag, message.data));
}
}
}
task::r#yield().await;
}
Err(Error::Timeout)
}
pub async fn message_send<'a>(
aux_mutex: &Rc<Mutex<bool>>,
routing_table: &RoutingTable,
timer: GlobalTimer,
id: u32,
message: Vec<u8>,
) -> Result<(), Error> {
let destination = SUBKERNELS.async_lock().await.get(&id).unwrap().destination;
// rpc data prepared by the kernel core already
Ok(drtio::subkernel_send_message(aux_mutex, routing_table, timer, id, destination, &message).await?)
}

View File

@ -1,6 +1,6 @@
use core::cmp::min; use core::cmp::min;
use libboard_artiq::{drtioaux_proto::ANALYZER_MAX_SIZE, pl::csr}; use libboard_artiq::{drtioaux_proto::SAT_PAYLOAD_MAX_SIZE, pl::csr};
use libcortex_a9::cache; use libcortex_a9::cache;
const BUFFER_SIZE: usize = 512 * 1024; const BUFFER_SIZE: usize = 512 * 1024;
@ -100,10 +100,10 @@ impl Analyzer {
} }
} }
pub fn get_data(&mut self, data_slice: &mut [u8; ANALYZER_MAX_SIZE]) -> AnalyzerSliceMeta { pub fn get_data(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> AnalyzerSliceMeta {
let data = unsafe { &BUFFER.data[..] }; let data = unsafe { &BUFFER.data[..] };
let i = (self.data_pointer + self.sent_bytes) % BUFFER_SIZE; let i = (self.data_pointer + self.sent_bytes) % BUFFER_SIZE;
let len = min(ANALYZER_MAX_SIZE, self.data_len - self.sent_bytes); let len = min(SAT_PAYLOAD_MAX_SIZE, self.data_len - self.sent_bytes);
let last = self.sent_bytes + len == self.data_len; let last = self.sent_bytes + len == self.data_len;
if i + len >= BUFFER_SIZE { if i + len >= BUFFER_SIZE {

View File

@ -27,7 +27,7 @@ use embedded_hal::blocking::delay::DelayUs;
use libboard_artiq::io_expander; use libboard_artiq::io_expander;
#[cfg(has_si5324)] #[cfg(has_si5324)]
use libboard_artiq::si5324; use libboard_artiq::si5324;
use libboard_artiq::{drtio_routing, drtioaux, drtioaux_proto::ANALYZER_MAX_SIZE, identifier_read, logger, pl::csr}; use libboard_artiq::{drtio_routing, drtioaux, drtioaux_proto::SAT_PAYLOAD_MAX_SIZE, identifier_read, logger, pl::csr};
#[cfg(feature = "target_kasli_soc")] #[cfg(feature = "target_kasli_soc")]
use libboard_zynq::error_led::ErrorLED; use libboard_zynq::error_led::ErrorLED;
use libboard_zynq::{gic, i2c::I2c, mpcore, print, println, stdio, time::Milliseconds, timer::GlobalTimer}; use libboard_zynq::{gic, i2c::I2c, mpcore, print, println, stdio, time::Milliseconds, timer::GlobalTimer};
@ -450,7 +450,7 @@ fn process_aux_packet(
destination: _destination, destination: _destination,
} => { } => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
let mut data_slice: [u8; ANALYZER_MAX_SIZE] = [0; ANALYZER_MAX_SIZE]; let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = analyzer.get_data(&mut data_slice); let meta = analyzer.get_data(&mut data_slice);
drtioaux::send( drtioaux::send(
0, 0,