From b4cc072e8efd221099965ab6229bdf5c827f1533 Mon Sep 17 00:00:00 2001 From: occheung Date: Tue, 20 Aug 2024 16:48:04 +0800 Subject: [PATCH] mgmt: impl pull log --- src/libboard_artiq/src/drtioaux_proto.rs | 5 +- src/runtime/src/mgmt.rs | 47 ++++-- src/satman/src/main.rs | 104 +++++++------- src/satman/src/mgmt.rs | 175 +---------------------- src/satman/src/routing.rs | 2 +- 5 files changed, 94 insertions(+), 239 deletions(-) diff --git a/src/libboard_artiq/src/drtioaux_proto.rs b/src/libboard_artiq/src/drtioaux_proto.rs index 7725cc0..eb28a52 100644 --- a/src/libboard_artiq/src/drtioaux_proto.rs +++ b/src/libboard_artiq/src/drtioaux_proto.rs @@ -295,6 +295,7 @@ pub enum Packet { CoreMgmtGetLogRequest { destination: u8, + clear: bool, }, CoreMgmtGetLogReply { last: bool, @@ -636,6 +637,7 @@ impl Packet { 0xd0 => Packet::CoreMgmtGetLogRequest { destination: reader.read_u8()?, + clear: reader.read_bool()?, }, 0xd1 => { let last = reader.read_bool()?; @@ -1114,9 +1116,10 @@ impl Packet { writer.write_u8(0xcc)?; writer.write_u8(destination)?; } - Packet::CoreMgmtGetLogRequest { destination } => { + Packet::CoreMgmtGetLogRequest { destination, clear } => { writer.write_u8(0xd0)?; writer.write_u8(destination)?; + writer.write_bool(clear)?; } Packet::CoreMgmtGetLogReply { last, length, data } => { writer.write_u8(0xd1)?; diff --git a/src/runtime/src/mgmt.rs b/src/runtime/src/mgmt.rs index 7e3d46a..9cd0bf6 100644 --- a/src/runtime/src/mgmt.rs +++ b/src/runtime/src/mgmt.rs @@ -153,7 +153,7 @@ mod remote_coremgmt { aux_mutex, linkno, routing_table, - &Packet::CoreMgmtGetLogRequest { destination }, + &Packet::CoreMgmtGetLogRequest { destination, clear: false }, timer, ) .await? @@ -204,15 +204,42 @@ mod remote_coremgmt { } pub async fn pull_log( - _stream: &mut TcpStream, - _aux_mutex: &Rc>, - _routing_table: &drtio_routing::RoutingTable, - _timer: GlobalTimer, - _linkno: u8, - _destination: u8, - _pull_id: &Rc>, + stream: &mut TcpStream, + aux_mutex: &Rc>, + routing_table: &drtio_routing::RoutingTable, + timer: GlobalTimer, + linkno: u8, + destination: u8, + pull_id: &Rc>, ) -> Result<()> { - todo!() + let id = { + let mut guard = pull_id.borrow_mut(); + *guard += 1; + *guard + }; + loop { + if id != *pull_id.borrow() { + // another connection attempts to pull the log... + // abort this connection... + break; + } + + match drtio::aux_transact( + aux_mutex, + linkno, + routing_table, + &Packet::CoreMgmtGetLogRequest { destination, clear: true }, + timer, + ).await { + Ok(Packet::CoreMgmtGetLogReply { last, length, data }) => { + write_chunk(stream, &data[..length as usize]).await?; + }, + + _ => return Err(drtio::Error::UnexpectedReply.into()), + } + } + + Ok(()) } pub async fn set_log_filter( @@ -594,7 +621,7 @@ macro_rules! process { #[cfg(not(has_drtio))] macro_rules! process { - ($stream, $timer:ident, $aux_mutex:ident, $routing_table:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{ + ($stream: ident, $timer:ident, $aux_mutex:ident, $routing_table:ident, $destination:expr, $func:ident $(, $param:expr)*) => {{ local_coremgmt::$func($stream, $($param, )*).await }} } diff --git a/src/satman/src/main.rs b/src/satman/src/main.rs index 6a09298..89f2086 100644 --- a/src/satman/src/main.rs +++ b/src/satman/src/main.rs @@ -4,8 +4,8 @@ #[macro_use] extern crate log; -extern crate core_io; extern crate byteorder; +extern crate core_io; extern crate cslice; extern crate embedded_hal; extern crate num_derive; @@ -36,7 +36,7 @@ use libboard_artiq::si5324; #[cfg(has_si549)] use libboard_artiq::si549; use libboard_artiq::{drtio_routing, drtioaux, - drtioaux_proto::{MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE, CORE_MGMT_PAYLOAD_MAX_SIZE}, + drtioaux_proto::{CORE_MGMT_PAYLOAD_MAX_SIZE, MASTER_PAYLOAD_MAX_SIZE, SAT_PAYLOAD_MAX_SIZE}, identifier_read, logger, pl::csr}; #[cfg(feature = "target_kasli_soc")] @@ -46,16 +46,16 @@ use libconfig::Config; use libcortex_a9::{l2c::enable_l2_cache, regs::MPIDR}; use libregister::RegisterR; use libsupport_zynq::{exception_vectors, ram}; +use mgmt::Manager as CoreManager; use routing::Router; use subkernel::Manager as KernelManager; -use mgmt::Manager as CoreManager; mod analyzer; mod dma; +mod mgmt; mod repeater; mod routing; mod subkernel; -mod mgmt; // linker symbols extern "C" { @@ -1018,6 +1018,7 @@ fn process_aux_packet( } drtioaux::Packet::CoreMgmtGetLogRequest { destination: _destination, + clear, } => { forward!( router, @@ -1031,7 +1032,11 @@ fn process_aux_packet( ); let mut data_slice: [u8; CORE_MGMT_PAYLOAD_MAX_SIZE] = [0; CORE_MGMT_PAYLOAD_MAX_SIZE]; let meta = core_manager.log_get_slice(&mut data_slice); - drtioaux::send(0, + if clear && meta.status.is_first() { + mgmt::clear_log(); + } + drtioaux::send( + 0, &drtioaux::Packet::CoreMgmtGetLogReply { last: meta.status.is_last(), length: meta.len as u16, @@ -1053,9 +1058,7 @@ fn process_aux_packet( timer ); mgmt::clear_log(); - drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { - succeeded: true - }) + drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { succeeded: true }) } drtioaux::Packet::CoreMgmtSetLogLevelRequest { destination: _destination, @@ -1071,20 +1074,16 @@ fn process_aux_packet( &packet, timer ); - + match mgmt::byte_to_level_filter(log_level) { Ok(level_filter) => { info!("Changing log level to {}", log_level); log::set_max_level(level_filter); - drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { - succeeded: true - }) + drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { succeeded: true }) } Err(_) => { error!("Unknown log level: {}", log_level); - drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { - succeeded: false - }) + drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { succeeded: false }) } } } @@ -1102,22 +1101,21 @@ fn process_aux_packet( &packet, timer ); - + match mgmt::byte_to_level_filter(log_level) { Ok(level_filter) => { info!("Changing UART log level to {}", log_level); unsafe { - logger::BufferLogger::get_logger().as_ref().unwrap().set_uart_log_level(level_filter); + logger::BufferLogger::get_logger() + .as_ref() + .unwrap() + .set_uart_log_level(level_filter); } - drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { - succeeded: true - }) + drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { succeeded: true }) } Err(_) => { error!("Unknown log level: {}", log_level); - drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { - succeeded: false - }) + drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { succeeded: false }) } } } @@ -1142,25 +1140,32 @@ fn process_aux_packet( let key_slice = &key[..length as usize]; if !key_slice.is_ascii() { error!("invalid key"); - drtioaux::send(0, &drtioaux::Packet::CoreMgmtConfigReadReply { - succeeded: false, - length: 0, - last: true, - value: value_slice, - }) - } else { - let key = core::str::from_utf8(key_slice).unwrap(); - if !core_manager.fetch_config_value(key) { - warn!("read error: no such key"); - drtioaux::send(0, &drtioaux::Packet::CoreMgmtConfigReadReply { + drtioaux::send( + 0, + &drtioaux::Packet::CoreMgmtConfigReadReply { succeeded: false, length: 0, last: true, value: value_slice, - }) + }, + ) + } else { + let key = core::str::from_utf8(key_slice).unwrap(); + if !core_manager.fetch_config_value(key) { + warn!("read error: no such key"); + drtioaux::send( + 0, + &drtioaux::Packet::CoreMgmtConfigReadReply { + succeeded: false, + length: 0, + last: true, + value: value_slice, + }, + ) } else { let meta = core_manager.get_config_value_slice(&mut value_slice); - drtioaux::send(0, + drtioaux::send( + 0, &drtioaux::Packet::CoreMgmtConfigReadReply { succeeded: true, length: meta.len as u16, @@ -1187,7 +1192,8 @@ fn process_aux_packet( let mut value_slice = [0; CORE_MGMT_PAYLOAD_MAX_SIZE]; let meta = core_manager.get_config_value_slice(&mut value_slice); - drtioaux::send(0, + drtioaux::send( + 0, &drtioaux::Packet::CoreMgmtConfigReadReply { succeeded: true, length: meta.len as u16, @@ -1222,11 +1228,7 @@ fn process_aux_packet( } } - drtioaux::send(0, - &drtioaux::Packet::CoreMgmtAck { - succeeded: succeeded, - }, - )?; + drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { succeeded: succeeded })?; Ok(()) } @@ -1249,22 +1251,16 @@ fn process_aux_packet( let key_slice = &key[..length as usize]; if !key_slice.is_ascii() { error!("invalid key"); - drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { - succeeded: false, - }) + drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { succeeded: false }) } else { let key = core::str::from_utf8(key_slice).unwrap(); debug!("erase key: {}", key); if core_manager.remove_config(key) { debug!("erase success"); - drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { - succeeded: true, - }) + drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { succeeded: true }) } else { warn!("erase failed"); - drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { - succeeded: false, - }) + drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { succeeded: false }) } } } @@ -1283,11 +1279,7 @@ fn process_aux_packet( timer ); - drtioaux::send(0, - &drtioaux::Packet::CoreMgmtAck { - succeeded: true, - }, - )?; + drtioaux::send(0, &drtioaux::Packet::CoreMgmtAck { succeeded: true })?; info!("reboot imminent"); slcr::reboot(); Ok(()) diff --git a/src/satman/src/mgmt.rs b/src/satman/src/mgmt.rs index 3a0bb6c..b20c8c8 100644 --- a/src/satman/src/mgmt.rs +++ b/src/satman/src/mgmt.rs @@ -1,13 +1,13 @@ use alloc::vec::Vec; -use libboard_artiq::logger::{BufferLogger, LogBufferRef}; +use io::{Cursor, ProtoRead, ProtoWrite}; +use libboard_artiq::{drtioaux_proto::CORE_MGMT_PAYLOAD_MAX_SIZE, + logger::{BufferLogger, LogBufferRef}}; use libboard_zynq::smoltcp; use libconfig::Config; use log::{self, debug, error, info, warn, LevelFilter}; -use libboard_artiq::drtioaux_proto::CORE_MGMT_PAYLOAD_MAX_SIZE; -use crate::routing::{Sliceable, SliceMeta}; -use io::{Cursor, ProtoRead, ProtoWrite}; +use crate::routing::{SliceMeta, Sliceable}; #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Error { @@ -30,35 +30,6 @@ impl core::fmt::Display for Error { } } -// impl From for Error { -// fn from(error: smoltcp::Error) -> Self { -// Error::NetworkError(error) -// } -// } - -// #[derive(Debug, FromPrimitive)] -// pub enum Request { -// GetLog = 1, -// ClearLog = 2, -// PullLog = 7, -// SetLogFilter = 3, -// Reboot = 5, -// SetUartLogFilter = 6, - -// ConfigRead = 12, -// ConfigWrite = 13, -// ConfigRemove = 14, -// } - -// #[repr(i8)] -// pub enum Reply { -// Success = 1, -// LogContent = 2, -// RebootImminent = 3, -// Error = 6, -// ConfigData = 7, -// } - pub fn byte_to_level_filter(level_byte: u8) -> Result { Ok(match level_byte { 0 => log::LevelFilter::Off, @@ -89,24 +60,6 @@ pub fn clear_log() { buffer.clear(); } -// async fn read_key(stream: &mut TcpStream) -> Result { -// let len = read_i32(stream).await?; -// if len <= 0 { -// write_i8(stream, Reply::Error as i8).await?; -// return Err(Error::UnexpectedPattern); -// } -// let mut buffer = Vec::with_capacity(len as usize); -// for _ in 0..len { -// buffer.push(0); -// } -// read_chunk(stream, &mut buffer).await?; -// if !buffer.is_ascii() { -// write_i8(stream, Reply::Error as i8).await?; -// return Err(Error::UnexpectedPattern); -// } -// Ok(String::from_utf8(buffer).unwrap()) -// } - pub struct Manager<'a> { cfg: &'a mut Config, last_log: Sliceable, @@ -185,123 +138,3 @@ impl<'a> Manager<'_> { status.is_ok() } } - - - -// async fn handle_connection(stream: &mut TcpStream, pull_id: Rc>, cfg: Rc) -> Result<()> { -// if !expect(&stream, b"ARTIQ management\n").await? { -// return Err(Error::UnexpectedPattern); -// } -// stream.send_slice("e".as_bytes()).await?; - -// loop { -// let msg = read_i8(stream).await; -// if let Err(smoltcp::Error::Finished) = msg { -// return Ok(()); -// } -// let msg: Request = FromPrimitive::from_i8(msg?).ok_or(Error::UnrecognizedPacket)?; -// match msg { -// Request::GetLog => { -// let buffer = get_logger_buffer().await.extract().as_bytes().to_vec(); -// write_i8(stream, Reply::LogContent as i8).await?; -// write_chunk(stream, &buffer).await?; -// } -// Request::ClearLog => { -// let mut buffer = get_logger_buffer().await; -// buffer.clear(); -// write_i8(stream, Reply::Success as i8).await?; -// } -// Request::PullLog => { -// let id = { -// let mut guard = pull_id.borrow_mut(); -// *guard += 1; -// *guard -// }; -// loop { -// let mut buffer = get_logger_buffer_pred(|b| !b.is_empty()).await; -// if id != *pull_id.borrow() { -// // another connection attempts to pull the log... -// // abort this connection... -// break; -// } -// let bytes = buffer.extract().as_bytes().to_vec(); -// buffer.clear(); -// core::mem::drop(buffer); -// write_chunk(stream, &bytes).await?; -// if log::max_level() == LevelFilter::Trace { -// // temporarily discard all trace level log -// let logger = unsafe { BufferLogger::get_logger().as_mut().unwrap() }; -// logger.set_buffer_log_level(LevelFilter::Debug); -// stream.flush().await?; -// logger.set_buffer_log_level(LevelFilter::Trace); -// } -// } -// } -// Request::SetLogFilter => { -// let lvl = read_log_level_filter(stream).await?; -// info!("Changing log level to {}", lvl); -// log::set_max_level(lvl); -// write_i8(stream, Reply::Success as i8).await?; -// } -// Request::SetUartLogFilter => { -// let lvl = read_log_level_filter(stream).await?; -// info!("Changing UART log level to {}", lvl); -// unsafe { -// BufferLogger::get_logger().as_ref().unwrap().set_uart_log_level(lvl); -// } -// write_i8(stream, Reply::Success as i8).await?; -// } -// Request::ConfigRead => { -// let key = read_key(stream).await?; -// debug!("read key: {}", key); -// let value = cfg.read(&key); -// if let Ok(value) = value { -// debug!("got value"); -// write_i8(stream, Reply::ConfigData as i8).await?; -// write_chunk(stream, &value).await?; -// } else { -// warn!("read error: no such key"); -// write_i8(stream, Reply::Error as i8).await?; -// } -// } -// Request::ConfigWrite => { -// let key = read_key(stream).await?; -// debug!("write key: {}", key); -// let len = read_i32(stream).await?; -// let len = if len <= 0 { 0 } else { len as usize }; -// let mut buffer = Vec::with_capacity(len); -// unsafe { -// buffer.set_len(len); -// } -// read_chunk(stream, &mut buffer).await?; -// let value = cfg.write(&key, buffer); -// if value.is_ok() { -// debug!("write success"); -// write_i8(stream, Reply::Success as i8).await?; -// } else { -// // this is an error because we do not expect write to fail -// error!("failed to write: {:?}", value); -// write_i8(stream, Reply::Error as i8).await?; -// } -// } -// Request::ConfigRemove => { -// let key = read_key(stream).await?; -// debug!("erase key: {}", key); -// let value = cfg.remove(&key); -// if value.is_ok() { -// debug!("erase success"); -// write_i8(stream, Reply::Success as i8).await?; -// } else { -// warn!("erase failed"); -// write_i8(stream, Reply::Error as i8).await?; -// } -// } -// Request::Reboot => { -// info!("rebooting"); -// write_i8(stream, Reply::RebootImminent as i8).await?; -// stream.flush().await?; -// slcr::reboot(); -// } -// } -// } -// } diff --git a/src/satman/src/routing.rs b/src/satman/src/routing.rs index dbbb958..6d4f19f 100644 --- a/src/satman/src/routing.rs +++ b/src/satman/src/routing.rs @@ -4,7 +4,7 @@ use core::cmp::min; #[cfg(has_drtio_routing)] use libboard_artiq::pl::csr; use libboard_artiq::{drtio_routing, drtioaux, - drtioaux_proto::{PayloadStatus, MASTER_PAYLOAD_MAX_SIZE, CORE_MGMT_PAYLOAD_MAX_SIZE}}; + drtioaux_proto::{PayloadStatus, CORE_MGMT_PAYLOAD_MAX_SIZE, MASTER_PAYLOAD_MAX_SIZE}}; pub struct SliceMeta { pub destination: u8,