runtime: make routing table static OnceLock

This commit was merged in pull request #425.
This commit is contained in:
2025-08-01 17:08:27 +08:00
parent 78080eae2b
commit 59266fd141
7 changed files with 203 additions and 384 deletions

View File

@@ -56,7 +56,6 @@ pub mod remote_analyzer {
}
pub async fn get_data(
routing_table: &drtio_routing::RoutingTable,
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
) -> Result<RemoteBuffer, drtio::Error> {
// gets data from satellites and returns consolidated data
@@ -65,7 +64,7 @@ pub mod remote_analyzer {
let mut remote_sent_bytes = 0;
let mut remote_total_bytes = 0;
let data_vec = match drtio::analyzer_query(routing_table, up_destinations).await {
let data_vec = match drtio::analyzer_query(up_destinations).await {
Ok(data_vec) => data_vec,
Err(e) => return Err(e),
};
@@ -106,7 +105,6 @@ async fn write_header(stream: &mut TcpStream, header: &Header) -> Result<(), Err
async fn handle_connection(
stream: &mut TcpStream,
_routing_table: &drtio_routing::RoutingTable,
_up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
) -> Result<(), Error> {
info!("received connection");
@@ -131,7 +129,7 @@ async fn handle_connection(
}
#[cfg(has_drtio)]
let remote = remote_analyzer::get_data(_routing_table, _up_destinations).await;
let remote = remote_analyzer::get_data(_up_destinations).await;
#[cfg(has_drtio)]
let (header, remote_data) = match remote {
Ok(remote) => (
@@ -182,19 +180,14 @@ async fn handle_connection(
Ok(())
}
pub fn start(
routing_table: &Rc<RefCell<drtio_routing::RoutingTable>>,
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
) {
let routing_table = routing_table.clone();
pub fn start(up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
let up_destinations = up_destinations.clone();
task::spawn(async move {
loop {
arm();
let mut stream = TcpStream::accept(1382, 2048, 2048).await.unwrap();
disarm();
let routing_table = routing_table.borrow();
let _ = handle_connection(&mut stream, &routing_table, &up_destinations)
let _ = handle_connection(&mut stream, &up_destinations)
.await
.map_err(|e| warn!("connection terminated: {:?}", e));
let _ = stream.flush().await;

View File

@@ -14,7 +14,7 @@ use ksupport::rpc;
use ksupport::{kernel, resolve_channel_name};
use libasync::{smoltcp::{Sockets, TcpStream},
task};
use libboard_artiq::drtio_routing;
use libboard_artiq::drtio_routing::{self, RoutingTable};
#[cfg(has_drtio)]
use libboard_artiq::drtioaux::Packet;
#[cfg(feature = "target_kasli_soc")]
@@ -27,6 +27,7 @@ use libboard_zynq::{self as zynq,
timer};
use libconfig::{self, net_settings};
use libcortex_a9::{mutex::Mutex,
once_lock::OnceLock,
semaphore::Semaphore,
sync_channel::{Receiver, Sender}};
use log::{error, info, warn};
@@ -118,6 +119,8 @@ static CACHE_STORE: Mutex<BTreeMap<String, Vec<i32>>> = Mutex::new(BTreeMap::new
pub static RESTART_IDLE: Semaphore = Semaphore::new(1, 1);
pub static ROUTING_TABLE: OnceLock<RoutingTable> = OnceLock::new();
async fn write_header(stream: &TcpStream, reply: Reply) -> Result<()> {
stream
.send_slice(&[0x5a, 0x5a, 0x5a, 0x5a, reply.to_u8().unwrap()])
@@ -194,7 +197,6 @@ async fn handle_run_kernel(
stream: Option<&TcpStream>,
control: &Rc<RefCell<kernel::Control>>,
_up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
routing_table: &drtio_routing::RoutingTable,
) -> Result<()> {
control.borrow_mut().tx.async_send(kernel::Message::StartRequest).await;
loop {
@@ -357,13 +359,13 @@ async fn handle_run_kernel(
.await;
}
kernel::Message::DmaPutRequest(recorder) => {
let _id = rtio_dma::put_record(routing_table, recorder).await;
let _id = rtio_dma::put_record(recorder).await;
#[cfg(has_drtio)]
rtio_dma::remote_dma::upload_traces(routing_table, _id).await;
rtio_dma::remote_dma::upload_traces(_id).await;
}
kernel::Message::DmaEraseRequest(name) => {
// prevent possible OOM when we have large DMA record replacement.
rtio_dma::erase(name, routing_table).await;
rtio_dma::erase(name).await;
}
kernel::Message::DmaGetRequest(name) => {
let result = rtio_dma::retrieve(name).await;
@@ -375,7 +377,7 @@ async fn handle_run_kernel(
}
#[cfg(has_drtio)]
kernel::Message::DmaStartRemoteRequest { id, timestamp } => {
rtio_dma::remote_dma::playback(routing_table, id as u32, timestamp as u64).await;
rtio_dma::remote_dma::playback(id as u32, timestamp as u64).await;
}
#[cfg(has_drtio)]
kernel::Message::DmaAwaitRemoteRequest(id) => {
@@ -405,7 +407,7 @@ async fn handle_run_kernel(
| kernel::Message::I2cRestartRequest(busno)
| kernel::Message::I2cStopRequest(busno)
| kernel::Message::I2cSwitchSelectRequest { busno, .. } => {
let result = rtio_mgt::drtio::i2c_send_basic(routing_table, &reply, busno).await;
let result = rtio_mgt::drtio::i2c_send_basic(&reply, busno).await;
let reply = match result {
Ok(succeeded) => kernel::Message::I2cBasicReply(succeeded),
Err(_) => kernel::Message::I2cBasicReply(false),
@@ -414,7 +416,7 @@ async fn handle_run_kernel(
}
#[cfg(has_drtio)]
kernel::Message::I2cWriteRequest { busno, data } => {
let result = rtio_mgt::drtio::i2c_send_write(routing_table, busno, data).await;
let result = rtio_mgt::drtio::i2c_send_write(busno, data).await;
let reply = match result {
Ok((succeeded, ack)) => kernel::Message::I2cWriteReply { succeeded, ack },
Err(_) => kernel::Message::I2cWriteReply {
@@ -426,7 +428,7 @@ async fn handle_run_kernel(
}
#[cfg(has_drtio)]
kernel::Message::I2cReadRequest { busno, ack } => {
let result = rtio_mgt::drtio::i2c_send_read(routing_table, busno, ack).await;
let result = rtio_mgt::drtio::i2c_send_read(busno, ack).await;
let reply = match result {
Ok((succeeded, data)) => kernel::Message::I2cReadReply { succeeded, data },
Err(_) => kernel::Message::I2cReadReply {
@@ -443,7 +445,7 @@ async fn handle_run_kernel(
run,
timestamp,
} => {
let succeeded = match subkernel::load(routing_table, id, run, timestamp).await {
let succeeded = match subkernel::load(id, run, timestamp).await {
Ok(()) => true,
Err(e) => {
error!("Error loading subkernel: {:?}", e);
@@ -458,7 +460,7 @@ async fn handle_run_kernel(
}
#[cfg(has_drtio)]
kernel::Message::SubkernelAwaitFinishRequest { id, timeout } => {
let res = subkernel::await_finish(routing_table, id, timeout).await;
let res = subkernel::await_finish(id, timeout).await;
let response = match res {
Ok(res) => {
if res.status == subkernel::FinishStatus::CommLost {
@@ -479,7 +481,7 @@ async fn handle_run_kernel(
}
#[cfg(has_drtio)]
kernel::Message::SubkernelMsgSend { id, destination, data } => {
let res = subkernel::message_send(routing_table, id, destination.unwrap(), data).await;
let res = subkernel::message_send(id, destination.unwrap(), data).await;
match res {
Ok(_) => (),
Err(e) => {
@@ -504,9 +506,7 @@ async fn handle_run_kernel(
Err(SubkernelError::CommLost) => kernel::Message::SubkernelError(kernel::SubkernelStatus::CommLost),
Err(SubkernelError::SubkernelException) => {
// just retrieve the exception
let status = subkernel::await_finish(routing_table, id as u32, timeout)
.await
.unwrap();
let status = subkernel::await_finish(id as u32, timeout).await.unwrap();
kernel::Message::SubkernelError(kernel::SubkernelStatus::Exception(status.exception.unwrap()))
}
Err(_) => kernel::Message::SubkernelError(kernel::SubkernelStatus::OtherError),
@@ -562,7 +562,7 @@ async fn handle_run_kernel(
}
#[cfg(has_drtio)]
kernel::Message::RtioInitRequest => {
rtio_mgt::drtio::reset(routing_table).await;
rtio_mgt::drtio::reset().await;
control.borrow_mut().tx.async_send(kernel::Message::RtioInitReply).await;
}
#[cfg(has_drtio)]
@@ -571,11 +571,10 @@ async fn handle_run_kernel(
address,
length,
} => {
let linkno = routing_table.0[destination as usize][0] - 1;
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let reply = loop {
let result = rtio_mgt::drtio::aux_transact(
linkno,
routing_table,
&Packet::CXPReadRequest {
destination,
address,
@@ -612,11 +611,10 @@ async fn handle_run_kernel(
address,
value,
} => {
let linkno = routing_table.0[destination as usize][0] - 1;
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let reply = loop {
let drtioaux_packet = rtio_mgt::drtio::aux_transact(
linkno,
routing_table,
&Packet::CXPWrite32Request {
destination,
address,
@@ -653,10 +651,9 @@ async fn handle_run_kernel(
x1,
y1,
} => {
let linkno = routing_table.0[destination as usize][0] - 1;
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let drtioaux_packet = rtio_mgt::drtio::aux_transact(
linkno,
routing_table,
&Packet::CXPROIViewerSetupRequest {
destination,
x0,
@@ -682,14 +679,10 @@ async fn handle_run_kernel(
}
#[cfg(has_drtio)]
kernel::Message::CXPROIViewerDataRequest { destination } => {
let linkno = routing_table.0[destination as usize][0] - 1;
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let reply = loop {
let drtioaux_packet = rtio_mgt::drtio::aux_transact(
linkno,
routing_table,
&Packet::CXPROIViewerDataRequest { destination },
)
.await;
let drtioaux_packet =
rtio_mgt::drtio::aux_transact(linkno, &Packet::CXPROIViewerDataRequest { destination }).await;
match drtioaux_packet {
Ok(Packet::CXPWaitReply) => {}
@@ -731,7 +724,6 @@ async fn handle_flash_kernel(
buffer: &Vec<u8>,
control: &Rc<RefCell<kernel::Control>>,
_up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
_routing_table: &drtio_routing::RoutingTable,
) -> Result<()> {
if buffer[0] == elf::ELFMAG0 && buffer[1] == elf::ELFMAG1 && buffer[2] == elf::ELFMAG2 && buffer[3] == elf::ELFMAG3
{
@@ -757,7 +749,7 @@ async fn handle_flash_kernel(
if up {
let subkernel_lib = entry.data().to_vec();
subkernel::add_subkernel(sid, dest, subkernel_lib).await;
match subkernel::upload(_routing_table, sid).await {
match subkernel::upload(sid).await {
Ok(_) => (),
Err(_) => return Err(Error::UnexpectedPattern),
}
@@ -818,7 +810,6 @@ async fn handle_connection(
stream: &mut TcpStream,
control: Rc<RefCell<kernel::Control>>,
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
routing_table: &drtio_routing::RoutingTable,
) -> Result<()> {
stream.set_ack_delay(None);
@@ -846,7 +837,7 @@ async fn handle_connection(
load_kernel(&buffer, &control, Some(stream)).await?;
}
Request::RunKernel => {
handle_run_kernel(Some(stream), &control, &up_destinations, routing_table).await?;
handle_run_kernel(Some(stream), &control, &up_destinations).await?;
}
Request::UploadSubkernel => {
#[cfg(has_drtio)]
@@ -855,7 +846,7 @@ async fn handle_connection(
let destination = read_i8(stream).await? as u8;
let buffer = read_bytes(stream, 1024 * 1024).await?;
subkernel::add_subkernel(id, destination, buffer).await;
match subkernel::upload(routing_table, id).await {
match subkernel::upload(id).await {
Ok(_) => write_header(stream, Reply::LoadCompleted).await?,
Err(_) => {
write_header(stream, Reply::LoadFailed).await?;
@@ -921,33 +912,34 @@ pub fn main() {
Sockets::init(32);
#[cfg(has_drtio)]
let drtio_routing_table = Rc::new(RefCell::new(drtio_routing::config_routing_table(pl::csr::DRTIO.len())));
let res = ROUTING_TABLE.set(drtio_routing::config_routing_table(pl::csr::DRTIO.len()));
#[cfg(not(has_drtio))]
let drtio_routing_table = Rc::new(RefCell::new(drtio_routing::RoutingTable::default_empty()));
let res = ROUTING_TABLE.set(drtio_routing::RoutingTable::default_empty());
res.expect("routing_table can only be initialized once");
let up_destinations = Rc::new(RefCell::new([false; drtio_routing::DEST_COUNT]));
#[cfg(has_drtio_routing)]
drtio_routing::interconnect_disable_all();
rtio_mgt::startup(&drtio_routing_table, &up_destinations);
rtio_mgt::startup(&up_destinations);
ksupport::setup_device_map();
analyzer::start(&drtio_routing_table, &up_destinations);
moninj::start(&drtio_routing_table);
analyzer::start(&up_destinations);
moninj::start();
let control: Rc<RefCell<kernel::Control>> = Rc::new(RefCell::new(kernel::Control::start()));
if let Ok(buffer) = libconfig::read("startup_kernel") {
info!("Loading startup kernel...");
let routing_table = drtio_routing_table.borrow();
if let Ok(()) = task::block_on(handle_flash_kernel(&buffer, &control, &up_destinations, &routing_table)) {
if let Ok(()) = task::block_on(handle_flash_kernel(&buffer, &control, &up_destinations)) {
info!("Starting startup kernel...");
let _ = task::block_on(handle_run_kernel(None, &control, &up_destinations, &routing_table));
let _ = task::block_on(handle_run_kernel(None, &control, &up_destinations));
info!("Startup kernel finished!");
} else {
error!("Error loading startup kernel!");
}
}
mgmt::start(Some(drtio_routing_table.clone()));
mgmt::start();
task::spawn(async move {
let connection = Rc::new(Semaphore::new(1, 1));
@@ -981,17 +973,15 @@ pub fn main() {
let terminate = terminate.clone();
let can_restart_idle = can_restart_idle.clone();
let up_destinations = up_destinations.clone();
let routing_table = drtio_routing_table.clone();
// we make sure the value of terminate is 0 before we start
let _ = terminate.try_wait();
let _ = can_restart_idle.try_wait();
task::spawn(async move {
let routing_table = routing_table.borrow();
select_biased! {
_ = (async {
if let Some(stream) = &mut maybe_stream {
let _ = handle_connection(stream, control.clone(), &up_destinations, &routing_table)
let _ = handle_connection(stream, control.clone(), &up_destinations)
.await
.map_err(|e| warn!("connection terminated: {}", e));
}
@@ -1000,10 +990,10 @@ pub fn main() {
Some(buffer) => {
loop {
info!("loading idle kernel");
match handle_flash_kernel(&buffer, &control, &up_destinations, &routing_table).await {
match handle_flash_kernel(&buffer, &control, &up_destinations).await {
Ok(_) => {
info!("running idle kernel");
match handle_run_kernel(None, &control, &up_destinations, &routing_table).await {
match handle_run_kernel(None, &control, &up_destinations).await {
Ok(_) => info!("idle kernel finished"),
Err(_) => warn!("idle kernel running error")
}
@@ -1086,7 +1076,7 @@ pub fn soft_panic_main() -> ! {
Sockets::init(32);
mgmt::start(None);
mgmt::start();
// getting eth settings disables the LED as it resets GPIO
// need to re-enable it here

View File

@@ -5,8 +5,7 @@ use byteorder::{ByteOrder, NativeEndian};
use crc::crc32;
use futures::{future::poll_fn, task::Poll};
use libasync::{smoltcp::TcpStream, task};
use libboard_artiq::{drtio_routing::RoutingTable,
logger::{BufferLogger, LogBufferRef}};
use libboard_artiq::logger::{BufferLogger, LogBufferRef};
use libboard_zynq::smoltcp;
use libconfig;
use log::{self, debug, error, info, warn};
@@ -15,7 +14,8 @@ use num_traits::FromPrimitive;
#[cfg(has_drtio)]
use crate::rtio_mgt::drtio;
use crate::{comms::RESTART_IDLE, proto_async::*};
use crate::{comms::{RESTART_IDLE, ROUTING_TABLE},
proto_async::*};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Error {
@@ -141,17 +141,11 @@ mod remote_coremgmt {
use super::*;
pub async fn get_log(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
) -> Result<()> {
pub async fn get_log(stream: &mut TcpStream, linkno: u8, destination: u8) -> Result<()> {
let mut buffer = Vec::new();
loop {
let reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtGetLogRequest {
destination,
clear: false,
@@ -182,13 +176,8 @@ mod remote_coremgmt {
}
}
pub async fn clear_log(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
) -> Result<()> {
let reply = drtio::aux_transact(linkno, routing_table, &Packet::CoreMgmtClearLogRequest { destination }).await;
pub async fn clear_log(stream: &mut TcpStream, linkno: u8, destination: u8) -> Result<()> {
let reply = drtio::aux_transact(linkno, &Packet::CoreMgmtClearLogRequest { destination }).await;
match reply {
Ok(Packet::CoreMgmtReply { succeeded: true }) => {
@@ -210,7 +199,6 @@ mod remote_coremgmt {
pub async fn pull_log(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
pull_id: &Rc<RefCell<u32>>,
@@ -231,7 +219,6 @@ mod remote_coremgmt {
let reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtGetLogRequest {
destination,
clear: true,
@@ -264,14 +251,12 @@ mod remote_coremgmt {
pub async fn set_log_filter(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
level: log::LevelFilter,
) -> Result<()> {
let reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtSetLogLevelRequest {
destination,
log_level: level as u8,
@@ -299,14 +284,12 @@ mod remote_coremgmt {
pub async fn set_uart_log_filter(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
level: log::LevelFilter,
) -> Result<()> {
let reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtSetUartLogLevelRequest {
destination,
log_level: level as u8,
@@ -332,20 +315,13 @@ mod remote_coremgmt {
}
}
pub async fn config_read(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
key: &String,
) -> Result<()> {
pub async fn config_read(stream: &mut TcpStream, linkno: u8, destination: u8, key: &String) -> Result<()> {
let mut config_key: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
let len = key.len();
config_key[..len].clone_from_slice(key.as_bytes());
let mut reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtConfigReadRequest {
destination: destination,
length: len as u16,
@@ -368,7 +344,6 @@ mod remote_coremgmt {
reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtConfigReadContinue {
destination: destination,
},
@@ -391,7 +366,6 @@ mod remote_coremgmt {
pub async fn config_write(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
key: &String,
@@ -403,7 +377,6 @@ mod remote_coremgmt {
match drtio::partition_data(
linkno,
routing_table,
&message,
|slice, status, len: usize| Packet::CoreMgmtConfigWriteRequest {
destination: destination,
@@ -433,20 +406,13 @@ mod remote_coremgmt {
}
}
pub async fn config_remove(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
key: &String,
) -> Result<()> {
pub async fn config_remove(stream: &mut TcpStream, linkno: u8, destination: u8, key: &String) -> Result<()> {
let mut config_key: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE];
let len = key.len();
config_key[..len].clone_from_slice(key.as_bytes());
let reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtConfigRemoveRequest {
destination: destination,
length: len as u16,
@@ -473,15 +439,9 @@ mod remote_coremgmt {
}
}
pub async fn config_erase(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
) -> Result<()> {
pub async fn config_erase(stream: &mut TcpStream, linkno: u8, destination: u8) -> Result<()> {
let reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtConfigEraseRequest {
destination: destination,
},
@@ -506,15 +466,9 @@ mod remote_coremgmt {
}
}
pub async fn reboot(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
) -> Result<()> {
pub async fn reboot(stream: &mut TcpStream, linkno: u8, destination: u8) -> Result<()> {
let reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtRebootRequest {
destination: destination,
},
@@ -539,15 +493,9 @@ mod remote_coremgmt {
}
}
pub async fn debug_allocator(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
) -> Result<()> {
pub async fn debug_allocator(stream: &mut TcpStream, linkno: u8, destination: u8) -> Result<()> {
let reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtAllocatorDebugRequest {
destination: destination,
},
@@ -570,18 +518,11 @@ mod remote_coremgmt {
}
}
pub async fn image_write(
stream: &mut TcpStream,
routing_table: &RoutingTable,
linkno: u8,
destination: u8,
image: Vec<u8>,
) -> Result<()> {
pub async fn image_write(stream: &mut TcpStream, linkno: u8, destination: u8, image: Vec<u8>) -> Result<()> {
let mut image = &image[..];
let alloc_reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtFlashRequest {
destination: destination,
payload_length: image.len() as u32,
@@ -610,7 +551,6 @@ mod remote_coremgmt {
let reply = drtio::aux_transact(
linkno,
routing_table,
&Packet::CoreMgmtFlashAddDataRequest {
destination: destination,
last: last,
@@ -804,13 +744,12 @@ mod local_coremgmt {
#[cfg(has_drtio)]
macro_rules! process {
($stream: ident, $routing_table:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{
($stream: ident, $destination:expr, $func:ident $(, $param:expr)*) => {{
if $destination == 0 {
local_coremgmt::$func($stream, $($param, )*).await
} else if let Some(ref routing_table) = $routing_table {
let routing_table = routing_table.borrow();
} else if let Some(routing_table) = ROUTING_TABLE.get() {
let linkno = routing_table.0[$destination as usize][0] - 1 as u8;
remote_coremgmt::$func($stream, &routing_table, linkno, $destination, $($param, )*).await
remote_coremgmt::$func($stream, linkno, $destination, $($param, )*).await
} else {
error!("coremgmt-over-drtio not supported for panicked device, please reboot");
write_i8($stream, Reply::Error as i8).await?;
@@ -821,16 +760,12 @@ macro_rules! process {
#[cfg(not(has_drtio))]
macro_rules! process {
($stream: ident, $routing_table:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{
($stream: ident, $destination:expr, $func:ident $(, $param:expr)*) => {{
local_coremgmt::$func($stream, $($param, )*).await
}}
}
async fn handle_connection(
stream: &mut TcpStream,
pull_id: Rc<RefCell<u32>>,
_routing_table: Option<Rc<RefCell<RoutingTable>>>,
) -> Result<()> {
async fn handle_connection(stream: &mut TcpStream, pull_id: Rc<RefCell<u32>>) -> Result<()> {
if !expect(&stream, b"ARTIQ management\n").await? {
return Err(Error::UnexpectedPattern);
}
@@ -845,20 +780,20 @@ async fn handle_connection(
}
let msg: Request = FromPrimitive::from_i8(msg?).ok_or(Error::UnrecognizedPacket)?;
match msg {
Request::GetLog => process!(stream, _routing_table, _destination, get_log),
Request::ClearLog => process!(stream, _routing_table, _destination, clear_log),
Request::PullLog => process!(stream, _routing_table, _destination, pull_log, &pull_id),
Request::GetLog => process!(stream, _destination, get_log),
Request::ClearLog => process!(stream, _destination, clear_log),
Request::PullLog => process!(stream, _destination, pull_log, &pull_id),
Request::SetLogFilter => {
let lvl = read_log_level_filter(stream).await?;
process!(stream, _routing_table, _destination, set_log_filter, lvl)
process!(stream, _destination, set_log_filter, lvl)
}
Request::SetUartLogFilter => {
let lvl = read_log_level_filter(stream).await?;
process!(stream, _routing_table, _destination, set_uart_log_filter, lvl)
process!(stream, _destination, set_uart_log_filter, lvl)
}
Request::ConfigRead => {
let key = read_key(stream).await?;
process!(stream, _routing_table, _destination, config_read, &key)
process!(stream, _destination, config_read, &key)
}
Request::ConfigWrite => {
let key = read_key(stream).await?;
@@ -869,20 +804,20 @@ async fn handle_connection(
buffer.set_len(len);
}
read_chunk(stream, &mut buffer).await?;
process!(stream, _routing_table, _destination, config_write, &key, buffer)
process!(stream, _destination, config_write, &key, buffer)
}
Request::ConfigRemove => {
let key = read_key(stream).await?;
process!(stream, _routing_table, _destination, config_remove, &key)
process!(stream, _destination, config_remove, &key)
}
Request::Reboot => {
process!(stream, _routing_table, _destination, reboot)
process!(stream, _destination, reboot)
}
Request::ConfigErase => {
process!(stream, _routing_table, _destination, config_erase)
process!(stream, _destination, config_erase)
}
Request::DebugAllocator => {
process!(stream, _routing_table, _destination, debug_allocator)
process!(stream, _destination, debug_allocator)
}
Request::Flash => {
let len = read_i32(stream).await?;
@@ -895,22 +830,21 @@ async fn handle_connection(
buffer.set_len(len as usize);
}
read_chunk(stream, &mut buffer).await?;
process!(stream, _routing_table, _destination, image_write, buffer)
process!(stream, _destination, image_write, buffer)
}
}?;
}
}
pub fn start(routing_table: Option<Rc<RefCell<RoutingTable>>>) {
pub fn start() {
task::spawn(async move {
let pull_id = Rc::new(RefCell::new(0u32));
loop {
let mut stream = TcpStream::accept(1380, 2048, 2048).await.unwrap();
let pull_id = pull_id.clone();
let routing_table = routing_table.clone();
task::spawn(async move {
info!("received connection");
let _ = handle_connection(&mut stream, pull_id, routing_table)
let _ = handle_connection(&mut stream, pull_id)
.await
.map_err(|e| warn!("connection terminated: {:?}", e));
let _ = stream.flush().await;

View File

@@ -1,15 +1,14 @@
use alloc::{collections::BTreeMap, rc::Rc};
use core::{cell::RefCell, fmt};
use alloc::collections::BTreeMap;
use core::fmt;
use futures::{FutureExt, pin_mut, select_biased};
use libasync::{smoltcp::TcpStream, task};
use libboard_artiq::drtio_routing;
use libboard_zynq::{smoltcp, timer};
use log::{debug, info, warn};
use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::{FromPrimitive, ToPrimitive};
use crate::proto_async::*;
use crate::{comms::ROUTING_TABLE, proto_async::*};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Error {
@@ -59,16 +58,9 @@ mod remote_moninj {
use crate::rtio_mgt::{drtio,
drtio::{AUX_MUTEX, Error as DrtioError}};
pub async fn read_probe(
routing_table: &drtio_routing::RoutingTable,
linkno: u8,
destination: u8,
channel: i32,
probe: i8,
) -> i64 {
pub async fn read_probe(linkno: u8, destination: u8, channel: i32, probe: i8) -> i64 {
let reply = drtio::aux_transact(
linkno,
routing_table,
&drtioaux_async::Packet::MonitorRequest {
destination: destination,
channel: channel as _,
@@ -87,14 +79,7 @@ mod remote_moninj {
0
}
pub async fn inject(
_routing_table: &drtio_routing::RoutingTable,
linkno: u8,
destination: u8,
channel: i32,
overrd: i8,
value: i8,
) {
pub async fn inject(linkno: u8, destination: u8, channel: i32, overrd: i8, value: i8) {
let _lock = AUX_MUTEX.async_lock().await;
drtioaux_async::send(
linkno,
@@ -109,16 +94,9 @@ mod remote_moninj {
.unwrap();
}
pub async fn read_injection_status(
routing_table: &drtio_routing::RoutingTable,
linkno: u8,
destination: u8,
channel: i32,
overrd: i8,
) -> i8 {
pub async fn read_injection_status(linkno: u8, destination: u8, channel: i32, overrd: i8) -> i8 {
let reply = drtio::aux_transact(
linkno,
routing_table,
&drtioaux_async::Packet::InjectionStatusRequest {
destination: destination,
channel: channel as _,
@@ -169,28 +147,28 @@ mod local_moninj {
#[cfg(has_drtio)]
macro_rules! dispatch {
($routing_table:ident, $channel:expr, $func:ident $(, $param:expr)*) => {{
($channel:expr, $func:ident $(, $param:expr)*) => {{
let destination = ($channel >> 16) as u8;
let channel = $channel;
let hop = $routing_table.0[destination as usize][0];
let hop = ROUTING_TABLE.get().unwrap().0[destination as usize][0];
if hop == 0 {
local_moninj::$func(channel.into(), $($param, )*)
} else {
let linkno = hop - 1 as u8;
remote_moninj::$func($routing_table, linkno, destination, channel, $($param, )*).await
remote_moninj::$func(linkno, destination, channel, $($param, )*).await
}
}}
}
#[cfg(not(has_drtio))]
macro_rules! dispatch {
($routing_table:ident, $channel:expr, $func:ident $(, $param:expr)*) => {{
($channel:expr, $func:ident $(, $param:expr)*) => {{
let channel = $channel as u16;
local_moninj::$func(channel.into(), $($param, )*)
}}
}
async fn handle_connection(stream: &TcpStream, _routing_table: &drtio_routing::RoutingTable) -> Result<()> {
async fn handle_connection(stream: &TcpStream) -> Result<()> {
if !expect(&stream, b"ARTIQ moninj\n").await? {
return Err(Error::UnexpectedPattern);
}
@@ -238,13 +216,13 @@ async fn handle_connection(stream: &TcpStream, _routing_table: &drtio_routing::R
let channel = read_i32(&stream).await?;
let overrd = read_i8(&stream).await?;
let value = read_i8(&stream).await?;
dispatch!(_routing_table, channel, inject, overrd, value);
dispatch!(channel, inject, overrd, value);
debug!("INJECT channel {}, overrd {}, value {}", channel, overrd, value);
},
HostMessage::GetInjectionStatus => {
let channel = read_i32(&stream).await?;
let overrd = read_i8(&stream).await?;
let value = dispatch!(_routing_table, channel, read_injection_status, overrd);
let value = dispatch!(channel, read_injection_status, overrd);
write_i8(&stream, DeviceMessage::InjectionStatus.to_i8().unwrap()).await?;
write_i32(&stream, channel).await?;
write_i8(&stream, overrd).await?;
@@ -254,7 +232,7 @@ async fn handle_connection(stream: &TcpStream, _routing_table: &drtio_routing::R
},
_ = timeout_f => {
for (&(channel, probe), previous) in probe_watch_list.iter_mut() {
let current = dispatch!(_routing_table, channel, read_probe, probe);
let current = dispatch!(channel, read_probe, probe);
if previous.is_none() || previous.unwrap() != current {
write_i8(&stream, DeviceMessage::MonitorStatus.to_i8().unwrap()).await?;
write_i32(&stream, channel).await?;
@@ -264,7 +242,7 @@ async fn handle_connection(stream: &TcpStream, _routing_table: &drtio_routing::R
}
}
for (&(channel, overrd), previous) in inject_watch_list.iter_mut() {
let current = dispatch!(_routing_table, channel, read_injection_status, overrd);
let current = dispatch!(channel, read_injection_status, overrd);
if previous.is_none() || previous.unwrap() != current {
write_i8(&stream, DeviceMessage::InjectionStatus.to_i8().unwrap()).await?;
write_i32(&stream, channel).await?;
@@ -279,16 +257,13 @@ async fn handle_connection(stream: &TcpStream, _routing_table: &drtio_routing::R
}
}
pub fn start(routing_table: &Rc<RefCell<drtio_routing::RoutingTable>>) {
let routing_table = routing_table.clone();
pub fn start() {
task::spawn(async move {
loop {
let routing_table = routing_table.clone();
let stream = TcpStream::accept(1383, 2048, 2048).await.unwrap();
task::spawn(async move {
info!("received connection");
let routing_table = routing_table.borrow();
let result = handle_connection(&stream, &routing_table).await;
let result = handle_connection(&stream).await;
match result {
Err(Error::NetworkError(smoltcp::Error::Finished)) => info!("peer closed connection"),
Err(error) => warn!("connection terminated: {}", error),

View File

@@ -5,7 +5,6 @@ use core::mem;
use ksupport::kernel::DmaRecorder;
#[cfg(has_drtio)]
use libasync::task;
use libboard_artiq::drtio_routing::RoutingTable;
use libcortex_a9::{cache::dcci_slice, mutex::Mutex};
const ALIGNMENT: usize = 16 * 8;
@@ -105,11 +104,11 @@ pub mod remote_dma {
Ok(playback_state)
}
pub async fn upload_traces(&mut self, routing_table: &RoutingTable) {
pub async fn upload_traces(&mut self) {
let mut lock = self.traces.async_lock().await;
let trace_iter = lock.iter_mut();
for (destination, trace) in trace_iter {
match drtio::ddma_upload_trace(routing_table, self.id, *destination, trace.get_trace()).await {
match drtio::ddma_upload_trace(self.id, *destination, trace.get_trace()).await {
Ok(_) => trace.state = RemoteState::Loaded,
Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e),
}
@@ -117,11 +116,11 @@ pub mod remote_dma {
*(self.done_count.async_lock().await) = 0;
}
pub async fn erase(&mut self, routing_table: &RoutingTable) {
pub async fn erase(&mut self) {
let lock = self.traces.async_lock().await;
let trace_iter = lock.keys();
for destination in trace_iter {
match drtio::ddma_send_erase(routing_table, self.id, *destination).await {
match drtio::ddma_send_erase(self.id, *destination).await {
Ok(_) => (),
Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e),
}
@@ -139,7 +138,7 @@ pub mod remote_dma {
*(self.done_count.async_lock().await) += 1;
}
pub async fn playback(&self, routing_table: &RoutingTable, timestamp: u64) {
pub async fn playback(&self, timestamp: u64) {
let mut dest_list: Vec<u8> = Vec::new();
{
let lock = self.traces.async_lock().await;
@@ -155,18 +154,18 @@ pub mod remote_dma {
// mutex lock must be dropped before sending a playback request to avoid a deadlock,
// if PlaybackStatus is sent from another satellite and the state must be updated.
for destination in dest_list {
match drtio::ddma_send_playback(routing_table, self.id, destination, timestamp).await {
match drtio::ddma_send_playback(self.id, destination, timestamp).await {
Ok(_) => (),
Err(e) => error!("Error during remote DMA playback: {}", e),
}
}
}
pub async fn destination_changed(&mut self, routing_table: &RoutingTable, destination: u8, up: bool) {
pub async fn destination_changed(&mut self, destination: u8, up: bool) {
// update state of the destination, resend traces if it's up
if let Some(trace) = self.traces.async_lock().await.get_mut(&destination) {
if up {
match drtio::ddma_upload_trace(routing_table, self.id, destination, trace.get_trace()).await {
match drtio::ddma_upload_trace(self.id, destination, trace.get_trace()).await {
Ok(_) => trace.state = RemoteState::Loaded,
Err(e) => error!("Error adding DMA trace on destination {}: {}", destination, e),
}
@@ -192,22 +191,22 @@ pub mod remote_dma {
trace_set.await_done(timeout).await
}
pub async fn erase(routing_table: &RoutingTable, id: u32) {
pub async fn erase(id: u32) {
let trace_set = unsafe { TRACES.get_mut(&id).unwrap() };
trace_set.erase(routing_table).await;
trace_set.erase().await;
unsafe {
TRACES.remove(&id);
}
}
pub async fn upload_traces(routing_table: &RoutingTable, id: u32) {
pub async fn upload_traces(id: u32) {
let trace_set = unsafe { TRACES.get_mut(&id).unwrap() };
trace_set.upload_traces(routing_table).await;
trace_set.upload_traces().await;
}
pub async fn playback(routing_table: &RoutingTable, id: u32, timestamp: u64) {
pub async fn playback(id: u32, timestamp: u64) {
let trace_set = unsafe { TRACES.get_mut(&id).unwrap() };
trace_set.playback(routing_table, timestamp).await;
trace_set.playback(timestamp).await;
}
pub async fn playback_done(id: u32, destination: u8, error: u8, channel: u32, timestamp: u64) {
@@ -215,10 +214,10 @@ pub mod remote_dma {
trace_set.playback_done(destination, error, channel, timestamp).await;
}
pub async fn destination_changed(routing_table: &RoutingTable, destination: u8, up: bool) {
pub async fn destination_changed(destination: u8, up: bool) {
let trace_iter = unsafe { TRACES.values_mut() };
for trace_set in trace_iter {
trace_set.destination_changed(routing_table, destination, up).await;
trace_set.destination_changed(destination, up).await;
}
}
@@ -228,7 +227,7 @@ pub mod remote_dma {
}
}
pub async fn put_record(_routing_table: &RoutingTable, mut recorder: DmaRecorder) -> u32 {
pub async fn put_record(mut recorder: DmaRecorder) -> u32 {
#[cfg(has_drtio)]
let mut remote_traces: BTreeMap<u8, Vec<u8>> = BTreeMap::new();
@@ -277,7 +276,7 @@ pub async fn put_record(_routing_table: &RoutingTable, mut recorder: DmaRecorder
#[cfg(has_drtio)]
{
if let Some((old_id, _v, _d)) = _old_record {
remote_dma::erase(_routing_table, old_id).await;
remote_dma::erase(old_id).await;
}
remote_dma::add_traces(ptr, remote_traces);
}
@@ -285,11 +284,11 @@ pub async fn put_record(_routing_table: &RoutingTable, mut recorder: DmaRecorder
ptr
}
pub async fn erase(name: String, _routing_table: &RoutingTable) {
pub async fn erase(name: String) {
let _entry = DMA_RECORD_STORE.lock().remove(&name);
#[cfg(has_drtio)]
if let Some((id, _v, _d)) = _entry {
remote_dma::erase(_routing_table, id).await;
remote_dma::erase(id).await;
}
}

View File

@@ -1,7 +1,7 @@
use alloc::rc::Rc;
use core::cell::RefCell;
use libboard_artiq::{drtio_routing, drtio_routing::RoutingTable, pl::csr};
use libboard_artiq::{drtio_routing, pl::csr};
use libconfig;
use libcortex_a9::mutex::Mutex;
use log::{info, warn};
@@ -24,7 +24,7 @@ pub mod drtio {
use log::{error, info, warn};
use super::*;
use crate::{analyzer::remote_analyzer::RemoteBuffer, rtio_dma::remote_dma, subkernel};
use crate::{analyzer::remote_analyzer::RemoteBuffer, comms::ROUTING_TABLE, rtio_dma::remote_dma, subkernel};
#[cfg(has_drtio_eem)]
const DRTIO_EEM_LINKNOS: core::ops::Range<usize> =
@@ -67,15 +67,10 @@ pub mod drtio {
}
}
pub fn startup(
routing_table: &Rc<RefCell<RoutingTable>>,
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
) {
let routing_table = routing_table.clone();
pub fn startup(up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
let up_destinations = up_destinations.clone();
task::spawn(async move {
let routing_table = routing_table.borrow();
link_task(&routing_table, &up_destinations).await;
link_task(&up_destinations).await;
});
}
@@ -94,9 +89,9 @@ pub mod drtio {
unsafe { (csr::DRTIO[linkno].rx_up_read)() == 1 }
}
fn get_master_destination(routing_table: &RoutingTable) -> u8 {
fn get_master_destination() -> u8 {
for i in 0..drtio_routing::DEST_COUNT {
if routing_table.0[i][0] == 0 {
if ROUTING_TABLE.get().unwrap().0[i][0] == 0 {
return i as u8;
}
}
@@ -104,8 +99,8 @@ pub mod drtio {
0
}
async fn route_packet(linkno: u8, routing_table: &RoutingTable, packet: Packet, destination: u8) {
let dest_link = routing_table.0[destination as usize][0] - 1;
async fn route_packet(linkno: u8, packet: Packet, destination: u8) {
let dest_link = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
if dest_link == linkno {
warn!(
"[LINK#{}] Re-routed packet would return to the same link, dropping: {:?}",
@@ -116,8 +111,8 @@ pub mod drtio {
}
}
async fn process_async_packets(linkno: u8, routing_table: &RoutingTable, packet: Packet) -> Option<Packet> {
let master_destination = get_master_destination(routing_table);
async fn process_async_packets(linkno: u8, packet: Packet) -> Option<Packet> {
let master_destination = get_master_destination();
match packet {
Packet::DmaPlaybackStatus {
id,
@@ -130,7 +125,7 @@ pub mod drtio {
if destination == master_destination {
remote_dma::playback_done(id, source, error, channel, timestamp).await;
} else {
route_packet(linkno, routing_table, packet, destination).await;
route_packet(linkno, packet, destination).await;
}
None
}
@@ -143,7 +138,7 @@ pub mod drtio {
if destination == master_destination {
subkernel::subkernel_finished(id, with_exception, exception_src).await;
} else {
route_packet(linkno, routing_table, packet, destination).await;
route_packet(linkno, packet, destination).await;
}
None
}
@@ -162,7 +157,7 @@ pub mod drtio {
.await
.unwrap();
} else {
route_packet(linkno, routing_table, packet, destination).await;
route_packet(linkno, packet, destination).await;
}
None
}
@@ -181,7 +176,7 @@ pub mod drtio {
if destination == master_destination {
Some(packet)
} else {
route_packet(linkno, routing_table, packet, destination).await;
route_packet(linkno, packet, destination).await;
None
}
}
@@ -200,7 +195,7 @@ pub mod drtio {
}
}
pub async fn aux_transact(linkno: u8, routing_table: &RoutingTable, request: &Packet) -> Result<Packet, Error> {
pub async fn aux_transact(linkno: u8, request: &Packet) -> Result<Packet, Error> {
if !link_rx_up(linkno).await {
return Err(Error::LinkDown);
}
@@ -208,7 +203,7 @@ pub mod drtio {
drtioaux_async::send(linkno, request).await.unwrap();
loop {
let packet = recv_aux_timeout(linkno, 200).await?;
if let Some(packet) = process_async_packets(linkno, routing_table, packet).await {
if let Some(packet) = process_async_packets(linkno, packet).await {
return Ok(packet);
}
}
@@ -221,7 +216,7 @@ pub mod drtio {
}
}
async fn ping_remote(linkno: u8, routing_table: &RoutingTable) -> u32 {
async fn ping_remote(linkno: u8) -> u32 {
let mut count = 0;
loop {
if !link_rx_up(linkno).await {
@@ -231,7 +226,7 @@ pub mod drtio {
if count > 100 {
return 0;
}
let reply = aux_transact(linkno, routing_table, &Packet::EchoRequest).await;
let reply = aux_transact(linkno, &Packet::EchoRequest).await;
match reply {
Ok(Packet::EchoReply) => {
// make sure receive buffer is drained
@@ -260,14 +255,13 @@ pub mod drtio {
}
}
async fn load_routing_table(linkno: u8, routing_table: &RoutingTable) -> Result<(), Error> {
async fn load_routing_table(linkno: u8) -> Result<(), Error> {
for i in 0..drtio_routing::DEST_COUNT {
let reply = aux_transact(
linkno,
routing_table,
&Packet::RoutingSetPath {
destination: i as u8,
hops: routing_table.0[i],
hops: ROUTING_TABLE.get().unwrap().0[i],
},
)
.await?;
@@ -278,8 +272,8 @@ pub mod drtio {
Ok(())
}
async fn set_rank(linkno: u8, rank: u8, routing_table: &RoutingTable) -> Result<(), Error> {
let reply = aux_transact(linkno, routing_table, &Packet::RoutingSetRank { rank: rank }).await?;
async fn set_rank(linkno: u8, rank: u8) -> Result<(), Error> {
let reply = aux_transact(linkno, &Packet::RoutingSetRank { rank: rank }).await?;
match reply {
Packet::RoutingAck => Ok(()),
_ => Err(Error::UnexpectedReply),
@@ -302,11 +296,11 @@ pub mod drtio {
}
}
async fn process_unsolicited_aux(linkno: u8, routing_table: &RoutingTable) {
async fn process_unsolicited_aux(linkno: u8) {
let _lock = AUX_MUTEX.async_lock().await;
match drtioaux_async::recv(linkno).await {
Ok(Some(packet)) => {
if let Some(packet) = process_async_packets(linkno, routing_table, packet).await {
if let Some(packet) = process_async_packets(linkno, packet).await {
warn!("[LINK#{}] unsolicited aux packet: {:?}", linkno, packet);
}
}
@@ -337,7 +331,6 @@ pub mod drtio {
}
async fn destination_set_up(
routing_table: &RoutingTable,
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
destination: u8,
up: bool,
@@ -345,7 +338,7 @@ pub mod drtio {
let mut up_destinations = up_destinations.borrow_mut();
up_destinations[destination as usize] = up;
if up {
drtio_routing::interconnect_enable(routing_table, 0, destination);
drtio_routing::interconnect_enable(ROUTING_TABLE.get().unwrap(), 0, destination);
info!("[DEST#{}] destination is up", destination);
} else {
drtio_routing::interconnect_disable(destination);
@@ -358,13 +351,9 @@ pub mod drtio {
up_destinations[destination as usize]
}
async fn destination_survey(
routing_table: &RoutingTable,
up_links: &[bool],
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
) {
async fn destination_survey(up_links: &[bool], up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
for destination in 0..drtio_routing::DEST_COUNT {
let hop = routing_table.0[destination][0];
let hop = ROUTING_TABLE.get().unwrap().0[destination][0];
let destination = destination as u8;
if hop > 0 && hop as usize <= csr::DRTIO.len() {
@@ -373,7 +362,6 @@ pub mod drtio {
if up_links[linkno as usize] {
let reply = aux_transact(
linkno,
routing_table,
&Packet::DestinationStatusRequest {
destination: destination,
},
@@ -381,9 +369,9 @@ pub mod drtio {
.await;
match reply {
Ok(Packet::DestinationDownReply) => {
destination_set_up(routing_table, up_destinations, destination, false).await;
remote_dma::destination_changed(routing_table, destination, false).await;
subkernel::destination_changed(routing_table, destination, false).await;
destination_set_up(up_destinations, destination, false).await;
remote_dma::destination_changed(destination, false).await;
subkernel::destination_changed(destination, false).await;
}
Ok(Packet::DestinationOkReply) => (),
Ok(Packet::DestinationSequenceErrorReply { channel }) => {
@@ -420,15 +408,14 @@ pub mod drtio {
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e),
}
} else {
destination_set_up(routing_table, up_destinations, destination, false).await;
remote_dma::destination_changed(routing_table, destination, false).await;
subkernel::destination_changed(routing_table, destination, false).await;
destination_set_up(up_destinations, destination, false).await;
remote_dma::destination_changed(destination, false).await;
subkernel::destination_changed(destination, false).await;
}
} else {
if up_links[linkno as usize] {
let reply = aux_transact(
linkno,
routing_table,
&Packet::DestinationStatusRequest {
destination: destination,
},
@@ -437,10 +424,10 @@ pub mod drtio {
match reply {
Ok(Packet::DestinationDownReply) => (),
Ok(Packet::DestinationOkReply) => {
destination_set_up(routing_table, up_destinations, destination, true).await;
destination_set_up(up_destinations, destination, true).await;
init_buffer_space(destination as u8, linkno).await;
remote_dma::destination_changed(routing_table, destination, true).await;
subkernel::destination_changed(routing_table, destination, true).await;
remote_dma::destination_changed(destination, true).await;
subkernel::destination_changed(destination, true).await;
}
Ok(packet) => error!("[DEST#{}] received unexpected aux packet: {:?}", destination, packet),
Err(e) => error!("[DEST#{}] communication failed ({})", destination, e),
@@ -451,22 +438,19 @@ pub mod drtio {
}
}
pub async fn link_task(
routing_table: &RoutingTable,
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
) {
pub async fn link_task(up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
let mut up_links = [false; csr::DRTIO.len()];
// set up local RTIO
let master_destination = get_master_destination(routing_table);
let master_destination = get_master_destination();
destination_set_up(routing_table, up_destinations, master_destination, true).await;
destination_set_up(up_destinations, master_destination, true).await;
loop {
for linkno in 0..csr::DRTIO.len() {
let linkno = linkno as u8;
if up_links[linkno as usize] {
/* link was previously up */
if link_rx_up(linkno).await {
process_unsolicited_aux(linkno, routing_table).await;
process_unsolicited_aux(linkno).await;
process_local_errors(linkno).await;
} else {
info!("[LINK#{}] link is down", linkno);
@@ -495,17 +479,17 @@ pub mod drtio {
if link_rx_up(linkno).await {
info!("[LINK#{}] link RX became up, pinging", linkno);
let ping_count = ping_remote(linkno, routing_table).await;
let ping_count = ping_remote(linkno).await;
if ping_count > 0 {
info!("[LINK#{}] remote replied after {} packets", linkno, ping_count);
up_links[linkno as usize] = true;
if let Err(e) = sync_tsc(linkno).await {
error!("[LINK#{}] failed to sync TSC ({})", linkno, e);
}
if let Err(e) = load_routing_table(linkno, routing_table).await {
if let Err(e) = load_routing_table(linkno).await {
error!("[LINK#{}] failed to load routing table ({})", linkno, e);
}
if let Err(e) = set_rank(linkno, 1 as u8, routing_table).await {
if let Err(e) = set_rank(linkno, 1 as u8).await {
error!("[LINK#{}] failed to set rank ({})", linkno, e);
}
info!("[LINK#{}] link initialization completed", linkno);
@@ -515,12 +499,12 @@ pub mod drtio {
}
}
}
destination_survey(routing_table, &up_links, up_destinations).await;
destination_survey(&up_links, up_destinations).await;
timer::async_delay_ms(200).await;
}
}
pub async fn reset(routing_table: &RoutingTable) {
pub async fn reset() {
for linkno in 0..csr::DRTIO.len() {
unsafe {
(csr::DRTIO[linkno].reset_write)(1);
@@ -536,7 +520,7 @@ pub mod drtio {
for linkno in 0..csr::DRTIO.len() {
let linkno = linkno as u8;
if link_rx_up(linkno).await {
let reply = aux_transact(linkno, routing_table, &Packet::ResetRequest).await;
let reply = aux_transact(linkno, &Packet::ResetRequest).await;
match reply {
Ok(Packet::ResetAck) => (),
Ok(_) => error!("[LINK#{}] reset failed, received unexpected aux packet", linkno),
@@ -548,7 +532,6 @@ pub mod drtio {
pub async fn partition_data<PacketF, HandlerF>(
linkno: u8,
routing_table: &RoutingTable,
data: &[u8],
packet_f: PacketF,
reply_handler_f: HandlerF,
@@ -571,23 +554,17 @@ pub mod drtio {
i += len;
let status = PayloadStatus::from_status(first, last);
let packet = packet_f(&slice, status, len);
let reply = aux_transact(linkno, routing_table, &packet).await?;
let reply = aux_transact(linkno, &packet).await?;
reply_handler_f(&reply)?;
}
Ok(())
}
pub async fn ddma_upload_trace(
routing_table: &RoutingTable,
id: u32,
destination: u8,
trace: &Vec<u8>,
) -> Result<(), Error> {
let linkno = routing_table.0[destination as usize][0] - 1;
let master_destination = get_master_destination(routing_table);
pub async fn ddma_upload_trace(id: u32, destination: u8, trace: &Vec<u8>) -> Result<(), Error> {
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let master_destination = get_master_destination();
partition_data(
linkno,
routing_table,
trace,
|slice, status, len| Packet::DmaAddTraceRequest {
id: id,
@@ -626,12 +603,11 @@ pub mod drtio {
.await
}
pub async fn ddma_send_erase(routing_table: &RoutingTable, id: u32, destination: u8) -> Result<(), Error> {
let linkno = routing_table.0[destination as usize][0] - 1;
let master_destination = get_master_destination(routing_table);
pub async fn ddma_send_erase(id: u32, destination: u8) -> Result<(), Error> {
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let master_destination = get_master_destination();
let reply = aux_transact(
linkno,
routing_table,
&Packet::DmaRemoveTraceRequest {
id: id,
source: master_destination,
@@ -664,17 +640,11 @@ pub mod drtio {
}
}
pub async fn ddma_send_playback(
routing_table: &RoutingTable,
id: u32,
destination: u8,
timestamp: u64,
) -> Result<(), Error> {
let linkno = routing_table.0[destination as usize][0] - 1;
let master_destination = get_master_destination(routing_table);
pub async fn ddma_send_playback(id: u32, destination: u8, timestamp: u64) -> Result<(), Error> {
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let master_destination = get_master_destination();
let reply = aux_transact(
linkno,
routing_table,
&Packet::DmaPlaybackRequest {
id: id,
source: master_destination,
@@ -708,11 +678,10 @@ pub mod drtio {
}
}
async fn analyzer_get_data(routing_table: &RoutingTable, destination: u8) -> Result<RemoteBuffer, Error> {
let linkno = routing_table.0[destination as usize][0] - 1;
async fn analyzer_get_data(destination: u8) -> Result<RemoteBuffer, Error> {
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let reply = aux_transact(
linkno,
routing_table,
&Packet::AnalyzerHeaderRequest {
destination: destination,
},
@@ -733,7 +702,6 @@ pub mod drtio {
while !last_packet {
let reply = aux_transact(
linkno,
routing_table,
&Packet::AnalyzerDataRequest {
destination: destination,
},
@@ -758,28 +726,21 @@ pub mod drtio {
}
pub async fn analyzer_query(
routing_table: &RoutingTable,
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
) -> Result<Vec<RemoteBuffer>, Error> {
let mut remote_buffers: Vec<RemoteBuffer> = Vec::new();
for i in 1..drtio_routing::DEST_COUNT {
if destination_up(up_destinations, i as u8).await {
remote_buffers.push(analyzer_get_data(routing_table, i as u8).await?);
remote_buffers.push(analyzer_get_data(i as u8).await?);
}
}
Ok(remote_buffers)
}
pub async fn subkernel_upload(
routing_table: &RoutingTable,
id: u32,
destination: u8,
data: &Vec<u8>,
) -> Result<(), Error> {
let linkno = routing_table.0[destination as usize][0] - 1;
pub async fn subkernel_upload(id: u32, destination: u8, data: &Vec<u8>) -> Result<(), Error> {
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
partition_data(
linkno,
routing_table,
data,
|slice, status, len| Packet::SubkernelAddDataRequest {
id: id,
@@ -797,18 +758,11 @@ pub mod drtio {
.await
}
pub async fn subkernel_load(
routing_table: &RoutingTable,
id: u32,
destination: u8,
run: bool,
timestamp: u64,
) -> Result<(), Error> {
let linkno = routing_table.0[destination as usize][0] - 1;
let master_destination = get_master_destination(routing_table);
pub async fn subkernel_load(id: u32, destination: u8, run: bool, timestamp: u64) -> Result<(), Error> {
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let master_destination = get_master_destination();
let reply = aux_transact(
linkno,
routing_table,
&Packet::SubkernelLoadRunRequest {
id: id,
source: master_destination,
@@ -843,14 +797,13 @@ pub mod drtio {
}
}
pub async fn subkernel_retrieve_exception(routing_table: &RoutingTable, destination: u8) -> Result<Vec<u8>, Error> {
let linkno = routing_table.0[destination as usize][0] - 1;
pub async fn subkernel_retrieve_exception(destination: u8) -> Result<Vec<u8>, Error> {
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let mut remote_data: Vec<u8> = Vec::new();
let master_destination = get_master_destination(routing_table);
let master_destination = get_master_destination();
loop {
let reply = aux_transact(
linkno,
routing_table,
&Packet::SubkernelExceptionRequest {
source: master_destination,
destination: destination,
@@ -878,17 +831,11 @@ pub mod drtio {
}
}
pub async fn subkernel_send_message(
routing_table: &RoutingTable,
id: u32,
destination: u8,
message: &[u8],
) -> Result<(), Error> {
let linkno = routing_table.0[destination as usize][0] - 1;
let master_destination = get_master_destination(routing_table);
pub async fn subkernel_send_message(id: u32, destination: u8, message: &[u8]) -> Result<(), Error> {
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let master_destination = get_master_destination();
partition_data(
linkno,
routing_table,
message,
|slice, status, len| Packet::SubkernelMessage {
source: master_destination,
@@ -906,11 +853,7 @@ pub mod drtio {
.await
}
pub async fn i2c_send_basic(
routing_table: &RoutingTable,
request: &KernelMessage,
busno: u32,
) -> Result<bool, Error> {
pub async fn i2c_send_basic(request: &KernelMessage, busno: u32) -> Result<bool, Error> {
let destination = (busno >> 16) as u8;
let busno = busno as u8;
let packet = match request {
@@ -925,21 +868,20 @@ pub mod drtio {
},
_ => unreachable!(),
};
let linkno = routing_table.0[destination as usize][0] - 1;
let reply = aux_transact(linkno, routing_table, &packet).await?;
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let reply = aux_transact(linkno, &packet).await?;
match reply {
Packet::I2cBasicReply { succeeded } => Ok(succeeded),
_ => Err(Error::UnexpectedReply),
}
}
pub async fn i2c_send_write(routing_table: &RoutingTable, busno: u32, data: u8) -> Result<(bool, bool), Error> {
pub async fn i2c_send_write(busno: u32, data: u8) -> Result<(bool, bool), Error> {
let destination = (busno >> 16) as u8;
let busno = busno as u8;
let linkno = routing_table.0[destination as usize][0] - 1;
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let reply = aux_transact(
linkno,
routing_table,
&Packet::I2cWriteRequest {
destination,
busno,
@@ -953,13 +895,12 @@ pub mod drtio {
}
}
pub async fn i2c_send_read(routing_table: &RoutingTable, busno: u32, ack: bool) -> Result<(bool, u8), Error> {
pub async fn i2c_send_read(busno: u32, ack: bool) -> Result<(bool, u8), Error> {
let destination = (busno >> 16) as u8;
let busno = busno as u8;
let linkno = routing_table.0[destination as usize][0] - 1;
let linkno = ROUTING_TABLE.get().unwrap().0[destination as usize][0] - 1;
let reply = aux_transact(
linkno,
routing_table,
&Packet::I2cReadRequest {
destination,
busno,
@@ -978,14 +919,10 @@ pub mod drtio {
pub mod drtio {
use super::*;
pub fn startup(
_routing_table: &Rc<RefCell<RoutingTable>>,
_up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
) {
}
pub fn startup(_up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {}
#[allow(dead_code)]
pub fn reset(_routing_table: &RoutingTable) {}
pub fn reset() {}
}
fn toggle_sed_spread(val: u8) {
@@ -1010,12 +947,9 @@ fn setup_sed_spread() {
}
}
pub fn startup(
routing_table: &Rc<RefCell<RoutingTable>>,
up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>,
) {
pub fn startup(up_destinations: &Rc<RefCell<[bool; drtio_routing::DEST_COUNT]>>) {
setup_sed_spread();
drtio::startup(routing_table, up_destinations);
drtio::startup(up_destinations);
unsafe {
csr::rtio_core::reset_phy_write(1);
}

View File

@@ -1,8 +1,7 @@
use alloc::{collections::BTreeMap, vec::Vec};
use libasync::task;
use libboard_artiq::{drtio_routing::RoutingTable,
drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, PayloadStatus}};
use libboard_artiq::drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, PayloadStatus};
use libboard_zynq::timer;
use libcortex_a9::mutex::Mutex;
use log::{error, warn};
@@ -72,9 +71,9 @@ pub async fn add_subkernel(id: u32, destination: u8, kernel: Vec<u8>) {
.insert(id, Subkernel::new(destination, kernel));
}
pub async fn upload(routing_table: &RoutingTable, id: u32) -> Result<(), Error> {
pub async fn upload(id: u32) -> Result<(), Error> {
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
drtio::subkernel_upload(routing_table, id, subkernel.destination, &subkernel.data).await?;
drtio::subkernel_upload(id, subkernel.destination, &subkernel.data).await?;
subkernel.state = SubkernelState::Uploaded;
Ok(())
} else {
@@ -82,12 +81,12 @@ pub async fn upload(routing_table: &RoutingTable, id: u32) -> Result<(), Error>
}
}
pub async fn load(routing_table: &RoutingTable, id: u32, run: bool, timestamp: u64) -> Result<(), Error> {
pub async fn load(id: u32, run: bool, timestamp: u64) -> Result<(), Error> {
if let Some(subkernel) = SUBKERNELS.async_lock().await.get_mut(&id) {
if subkernel.state != SubkernelState::Uploaded {
return Err(Error::IncorrectState);
}
drtio::subkernel_load(routing_table, id, subkernel.destination, run, timestamp).await?;
drtio::subkernel_load(id, subkernel.destination, run, timestamp).await?;
if run {
subkernel.state = SubkernelState::Running;
}
@@ -118,12 +117,12 @@ pub async fn subkernel_finished(id: u32, with_exception: bool, exception_src: u8
}
}
pub async fn destination_changed(routing_table: &RoutingTable, destination: u8, up: bool) {
pub async fn destination_changed(destination: u8, up: bool) {
let mut locked_subkernels = SUBKERNELS.async_lock().await;
for (id, subkernel) in locked_subkernels.iter_mut() {
if subkernel.destination == destination {
if up {
match drtio::subkernel_upload(routing_table, *id, destination, &subkernel.data).await {
match drtio::subkernel_upload(*id, destination, &subkernel.data).await {
Ok(_) => subkernel.state = SubkernelState::Uploaded,
Err(e) => error!("Error adding subkernel on destination {}: {}", destination, e),
}
@@ -139,7 +138,7 @@ pub async fn destination_changed(routing_table: &RoutingTable, destination: u8,
}
}
pub async fn await_finish(routing_table: &RoutingTable, id: u32, timeout: i64) -> Result<SubkernelFinished, Error> {
pub async fn await_finish(id: u32, timeout: i64) -> Result<SubkernelFinished, Error> {
match SUBKERNELS.async_lock().await.get(&id).unwrap().state {
SubkernelState::Running | SubkernelState::Finished { .. } => (),
_ => return Err(Error::IncorrectState),
@@ -175,7 +174,7 @@ pub async fn await_finish(routing_table: &RoutingTable, id: u32, timeout: i64) -
id: id,
status: status,
exception: if let FinishStatus::Exception(dest) = status {
Some(drtio::subkernel_retrieve_exception(routing_table, dest).await?)
Some(drtio::subkernel_retrieve_exception(dest).await?)
} else {
None
},
@@ -282,11 +281,6 @@ pub async fn message_await(id: u32, timeout: i64) -> Result<Message, Error> {
Err(Error::Timeout)
}
pub async fn message_send<'a>(
routing_table: &RoutingTable,
id: u32,
destination: u8,
message: Vec<u8>,
) -> Result<(), Error> {
Ok(drtio::subkernel_send_message(routing_table, id, destination, &message).await?)
pub async fn message_send<'a>(id: u32, destination: u8, message: Vec<u8>) -> Result<(), Error> {
Ok(drtio::subkernel_send_message(id, destination, &message).await?)
}