From 33e8d6882a400882d528b1aabf0655012214fabe Mon Sep 17 00:00:00 2001 From: occheung Date: Wed, 28 Aug 2024 13:34:52 +0800 Subject: [PATCH] runtime: support coremgmt on satellites --- src/runtime/src/comms.rs | 4 +- src/runtime/src/mgmt.rs | 840 ++++++++++++++++++++++++++++++++---- src/runtime/src/rtio_mgt.rs | 2 +- 3 files changed, 752 insertions(+), 94 deletions(-) diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 6519296..a3f95ba 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -784,7 +784,7 @@ pub fn main(timer: GlobalTimer, cfg: Config) { let cfg = Rc::new(cfg); let restart_idle = Rc::new(Semaphore::new(1, 1)); - mgmt::start(cfg.clone(), restart_idle.clone()); + mgmt::start(cfg.clone(), restart_idle.clone(), Some((&aux_mutex, &drtio_routing_table, timer))); task::spawn(async move { let connection = Rc::new(Semaphore::new(1, 1)); @@ -910,7 +910,7 @@ pub fn soft_panic_main(timer: GlobalTimer, cfg: Config) -> ! { Sockets::init(32); let dummy = Rc::new(Semaphore::new(0, 1)); - mgmt::start(Rc::new(cfg), dummy); + mgmt::start(Rc::new(cfg), dummy, None); // getting eth settings disables the LED as it resets GPIO // need to re-enable it here diff --git a/src/runtime/src/mgmt.rs b/src/runtime/src/mgmt.rs index 1a5590f..bbcc8d9 100644 --- a/src/runtime/src/mgmt.rs +++ b/src/runtime/src/mgmt.rs @@ -1,17 +1,21 @@ use alloc::{rc::Rc, string::String, vec::Vec}; -use core::cell::RefCell; +use core::{cell::RefCell, ops::Deref}; use futures::{future::poll_fn, task::Poll}; use libasync::{smoltcp::TcpStream, task}; -use libboard_artiq::logger::{BufferLogger, LogBufferRef}; -use libboard_zynq::{slcr, smoltcp}; +use libboard_artiq::{drtio_routing, + drtio_routing::RoutingTable, + logger::{BufferLogger, LogBufferRef}}; +use libboard_zynq::{slcr, smoltcp, timer::GlobalTimer}; use libconfig::Config; -use libcortex_a9::semaphore::Semaphore; +use libcortex_a9::{mutex::Mute, semaphore::Semaphore}; use log::{self, debug, error, info, warn, LevelFilter}; use num_derive::FromPrimitive; use num_traits::FromPrimitive; use crate::proto_async::*; +#[cfg(has_drtio)] +use crate::rtio_mgt::*; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Error { @@ -19,6 +23,8 @@ pub enum Error { UnknownLogLevel(u8), UnexpectedPattern, UnrecognizedPacket, + #[cfg(has_drtio)] + DrtioError(drtio::Error), } type Result = core::result::Result; @@ -30,6 +36,8 @@ impl core::fmt::Display for Error { &Error::UnknownLogLevel(lvl) => write!(f, "unknown log level {}", lvl), &Error::UnexpectedPattern => write!(f, "unexpected pattern"), &Error::UnrecognizedPacket => write!(f, "unrecognized packet"), + #[cfg(has_drtio)] + &Error::DrtioError(error) => write!(f, "drtio error: {}", error), } } } @@ -40,6 +48,13 @@ impl From for Error { } } +#[cfg(has_drtio)] +impl From for Error { + fn from(error: drtio::Error) -> Self { + Error::DrtioError(error) + } +} + #[derive(Debug, FromPrimitive)] pub enum Request { GetLog = 1, @@ -52,6 +67,9 @@ pub enum Request { ConfigRead = 12, ConfigWrite = 13, ConfigRemove = 14, + ConfigErase = 15, + + DebugAllocator = 8, } #[repr(i8)] @@ -112,15 +130,657 @@ async fn read_key(stream: &mut TcpStream) -> Result { Ok(String::from_utf8(buffer).unwrap()) } +#[cfg(has_drtio)] +mod remote_coremgmt { + use io::{Cursor, ProtoWrite}; + use libboard_artiq::drtioaux_proto::{Packet, MASTER_PAYLOAD_MAX_SIZE}; + + use super::*; + use crate::rtio_mgt::drtio; + + pub async fn get_log( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + ) -> Result<()> { + let mut buffer = Vec::new(); + loop { + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtGetLogRequest { + destination, + clear: false, + }, + timer, + ) + .await; + + match reply { + Ok(Packet::CoreMgmtGetLogReply { last, length, data }) => { + buffer.extend(&data[..length as usize]); + if last { + write_i8(stream, Reply::LogContent as i8).await?; + write_chunk(stream, &buffer).await?; + return Ok(()); + } + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + write_i8(stream, Reply::Error as i8).await?; + return Err(drtio::Error::UnexpectedReply.into()); + } + Err(e) => { + error!("aux packet error ({})", e); + write_i8(stream, Reply::Error as i8).await?; + return Err(e.into()); + } + } + } + } + + pub async fn clear_log( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + ) -> Result<()> { + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtClearLogRequest { destination }, + timer, + ) + .await; + + match reply { + Ok(Packet::CoreMgmtAck) => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + write_i8(stream, Reply::Error as i8).await?; + Err(drtio::Error::UnexpectedReply.into()) + } + Err(e) => { + error!("aux packet error ({})", e); + write_i8(stream, Reply::Error as i8).await?; + Err(e.into()) + } + } + } + + pub async fn pull_log( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + pull_id: &Rc>, + ) -> Result<()> { + let id = { + let mut guard = pull_id.borrow_mut(); + *guard += 1; + *guard + }; + loop { + if id != *pull_id.borrow() { + // another connection attempts to pull the log... + // abort this connection... + break; + } + + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtGetLogRequest { + destination, + clear: true, + }, + timer, + ) + .await; + + match reply { + Ok(Packet::CoreMgmtGetLogReply { last: _, length, data }) => { + write_chunk(stream, &data[..length as usize]).await?; + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + return Err(drtio::Error::UnexpectedReply.into()); + } + Err(e) => { + error!("aux packet error ({})", e); + return Err(e.into()); + } + } + } + + Ok(()) + } + + pub async fn set_log_filter( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + level: LevelFilter, + ) -> Result<()> { + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtSetLogLevelRequest { + destination, + log_level: level as u8, + }, + timer, + ) + .await; + + match reply { + Ok(Packet::CoreMgmtAck) => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + write_i8(stream, Reply::Error as i8).await?; + Err(drtio::Error::UnexpectedReply.into()) + } + Err(e) => { + error!("aux packet error ({})", e); + write_i8(stream, Reply::Error as i8).await?; + Err(e.into()) + } + } + } + + pub async fn set_uart_log_filter( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + level: LevelFilter, + ) -> Result<()> { + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtSetUartLogLevelRequest { + destination, + log_level: level as u8, + }, + timer, + ) + .await; + + match reply { + Ok(Packet::CoreMgmtAck) => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + write_i8(stream, Reply::Error as i8).await?; + Err(drtio::Error::UnexpectedReply.into()) + } + Err(e) => { + error!("aux packet error ({})", e); + write_i8(stream, Reply::Error as i8).await?; + Err(e.into()) + } + } + } + + pub async fn config_read( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + _cfg: &Rc, + key: &String, + ) -> Result<()> { + let mut config_key: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + let len = key.len(); + config_key[..len].clone_from_slice(key.as_bytes()); + + let mut reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtConfigReadRequest { + destination: destination, + length: len as u16, + key: config_key, + }, + timer, + ) + .await; + + let mut buffer = Vec::::new(); + loop { + match reply { + Ok(Packet::CoreMgmtConfigReadReply { last, length, value }) => { + buffer.extend(&value[..length as usize]); + + if last { + write_i8(stream, Reply::ConfigData as i8).await?; + write_chunk(stream, &buffer).await?; + return Ok(()); + } + + reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtConfigReadContinue { + destination: destination, + }, + timer, + ) + .await; + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + write_i8(stream, Reply::Error as i8).await?; + return Err(drtio::Error::UnexpectedReply.into()); + } + Err(e) => { + error!("aux packet error ({})", e); + write_i8(stream, Reply::Error as i8).await?; + return Err(e.into()); + } + } + } + } + + pub async fn config_write( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + _cfg: &Rc, + key: &String, + value: Vec, + _restart_idle: &Rc, + ) -> Result<()> { + let mut message = Cursor::new(Vec::with_capacity(key.len() + value.len() + 4 * 2)); + message.write_string(key).unwrap(); + message.write_bytes(&value).unwrap(); + + match drtio::partition_data( + linkno, + aux_mutex, + routing_table, + timer, + message.get_ref(), + |slice, status, len: usize| Packet::CoreMgmtConfigWriteRequest { + destination: destination, + last: status.is_last(), + length: len as u16, + data: *slice, + }, + |reply| match reply { + Packet::CoreMgmtAck => Ok(()), + packet => { + error!("received unexpected aux packet: {:?}", packet); + Err(drtio::Error::UnexpectedReply) + } + }, + ) + .await + { + Ok(()) => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + Err(e) => { + error!("aux packet error ({})", e); + write_i8(stream, Reply::Error as i8).await?; + Err(e.into()) + } + } + } + + pub async fn config_remove( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + _cfg: &Rc, + key: &String, + _restart_idle: &Rc, + ) -> Result<()> { + let mut config_key: [u8; MASTER_PAYLOAD_MAX_SIZE] = [0; MASTER_PAYLOAD_MAX_SIZE]; + let len = key.len(); + config_key[..len].clone_from_slice(key.as_bytes()); + + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtConfigRemoveRequest { + destination: destination, + length: len as u16, + key: config_key, + }, + timer, + ) + .await; + + match reply { + Ok(Packet::CoreMgmtAck) => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + write_i8(stream, Reply::Error as i8).await?; + Err(drtio::Error::UnexpectedReply.into()) + } + Err(e) => { + error!("aux packet error ({})", e); + write_i8(stream, Reply::Error as i8).await?; + Err(e.into()) + } + } + } + + pub async fn config_erase( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + ) -> Result<()> { + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtConfigEraseRequest { + destination: destination, + }, + timer, + ) + .await; + + match reply { + Ok(Packet::CoreMgmtAck) => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + write_i8(stream, Reply::Error as i8).await?; + Err(drtio::Error::UnexpectedReply.into()) + } + Err(e) => { + error!("aux packet error ({})", e); + write_i8(stream, Reply::Error as i8).await?; + Err(e.into()) + } + } + } + + pub async fn reboot( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + ) -> Result<()> { + info!("initited reboot request to satellite destination {}", destination); + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtRebootRequest { + destination: destination, + }, + timer, + ) + .await; + + match reply { + Ok(Packet::CoreMgmtAck) => { + write_i8(stream, Reply::RebootImminent as i8).await?; + Ok(()) + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + write_i8(stream, Reply::Error as i8).await?; + Err(drtio::Error::UnexpectedReply.into()) + } + Err(e) => { + error!("aux packet error ({})", e); + write_i8(stream, Reply::Error as i8).await?; + Err(e.into()) + } + } + } + + pub async fn debug_allocator( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + ) -> Result<()> { + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtAllocatorDebugRequest { + destination: destination, + }, + timer, + ) + .await; + + match reply { + Ok(Packet::CoreMgmtAck) => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + Ok(packet) => { + error!("received unexpected aux packet: {:?}", packet); + Err(drtio::Error::UnexpectedReply.into()) + } + Err(e) => { + error!("aux packet error ({})", e); + Err(e.into()) + } + } + } +} + +mod local_coremgmt { + use super::*; + + pub async fn get_log(stream: &mut TcpStream) -> Result<()> { + let buffer = get_logger_buffer().await.extract().as_bytes().to_vec(); + write_i8(stream, Reply::LogContent as i8).await?; + write_chunk(stream, &buffer).await?; + Ok(()) + } + + pub async fn clear_log(stream: &mut TcpStream) -> Result<()> { + let mut buffer = get_logger_buffer().await; + buffer.clear(); + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + + pub async fn pull_log(stream: &mut TcpStream, pull_id: &Rc>) -> Result<()> { + let id = { + let mut guard = pull_id.borrow_mut(); + *guard += 1; + *guard + }; + loop { + let mut buffer = get_logger_buffer_pred(|b| !b.is_empty()).await; + if id != *pull_id.borrow() { + // another connection attempts to pull the log... + // abort this connection... + break; + } + let bytes = buffer.extract().as_bytes().to_vec(); + buffer.clear(); + core::mem::drop(buffer); + write_chunk(stream, &bytes).await?; + if log::max_level() == LevelFilter::Trace { + // temporarily discard all trace level log + let logger = unsafe { BufferLogger::get_logger().as_mut().unwrap() }; + logger.set_buffer_log_level(LevelFilter::Debug); + stream.flush().await?; + logger.set_buffer_log_level(LevelFilter::Trace); + } + } + Ok(()) + } + + pub async fn set_log_filter(stream: &mut TcpStream, lvl: LevelFilter) -> Result<()> { + info!("Changing log level to {}", lvl); + log::set_max_level(lvl); + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + + pub async fn set_uart_log_filter(stream: &mut TcpStream, lvl: LevelFilter) -> Result<()> { + info!("Changing UART log level to {}", lvl); + unsafe { + BufferLogger::get_logger().as_ref().unwrap().set_uart_log_level(lvl); + } + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + + pub async fn config_read(stream: &mut TcpStream, cfg: &Rc, key: &String) -> Result<()> { + let value = cfg.read(&key); + if let Ok(value) = value { + debug!("got value"); + write_i8(stream, Reply::ConfigData as i8).await?; + write_chunk(stream, &value).await?; + } else { + warn!("read error: no such key"); + write_i8(stream, Reply::Error as i8).await?; + } + Ok(()) + } + + pub async fn config_write(stream: &mut TcpStream, cfg: &Rc, key: &String, value: Vec, restart_idle: &Rc) -> Result<()> { + let value = cfg.write(&key, buffer); + if value.is_ok() { + debug!("write success"); + if key == "idle_kernel" { + restart_idle.signal(); + } + write_i8(stream, Reply::Success as i8).await?; + } else { + // this is an error because we do not expect write to fail + error!("failed to write: {:?}", value); + write_i8(stream, Reply::Error as i8).await?; + } + Ok(()) + } + + pub async fn config_remove(stream: &mut TcpStream, cfg: &Rc, key: &String, restart_idle: &Rc) -> Result<()> { + let key = read_key(stream).await?; + debug!("erase key: {}", key); + let value = cfg.remove(&key); + if value.is_ok() { + debug!("erase success"); + if key == "idle_kernel" { + restart_idle.signal(); + } + write_i8(stream, Reply::Success as i8).await?; + } else { + warn!("erase failed"); + write_i8(stream, Reply::Error as i8).await?; + } + Ok(()) + } + + pub async fn config_erase(stream: &mut TcpStream) -> Result<()> { + error!("zynq device does not support config erase"); + write_i8(stream, Reply::Error as i8).await?; + Ok(()) + } + + pub async fn reboot(stream: &mut TcpStream) -> Result<()> { + info!("rebooting"); + write_i8(stream, Reply::RebootImminent as i8).await?; + stream.flush().await?; + slcr::reboot(); + + unreachable!() + } + + pub async fn debug_allocator(_stream: &mut TcpStream) -> Result<()> { + error!("zynq device does not support allocator debug print"); + Ok(()) + } +} + +#[cfg(has_drtio)] +macro_rules! process { + ($stream: ident, $drtio_tuple:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{ + if $destination == 0 { + local_coremgmt::$func($stream, $($param, )*).await + } else if let Some((aux_mutex, routing_table, timer)) = $drtio_tuple { + let linkno = routing_table.0[$destination as usize][0] - 1 as u8; + remote_coremgmt::$func($stream, aux_mutex, routing_table, timer, linkno, $destination, $($param, )*).await + } else { + error!("coremgmt-over-drtio not supported for panicked device, please reboot"); + write_i8($stream, Reply::Error as i8).await?; + Err(drtio::Error::LinkDown.into()) + } + }} +} + +#[cfg(not(has_drtio))] +macro_rules! process { + ($stream: ident, $drtio_tuple:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{ + local_coremgmt::$func($stream, $($param, )*).await + }} +} + async fn handle_connection( stream: &mut TcpStream, pull_id: Rc>, cfg: Rc, restart_idle: Rc, + _drtio_tuple: Option<(&Rc>, &RoutingTable, GlobalTimer)>, ) -> Result<()> { if !expect(&stream, b"ARTIQ management\n").await? { return Err(Error::UnexpectedPattern); } + + let _destination: u8 = read_i8(stream).await? as u8; stream.send_slice("e".as_bytes()).await?; loop { @@ -130,72 +790,48 @@ async fn handle_connection( } let msg: Request = FromPrimitive::from_i8(msg?).ok_or(Error::UnrecognizedPacket)?; match msg { - Request::GetLog => { - let buffer = get_logger_buffer().await.extract().as_bytes().to_vec(); - write_i8(stream, Reply::LogContent as i8).await?; - write_chunk(stream, &buffer).await?; - } - Request::ClearLog => { - let mut buffer = get_logger_buffer().await; - buffer.clear(); - write_i8(stream, Reply::Success as i8).await?; - } - Request::PullLog => { - let id = { - let mut guard = pull_id.borrow_mut(); - *guard += 1; - *guard - }; - loop { - let mut buffer = get_logger_buffer_pred(|b| !b.is_empty()).await; - if id != *pull_id.borrow() { - // another connection attempts to pull the log... - // abort this connection... - break; - } - let bytes = buffer.extract().as_bytes().to_vec(); - buffer.clear(); - core::mem::drop(buffer); - write_chunk(stream, &bytes).await?; - if log::max_level() == LevelFilter::Trace { - // temporarily discard all trace level log - let logger = unsafe { BufferLogger::get_logger().as_mut().unwrap() }; - logger.set_buffer_log_level(LevelFilter::Debug); - stream.flush().await?; - logger.set_buffer_log_level(LevelFilter::Trace); - } - } - } + Request::GetLog => process!(stream, _drtio_tuple, _destination, get_log), + Request::ClearLog => process!(stream, _drtio_tuple, _destination, clear_log), + Request::PullLog => process!( + stream, + _drtio_tuple, + _destination, + pull_log, + &pull_id + ), Request::SetLogFilter => { let lvl = read_log_level_filter(stream).await?; - info!("Changing log level to {}", lvl); - log::set_max_level(lvl); - write_i8(stream, Reply::Success as i8).await?; + process!( + stream, + _drtio_tuple, + _destination, + set_log_filter, + lvl + ) } Request::SetUartLogFilter => { let lvl = read_log_level_filter(stream).await?; - info!("Changing UART log level to {}", lvl); - unsafe { - BufferLogger::get_logger().as_ref().unwrap().set_uart_log_level(lvl); - } - write_i8(stream, Reply::Success as i8).await?; + process!( + stream, + _drtio_tuple, + _destination, + set_uart_log_filter, + lvl + ) } Request::ConfigRead => { let key = read_key(stream).await?; - debug!("read key: {}", key); - let value = cfg.read(&key); - if let Ok(value) = value { - debug!("got value"); - write_i8(stream, Reply::ConfigData as i8).await?; - write_chunk(stream, &value).await?; - } else { - warn!("read error: no such key"); - write_i8(stream, Reply::Error as i8).await?; - } + process!( + stream, + _drtio_tuple, + _destination, + config_read, + &cfg, + &key + ) } Request::ConfigWrite => { let key = read_key(stream).await?; - debug!("write key: {}", key); let len = read_i32(stream).await?; let len = if len <= 0 { 0 } else { len as usize }; let mut buffer = Vec::with_capacity(len); @@ -203,55 +839,77 @@ async fn handle_connection( buffer.set_len(len); } read_chunk(stream, &mut buffer).await?; - let value = cfg.write(&key, buffer); - if value.is_ok() { - debug!("write success"); - if key == "idle_kernel" { - restart_idle.signal(); - } - write_i8(stream, Reply::Success as i8).await?; - } else { - // this is an error because we do not expect write to fail - error!("failed to write: {:?}", value); - write_i8(stream, Reply::Error as i8).await?; - } + process!( + stream, + _drtio_tuple, + _destination, + config_write, + &cfg, + &key, + buffer, + restart_idle + ) } Request::ConfigRemove => { let key = read_key(stream).await?; - debug!("erase key: {}", key); - let value = cfg.remove(&key); - if value.is_ok() { - debug!("erase success"); - if key == "idle_kernel" { - restart_idle.signal(); - } - write_i8(stream, Reply::Success as i8).await?; - } else { - warn!("erase failed"); - write_i8(stream, Reply::Error as i8).await?; - } + process!( + stream, + _drtio_tuple, + _destination, + config_remove, + &cfg, + &key, + restart_idle + ) } Request::Reboot => { - info!("rebooting"); - write_i8(stream, Reply::RebootImminent as i8).await?; - stream.flush().await?; - slcr::reboot(); + process!(stream, _drtio_tuple, _destination, reboot) } - } + Request::ConfigErase => { + process!(stream, _drtio_tuple, _destination, config_erase) + } + Request::DebugAllocator => { + process!( + stream, + _drtio_tuple, + _destination, + debug_allocator + ) + } + }?; } } -pub fn start(cfg: Rc, restart_idle: Rc) { +pub fn start( + cfg: Rc, + restart_idle: Rc, + drtio_tuple: Option<( + &Rc>, + &Rc>, + GlobalTimer, + )>, +) { + let drtio_tuple = drtio_tuple.map( + |(aux_mutex, routing_table, timer)| (aux_mutex.clone(), routing_table.clone(), timer) + ); task::spawn(async move { let pull_id = Rc::new(RefCell::new(0u32)); loop { let mut stream = TcpStream::accept(1380, 2048, 2048).await.unwrap(); let pull_id = pull_id.clone(); let cfg = cfg.clone(); - let restart_idle = restart_idle.clone(); + let drtio_tuple = drtio_tuple.clone(); task::spawn(async move { info!("received connection"); - let _ = handle_connection(&mut stream, pull_id, cfg, restart_idle) + // Avoid consuming the tuple + // Keep the borrowed value on stack + let drtio_tuple = drtio_tuple.as_ref().map( + |(aux_mutex, routing_table, timer)| (aux_mutex, routing_table.borrow(), *timer) + ); + let drtio_tuple = drtio_tuple.as_ref().map( + |(aux_mutex, routing_table, timer)| (*aux_mutex, routing_table.deref(), *timer) + ); + let _ = handle_connection(&mut stream, pull_id, cfg, restart_idle, drtio_tuple) .await .map_err(|e| warn!("connection terminated: {:?}", e)); let _ = stream.flush().await; diff --git a/src/runtime/src/rtio_mgt.rs b/src/runtime/src/rtio_mgt.rs index 53a3b8b..4fa8cce 100644 --- a/src/runtime/src/rtio_mgt.rs +++ b/src/runtime/src/rtio_mgt.rs @@ -540,7 +540,7 @@ pub mod drtio { } } - async fn partition_data( + pub async fn partition_data( linkno: u8, aux_mutex: &Rc>, routing_table: &RoutingTable,