From 740c2f1fc0d1d7b722aafb18d388efd05a27e9c8 Mon Sep 17 00:00:00 2001 From: occheung Date: Mon, 19 Aug 2024 12:01:36 +0800 Subject: [PATCH] runtime/mgmt: dispatch->process --- src/runtime/src/mgmt.rs | 904 +++++++++++++++++++++------------------- 1 file changed, 465 insertions(+), 439 deletions(-) diff --git a/src/runtime/src/mgmt.rs b/src/runtime/src/mgmt.rs index 11ed426..7e3d46a 100644 --- a/src/runtime/src/mgmt.rs +++ b/src/runtime/src/mgmt.rs @@ -3,7 +3,9 @@ use core::cell::RefCell; use futures::{future::poll_fn, task::Poll}; use libasync::{smoltcp::TcpStream, task}; -use libboard_artiq::{logger::{BufferLogger, LogBufferRef}, drtio_routing, drtio_routing::RoutingTable}; +use libboard_artiq::{drtio_routing, + drtio_routing::RoutingTable, + logger::{BufferLogger, LogBufferRef}}; use libboard_zynq::{slcr, smoltcp, timer::GlobalTimer}; use libconfig::Config; use libcortex_a9::mutex::Mutex; @@ -11,8 +13,7 @@ use log::{self, debug, error, info, warn, LevelFilter}; use num_derive::FromPrimitive; use num_traits::FromPrimitive; -use crate::proto_async::*; -use crate::rtio_mgt::*; +use crate::{proto_async::*, rtio_mgt::*}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Error { @@ -20,9 +21,6 @@ pub enum Error { UnknownLogLevel(u8), UnexpectedPattern, UnrecognizedPacket, - ReadConfigFail, - WriteConfigFail, - RemoveConfigFail, #[cfg(has_drtio)] DrtioError(drtio::Error), } @@ -36,9 +34,6 @@ impl core::fmt::Display for Error { &Error::UnknownLogLevel(lvl) => write!(f, "unknown log level {}", lvl), &Error::UnexpectedPattern => write!(f, "unexpected pattern"), &Error::UnrecognizedPacket => write!(f, "unrecognized packet"), - &Error::ReadConfigFail => write!(f, "error reading config"), - &Error::WriteConfigFail => write!(f, "error writing config"), - &Error::RemoveConfigFail => write!(f, "error removing config"), #[cfg(has_drtio)] &Error::DrtioError(error) => write!(f, "drtio error: {}", error), } @@ -95,7 +90,7 @@ pub fn byte_to_level_filter(level_byte: u8) -> Result { async fn read_log_level_filter(stream: &mut TcpStream) -> Result { let level_byte = read_i8(stream).await? as u8; - + byte_to_level_filter(level_byte) } @@ -136,21 +131,359 @@ async fn read_key(stream: &mut TcpStream) -> Result { Ok(String::from_utf8(buffer).unwrap()) } +#[cfg(has_drtio)] +mod remote_coremgmt { + use io::{Cursor, ProtoWrite}; + use libboard_artiq::drtioaux_proto::{Packet, CORE_MGMT_PAYLOAD_MAX_SIZE}; + + use super::*; + use crate::rtio_mgt::drtio; + + pub async fn get_log( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + ) -> Result<()> { + let mut buffer = Vec::new(); + loop { + match drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtGetLogRequest { destination }, + timer, + ) + .await? + { + Packet::CoreMgmtGetLogReply { last, length, data } => { + buffer.extend(&data[..length as usize]); + if last { + write_i8(stream, Reply::LogContent as i8).await?; + write_chunk(stream, &buffer).await?; + return Ok(()); + } + } + _ => { + write_i8(stream, Reply::Error as i8).await?; + return Err(drtio::Error::UnexpectedReply.into()); + } + } + } + } + + pub async fn clear_log( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + ) -> Result<()> { + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtClearLogRequest { destination }, + timer, + ) + .await?; + + match reply { + Packet::CoreMgmtAck { succeeded: true } => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + _ => { + write_i8(stream, Reply::Error as i8).await?; + return Err(drtio::Error::UnexpectedReply.into()); + } + } + } + + pub async fn pull_log( + _stream: &mut TcpStream, + _aux_mutex: &Rc>, + _routing_table: &drtio_routing::RoutingTable, + _timer: GlobalTimer, + _linkno: u8, + _destination: u8, + _pull_id: &Rc>, + ) -> Result<()> { + todo!() + } + + pub async fn set_log_filter( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + level: LevelFilter, + ) -> Result<()> { + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtSetLogLevelRequest { + destination, + log_level: level as u8, + }, + timer, + ) + .await?; + + match reply { + Packet::CoreMgmtAck { succeeded: true } => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + _ => { + write_i8(stream, Reply::Error as i8).await?; + return Err(drtio::Error::UnexpectedReply.into()); + } + } + } + + pub async fn set_uart_log_filter( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + level: LevelFilter, + ) -> Result<()> { + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtSetUartLogLevelRequest { + destination, + log_level: level as u8, + }, + timer, + ) + .await?; + + match reply { + Packet::CoreMgmtAck { succeeded: true } => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + _ => { + write_i8(stream, Reply::Error as i8).await?; + return Err(drtio::Error::UnexpectedReply.into()); + } + } + } + + pub async fn config_read( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + _cfg: &Rc, + key: &String, + ) -> Result<()> { + let mut config_key: [u8; CORE_MGMT_PAYLOAD_MAX_SIZE] = [0; CORE_MGMT_PAYLOAD_MAX_SIZE]; + let len = key.len(); + config_key[..len].clone_from_slice(key.as_bytes()); + + let mut reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtConfigReadRequest { + destination: destination, + length: len as u16, + key: config_key, + }, + timer, + ) + .await?; + + let mut buffer = Vec::::new(); + loop { + match reply { + Packet::CoreMgmtConfigReadReply { + succeeded: true, + length, + last, + value, + } => { + buffer.extend(&value[..length as usize]); + + if last { + write_i8(stream, Reply::ConfigData as i8).await?; + write_chunk(stream, &value).await?; + return Ok(()); + } + + reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtConfigReadContinue { + destination: destination, + }, + timer, + ) + .await?; + } + + _ => { + write_i8(stream, Reply::Error as i8).await?; + return Err(drtio::Error::UnexpectedReply.into()); + } + } + } + } + + pub async fn config_write( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + _cfg: &Rc, + key: &String, + value: Vec, + ) -> Result<()> { + let mut message = Cursor::new(Vec::with_capacity(key.len() + value.len() + 4 * 2)); + message.write_string(key).unwrap(); + message.write_bytes(&value).unwrap(); + + match drtio::partition_data( + linkno, + aux_mutex, + routing_table, + timer, + message.get_ref(), + |slice, status, len: usize| Packet::CoreMgmtConfigWriteRequest { + destination: destination, + length: len as u16, + last: status.is_last(), + data: *slice, + }, + |reply| match reply { + Packet::CoreMgmtAck { succeeded: true } => Ok(()), + _ => { + error!("received unknown packet"); + Err(drtio::Error::UnexpectedReply.into()) + } + }, + ) + .await + { + Ok(()) => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + error => { + write_i8(stream, Reply::Error as i8).await?; + error + } + } + } + + pub async fn config_remove( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + _cfg: &Rc, + key: &String, + ) -> Result<()> { + let mut config_key: [u8; CORE_MGMT_PAYLOAD_MAX_SIZE] = [0; CORE_MGMT_PAYLOAD_MAX_SIZE]; + let len = key.len(); + config_key[..len].clone_from_slice(key.as_bytes()); + + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtConfigRemoveRequest { + destination: destination, + length: len as u16, + key: config_key, + }, + timer, + ) + .await?; + + match reply { + Packet::CoreMgmtAck { succeeded: true } => { + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + _ => { + write_i8(stream, Reply::Error as i8).await?; + Err(drtio::Error::UnexpectedReply.into()) + } + } + } + + pub async fn reboot( + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + ) -> Result<()> { + info!("initited reboot request to satellite destination {}", destination); + let reply = drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtRebootRequest { + destination: destination, + }, + timer, + ) + .await?; + + match reply { + Packet::CoreMgmtAck { succeeded: true } => { + write_i8(stream, Reply::RebootImminent as i8).await?; + Ok(()) + } + _ => { + error!("received unknown packet"); + write_i8(stream, Reply::Error as i8).await?; + Err(drtio::Error::UnexpectedReply.into()) + } + } + } +} + mod local_coremgmt { use super::*; - pub async fn get_log() -> Result> { + pub async fn get_log(stream: &mut TcpStream) -> Result<()> { let buffer = get_logger_buffer().await.extract().as_bytes().to_vec(); - Ok(buffer) - } - - pub async fn clear_log() -> Result<()> { - let mut buffer = get_logger_buffer().await; - buffer.clear(); + write_i8(stream, Reply::LogContent as i8).await?; + write_chunk(stream, &buffer).await?; Ok(()) } - pub async fn pull_log<'a>(stream: &'a mut TcpStream, pull_id: Rc>) -> Result<()> { + pub async fn clear_log(stream: &mut TcpStream) -> Result<()> { + let mut buffer = get_logger_buffer().await; + buffer.clear(); + write_i8(stream, Reply::Success as i8).await?; + Ok(()) + } + + pub async fn pull_log<'a>(stream: &'a mut TcpStream, pull_id: &Rc>) -> Result<()> { let id = { let mut guard = pull_id.borrow_mut(); *guard += 1; @@ -178,374 +511,94 @@ mod local_coremgmt { Ok(()) } - pub async fn set_log_filter(lvl: LevelFilter) -> Result<()> { + pub async fn set_log_filter(stream: &mut TcpStream, lvl: LevelFilter) -> Result<()> { info!("Changing log level to {}", lvl); log::set_max_level(lvl); + write_i8(stream, Reply::Success as i8).await?; Ok(()) } - pub async fn set_uart_log_filter(lvl: LevelFilter) -> Result<()> { + pub async fn set_uart_log_filter(stream: &mut TcpStream, lvl: LevelFilter) -> Result<()> { info!("Changing UART log level to {}", lvl); unsafe { BufferLogger::get_logger().as_ref().unwrap().set_uart_log_level(lvl); } + write_i8(stream, Reply::Success as i8).await?; Ok(()) } - pub async fn config_read(cfg: &Rc, key: &String) -> Result>{ - debug!("read key: {}", key); - cfg.read(&key).map_err(|_| Error::ReadConfigFail) + pub async fn config_read(stream: &mut TcpStream, cfg: &Rc, key: &String) -> Result<()> { + let value = cfg.read(&key); + if let Ok(value) = value { + debug!("got value"); + write_i8(stream, Reply::ConfigData as i8).await?; + write_chunk(stream, &value).await?; + } else { + warn!("read error: no such key"); + write_i8(stream, Reply::Error as i8).await?; + } + Ok(()) } - pub async fn config_write<'a>(cfg: &Rc, key: &'a String, value: Vec) -> Result<()> { - cfg.write(&key, value).map_err(|_| Error::WriteConfigFail) + pub async fn config_write<'a>( + stream: &mut TcpStream, + cfg: &Rc, + key: &'a String, + value: Vec, + ) -> Result<()> { + let value = cfg.write(&key, value); + if value.is_ok() { + debug!("write success"); + write_i8(stream, Reply::Success as i8).await?; + } else { + // this is an error because we do not expect write to fail + error!("failed to write: {:?}", value); + write_i8(stream, Reply::Error as i8).await?; + } + Ok(()) } - pub async fn config_remove<'a>(cfg: &Rc, key: &'a String) -> Result<()> { - debug!("erase key: {}", key); + pub async fn config_remove<'a>(stream: &mut TcpStream, cfg: &Rc, key: &'a String) -> Result<()> { let value = cfg.remove(&key); if value.is_ok() { debug!("erase success"); - Ok(()) + write_i8(stream, Reply::Success as i8).await?; } else { warn!("erase failed"); - Err(Error::RemoveConfigFail) - } - } -} - -#[cfg(has_drtio)] -mod remote_coremgmt { - use crate::rtio_mgt::drtio; - use super::*; - use libboard_artiq::drtioaux_proto::{Packet, CORE_MGMT_PAYLOAD_MAX_SIZE}; - use io::{Cursor, ProtoWrite}; - - pub async fn get_log( - aux_mutex: &Rc>, - routing_table: &drtio_routing::RoutingTable, - timer: GlobalTimer, - linkno: u8, - destination: u8, - ) -> Result> { - let mut buffer = Vec::new(); - loop { - match drtio::aux_transact( - aux_mutex, - linkno, - routing_table, - &Packet::CoreMgmtGetLogRequest { - destination - }, - timer, - ).await? { - Packet::CoreMgmtGetLogReply { - last, length, data, - } => { - buffer.extend(&data[..length as usize]); - if last { - return Ok(buffer); - } - }, - _ => return Err(drtio::Error::UnexpectedReply.into()), - } + write_i8(stream, Reply::Error as i8).await?; } + Ok(()) } - pub async fn clear_log( - aux_mutex: &Rc>, - routing_table: &drtio_routing::RoutingTable, - timer: GlobalTimer, - linkno: u8, - destination: u8, - ) -> Result<()> { - let reply = drtio::aux_transact( - aux_mutex, - linkno, - routing_table, - &Packet::CoreMgmtClearLogRequest { - destination - }, - timer, - ).await?; - - match reply { - Packet::CoreMgmtAck { - succeeded: true - } => Ok(()), - _ => Err(drtio::Error::UnexpectedReply.into()), - } - } + pub async fn reboot(stream: &mut TcpStream) -> Result<()> { + info!("rebooting"); + write_i8(stream, Reply::RebootImminent as i8).await?; + stream.flush().await?; + slcr::reboot(); - pub async fn pull_log( - aux_mutex: &Rc>, - routing_table: &drtio_routing::RoutingTable, - timer: GlobalTimer, - linkno: u8, - destination: u8, - stream: &mut TcpStream, - ) -> Result<()> { - todo!() - } - - pub async fn set_log_filter( - aux_mutex: &Rc>, - routing_table: &drtio_routing::RoutingTable, - timer: GlobalTimer, - linkno: u8, - destination: u8, - level: LevelFilter, - ) -> Result<()> { - let reply = drtio::aux_transact( - aux_mutex, - linkno, - routing_table, - &Packet::CoreMgmtSetLogLevelRequest { - destination, - log_level: level as u8, - }, - timer, - ).await?; - - match reply { - Packet::CoreMgmtAck { - succeeded: true - } => Ok(()), - Packet::CoreMgmtAck { - succeeded: false - } => { - error!("satellite misinterpret log level, corrupted packet?"); - Err(drtio::Error::AuxError.into()) - }, - _ => Err(drtio::Error::UnexpectedReply.into()), - } - } - - pub async fn set_uart_log_filter( - aux_mutex: &Rc>, - routing_table: &drtio_routing::RoutingTable, - timer: GlobalTimer, - linkno: u8, - destination: u8, - level: LevelFilter, - ) -> Result<()> { - let reply = drtio::aux_transact( - aux_mutex, - linkno, - routing_table, - &Packet::CoreMgmtSetUartLogLevelRequest { - destination, - log_level: level as u8, - }, - timer, - ).await?; - - match reply { - Packet::CoreMgmtAck { - succeeded: true - } => Ok(()), - Packet::CoreMgmtAck { - succeeded: false - } => { - error!("satellite misinterpret log level, corrupted packet?"); - Err(drtio::Error::AuxError.into()) - }, - _ => Err(drtio::Error::UnexpectedReply.into()), - } - } - - pub async fn config_read( - aux_mutex: &Rc>, - routing_table: &drtio_routing::RoutingTable, - timer: GlobalTimer, - linkno: u8, - destination: u8, - _cfg: &Rc, - key: &String, - ) -> Result> { - let mut config_key: [u8; CORE_MGMT_PAYLOAD_MAX_SIZE] = [0; CORE_MGMT_PAYLOAD_MAX_SIZE]; - let len = key.len(); - if len > CORE_MGMT_PAYLOAD_MAX_SIZE { - error!("key is too long"); - return Err(Error::ReadConfigFail); - } - config_key[..len].clone_from_slice(key.as_bytes()); - - let mut reply = drtio::aux_transact( - aux_mutex, - linkno, - routing_table, - &Packet::CoreMgmtConfigReadRequest { - destination: destination, - length: len as u16, - key: config_key, - }, - timer, - ).await?; - - let mut buffer = Vec::new(); - loop { - match reply { - Packet::CoreMgmtConfigReadReply { - succeeded: true, - length, - last, - value, - } => { - buffer.extend(&value[..length as usize]); - - if last { - return Ok(buffer); - } - - reply = drtio::aux_transact( - aux_mutex, - linkno, - routing_table, - &Packet::CoreMgmtConfigReadContinue { - destination: destination, - }, - timer, - ).await?; - } - - Packet::CoreMgmtConfigReadReply { - succeeded:false, .. - } => { - return Err(Error::ReadConfigFail); - } - - _ => return Err(drtio::Error::UnexpectedReply.into()), - } - } - } - - pub async fn config_write( - aux_mutex: &Rc>, - routing_table: &drtio_routing::RoutingTable, - timer: GlobalTimer, - linkno: u8, - destination: u8, - _cfg: &Rc, - key: &String, - value: Vec, - ) -> Result<()> { - let mut message = Cursor::new(Vec::with_capacity(key.len() + value.len() + 4 * 2)); - message.write_string(key).unwrap(); - message.write_bytes(&value).unwrap(); - - drtio::partition_data( - linkno, - aux_mutex, - routing_table, - timer, - message.get_ref(), - |slice, status, len: usize| Packet::CoreMgmtConfigWriteRequest { - destination: destination, - length: len as u16, - last: status.is_last(), - data: *slice - }, - |reply| match reply { - Packet::CoreMgmtAck { succeeded: true } => { - Ok(()) - }, - Packet::CoreMgmtAck { succeeded: false } => { - error!("config write failed"); - Err(Error::WriteConfigFail) - }, - _ => { - error!("received unknown packet"); - Err(drtio::Error::UnexpectedReply.into()) - }, - } - ).await - } - - pub async fn config_remove( - aux_mutex: &Rc>, - routing_table: &drtio_routing::RoutingTable, - timer: GlobalTimer, - linkno: u8, - destination: u8, - _cfg: &Rc, - key: &String, - ) -> Result<()> { - let mut config_key: [u8; CORE_MGMT_PAYLOAD_MAX_SIZE] = [0; CORE_MGMT_PAYLOAD_MAX_SIZE]; - let len = key.len(); - if len > CORE_MGMT_PAYLOAD_MAX_SIZE { - error!("key is too long"); - return Err(Error::RemoveConfigFail); - } - config_key[..len].clone_from_slice(key.as_bytes()); - - let reply = drtio::aux_transact( - aux_mutex, - linkno, - routing_table, - &Packet::CoreMgmtConfigRemoveRequest { - destination: destination, - length: len as u16, - key: config_key, - }, - timer, - ).await?; - - match reply { - Packet::CoreMgmtAck { succeeded: true } => Ok(()), - Packet::CoreMgmtAck { succeeded: false } => Err(Error::RemoveConfigFail), - _ => Err(drtio::Error::UnexpectedReply.into()), - } - } - - pub async fn reboot( - aux_mutex: &Rc>, - routing_table: &drtio_routing::RoutingTable, - timer: GlobalTimer, - linkno: u8, - destination: u8, - ) -> Result<()> { - info!("initited reboot request to satellite destination {}", destination); - let reply = drtio::aux_transact( - aux_mutex, - linkno, - routing_table, - &Packet::CoreMgmtRebootRequest { - destination: destination, - }, - timer, - ).await?; - - match reply { - Packet::CoreMgmtAck { succeeded: true } => Ok(()), - _ => { - error!("received unknown packet"); - Err(drtio::Error::UnexpectedReply.into()) - }, - } + unreachable!() } } #[cfg(has_drtio)] macro_rules! process { - ($timer:ident, $aux_mutex:ident, $routing_table:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{ + ($stream: ident, $timer:ident, $aux_mutex:ident, $routing_table:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{ if $destination == 0 { - local_coremgmt::$func($($param, )*).await + local_coremgmt::$func($stream, $($param, )*).await } else { let linkno = $routing_table.0[$destination as usize][0] - 1 as u8; - remote_coremgmt::$func($aux_mutex, $routing_table, $timer, linkno, $destination, $($param, )*).await + remote_coremgmt::$func($stream, $aux_mutex, $routing_table, $timer, linkno, $destination, $($param, )*).await } }} } #[cfg(not(has_drtio))] macro_rules! process { - ($timer:ident, $aux_mutex:ident, $routing_table:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{ - local_coremgmt::$func($($param, )*).await + ($stream, $timer:ident, $aux_mutex:ident, $routing_table:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{ + local_coremgmt::$func($stream, $($param, )*).await }} } - async fn handle_connection( stream: &mut TcpStream, pull_id: Rc>, @@ -568,74 +621,56 @@ async fn handle_connection( } let msg: Request = FromPrimitive::from_i8(msg?).ok_or(Error::UnrecognizedPacket)?; match msg { - Request::GetLog => { - match dispatch!(timer, _aux_mutex, _routing_table, destination, get_log) { - Ok(buffer) => { - write_i8(stream, Reply::LogContent as i8).await?; - write_chunk(stream, &buffer).await?; - } - Err(_) => { - write_i8(stream, Reply::Error as i8).await?; - } - } - }, - - Request::ClearLog => { - match dispatch!(timer, _aux_mutex, _routing_table, destination, clear_log) { - Ok(()) => { - write_i8(stream, Reply::Success as i8).await?; - } - Err(_) => { - write_i8(stream, Reply::Error as i8).await?; - } - } - }, - - Request::PullLog => { - todo!() - } - + Request::GetLog => process!(stream, timer, _aux_mutex, _routing_table, destination, get_log), + Request::ClearLog => process!(stream, timer, _aux_mutex, _routing_table, destination, clear_log), + Request::PullLog => process!( + stream, + timer, + _aux_mutex, + _routing_table, + destination, + pull_log, + &pull_id + ), Request::SetLogFilter => { let lvl = read_log_level_filter(stream).await?; - match dispatch!(timer, _aux_mutex, _routing_table, destination, set_log_filter, lvl) { - Ok(()) => { - write_i8(stream, Reply::Success as i8).await?; - } - Err(_) => { - write_i8(stream, Reply::Error as i8).await?; - } - } - }, - + process!( + stream, + timer, + _aux_mutex, + _routing_table, + destination, + set_log_filter, + lvl + ) + } Request::SetUartLogFilter => { let lvl = read_log_level_filter(stream).await?; - match dispatch!(timer, _aux_mutex, _routing_table, destination, set_uart_log_filter, lvl) { - Ok(()) => { - write_i8(stream, Reply::Success as i8).await?; - } - Err(_) => { - write_i8(stream, Reply::Error as i8).await?; - } - } - }, - + process!( + stream, + timer, + _aux_mutex, + _routing_table, + destination, + set_uart_log_filter, + lvl + ) + } Request::ConfigRead => { let key = read_key(stream).await?; - debug!("read key: {}", key); - match dispatch!(timer, _aux_mutex, _routing_table, destination, config_read, &cfg, &key) { - Ok(value) => { - write_i8(stream, Reply::ConfigData as i8).await?; - write_chunk(stream, &value).await?; - } - Err(_) => { - write_i8(stream, Reply::Error as i8).await?; - } - } - }, - + process!( + stream, + timer, + _aux_mutex, + _routing_table, + destination, + config_read, + &cfg, + &key + ) + } Request::ConfigWrite => { let key = read_key(stream).await?; - debug!("write key: {}", key); let len = read_i32(stream).await?; let len = if len <= 0 { 0 } else { len as usize }; let mut buffer = Vec::with_capacity(len); @@ -643,47 +678,38 @@ async fn handle_connection( buffer.set_len(len); } read_chunk(stream, &mut buffer).await?; - match dispatch!(timer, _aux_mutex, _routing_table, destination, config_write, &cfg, &key, buffer) { - Ok(()) => write_i8(stream, Reply::Success as i8).await?, - Err(_) => write_i8(stream, Reply::Error as i8).await?, - } - }, - + process!( + stream, + timer, + _aux_mutex, + _routing_table, + destination, + config_write, + &cfg, + &key, + buffer + ) + } Request::ConfigRemove => { let key = read_key(stream).await?; - let value = dispatch!(timer, _aux_mutex, _routing_table, destination, config_remove, &cfg, &key); - if value.is_ok() { - write_i8(stream, Reply::Success as i8).await?; - } else { - write_i8(stream, Reply::Error as i8).await?; - } - }, - + process!( + stream, + timer, + _aux_mutex, + _routing_table, + destination, + config_remove, + &cfg, + &key + ) + } Request::Reboot => { - if destination == 0 { - info!("rebooting"); - write_i8(stream, Reply::RebootImminent as i8).await?; - stream.flush().await?; - slcr::reboot(); - } - - #[cfg(has_drtio)] - { - let linkno = _routing_table.0[destination as usize][0] - 1; - match remote_coremgmt::reboot( - _aux_mutex, _routing_table, timer, linkno, destination - ).await { - Ok(()) => write_i8(stream, Reply::Success as i8).await?, - Err(_) => write_i8(stream, Reply::Error as i8).await?, - } - } - }, - _ => todo!() - } + process!(stream, timer, _aux_mutex, _routing_table, destination, reboot) + } + }?; } } - pub fn start( cfg: Config, aux_mutex: &Rc>,