mgmt: fixed pull log

core0-buffer
pca006132 2020-07-15 16:05:00 +08:00
parent 5149d37be9
commit 2568d62865
1 changed files with 44 additions and 12 deletions

View File

@ -1,17 +1,23 @@
use futures::{future::poll_fn, task::Poll}; use futures::{future::poll_fn, task::Poll};
use libasync::{smoltcp::TcpStream, task}; use libasync::{smoltcp::TcpStream, task};
use libboard_zynq::smoltcp; use libboard_zynq::smoltcp;
use log::{self, info, warn}; use core::cell::RefCell;
use alloc::rc::Rc;
use log::{self, info, warn, LevelFilter};
use crate::logger::{BufferLogger, LogBufferRef}; use crate::logger::{BufferLogger, LogBufferRef};
use crate::proto_async; use crate::proto_async;
use crate::proto_mgmt::*; use crate::proto_mgmt::*;
async fn get_logger_buffer() -> LogBufferRef<'static> {
async fn get_logger_buffer_pred<F>(f: F) -> LogBufferRef<'static>
where
F: Fn(&LogBufferRef) -> bool,
{
poll_fn(|ctx| { poll_fn(|ctx| {
let logger = unsafe { BufferLogger::get_logger().as_mut().unwrap() }; let logger = unsafe { BufferLogger::get_logger().as_mut().unwrap() };
match logger.buffer() { match logger.buffer() {
Ok(buffer) => Poll::Ready(buffer), Ok(buffer) if f(&buffer) => Poll::Ready(buffer),
_ => { _ => {
ctx.waker().wake_by_ref(); ctx.waker().wake_by_ref();
Poll::Pending Poll::Pending
@ -21,7 +27,11 @@ async fn get_logger_buffer() -> LogBufferRef<'static> {
.await .await
} }
async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> { async fn get_logger_buffer() -> LogBufferRef<'static> {
get_logger_buffer_pred(|_| true).await
}
async fn handle_connection(stream: &mut TcpStream, pull_id: Rc<RefCell<u32>>) -> Result<(), Error> {
Request::read_magic(stream).await?; Request::read_magic(stream).await?;
loop { loop {
@ -39,13 +49,33 @@ async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> {
buffer.clear(); buffer.clear();
Reply::Success.write_to(stream).await?; Reply::Success.write_to(stream).await?;
} }
Request::PullLog => loop { Request::PullLog => {
let mut buffer = get_logger_buffer().await; let id = {
if buffer.is_empty() { let mut guard = pull_id.borrow_mut();
continue; *guard += 1;
*guard
};
loop {
let mut buffer = get_logger_buffer_pred(|b| !b.is_empty()).await;
let bytes = buffer.extract().as_bytes();
if id != *pull_id.borrow() {
// another connection attempts to pull the log...
// abort this connection...
break;
}
proto_async::write_chunk(stream, bytes).await?;
if log::max_level() == LevelFilter::Trace {
// Hold exclusive access over the logger until we get positive
// acknowledgement; otherwise we get an infinite loop of network
// trace messages being transmitted and causing more network
// trace messages to be emitted.
//
// Any messages unrelated to this management socket that arrive
// while it is flushed are lost, but such is life.
stream.flush().await?;
}
buffer.clear();
} }
proto_async::write_chunk(stream, buffer.extract().as_bytes()).await?;
buffer.clear();
}, },
Request::SetLogFilter(lvl) => { Request::SetLogFilter(lvl) => {
info!("Changing log level to {}", lvl); info!("Changing log level to {}", lvl);
@ -61,17 +91,19 @@ async fn handle_connection(stream: &mut TcpStream) -> Result<(), Error> {
.set_uart_log_level(lvl); .set_uart_log_level(lvl);
} }
Reply::Success.write_to(stream).await?; Reply::Success.write_to(stream).await?;
}, }
} }
} }
} }
pub fn start() { pub fn start() {
task::spawn(async move { task::spawn(async move {
let pull_id = Rc::new(RefCell::new(0u32));
loop { loop {
let mut stream = TcpStream::accept(1380, 2048, 2048).await.unwrap(); let mut stream = TcpStream::accept(1380, 2048, 2048).await.unwrap();
let pull_id = pull_id.clone();
task::spawn(async move { task::spawn(async move {
let _ = handle_connection(&mut stream) let _ = handle_connection(&mut stream, pull_id)
.await .await
.map_err(|e| warn!("connection terminated: {:?}", e)); .map_err(|e| warn!("connection terminated: {:?}", e));
let _ = stream.flush().await; let _ = stream.flush().await;