From cb79c1228497eb083088a395b79a3c788334c914 Mon Sep 17 00:00:00 2001 From: mwojcik Date: Wed, 6 Sep 2023 16:06:38 +0800 Subject: [PATCH] satellite: support subkernels --- src/Cargo.lock | 3 + src/libio/cursor.rs | 1 + src/libksupport/src/kernel/control.rs | 2 +- src/libksupport/src/kernel/mod.rs | 1 - src/libksupport/src/kernel/subkernel.rs | 2 +- src/libksupport/src/lib.rs | 17 +- src/libksupport/src/rpc.rs | 12 +- src/libksupport/src/rtio_acp.rs | 6 +- src/libksupport/src/rtio_csr.rs | 6 +- src/runtime/src/comms.rs | 10 +- src/runtime/src/rpc_async.rs | 2 +- src/runtime/src/rtio_mgt.rs | 3 +- src/satman/Cargo.toml | 6 +- src/satman/src/dma.rs | 4 + src/satman/src/main.rs | 189 ++++--- src/satman/src/subkernel.rs | 678 ++++++++++++++++++++++++ 16 files changed, 839 insertions(+), 103 deletions(-) create mode 100644 src/satman/src/subkernel.rs diff --git a/src/Cargo.lock b/src/Cargo.lock index e4d22839..3666b3eb 100644 --- a/src/Cargo.lock +++ b/src/Cargo.lock @@ -497,7 +497,10 @@ name = "satman" version = "0.0.0" dependencies = [ "build_zynq", + "core_io", + "cslice", "embedded-hal", + "io", "ksupport", "libasync", "libboard_artiq", diff --git a/src/libio/cursor.rs b/src/libio/cursor.rs index 5f695e28..e2390223 100644 --- a/src/libio/cursor.rs +++ b/src/libio/cursor.rs @@ -1,5 +1,6 @@ #[cfg(feature = "alloc")] use alloc::vec::Vec; + use core_io::{Error as IoError, Read, Write}; #[derive(Debug, Clone)] diff --git a/src/libksupport/src/kernel/control.rs b/src/libksupport/src/kernel/control.rs index 233c78ca..fcd35dcc 100644 --- a/src/libksupport/src/kernel/control.rs +++ b/src/libksupport/src/kernel/control.rs @@ -4,7 +4,7 @@ use libcortex_a9::sync_channel::{Receiver, Sender}; use libsupport_zynq::boot::Core1; use super::{Message, CHANNEL_0TO1, CHANNEL_1TO0, CHANNEL_SEM, INIT_LOCK}; -use crate::{irq::restart_core1}; +use crate::irq::restart_core1; pub struct Control { pub tx: Sender<'static, Message>, diff --git a/src/libksupport/src/kernel/mod.rs b/src/libksupport/src/kernel/mod.rs index 0d59fad4..ba082402 100644 --- a/src/libksupport/src/kernel/mod.rs +++ b/src/libksupport/src/kernel/mod.rs @@ -16,7 +16,6 @@ mod cache; #[cfg(has_drtio)] mod subkernel; - #[cfg(has_drtio)] #[derive(Debug, Clone)] pub enum SubkernelStatus { diff --git a/src/libksupport/src/kernel/subkernel.rs b/src/libksupport/src/kernel/subkernel.rs index e088ff22..f168a99d 100644 --- a/src/libksupport/src/kernel/subkernel.rs +++ b/src/libksupport/src/kernel/subkernel.rs @@ -2,7 +2,7 @@ use alloc::vec::Vec; use cslice::CSlice; -use super::{Message, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0, SubkernelStatus}; +use super::{Message, SubkernelStatus, KERNEL_CHANNEL_0TO1, KERNEL_CHANNEL_1TO0}; use crate::{artiq_raise, rpc::send_args}; pub extern "C" fn load_run(id: u32, run: bool) { diff --git a/src/libksupport/src/lib.rs b/src/libksupport/src/lib.rs index 891be7e2..8d35b49d 100644 --- a/src/libksupport/src/lib.rs +++ b/src/libksupport/src/lib.rs @@ -14,8 +14,11 @@ use io::{Cursor, ProtoRead}; use libasync::block_async; use libconfig::Config; use log::{error, warn}; +#[cfg(has_drtiosat)] +pub use pl::csr::drtiosat as rtio_core; +#[cfg(has_rtio_core)] +pub use pl::csr::rtio_core; use void::Void; -use rtio::rtio_core; pub mod eh_artiq; pub mod i2c; @@ -151,7 +154,11 @@ pub unsafe fn get_async_errors() -> u8 { fn wait_for_async_rtio_error() -> nb::Result<(), Void> { unsafe { - if rtio_core::async_error_read() != 0 { + #[cfg(has_rtio_core)] + let errors = rtio_core::async_error_read(); + #[cfg(has_drtiosat)] + let errors = rtio_core::protocol_error_read(); + if errors != 0 { Ok(()) } else { Err(nb::Error::WouldBlock) @@ -163,7 +170,10 @@ pub async fn report_async_rtio_errors() { loop { let _ = block_async!(wait_for_async_rtio_error()).await; unsafe { + #[cfg(has_rtio_core)] let errors = rtio_core::async_error_read(); + #[cfg(has_drtiosat)] + let errors = rtio_core::protocol_error_read(); if errors & ASYNC_ERROR_COLLISION != 0 { let channel = rtio_core::collision_channel_read(); error!( @@ -189,7 +199,10 @@ pub async fn report_async_rtio_errors() { ); } SEEN_ASYNC_ERRORS = errors; + #[cfg(has_rtio_core)] rtio_core::async_error_write(errors); + #[cfg(has_drtiosat)] + rtio_core::protocol_error_write(errors); } } } diff --git a/src/libksupport/src/rpc.rs b/src/libksupport/src/rpc.rs index 6a66f17a..f85fa50d 100644 --- a/src/libksupport/src/rpc.rs +++ b/src/libksupport/src/rpc.rs @@ -42,10 +42,10 @@ unsafe fn recv_elements( elt_tag: Tag, length: usize, storage: *mut (), - alloc: &F, + alloc: &mut F, ) -> Result<(), Error> where - F: Fn(usize) -> *mut (), + F: FnMut(usize) -> *mut (), R: Read + ?Sized, { match elt_tag { @@ -79,9 +79,9 @@ where Ok(()) } -unsafe fn recv_value(reader: &mut R, tag: Tag, data: &mut *mut (), alloc: &F) -> Result<(), Error> +unsafe fn recv_value(reader: &mut R, tag: Tag, data: &mut *mut (), alloc: &mut F) -> Result<(), Error> where - F: Fn(usize) -> *mut (), + F: FnMut(usize) -> *mut (), R: Read + ?Sized, { macro_rules! consume_value { @@ -175,9 +175,9 @@ where } } -pub fn recv_return(reader: &mut R, tag_bytes: &[u8], data: *mut (), alloc: &F) -> Result<(), Error> +pub fn recv_return(reader: &mut R, tag_bytes: &[u8], data: *mut (), alloc: &mut F) -> Result<(), Error> where - F: Fn(usize) -> *mut (), + F: FnMut(usize) -> *mut (), R: Read + ?Sized, { let mut it = TagIterator::new(tag_bytes); diff --git a/src/libksupport/src/rtio_acp.rs b/src/libksupport/src/rtio_acp.rs index 83858035..13e9fc7f 100644 --- a/src/libksupport/src/rtio_acp.rs +++ b/src/libksupport/src/rtio_acp.rs @@ -4,11 +4,7 @@ use cslice::CSlice; use libcortex_a9::asm; use vcell::VolatileCell; -use crate::{artiq_raise, pl::csr, resolve_channel_name}; -#[cfg(has_drtiosat)] -pub use crate::pl::csr::drtiosat as rtio_core; -#[cfg(has_rtio_core)] -pub use crate::pl::csr::rtio_core; +use crate::{artiq_raise, pl::csr, resolve_channel_name, rtio_core}; pub const RTIO_O_STATUS_WAIT: i32 = 1; pub const RTIO_O_STATUS_UNDERFLOW: i32 = 2; diff --git a/src/libksupport/src/rtio_csr.rs b/src/libksupport/src/rtio_csr.rs index 6fc0bee4..b3d1e172 100644 --- a/src/libksupport/src/rtio_csr.rs +++ b/src/libksupport/src/rtio_csr.rs @@ -2,11 +2,7 @@ use core::ptr::{read_volatile, write_volatile}; use cslice::CSlice; -use crate::{artiq_raise, pl::csr, resolve_channel_name}; -#[cfg(has_drtiosat)] -pub use crate::pl::csr::drtiosat as rtio_core; -#[cfg(has_rtio_core)] -pub use crate::pl::csr::rtio_core; +use crate::{artiq_raise, pl::csr, resolve_channel_name, rtio_core}; pub const RTIO_O_STATUS_WAIT: u8 = 1; pub const RTIO_O_STATUS_UNDERFLOW: u8 = 2; diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index df9e72ba..4c7974e5 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -6,9 +6,9 @@ use cslice::CSlice; use futures::{future::FutureExt, select_biased}; #[cfg(has_drtio)] use io::{Cursor, ProtoRead}; -use ksupport::{kernel, resolve_channel_name}; #[cfg(has_drtio)] use ksupport::rpc; +use ksupport::{kernel, resolve_channel_name}; use libasync::{smoltcp::{Sockets, TcpStream}, task}; use libboard_artiq::drtio_routing; @@ -444,7 +444,11 @@ async fn handle_run_kernel( error!("error sending subkernel message: {:?}", e) } }; - control.borrow_mut().tx.async_send(kernel::Message::SubkernelMsgSent).await; + control + .borrow_mut() + .tx + .async_send(kernel::Message::SubkernelMsgSent) + .await; } #[cfg(has_drtio)] kernel::Message::SubkernelMsgRecvRequest { id, timeout } => { @@ -472,7 +476,7 @@ async fn handle_run_kernel( kernel::Message::RpcRecvRequest(slot) => slot, other => panic!("expected root value slot from core1, not {:?}", other), }; - rpc::recv_return(&mut reader, &tag, slot, &|size| { + rpc::recv_return(&mut reader, &tag, slot, &mut |size| { if size == 0 { 0 as *mut () } else { diff --git a/src/runtime/src/rpc_async.rs b/src/runtime/src/rpc_async.rs index 1e84260d..303c630f 100644 --- a/src/runtime/src/rpc_async.rs +++ b/src/runtime/src/rpc_async.rs @@ -5,7 +5,7 @@ use async_recursion::async_recursion; use byteorder::{ByteOrder, NativeEndian}; use cslice::CMutSlice; use ksupport::rpc::{tag::{Tag, TagIterator}, - *}; + *}; use libasync::smoltcp::TcpStream; use libboard_zynq::smoltcp; use log::trace; diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index b165dade..3b3f897c 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -10,7 +10,8 @@ pub mod drtio { use alloc::vec::Vec; use embedded_hal::blocking::delay::DelayMs; - use ksupport::{ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, SEEN_ASYNC_ERRORS, resolve_channel_name}; + use ksupport::{resolve_channel_name, ASYNC_ERROR_BUSY, ASYNC_ERROR_COLLISION, ASYNC_ERROR_SEQUENCE_ERROR, + SEEN_ASYNC_ERRORS}; use libasync::{delay, task}; use libboard_artiq::{drtioaux::Error, drtioaux_async, drtioaux_async::Packet, drtioaux_proto::MASTER_PAYLOAD_MAX_SIZE}; diff --git a/src/satman/Cargo.toml b/src/satman/Cargo.toml index 1c941818..36914d36 100644 --- a/src/satman/Cargo.toml +++ b/src/satman/Cargo.toml @@ -14,6 +14,8 @@ build_zynq = { path = "../libbuild_zynq" } [dependencies] log = { version = "0.4", default-features = false } +core_io = { version = "0.1", features = ["collections"] } +cslice = "0.3" embedded-hal = "0.2" libboard_zynq = { git = "https://git.m-labs.hk/M-Labs/zynq-rs.git", features = ["ipv6"]} @@ -25,4 +27,6 @@ libconfig = { git = "https://git.m-labs.hk/M-Labs/zynq-rs.git", features = ["fat libboard_artiq = { path = "../libboard_artiq" } unwind = { path = "../libunwind" } -libc = { path = "../libc" } \ No newline at end of file +libc = { path = "../libc" } +io = { path = "../libio", features = ["alloc"] } +ksupport = { path = "../libksupport" } diff --git a/src/satman/src/dma.rs b/src/satman/src/dma.rs index 0b9964ba..5955e3c3 100644 --- a/src/satman/src/dma.rs +++ b/src/satman/src/dma.rs @@ -170,4 +170,8 @@ impl Manager { } } } + + pub fn running(&self) -> bool { + self.state == ManagerState::Playback + } } diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index 5228395a..1126e579 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -1,13 +1,15 @@ #![no_std] #![no_main] -#![feature(never_type, panic_info_message, asm, naked_functions)] -#![feature(alloc_error_handler)] +#![feature(alloc_error_handler, try_trait, never_type, panic_info_message)] #[macro_use] extern crate log; - +extern crate core_io; +extern crate cslice; extern crate embedded_hal; +extern crate io; +extern crate ksupport; extern crate libboard_artiq; extern crate libboard_zynq; extern crate libcortex_a9; @@ -18,8 +20,6 @@ extern crate unwind; extern crate alloc; -use core::sync::atomic::{AtomicBool, Ordering}; - use analyzer::Analyzer; use dma::Manager as DmaManager; use embedded_hal::blocking::delay::DelayUs; @@ -27,21 +27,22 @@ use embedded_hal::blocking::delay::DelayUs; use libboard_artiq::io_expander; #[cfg(has_si5324)] use libboard_artiq::si5324; -use libboard_artiq::{drtio_routing, drtioaux, drtioaux_proto::SAT_PAYLOAD_MAX_SIZE, identifier_read, logger, pl::csr}; +use libboard_artiq::{drtio_routing, drtioaux, + drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, + identifier_read, logger, + pl::csr}; #[cfg(feature = "target_kasli_soc")] use libboard_zynq::error_led::ErrorLED; -use libboard_zynq::{gic, i2c::I2c, mpcore, print, println, stdio, time::Milliseconds, timer::GlobalTimer}; -use libcortex_a9::{asm, interrupt_handler, - l2c::enable_l2_cache, - notify_spin_lock, - regs::{MPIDR, SP}, - spin_lock_yield}; -use libregister::{RegisterR, RegisterW}; +use libboard_zynq::{i2c::I2c, print, println, time::Milliseconds, timer::GlobalTimer}; +use libcortex_a9::{l2c::enable_l2_cache, regs::MPIDR}; +use libregister::RegisterR; use libsupport_zynq::ram; +use subkernel::Manager as KernelManager; mod analyzer; mod dma; mod repeater; +mod subkernel; fn drtiosat_reset(reset: bool) { unsafe { @@ -98,6 +99,7 @@ fn process_aux_packet( i2c: &mut I2c, dma_manager: &mut DmaManager, analyzer: &mut Analyzer, + kernel_manager: &mut KernelManager, ) -> Result<(), drtioaux::Error> { // In the code below, *_chan_sel_write takes an u8 if there are fewer than 256 channels, // and u16 otherwise; hence the `as _` conversion. @@ -487,10 +489,98 @@ fn process_aux_packet( timestamp, } => { forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); - let succeeded = dma_manager.playback(id, timestamp).is_ok(); + let succeeded = if !kernel_manager.running() { + dma_manager.playback(id, timestamp).is_ok() + } else { + false + }; drtioaux::send(0, &drtioaux::Packet::DmaPlaybackReply { succeeded: succeeded }) } + drtioaux::Packet::SubkernelAddDataRequest { + destination: _destination, + id, + last, + length, + data, + } => { + forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + let succeeded = kernel_manager.add(id, last, &data, length as usize).is_ok(); + drtioaux::send(0, &drtioaux::Packet::SubkernelAddDataReply { succeeded: succeeded }) + } + drtioaux::Packet::SubkernelLoadRunRequest { + destination: _destination, + id, + run, + } => { + forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + let mut succeeded = kernel_manager.load(id).is_ok(); + // allow preloading a kernel with delayed run + if run { + if dma_manager.running() { + // cannot run kernel while DDMA is running + succeeded = false; + } else { + succeeded |= kernel_manager.run(id).is_ok(); + } + } + drtioaux::send(0, &drtioaux::Packet::SubkernelLoadRunReply { succeeded: succeeded }) + } + drtioaux::Packet::SubkernelExceptionRequest { + destination: _destination, + } => { + forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + let mut data_slice: [u8; SAT_PAYLOAD_MAX_SIZE] = [0; SAT_PAYLOAD_MAX_SIZE]; + let meta = kernel_manager.exception_get_slice(&mut data_slice); + drtioaux::send( + 0, + &drtioaux::Packet::SubkernelException { + last: meta.last, + length: meta.len, + data: data_slice, + }, + ) + } + drtioaux::Packet::SubkernelMessage { + destination, + id: _id, + last, + length, + data, + } => { + forward!(_routing_table, destination, *_rank, _repeaters, &packet, timer); + kernel_manager.message_handle_incoming(last, length as usize, &data); + drtioaux::send( + 0, + &drtioaux::Packet::SubkernelMessageAck { + destination: destination, + }, + ) + } + drtioaux::Packet::SubkernelMessageAck { + destination: _destination, + } => { + forward!(_routing_table, _destination, *_rank, _repeaters, &packet, timer); + if kernel_manager.message_ack_slice() { + let mut data_slice: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + if let Some(meta) = kernel_manager.message_get_slice(&mut data_slice) { + drtioaux::send( + 0, + &drtioaux::Packet::SubkernelMessage { + destination: *_rank, + id: kernel_manager.get_current_id().unwrap(), + last: meta.last, + length: meta.len as u16, + data: data_slice, + }, + )?; + } else { + error!("Error receiving message slice"); + } + } + Ok(()) + } + _ => { warn!("received unexpected aux packet"); Ok(()) @@ -506,6 +596,7 @@ fn process_aux_packets( i2c: &mut I2c, dma_manager: &mut DmaManager, analyzer: &mut Analyzer, + kernel_manager: &mut KernelManager, ) { let result = drtioaux::recv(0).and_then(|packet| { if let Some(packet) = packet { @@ -518,6 +609,7 @@ fn process_aux_packets( i2c, dma_manager, analyzer, + kernel_manager, ) } else { Ok(()) @@ -626,8 +718,8 @@ pub extern "C" fn main_core0() -> i32 { ram::init_alloc_core0(); - let mut i2c = I2c::i2c0(); - i2c.init().expect("I2C initialization failed"); + ksupport::i2c::init(); + let mut i2c = unsafe { (ksupport::i2c::I2C_BUS).as_mut().unwrap() }; #[cfg(feature = "target_kasli_soc")] let (mut io_expander0, mut io_expander1); @@ -682,6 +774,8 @@ pub extern "C" fn main_core0() -> i32 { let mut hardware_tick_ts = 0; + let mut control = ksupport::kernel::Control::start(); + loop { while !drtiosat_link_rx_up() { drtiosat_process_errors(); @@ -709,12 +803,12 @@ pub extern "C" fn main_core0() -> i32 { si5324::siphaser::calibrate_skew(&mut timer).expect("failed to calibrate skew"); } - // DMA manager created here, so when link is dropped, all DMA traces + // Various managers created here, so when link is dropped, all DMA traces // are cleared out for a clean slate on subsequent connections, // without a manual intervention. let mut dma_manager = DmaManager::new(); - // same for RTIO Analyzer let mut analyzer = Analyzer::new(); + let mut kernel_manager = KernelManager::new(&mut control); drtioaux::reset(0); drtiosat_reset(false); @@ -730,6 +824,7 @@ pub extern "C" fn main_core0() -> i32 { &mut i2c, &mut dma_manager, &mut analyzer, + &mut kernel_manager, ); #[allow(unused_mut)] for mut rep in repeaters.iter_mut() { @@ -771,46 +866,8 @@ extern "C" { static mut __stack1_start: u32; } -interrupt_handler!(IRQ, irq, __irq_stack0_start, __irq_stack1_start, { - if MPIDR.read().cpu_id() == 1 { - let mpcore = mpcore::RegisterBlock::mpcore(); - let mut gic = gic::InterruptController::gic(mpcore); - let id = gic.get_interrupt_id(); - if id.0 == 0 { - gic.end_interrupt(id); - asm::exit_irq(); - SP.write(&mut __stack1_start as *mut _ as u32); - asm::enable_irq(); - CORE1_RESTART.store(false, Ordering::Relaxed); - notify_spin_lock(); - main_core1(); - } - stdio::drop_uart(); - } - loop {} -}); - static mut PANICKED: [bool; 2] = [false; 2]; -static CORE1_RESTART: AtomicBool = AtomicBool::new(false); - -pub fn restart_core1() { - let mut interrupt_controller = gic::InterruptController::gic(mpcore::RegisterBlock::mpcore()); - CORE1_RESTART.store(true, Ordering::Relaxed); - interrupt_controller.send_sgi(gic::InterruptId(0), gic::CPUCore::Core1.into()); - while CORE1_RESTART.load(Ordering::Relaxed) { - spin_lock_yield(); - } -} - -#[no_mangle] -pub fn main_core1() { - let mut interrupt_controller = gic::InterruptController::gic(mpcore::RegisterBlock::mpcore()); - interrupt_controller.enable_interrupts(); - - loop {} -} - #[no_mangle] pub extern "C" fn exception(_vect: u32, _regs: *const u32, pc: u32, ea: u32) { fn hexdump(addr: u32) { @@ -866,23 +923,3 @@ pub fn panic_fmt(info: &core::panic::PanicInfo) -> ! { loop {} } - -// linker symbols -extern "C" { - static __text_start: u32; - static __text_end: u32; - static __exidx_start: u32; - static __exidx_end: u32; -} - -#[no_mangle] -extern "C" fn dl_unwind_find_exidx(_pc: *const u32, len_ptr: *mut u32) -> *const u32 { - let length; - let start: *const u32; - unsafe { - length = (&__exidx_end as *const u32).offset_from(&__exidx_start) as u32; - start = &__exidx_start; - *len_ptr = length; - } - start -} diff --git a/src/satman/src/subkernel.rs b/src/satman/src/subkernel.rs new file mode 100644 index 00000000..44fddf79 --- /dev/null +++ b/src/satman/src/subkernel.rs @@ -0,0 +1,678 @@ +use alloc::{collections::{BTreeMap, VecDeque}, + format, + string::{String, ToString}, + vec::Vec}; +use core::{cmp::min, option::NoneError, slice, str}; + +use core_io::{Error as IoError, Write}; +use cslice::AsCSlice; +use io::{Cursor, ProtoRead, ProtoWrite}; +use ksupport::{eh_artiq, kernel, rpc}; +use libboard_artiq::{drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, + pl::csr}; +use libboard_zynq::{time::Milliseconds, timer::GlobalTimer}; +use libcortex_a9::sync_channel::Receiver; +use log::warn; + +#[derive(Debug, Clone, Copy, PartialEq)] +enum KernelState { + Absent, + Loaded, + Running, + MsgAwait(Milliseconds), + MsgSending, +} + +#[derive(Debug)] +pub enum Error { + Load(String), + KernelNotFound, + Unexpected(String), + NoMessage, + AwaitingMessage, + SubkernelIoError, + KernelException(Sliceable), +} + +impl From for Error { + fn from(_: NoneError) -> Error { + Error::KernelNotFound + } +} + +impl From for Error { + fn from(_value: IoError) -> Error { + Error::SubkernelIoError + } +} + +impl From<()> for Error { + fn from(_: ()) -> Error { + Error::NoMessage + } +} + +macro_rules! unexpected { + ($($arg:tt)*) => (return Err(Error::Unexpected(format!($($arg)*)))); +} + +/* represents data that has to be sent to Master */ +#[derive(Debug)] +pub struct Sliceable { + it: usize, + data: Vec, +} + +/* represents interkernel messages */ +struct Message { + tag: u8, + data: Vec, +} + +#[derive(PartialEq)] +enum OutMessageState { + NoMessage, + MessageReady, + MessageBeingSent, + MessageSent, + MessageAcknowledged, +} + +/* for dealing with incoming and outgoing interkernel messages */ +struct MessageManager { + out_message: Option, + out_state: OutMessageState, + in_queue: VecDeque, + in_buffer: Option, +} + +// Per-run state +struct Session { + id: u32, + kernel_state: KernelState, + last_exception: Option, + messages: MessageManager, +} + +impl Session { + pub fn new(id: u32) -> Session { + Session { + id: id, + kernel_state: KernelState::Absent, + last_exception: None, + messages: MessageManager::new(), + } + } + + fn running(&self) -> bool { + match self.kernel_state { + KernelState::Absent | KernelState::Loaded => false, + KernelState::Running | KernelState::MsgAwait { .. } | KernelState::MsgSending => true, + } + } +} + +#[derive(Debug)] +struct KernelLibrary { + library: Vec, + complete: bool, +} + +pub struct Manager<'a> { + kernels: BTreeMap, + session: Session, + control: &'a mut kernel::Control, + cache: BTreeMap>, +} + +pub struct SubkernelFinished { + pub id: u32, + pub with_exception: bool, +} + +pub struct SliceMeta { + pub len: u16, + pub last: bool, +} + +macro_rules! get_slice_fn { + ($name:tt, $size:expr) => { + pub fn $name(&mut self, data_slice: &mut [u8; $size]) -> SliceMeta { + if self.data.len() == 0 { + return SliceMeta { len: 0, last: true }; + } + let len = min($size, self.data.len() - self.it); + let last = self.it + len == self.data.len(); + + data_slice[..len].clone_from_slice(&self.data[self.it..self.it + len]); + self.it += len; + + SliceMeta { + len: len as u16, + last: last, + } + } + }; +} + +impl Sliceable { + pub fn new(data: Vec) -> Sliceable { + Sliceable { it: 0, data: data } + } + + get_slice_fn!(get_slice_sat, SAT_PAYLOAD_MAX_SIZE); + get_slice_fn!(get_slice_master, MASTER_PAYLOAD_MAX_SIZE); +} + +impl MessageManager { + pub fn new() -> MessageManager { + MessageManager { + out_message: None, + out_state: OutMessageState::NoMessage, + in_queue: VecDeque::new(), + in_buffer: None, + } + } + + pub fn handle_incoming(&mut self, last: bool, length: usize, data: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + // called when receiving a message from master + match self.in_buffer.as_mut() { + Some(message) => message.data.extend(&data[..length]), + None => { + self.in_buffer = Some(Message { + tag: data[0], + data: data[1..length].to_vec(), + }); + } + }; + if last { + // when done, remove from working queue + self.in_queue.push_back(self.in_buffer.take().unwrap()); + } + } + + pub fn is_outgoing_ready(&mut self) -> bool { + // called by main loop, to see if there's anything to send, will send it afterwards + match self.out_state { + OutMessageState::MessageReady => { + self.out_state = OutMessageState::MessageBeingSent; + true + } + _ => false, + } + } + + pub fn was_message_acknowledged(&mut self) -> bool { + match self.out_state { + OutMessageState::MessageAcknowledged => { + self.out_state = OutMessageState::NoMessage; + true + } + _ => false, + } + } + + pub fn get_outgoing_slice(&mut self, data_slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option { + if self.out_state != OutMessageState::MessageBeingSent { + return None; + } + let meta = self.out_message.as_mut()?.get_slice_master(data_slice); + if meta.last { + // clear the message slot + self.out_message = None; + // notify kernel with a flag that message is sent + self.out_state = OutMessageState::MessageSent; + } + Some(meta) + } + + pub fn ack_slice(&mut self) -> bool { + // returns whether or not there's more to be sent + match self.out_state { + OutMessageState::MessageBeingSent => true, + OutMessageState::MessageSent => { + self.out_state = OutMessageState::MessageAcknowledged; + false + } + _ => { + warn!("received unsolicited SubkernelMessageAck"); + false + } + } + } + + pub fn accept_outgoing(&mut self, message: Vec) -> Result<(), Error> { + // skip service tag + self.out_message = Some(Sliceable::new(message[4..].to_vec())); + self.out_state = OutMessageState::MessageReady; + Ok(()) + } + + pub fn get_incoming(&mut self) -> Option { + self.in_queue.pop_front() + } +} + +impl<'a> Manager<'_> { + pub fn new(control: &mut kernel::Control) -> Manager { + Manager { + kernels: BTreeMap::new(), + session: Session::new(0), + control: control, + cache: BTreeMap::new(), + } + } + + pub fn add(&mut self, id: u32, last: bool, data: &[u8], data_len: usize) -> Result<(), Error> { + let kernel = match self.kernels.get_mut(&id) { + Some(kernel) => { + if kernel.complete { + // replace entry + self.kernels.remove(&id); + self.kernels.insert( + id, + KernelLibrary { + library: Vec::new(), + complete: false, + }, + ); + self.kernels.get_mut(&id)? + } else { + kernel + } + } + None => { + self.kernels.insert( + id, + KernelLibrary { + library: Vec::new(), + complete: false, + }, + ); + self.kernels.get_mut(&id)? + } + }; + kernel.library.extend(&data[0..data_len]); + + kernel.complete = last; + Ok(()) + } + + pub fn running(&self) -> bool { + self.session.running() + } + + pub fn get_current_id(&self) -> Option { + match self.running() { + true => Some(self.session.id), + false => None, + } + } + + pub fn run(&mut self, id: u32) -> Result<(), Error> { + info!("starting subkernel #{}", id); + if self.session.kernel_state != KernelState::Loaded || self.session.id != id { + self.load(id)?; + } + self.session.kernel_state = KernelState::Running; + unsafe { + csr::cri_con::selected_write(2); + } + + self.control.tx.send(kernel::Message::StartRequest); + Ok(()) + } + + pub fn message_handle_incoming(&mut self, last: bool, length: usize, slice: &[u8; MASTER_PAYLOAD_MAX_SIZE]) { + if !self.running() { + return; + } + self.session.messages.handle_incoming(last, length, slice); + } + + pub fn message_get_slice(&mut self, slice: &mut [u8; MASTER_PAYLOAD_MAX_SIZE]) -> Option { + if !self.running() { + return None; + } + self.session.messages.get_outgoing_slice(slice) + } + + pub fn message_ack_slice(&mut self) -> bool { + if !self.running() { + warn!("received unsolicited SubkernelMessageAck"); + return false; + } + self.session.messages.ack_slice() + } + + pub fn message_is_ready(&mut self) -> bool { + self.session.messages.is_outgoing_ready() + } + + pub fn load(&mut self, id: u32) -> Result<(), Error> { + if self.session.id == id && self.session.kernel_state == KernelState::Loaded { + return Ok(()); + } + if !self.kernels.get(&id)?.complete { + return Err(Error::KernelNotFound); + } + self.session = Session::new(id); + self.control.restart(); + + self.control + .tx + .send(kernel::Message::LoadRequest(self.kernels.get(&id)?.library.clone())); + let reply = self.control.rx.recv(); + match reply { + kernel::Message::LoadCompleted => Ok(()), + kernel::Message::LoadFailed => Err(Error::Load("kernel load failed".to_string())), + _ => Err(Error::Load(format!( + "unexpected kernel CPU reply to load request: {:?}", + reply + ))), + } + } + + pub fn exception_get_slice(&mut self, data_slice: &mut [u8; SAT_PAYLOAD_MAX_SIZE]) -> SliceMeta { + match self.session.last_exception.as_mut() { + Some(exception) => exception.get_slice_sat(data_slice), + None => SliceMeta { len: 0, last: true }, + } + } + + fn kernel_stop(&mut self) { + self.session.kernel_state = KernelState::Absent; + unsafe { + csr::cri_con::selected_write(0); + } + } + + fn runtime_exception(&mut self, cause: Error) { + let raw_exception: Vec = Vec::new(); + let mut writer = Cursor::new(raw_exception); + match write_exception( + &mut writer, + &[Some(eh_artiq::Exception { + id: 11, // SubkernelError, defined in ksupport + message: format!("in subkernel id {}: {:?}", self.session.id, cause).as_c_slice(), + param: [0, 0, 0], + file: file!().as_c_slice(), + line: line!(), + column: column!(), + function: format!("subkernel id {}", self.session.id).as_c_slice(), + })], + &[eh_artiq::StackPointerBacktrace { + stack_pointer: 0, + initial_backtrace_size: 0, + current_backtrace_size: 0, + }], + &[], + 0, + ) { + Ok(_) => self.session.last_exception = Some(Sliceable::new(writer.into_inner())), + Err(_) => error!("Error writing exception data"), + } + self.kernel_stop(); + } + + pub fn process_kern_requests(&mut self, rank: u8, timer: GlobalTimer) -> Option { + if !self.running() { + return None; + } + + match self.process_external_messages(timer) { + Ok(()) => (), + Err(Error::AwaitingMessage) => return None, // kernel still waiting, do not process kernel messages + Err(Error::KernelException(exception)) => { + self.session.last_exception = Some(exception); + return Some(SubkernelFinished { + id: self.session.id, + with_exception: true, + }); + } + Err(e) => { + error!("Error while running processing external messages: {:?}", e); + self.runtime_exception(e); + return Some(SubkernelFinished { + id: self.session.id, + with_exception: true, + }); + } + } + + match self.process_kern_message(rank, timer) { + Ok(true) => Some(SubkernelFinished { + id: self.session.id, + with_exception: false, + }), + Ok(false) | Err(Error::NoMessage) => None, + Err(Error::KernelException(exception)) => { + self.session.last_exception = Some(exception); + return Some(SubkernelFinished { + id: self.session.id, + with_exception: true, + }); + } + Err(e) => { + error!("Error while running kernel: {:?}", e); + self.runtime_exception(e); + Some(SubkernelFinished { + id: self.session.id, + with_exception: true, + }) + } + } + } + + fn process_kern_message(&mut self, rank: u8, timer: GlobalTimer) -> Result { + let reply = self.control.rx.try_recv()?; + match reply { + kernel::Message::KernelFinished(_async_errors) => { + self.kernel_stop(); + return Ok(true); + } + kernel::Message::KernelException(exceptions, stack_pointers, backtrace, async_errors) => { + error!("exception in kernel"); + for exception in exceptions { + error!("{:?}", exception.unwrap()); + } + error!("stack pointers: {:?}", stack_pointers); + error!("backtrace: {:?}", backtrace); + let buf: Vec = Vec::new(); + let mut writer = Cursor::new(buf); + match write_exception(&mut writer, exceptions, stack_pointers, backtrace, async_errors) { + Ok(()) => (), + Err(_) => error!("Error writing exception data"), + } + self.kernel_stop(); + return Err(Error::KernelException(Sliceable::new(writer.into_inner()))); + } + kernel::Message::CachePutRequest(key, value) => { + self.cache.insert(key, value); + } + kernel::Message::CacheGetRequest(key) => { + const DEFAULT: Vec = Vec::new(); + let value = self.cache.get(&key).unwrap_or(&DEFAULT).clone(); + self.control.tx.send(kernel::Message::CacheGetReply(value)); + } + kernel::Message::SubkernelMsgSend { id: _, data } => { + self.session.messages.accept_outgoing(data)?; + self.session.kernel_state = KernelState::MsgSending; + } + kernel::Message::SubkernelMsgRecvRequest { id: _, timeout } => { + let max_time = timer.get_time() + Milliseconds(timeout); + self.session.kernel_state = KernelState::MsgAwait(max_time); + } + kernel::Message::UpDestinationsRequest(destination) => { + self.control + .tx + .send(kernel::Message::UpDestinationsReply(destination == (rank as i32))); + } + _ => { + unexpected!("unexpected message from core1 while kernel was running: {:?}", reply); + } + } + Ok(false) + } + + fn process_external_messages(&mut self, timer: GlobalTimer) -> Result<(), Error> { + match self.session.kernel_state { + KernelState::MsgAwait(timeout) => { + if timer.get_time() > timeout { + self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { + status: kernel::SubkernelStatus::Timeout, + }); + self.session.kernel_state = KernelState::Running; + return Ok(()); + } + if let Some(message) = self.session.messages.get_incoming() { + self.control.tx.send(kernel::Message::SubkernelMsgRecvReply { + status: kernel::SubkernelStatus::NoError, + }); + self.session.kernel_state = KernelState::Running; + self.pass_message_to_kernel(&message, timer) + } else { + Err(Error::AwaitingMessage) + } + } + KernelState::MsgSending => { + if self.session.messages.was_message_acknowledged() { + self.session.kernel_state = KernelState::Running; + self.control.tx.send(kernel::Message::SubkernelMsgSent); + Ok(()) + } else { + Err(Error::AwaitingMessage) + } + } + _ => Ok(()), + } + } + + fn pass_message_to_kernel(&mut self, message: &Message, timer: GlobalTimer) -> Result<(), Error> { + let mut reader = Cursor::new(&message.data); + let mut tag: [u8; 1] = [message.tag]; + loop { + let slot = match recv_w_timeout(&mut self.control.rx, timer, 100)? { + kernel::Message::RpcRecvRequest(slot) => slot, + other => unexpected!("expected root value slot from core1, not {:?}", other), + }; + let mut exception: Option = None; + let mut unexpected: Option = None; + rpc::recv_return(&mut reader, &tag, slot, &mut |size| { + if size == 0 { + 0 as *mut () + } else { + self.control.tx.send(kernel::Message::RpcRecvReply(Ok(size))); + match recv_w_timeout(&mut self.control.rx, timer, 100) { + Ok(kernel::Message::RpcRecvRequest(slot)) => slot, + Ok(kernel::Message::KernelException(exceptions, stack_pointers, backtrace, async_errors)) => { + let buf: Vec = Vec::new(); + let mut writer = Cursor::new(buf); + match write_exception(&mut writer, exceptions, stack_pointers, backtrace, async_errors) { + Ok(()) => { + exception = Some(Sliceable::new(writer.into_inner())); + } + Err(_) => { + unexpected = Some("Error writing exception data".to_string()); + } + }; + 0 as *mut () + } + other => { + unexpected = Some(format!("expected nested value slot from kernel CPU, not {:?}", other)); + 0 as *mut () + } + } + } + })?; + if let Some(exception) = exception { + self.kernel_stop(); + return Err(Error::KernelException(exception)); + } else if let Some(unexpected) = unexpected { + self.kernel_stop(); + unexpected!("{}", unexpected); + } + self.control.tx.send(kernel::Message::RpcRecvReply(Ok(0))); + match reader.read_u8() { + Ok(0) | Err(_) => break, // reached the end of data, we're done + Ok(t) => { + tag[0] = t; + } // update the tag for next read + } + } + Ok(()) + } +} + +fn write_exception( + writer: &mut W, + exceptions: &[Option], + stack_pointers: &[eh_artiq::StackPointerBacktrace], + backtrace: &[(usize, usize)], + async_errors: u8, +) -> Result<(), Error> +where + W: Write + ?Sized, +{ + /* header */ + writer.write_bytes(&[0x5a, 0x5a, 0x5a, 0x5a, /*Reply::KernelException*/ 9])?; + writer.write_u32(exceptions.len() as u32)?; + for exception in exceptions.iter() { + let exception = exception.as_ref().unwrap(); + writer.write_u32(exception.id)?; + + if exception.message.len() == usize::MAX { + // exception with host string + writer.write_u32(u32::MAX)?; + writer.write_u32(exception.message.as_ptr() as u32)?; + } else { + let msg = + str::from_utf8(unsafe { slice::from_raw_parts(exception.message.as_ptr(), exception.message.len()) }) + .unwrap() + .replace( + "{rtio_channel_info:0}", + &format!( + "0x{:04x}:{}", + exception.param[0], + ksupport::resolve_channel_name(exception.param[0] as u32) + ), + ); + writer.write_string(&msg)?; + } + writer.write_u64(exception.param[0] as u64)?; + writer.write_u64(exception.param[1] as u64)?; + writer.write_u64(exception.param[2] as u64)?; + writer.write_bytes(exception.file.as_ref())?; + writer.write_u32(exception.line)?; + writer.write_u32(exception.column)?; + writer.write_bytes(exception.function.as_ref())?; + } + + for sp in stack_pointers.iter() { + writer.write_u32(sp.stack_pointer as u32)?; + writer.write_u32(sp.initial_backtrace_size as u32)?; + writer.write_u32(sp.current_backtrace_size as u32)?; + } + writer.write_u32(backtrace.len() as u32)?; + for &(addr, sp) in backtrace { + writer.write_u32(addr as u32)?; + writer.write_u32(sp as u32)?; + } + writer.write_u8(async_errors as u8)?; + Ok(()) +} + +fn recv_w_timeout( + rx: &mut Receiver<'_, kernel::Message>, + timer: GlobalTimer, + timeout: u64, +) -> Result { + let max_time = timer.get_time() + Milliseconds(timeout); + while timer.get_time() < max_time { + match rx.try_recv() { + Err(_) => (), + Ok(message) => return Ok(message), + } + } + Err(Error::NoMessage) +}