diff --git a/src/runtime/src/mgmt.rs b/src/runtime/src/mgmt.rs index 24f4bf4f..26234388 100644 --- a/src/runtime/src/mgmt.rs +++ b/src/runtime/src/mgmt.rs @@ -1,17 +1,23 @@ use futures::{future::poll_fn, task::Poll}; use libasync::{smoltcp::TcpStream, task}; use libboard_zynq::smoltcp; -use log::{self, info, warn}; +use core::cell::RefCell; +use alloc::rc::Rc; +use log::{self, info, warn, LevelFilter}; use crate::logger::{BufferLogger, LogBufferRef}; use crate::proto_async; use crate::proto_mgmt::*; -async fn get_logger_buffer() -> LogBufferRef<'static> { + +async fn get_logger_buffer_pred(f: F) -> LogBufferRef<'static> +where + F: Fn(&LogBufferRef) -> bool, +{ poll_fn(|ctx| { let logger = unsafe { BufferLogger::get_logger().as_mut().unwrap() }; match logger.buffer() { - Ok(buffer) => Poll::Ready(buffer), + Ok(buffer) if f(&buffer) => Poll::Ready(buffer), _ => { ctx.waker().wake_by_ref(); Poll::Pending @@ -21,7 +27,11 @@ async fn get_logger_buffer() -> LogBufferRef<'static> { .await } -async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> { +async fn get_logger_buffer() -> LogBufferRef<'static> { + get_logger_buffer_pred(|_| true).await +} + +async fn handle_connection(stream: &mut TcpStream, pull_id: Rc>) -> Result<(), Error> { Request::read_magic(stream).await?; loop { @@ -39,13 +49,33 @@ async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> { buffer.clear(); Reply::Success.write_to(stream).await?; } - Request::PullLog => loop { - let mut buffer = get_logger_buffer().await; - if buffer.is_empty() { - continue; + Request::PullLog => { + let id = { + let mut guard = pull_id.borrow_mut(); + *guard += 1; + *guard + }; + loop { + let mut buffer = get_logger_buffer_pred(|b| !b.is_empty()).await; + let bytes = buffer.extract().as_bytes(); + if id != *pull_id.borrow() { + // another connection attempts to pull the log... + // abort this connection... + break; + } + proto_async::write_chunk(stream, bytes).await?; + if log::max_level() == LevelFilter::Trace { + // Hold exclusive access over the logger until we get positive + // acknowledgement; otherwise we get an infinite loop of network + // trace messages being transmitted and causing more network + // trace messages to be emitted. + // + // Any messages unrelated to this management socket that arrive + // while it is flushed are lost, but such is life. + stream.flush().await?; + } + buffer.clear(); } - proto_async::write_chunk(stream, buffer.extract().as_bytes()).await?; - buffer.clear(); }, Request::SetLogFilter(lvl) => { info!("Changing log level to {}", lvl); @@ -61,17 +91,19 @@ async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> { .set_uart_log_level(lvl); } Reply::Success.write_to(stream).await?; - }, + } } } } pub fn start() { task::spawn(async move { + let pull_id = Rc::new(RefCell::new(0u32)); loop { let mut stream = TcpStream::accept(1380, 2048, 2048).await.unwrap(); + let pull_id = pull_id.clone(); task::spawn(async move { - let _ = handle_connection(&mut stream) + let _ = handle_connection(&mut stream, pull_id) .await .map_err(|e| warn!("connection terminated: {:?}", e)); let _ = stream.flush().await;