satellite: support subkernels

pull/267/head
mwojcik 2023-09-06 16:06:38 +08:00
parent 623cc7b79e
commit cb79c12284
16 changed files with 839 additions and 103 deletions

3
src/Cargo.lock generated
View File

@ -497,7 +497,10 @@ name = "satman"
version = "0.0.0"
dependencies = [
"build_zynq",
"core_io",
"cslice",
"embedded-hal",
"io",
"ksupport",
"libasync",
"libboard_artiq",

View File

@ -1,5 +1,6 @@
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core_io::{Error as IoError, Read, Write};
#[derive(Debug, Clone)]

View File

@ -4,7 +4,7 @@ use libcortex_a9::sync_channel::{Receiver, Sender};
use libsupport_zynq::boot::Core1;
use super::{Message, CHANNEL_0TO1, CHANNEL_1TO0, CHANNEL_SEM, INIT_LOCK};
use crate::{irq::restart_core1};
use crate::irq::restart_core1;
pub struct Control {
pub tx: Sender<'static, Message>,

View File

@ -16,7 +16,6 @@ mod cache;
#[cfg(has_drtio)]
mod subkernel;
#[cfg(has_drtio)]
#[derive(Debug, Clone)]
pub enum SubkernelStatus {

View File

@ -2,7 +2,7 @@ use alloc::vec::Vec;
use cslice::CSlice;
use super::{Message, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0, SubkernelStatus};
use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0};
use crate::{artiq_raise, rpc::send_args};
pub extern "C" fn load_run(id: u32, run: bool) {

View File

@ -14,8 +14,11 @@ use io::{Cursor, ProtoRead};
use libasync::block_async;
use libconfig::Config;
use log::{error, warn};
#[cfg(has_drtiosat)]
pub use pl::csr::drtiosat as rtio_core;
#[cfg(has_rtio_core)]
pub use pl::csr::rtio_core;
use void::Void;
use rtio::rtio_core;
pub mod eh_artiq;
pub mod i2c;
@ -151,7 +154,11 @@ pub unsafe fn get_async_errors() -> u8 {
fn wait_for_async_rtio_error() -> nb::Result<(), Void> {
unsafe {
if rtio_core::async_error_read() != 0 {
#[cfg(has_rtio_core)]
let errors = rtio_core::async_error_read();
#[cfg(has_drtiosat)]
let errors = rtio_core::protocol_error_read();
if errors != 0 {
Ok(())
} else {
Err(nb::Error::WouldBlock)
@ -163,7 +170,10 @@ pub async fn report_async_rtio_errors() {
loop {
let _ = block_async!(wait_for_async_rtio_error()).await;
unsafe {
#[cfg(has_rtio_core)]
let errors = rtio_core::async_error_read();
#[cfg(has_drtiosat)]
let errors = rtio_core::protocol_error_read();
if errors & ASYNC_ERROR_COLLISION != 0 {
let channel = rtio_core::collision_channel_read();
error!(
@ -189,7 +199,10 @@ pub async fn report_async_rtio_errors() {
);
}
SEEN_ASYNC_ERRORS = errors;
#[cfg(has_rtio_core)]
rtio_core::async_error_write(errors);
#[cfg(has_drtiosat)]
rtio_core::protocol_error_write(errors);
}
}
}

View File

@ -42,10 +42,10 @@ unsafe fn recv_elements<F, R>(
elt_tag: Tag,
length: usize,
storage: *mut (),
alloc: &F,
alloc: &mut F,
) -> Result<(), Error>
where
F: Fn(usize) -> *mut (),
F: FnMut(usize) -> *mut (),
R: Read + ?Sized,
{
match elt_tag {
@ -79,9 +79,9 @@ where
Ok(())
}
unsafe fn recv_value<F, R>(reader: &mut R, tag: Tag, data: &mut *mut (), alloc: &F) -> Result<(), Error>
unsafe fn recv_value<F, R>(reader: &mut R, tag: Tag, data: &mut *mut (), alloc: &mut F) -> Result<(), Error>
where
F: Fn(usize) -> *mut (),
F: FnMut(usize) -> *mut (),
R: Read + ?Sized,
{
macro_rules! consume_value {
@ -175,9 +175,9 @@ where
}
}
pub fn recv_return<F, R>(reader: &mut R, tag_bytes: &[u8], data: *mut (), alloc: &F) -> Result<(), Error>
pub fn recv_return<F, R>(reader: &mut R, tag_bytes: &[u8], data: *mut (), alloc: &mut F) -> Result<(), Error>
where
F: Fn(usize) -> *mut (),
F: FnMut(usize) -> *mut (),
R: Read + ?Sized,
{
let mut it = TagIterator::new(tag_bytes);

View File

@ -4,11 +4,7 @@ use cslice::CSlice;
use libcortex_a9::asm;
use vcell::VolatileCell;
use crate::{artiq_raise, pl::csr, resolve_channel_name};
#[cfg(has_drtiosat)]
pub use crate::pl::csr::drtiosat as rtio_core;
#[cfg(has_rtio_core)]
pub use crate::pl::csr::rtio_core;
use crate::{artiq_raise, pl::csr, resolve_channel_name, rtio_core};
pub const RTIO_O_STATUS_WAIT: i32 = 1;
pub const RTIO_O_STATUS_UNDERFLOW: i32 = 2;

View File

@ -2,11 +2,7 @@ use core::ptr::{read_volatile, write_volatile};
use cslice::CSlice;
use crate::{artiq_raise, pl::csr, resolve_channel_name};
#[cfg(has_drtiosat)]
pub use crate::pl::csr::drtiosat as rtio_core;
#[cfg(has_rtio_core)]
pub use crate::pl::csr::rtio_core;
use crate::{artiq_raise, pl::csr, resolve_channel_name, rtio_core};
pub const RTIO_O_STATUS_WAIT: u8 = 1;
pub const RTIO_O_STATUS_UNDERFLOW: u8 = 2;

View File

@ -6,9 +6,9 @@ use cslice::CSlice;
use futures::{future::FutureExt, select_biased};
#[cfg(has_drtio)]
use io::{Cursor, ProtoRead};
use ksupport::{kernel, resolve_channel_name};
#[cfg(has_drtio)]
use ksupport::rpc;
use ksupport::{kernel, resolve_channel_name};
use libasync::{smoltcp::{Sockets, TcpStream},
task};
use libboard_artiq::drtio_routing;
@ -444,7 +444,11 @@ async fn handle_run_kernel(
error!("error sending subkernel message: {:?}", e)
}
};
control.borrow_mut().tx.async_send(kernel::Message::SubkernelMsgSent).await;
control
.borrow_mut()
.tx
.async_send(kernel::Message::SubkernelMsgSent)
.await;
}
#[cfg(has_drtio)]
kernel::Message::SubkernelMsgRecvRequest { id, timeout } => {
@ -472,7 +476,7 @@ async fn handle_run_kernel(
kernel::Message::RpcRecvRequest(slot) => slot,
other => panic!("expected root value slot from core1, not {:?}", other),
};
rpc::recv_return(&mut reader, &tag, slot, &|size| {
rpc::recv_return(&mut reader, &tag, slot, &mut |size| {
if size == 0 {
0 as *mut ()
} else {

View File

@ -5,7 +5,7 @@ use async_recursion::async_recursion;
use byteorder::{ByteOrder, NativeEndian};
use cslice::CMutSlice;
use ksupport::rpc::{tag::{Tag, TagIterator},
*};
*};
use libasync::smoltcp::TcpStream;
use libboard_zynq::smoltcp;
use log::trace;

View File

@ -10,7 +10,8 @@ pub mod drtio {
use alloc::vec::Vec;
use embedded_hal::blocking::delay::DelayMs;
use ksupport::{ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, SEEN_ASYNC_ERRORS, resolve_channel_name};
use ksupport::{resolve_channel_name, ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR,
SEEN_ASYNC_ERRORS};
use libasync::{delay, task};
use libboard_artiq::{drtioaux::Error, drtioaux_async, drtioaux_async::Packet,
drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE};

View File

@ -14,6 +14,8 @@ build_zynq = { path = "../libbuild_zynq" }
[dependencies]
log = { version = "0.4", default-features = false }
core_io = { version = "0.1", features = ["collections"] }
cslice = "0.3"
embedded-hal = "0.2"
libboard_zynq = { git = "https://git.m-labs.hk/M-Labs/zynq-rs.git", features = ["ipv6"]}
@ -25,4 +27,6 @@ libconfig = { git = "https://git.m-labs.hk/M-Labs/zynq-rs.git", features = ["fat
libboard_artiq = { path = "../libboard_artiq" }
unwind = { path = "../libunwind" }
libc = { path = "../libc" }
libc = { path = "../libc" }
io = { path = "../libio", features = ["alloc"] }
ksupport = { path = "../libksupport" }

View File

@ -170,4 +170,8 @@ impl Manager {
}
}
}
pub fn running(&self) -> bool {
self.state == ManagerState::Playback
}
}

View File

@ -1,13 +1,15 @@
#![no_std]
#![no_main]
#![feature(never_type, panic_info_message, asm, naked_functions)]
#![feature(alloc_error_handler)]
#![feature(alloc_error_handler, try_trait, never_type, panic_info_message)]
#[macro_use]
extern crate log;
extern crate core_io;
extern crate cslice;
extern crate embedded_hal;
extern crate io;
extern crate ksupport;
extern crate libboard_artiq;
extern crate libboard_zynq;
extern crate libcortex_a9;
@ -18,8 +20,6 @@ extern crate unwind;
extern crate alloc;
use core::sync::atomic::{AtomicBool, Ordering};
use analyzer::Analyzer;
use dma::Manager as DmaManager;
use embedded_hal::blocking::delay::DelayUs;
@ -27,21 +27,22 @@ use embedded_hal::blocking::delay::DelayUs;
use libboard_artiq::io_expander;
#[cfg(has_si5324)]
use libboard_artiq::si5324;
use libboard_artiq::{drtio_routing, drtioaux, drtioaux_proto::SAT_PAYLOAD_MAX_SIZE, identifier_read, logger, pl::csr};
use libboard_artiq::{drtio_routing, drtioaux,
drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE},
identifier_read, logger,
pl::csr};
#[cfg(feature = "target_kasli_soc")]
use libboard_zynq::error_led::ErrorLED;
use libboard_zynq::{gic, i2c::I2c, mpcore, print, println, stdio, time::Milliseconds, timer::GlobalTimer};
use libcortex_a9::{asm, interrupt_handler,
l2c::enable_l2_cache,
notify_spin_lock,
regs::{MPIDR, SP},
spin_lock_yield};
use libregister::{RegisterR, RegisterW};
use libboard_zynq::{i2c::I2c, print, println, time::Milliseconds, timer::GlobalTimer};
use libcortex_a9::{l2c::enable_l2_cache, regs::MPIDR};
use libregister::RegisterR;
use libsupport_zynq::ram;
use subkernel::Manager as KernelManager;
mod analyzer;
mod dma;
mod repeater;
mod subkernel;
fn drtiosat_reset(reset: bool) {
unsafe {
@ -98,6 +99,7 @@ fn process_aux_packet(
i2c: &mut I2c,
dma_manager: &mut DmaManager,
analyzer: &mut Analyzer,
kernel_manager: &mut KernelManager,
) -> Result<(), drtioaux::Error> {
// In the code below, *_chan_sel_write takes an u8 if there are fewer than 256 channels,
// and u16 otherwise; hence the `as _` conversion.
@ -487,10 +489,98 @@ fn process_aux_packet(
timestamp,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
let succeeded = dma_manager.playback(id, timestamp).is_ok();
let succeeded = if !kernel_manager.running() {
dma_manager.playback(id, timestamp).is_ok()
} else {
false
};
drtioaux::send(0, &drtioaux::Packet::DmaPlaybackReply { succeeded: succeeded })
}
drtioaux::Packet::SubkernelAddDataRequest {
destination: _destination,
id,
last,
length,
data,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
let succeeded = kernel_manager.add(id, last, &data, length as usize).is_ok();
drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded })
}
drtioaux::Packet::SubkernelLoadRunRequest {
destination: _destination,
id,
run,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
let mut succeeded = kernel_manager.load(id).is_ok();
// allow preloading a kernel with delayed run
if run {
if dma_manager.running() {
// cannot run kernel while DDMA is running
succeeded = false;
} else {
succeeded |= kernel_manager.run(id).is_ok();
}
}
drtioaux::send(0, &drtioaux::Packet::SubkernelLoadRunReply { succeeded: succeeded })
}
drtioaux::Packet::SubkernelExceptionRequest {
destination: _destination,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE];
let meta = kernel_manager.exception_get_slice(&mut data_slice);
drtioaux::send(
0,
&drtioaux::Packet::SubkernelException {
last: meta.last,
length: meta.len,
data: data_slice,
},
)
}
drtioaux::Packet::SubkernelMessage {
destination,
id: _id,
last,
length,
data,
} => {
forward!(_routing_table, destination, *_rank, _repeaters, &packet, timer);
kernel_manager.message_handle_incoming(last, length as usize, &data);
drtioaux::send(
0,
&drtioaux::Packet::SubkernelMessageAck {
destination: destination,
},
)
}
drtioaux::Packet::SubkernelMessageAck {
destination: _destination,
} => {
forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer);
if kernel_manager.message_ack_slice() {
let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) {
drtioaux::send(
0,
&drtioaux::Packet::SubkernelMessage {
destination: *_rank,
id: kernel_manager.get_current_id().unwrap(),
last: meta.last,
length: meta.len as u16,
data: data_slice,
},
)?;
} else {
error!("Error receiving message slice");
}
}
Ok(())
}
_ => {
warn!("received unexpected aux packet");
Ok(())
@ -506,6 +596,7 @@ fn process_aux_packets(
i2c: &mut I2c,
dma_manager: &mut DmaManager,
analyzer: &mut Analyzer,
kernel_manager: &mut KernelManager,
) {
let result = drtioaux::recv(0).and_then(|packet| {
if let Some(packet) = packet {
@ -518,6 +609,7 @@ fn process_aux_packets(
i2c,
dma_manager,
analyzer,
kernel_manager,
)
} else {
Ok(())
@ -626,8 +718,8 @@ pub extern "C" fn main_core0() -> i32 {
ram::init_alloc_core0();
let mut i2c = I2c::i2c0();
i2c.init().expect("I2C initialization failed");
ksupport::i2c::init();
let mut i2c = unsafe { (ksupport::i2c::I2C_BUS).as_mut().unwrap() };
#[cfg(feature = "target_kasli_soc")]
let (mut io_expander0, mut io_expander1);
@ -682,6 +774,8 @@ pub extern "C" fn main_core0() -> i32 {
let mut hardware_tick_ts = 0;
let mut control = ksupport::kernel::Control::start();
loop {
while !drtiosat_link_rx_up() {
drtiosat_process_errors();
@ -709,12 +803,12 @@ pub extern "C" fn main_core0() -> i32 {
si5324::siphaser::calibrate_skew(&mut timer).expect("failed to calibrate skew");
}
// DMA manager created here, so when link is dropped, all DMA traces
// Various managers created here, so when link is dropped, all DMA traces
// are cleared out for a clean slate on subsequent connections,
// without a manual intervention.
let mut dma_manager = DmaManager::new();
// same for RTIO Analyzer
let mut analyzer = Analyzer::new();
let mut kernel_manager = KernelManager::new(&mut control);
drtioaux::reset(0);
drtiosat_reset(false);
@ -730,6 +824,7 @@ pub extern "C" fn main_core0() -> i32 {
&mut i2c,
&mut dma_manager,
&mut analyzer,
&mut kernel_manager,
);
#[allow(unused_mut)]
for mut rep in repeaters.iter_mut() {
@ -771,46 +866,8 @@ extern "C" {
static mut __stack1_start: u32;
}
interrupt_handler!(IRQ, irq, __irq_stack0_start, __irq_stack1_start, {
if MPIDR.read().cpu_id() == 1 {
let mpcore = mpcore::RegisterBlock::mpcore();
let mut gic = gic::InterruptController::gic(mpcore);
let id = gic.get_interrupt_id();
if id.0 == 0 {
gic.end_interrupt(id);
asm::exit_irq();
SP.write(&mut __stack1_start as *mut _ as u32);
asm::enable_irq();
CORE1_RESTART.store(false, Ordering::Relaxed);
notify_spin_lock();
main_core1();
}
stdio::drop_uart();
}
loop {}
});
static mut PANICKED: [bool; 2] = [false; 2];
static CORE1_RESTART: AtomicBool = AtomicBool::new(false);
pub fn restart_core1() {
let mut interrupt_controller = gic::InterruptController::gic(mpcore::RegisterBlock::mpcore());
CORE1_RESTART.store(true, Ordering::Relaxed);
interrupt_controller.send_sgi(gic::InterruptId(0), gic::CPUCore::Core1.into());
while CORE1_RESTART.load(Ordering::Relaxed) {
spin_lock_yield();
}
}
#[no_mangle]
pub fn main_core1() {
let mut interrupt_controller = gic::InterruptController::gic(mpcore::RegisterBlock::mpcore());
interrupt_controller.enable_interrupts();
loop {}
}
#[no_mangle]
pub extern "C" fn exception(_vect: u32, _regs: *const u32, pc: u32, ea: u32) {
fn hexdump(addr: u32) {
@ -866,23 +923,3 @@ pub fn panic_fmt(info: &core::panic::PanicInfo) -> ! {
loop {}
}
// linker symbols
extern "C" {
static __text_start: u32;
static __text_end: u32;
static __exidx_start: u32;
static __exidx_end: u32;
}
#[no_mangle]
extern "C" fn dl_unwind_find_exidx(_pc: *const u32, len_ptr: *mut u32) -> *const u32 {
let length;
let start: *const u32;
unsafe {
length = (&__exidx_end as *const u32).offset_from(&__exidx_start) as u32;
start = &__exidx_start;
*len_ptr = length;
}
start
}

678
src/satman/src/subkernel.rs Normal file
View File

@ -0,0 +1,678 @@
use alloc::{collections::{BTreeMap, VecDeque},
format,
string::{String, ToString},
vec::Vec};
use core::{cmp::min, option::NoneError, slice, str};
use core_io::{Error as IoError, Write};
use cslice::AsCSlice;
use io::{Cursor, ProtoRead, ProtoWrite};
use ksupport::{eh_artiq, kernel, rpc};
use libboard_artiq::{drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE},
pl::csr};
use libboard_zynq::{time::Milliseconds, timer::GlobalTimer};
use libcortex_a9::sync_channel::Receiver;
use log::warn;
#[derive(Debug, Clone, Copy, PartialEq)]
enum KernelState {
Absent,
Loaded,
Running,
MsgAwait(Milliseconds),
MsgSending,
}
#[derive(Debug)]
pub enum Error {
Load(String),
KernelNotFound,
Unexpected(String),
NoMessage,
AwaitingMessage,
SubkernelIoError,
KernelException(Sliceable),
}
impl From<NoneError> for Error {
fn from(_: NoneError) -> Error {
Error::KernelNotFound
}
}
impl From<IoError> for Error {
fn from(_value: IoError) -> Error {
Error::SubkernelIoError
}
}
impl From<()> for Error {
fn from(_: ()) -> Error {
Error::NoMessage
}
}
macro_rules! unexpected {
($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*))));
}
/* represents data that has to be sent to Master */
#[derive(Debug)]
pub struct Sliceable {
it: usize,
data: Vec<u8>,
}
/* represents interkernel messages */
struct Message {
tag: u8,
data: Vec<u8>,
}
#[derive(PartialEq)]
enum OutMessageState {
NoMessage,
MessageReady,
MessageBeingSent,
MessageSent,
MessageAcknowledged,
}
/* for dealing with incoming and outgoing interkernel messages */
struct MessageManager {
out_message: Option<Sliceable>,
out_state: OutMessageState,
in_queue: VecDeque<Message>,
in_buffer: Option<Message>,
}
// Per-run state
struct Session {
id: u32,
kernel_state: KernelState,
last_exception: Option<Sliceable>,
messages: MessageManager,
}
impl Session {
pub fn new(id: u32) -> Session {
Session {
id: id,
kernel_state: KernelState::Absent,
last_exception: None,
messages: MessageManager::new(),
}
}
fn running(&self) -> bool {
match self.kernel_state {
KernelState::Absent | KernelState::Loaded => false,
KernelState::Running | KernelState::MsgAwait { .. } | KernelState::MsgSending => true,
}
}
}
#[derive(Debug)]
struct KernelLibrary {
library: Vec<u8>,
complete: bool,
}
pub struct Manager<'a> {
kernels: BTreeMap<u32, KernelLibrary>,
session: Session,
control: &'a mut kernel::Control,
cache: BTreeMap<String, Vec<i32>>,
}
pub struct SubkernelFinished {
pub id: u32,
pub with_exception: bool,
}
pub struct SliceMeta {
pub len: u16,
pub last: bool,
}
macro_rules! get_slice_fn {
($name:tt, $size:expr) => {
pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta {
if self.data.len() == 0 {
return SliceMeta { len: 0, last: true };
}
let len = min($size, self.data.len() - self.it);
let last = self.it + len == self.data.len();
data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]);
self.it += len;
SliceMeta {
len: len as u16,
last: last,
}
}
};
}
impl Sliceable {
pub fn new(data: Vec<u8>) -> Sliceable {
Sliceable { it: 0, data: data }
}
get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE);
get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE);
}
impl MessageManager {
pub fn new() -> MessageManager {
MessageManager {
out_message: None,
out_state: OutMessageState::NoMessage,
in_queue: VecDeque::new(),
in_buffer: None,
}
}
pub fn handle_incoming(&mut self, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) {
// called when receiving a message from master
match self.in_buffer.as_mut() {
Some(message) => message.data.extend(&data[..length]),
None => {
self.in_buffer = Some(Message {
tag: data[0],
data: data[1..length].to_vec(),
});
}
};
if last {
// when done, remove from working queue
self.in_queue.push_back(self.in_buffer.take().unwrap());
}
}
pub fn is_outgoing_ready(&mut self) -> bool {
// called by main loop, to see if there's anything to send, will send it afterwards
match self.out_state {
OutMessageState::MessageReady => {
self.out_state = OutMessageState::MessageBeingSent;
true
}
_ => false,
}
}
pub fn was_message_acknowledged(&mut self) -> bool {
match self.out_state {
OutMessageState::MessageAcknowledged => {
self.out_state = OutMessageState::NoMessage;
true
}
_ => false,
}
}
pub fn get_outgoing_slice(&mut self, data_slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option<SliceMeta> {
if self.out_state != OutMessageState::MessageBeingSent {
return None;
}
let meta = self.out_message.as_mut()?.get_slice_master(data_slice);
if meta.last {
// clear the message slot
self.out_message = None;
// notify kernel with a flag that message is sent
self.out_state = OutMessageState::MessageSent;
}
Some(meta)
}
pub fn ack_slice(&mut self) -> bool {
// returns whether or not there's more to be sent
match self.out_state {
OutMessageState::MessageBeingSent => true,
OutMessageState::MessageSent => {
self.out_state = OutMessageState::MessageAcknowledged;
false
}
_ => {
warn!("received unsolicited SubkernelMessageAck");
false
}
}
}
pub fn accept_outgoing(&mut self, message: Vec<u8>) -> Result<(), Error> {
// skip service tag
self.out_message = Some(Sliceable::new(message[4..].to_vec()));
self.out_state = OutMessageState::MessageReady;
Ok(())
}
pub fn get_incoming(&mut self) -> Option<Message> {
self.in_queue.pop_front()
}
}
impl<'a> Manager<'_> {
pub fn new(control: &mut kernel::Control) -> Manager {
Manager {
kernels: BTreeMap::new(),
session: Session::new(0),
control: control,
cache: BTreeMap::new(),
}
}
pub fn add(&mut self, id: u32, last: bool, data: &[u8], data_len: usize) -> Result<(), Error> {
let kernel = match self.kernels.get_mut(&id) {
Some(kernel) => {
if kernel.complete {
// replace entry
self.kernels.remove(&id);
self.kernels.insert(
id,
KernelLibrary {
library: Vec::new(),
complete: false,
},
);
self.kernels.get_mut(&id)?
} else {
kernel
}
}
None => {
self.kernels.insert(
id,
KernelLibrary {
library: Vec::new(),
complete: false,
},
);
self.kernels.get_mut(&id)?
}
};
kernel.library.extend(&data[0..data_len]);
kernel.complete = last;
Ok(())
}
pub fn running(&self) -> bool {
self.session.running()
}
pub fn get_current_id(&self) -> Option<u32> {
match self.running() {
true => Some(self.session.id),
false => None,
}
}
pub fn run(&mut self, id: u32) -> Result<(), Error> {
info!("starting subkernel #{}", id);
if self.session.kernel_state != KernelState::Loaded || self.session.id != id {
self.load(id)?;
}
self.session.kernel_state = KernelState::Running;
unsafe {
csr::cri_con::selected_write(2);
}
self.control.tx.send(kernel::Message::StartRequest);
Ok(())
}
pub fn message_handle_incoming(&mut self, last: bool, length: usize, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) {
if !self.running() {
return;
}
self.session.messages.handle_incoming(last, length, slice);
}
pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option<SliceMeta> {
if !self.running() {
return None;
}
self.session.messages.get_outgoing_slice(slice)
}
pub fn message_ack_slice(&mut self) -> bool {
if !self.running() {
warn!("received unsolicited SubkernelMessageAck");
return false;
}
self.session.messages.ack_slice()
}
pub fn message_is_ready(&mut self) -> bool {
self.session.messages.is_outgoing_ready()
}
pub fn load(&mut self, id: u32) -> Result<(), Error> {
if self.session.id == id && self.session.kernel_state == KernelState::Loaded {
return Ok(());
}
if !self.kernels.get(&id)?.complete {
return Err(Error::KernelNotFound);
}
self.session = Session::new(id);
self.control.restart();
self.control
.tx
.send(kernel::Message::LoadRequest(self.kernels.get(&id)?.library.clone()));
let reply = self.control.rx.recv();
match reply {
kernel::Message::LoadCompleted => Ok(()),
kernel::Message::LoadFailed => Err(Error::Load("kernel load failed".to_string())),
_ => Err(Error::Load(format!(
"unexpected kernel CPU reply to load request: {:?}",
reply
))),
}
}
pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta {
match self.session.last_exception.as_mut() {
Some(exception) => exception.get_slice_sat(data_slice),
None => SliceMeta { len: 0, last: true },
}
}
fn kernel_stop(&mut self) {
self.session.kernel_state = KernelState::Absent;
unsafe {
csr::cri_con::selected_write(0);
}
}
fn runtime_exception(&mut self, cause: Error) {
let raw_exception: Vec<u8> = Vec::new();
let mut writer = Cursor::new(raw_exception);
match write_exception(
&mut writer,
&[Some(eh_artiq::Exception {
id: 11, // SubkernelError, defined in ksupport
message: format!("in subkernel id {}: {:?}", self.session.id, cause).as_c_slice(),
param: [0, 0, 0],
file: file!().as_c_slice(),
line: line!(),
column: column!(),
function: format!("subkernel id {}", self.session.id).as_c_slice(),
})],
&[eh_artiq::StackPointerBacktrace {
stack_pointer: 0,
initial_backtrace_size: 0,
current_backtrace_size: 0,
}],
&[],
0,
) {
Ok(_) => self.session.last_exception = Some(Sliceable::new(writer.into_inner())),
Err(_) => error!("Error writing exception data"),
}
self.kernel_stop();
}
pub fn process_kern_requests(&mut self, rank: u8, timer: GlobalTimer) -> Option<SubkernelFinished> {
if !self.running() {
return None;
}
match self.process_external_messages(timer) {
Ok(()) => (),
Err(Error::AwaitingMessage) => return None, // kernel still waiting, do not process kernel messages
Err(Error::KernelException(exception)) => {
self.session.last_exception = Some(exception);
return Some(SubkernelFinished {
id: self.session.id,
with_exception: true,
});
}
Err(e) => {
error!("Error while running processing external messages: {:?}", e);
self.runtime_exception(e);
return Some(SubkernelFinished {
id: self.session.id,
with_exception: true,
});
}
}
match self.process_kern_message(rank, timer) {
Ok(true) => Some(SubkernelFinished {
id: self.session.id,
with_exception: false,
}),
Ok(false) | Err(Error::NoMessage) => None,
Err(Error::KernelException(exception)) => {
self.session.last_exception = Some(exception);
return Some(SubkernelFinished {
id: self.session.id,
with_exception: true,
});
}
Err(e) => {
error!("Error while running kernel: {:?}", e);
self.runtime_exception(e);
Some(SubkernelFinished {
id: self.session.id,
with_exception: true,
})
}
}
}
fn process_kern_message(&mut self, rank: u8, timer: GlobalTimer) -> Result<bool, Error> {
let reply = self.control.rx.try_recv()?;
match reply {
kernel::Message::KernelFinished(_async_errors) => {
self.kernel_stop();
return Ok(true);
}
kernel::Message::KernelException(exceptions, stack_pointers, backtrace, async_errors) => {
error!("exception in kernel");
for exception in exceptions {
error!("{:?}", exception.unwrap());
}
error!("stack pointers: {:?}", stack_pointers);
error!("backtrace: {:?}", backtrace);
let buf: Vec<u8> = Vec::new();
let mut writer = Cursor::new(buf);
match write_exception(&mut writer, exceptions, stack_pointers, backtrace, async_errors) {
Ok(()) => (),
Err(_) => error!("Error writing exception data"),
}
self.kernel_stop();
return Err(Error::KernelException(Sliceable::new(writer.into_inner())));
}
kernel::Message::CachePutRequest(key, value) => {
self.cache.insert(key, value);
}
kernel::Message::CacheGetRequest(key) => {
const DEFAULT: Vec<i32> = Vec::new();
let value = self.cache.get(&key).unwrap_or(&DEFAULT).clone();
self.control.tx.send(kernel::Message::CacheGetReply(value));
}
kernel::Message::SubkernelMsgSend { id: _, data } => {
self.session.messages.accept_outgoing(data)?;
self.session.kernel_state = KernelState::MsgSending;
}
kernel::Message::SubkernelMsgRecvRequest { id: _, timeout } => {
let max_time = timer.get_time() + Milliseconds(timeout);
self.session.kernel_state = KernelState::MsgAwait(max_time);
}
kernel::Message::UpDestinationsRequest(destination) => {
self.control
.tx
.send(kernel::Message::UpDestinationsReply(destination == (rank as i32)));
}
_ => {
unexpected!("unexpected message from core1 while kernel was running: {:?}", reply);
}
}
Ok(false)
}
fn process_external_messages(&mut self, timer: GlobalTimer) -> Result<(), Error> {
match self.session.kernel_state {
KernelState::MsgAwait(timeout) => {
if timer.get_time() > timeout {
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
status: kernel::SubkernelStatus::Timeout,
});
self.session.kernel_state = KernelState::Running;
return Ok(());
}
if let Some(message) = self.session.messages.get_incoming() {
self.control.tx.send(kernel::Message::SubkernelMsgRecvReply {
status: kernel::SubkernelStatus::NoError,
});
self.session.kernel_state = KernelState::Running;
self.pass_message_to_kernel(&message, timer)
} else {
Err(Error::AwaitingMessage)
}
}
KernelState::MsgSending => {
if self.session.messages.was_message_acknowledged() {
self.session.kernel_state = KernelState::Running;
self.control.tx.send(kernel::Message::SubkernelMsgSent);
Ok(())
} else {
Err(Error::AwaitingMessage)
}
}
_ => Ok(()),
}
}
fn pass_message_to_kernel(&mut self, message: &Message, timer: GlobalTimer) -> Result<(), Error> {
let mut reader = Cursor::new(&message.data);
let mut tag: [u8; 1] = [message.tag];
loop {
let slot = match recv_w_timeout(&mut self.control.rx, timer, 100)? {
kernel::Message::RpcRecvRequest(slot) => slot,
other => unexpected!("expected root value slot from core1, not {:?}", other),
};
let mut exception: Option<Sliceable> = None;
let mut unexpected: Option<String> = None;
rpc::recv_return(&mut reader, &tag, slot, &mut |size| {
if size == 0 {
0 as *mut ()
} else {
self.control.tx.send(kernel::Message::RpcRecvReply(Ok(size)));
match recv_w_timeout(&mut self.control.rx, timer, 100) {
Ok(kernel::Message::RpcRecvRequest(slot)) => slot,
Ok(kernel::Message::KernelException(exceptions, stack_pointers, backtrace, async_errors)) => {
let buf: Vec<u8> = Vec::new();
let mut writer = Cursor::new(buf);
match write_exception(&mut writer, exceptions, stack_pointers, backtrace, async_errors) {
Ok(()) => {
exception = Some(Sliceable::new(writer.into_inner()));
}
Err(_) => {
unexpected = Some("Error writing exception data".to_string());
}
};
0 as *mut ()
}
other => {
unexpected = Some(format!("expected nested value slot from kernel CPU, not {:?}", other));
0 as *mut ()
}
}
}
})?;
if let Some(exception) = exception {
self.kernel_stop();
return Err(Error::KernelException(exception));
} else if let Some(unexpected) = unexpected {
self.kernel_stop();
unexpected!("{}", unexpected);
}
self.control.tx.send(kernel::Message::RpcRecvReply(Ok(0)));
match reader.read_u8() {
Ok(0) | Err(_) => break, // reached the end of data, we're done
Ok(t) => {
tag[0] = t;
} // update the tag for next read
}
}
Ok(())
}
}
fn write_exception<W>(
writer: &mut W,
exceptions: &[Option<eh_artiq::Exception>],
stack_pointers: &[eh_artiq::StackPointerBacktrace],
backtrace: &[(usize, usize)],
async_errors: u8,
) -> Result<(), Error>
where
W: Write + ?Sized,
{
/* header */
writer.write_bytes(&[0x5a, 0x5a, 0x5a, 0x5a, /*Reply::KernelException*/ 9])?;
writer.write_u32(exceptions.len() as u32)?;
for exception in exceptions.iter() {
let exception = exception.as_ref().unwrap();
writer.write_u32(exception.id)?;
if exception.message.len() == usize::MAX {
// exception with host string
writer.write_u32(u32::MAX)?;
writer.write_u32(exception.message.as_ptr() as u32)?;
} else {
let msg =
str::from_utf8(unsafe { slice::from_raw_parts(exception.message.as_ptr(), exception.message.len()) })
.unwrap()
.replace(
"{rtio_channel_info:0}",
&format!(
"0x{:04x}:{}",
exception.param[0],
ksupport::resolve_channel_name(exception.param[0] as u32)
),
);
writer.write_string(&msg)?;
}
writer.write_u64(exception.param[0] as u64)?;
writer.write_u64(exception.param[1] as u64)?;
writer.write_u64(exception.param[2] as u64)?;
writer.write_bytes(exception.file.as_ref())?;
writer.write_u32(exception.line)?;
writer.write_u32(exception.column)?;
writer.write_bytes(exception.function.as_ref())?;
}
for sp in stack_pointers.iter() {
writer.write_u32(sp.stack_pointer as u32)?;
writer.write_u32(sp.initial_backtrace_size as u32)?;
writer.write_u32(sp.current_backtrace_size as u32)?;
}
writer.write_u32(backtrace.len() as u32)?;
for &(addr, sp) in backtrace {
writer.write_u32(addr as u32)?;
writer.write_u32(sp as u32)?;
}
writer.write_u8(async_errors as u8)?;
Ok(())
}
fn recv_w_timeout(
rx: &mut Receiver<'_, kernel::Message>,
timer: GlobalTimer,
timeout: u64,
) -> Result<kernel::Message, Error> {
let max_time = timer.get_time() + Milliseconds(timeout);
while timer.get_time() < max_time {
match rx.try_recv() {
Err(_) => (),
Ok(message) => return Ok(message),
}
}
Err(Error::NoMessage)
}