From 56e7cc822c4d094029e5bb87e7e5fc5fc29e6dc4 Mon Sep 17 00:00:00 2001 From: pca006132 Date: Tue, 4 Aug 2020 10:31:03 +0800 Subject: [PATCH] runtime/comms: limited concurrent connections to 1 only. --- src/runtime/src/comms.rs | 60 ++++++++++++++++++++++++++++++++++------ 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 43406566..295ffb10 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -18,6 +18,8 @@ use libboard_zynq::{ }, timer::GlobalTimer, }; +use libcortex_a9::sync_channel; +use futures::{select_biased, future::FutureExt}; use libasync::{smoltcp::{Sockets, TcpStream}, task}; use crate::config; @@ -230,10 +232,10 @@ async fn handle_run_kernel(stream: Option<&TcpStream>, control: &Rc, control: &Rc>, stream: Option<&TcpStream>) -> Result<()> { +async fn load_kernel(buffer: &Vec, control: &Rc>, stream: Option<&TcpStream>) -> Result<()> { let mut control = control.borrow_mut(); control.restart(); - control.tx.async_send(kernel::Message::LoadRequest(buffer)).await; + control.tx.async_send(kernel::Message::LoadRequest(buffer.to_vec())).await; let reply = control.rx.async_recv().await; match reply { kernel::Message::LoadCompleted => { @@ -279,7 +281,7 @@ async fn handle_connection(stream: &TcpStream, control: Rc { let buffer = read_bytes(stream, 1024*1024).await?; - load_kernel(buffer, &control, Some(stream)).await?; + load_kernel(&buffer, &control, Some(stream)).await?; }, Request::RunKernel => { handle_run_kernel(Some(stream), &control).await?; @@ -338,9 +340,10 @@ pub fn main(timer: GlobalTimer, cfg: &config::Config) { moninj::start(timer); let control: Rc> = Rc::new(RefCell::new(kernel::Control::start())); + let idle_kernel = Rc::new(cfg.read("idle").ok()); if let Ok(buffer) = cfg.read("startup") { info!("Loading startup kernel..."); - if let Ok(()) = task::block_on(load_kernel(buffer, &control, None)) { + if let Ok(()) = task::block_on(load_kernel(&buffer, &control, None)) { info!("Starting startup kernel..."); let _ = task::block_on(handle_run_kernel(None, &control)); info!("Startup kernel finished!"); @@ -350,14 +353,53 @@ pub fn main(timer: GlobalTimer, cfg: &config::Config) { } task::spawn(async move { + let (tx, rx) = sync_channel!(u32, 1); + let tx = RefCell::new(tx); + let rx = Rc::new(RefCell::new(rx)); + let has_connection = Rc::new(RefCell::new(false)); loop { let stream = TcpStream::accept(1381, 2048, 2048).await.unwrap(); + let has_connection = has_connection.clone(); + + if *has_connection.borrow() { + let mut tx = tx.borrow_mut(); + tx.async_send(42).await; + // the second send is used to block until another connection received the abort + // request. + tx.async_send(42).await; + } + + *has_connection.borrow_mut() = true; let control = control.clone(); - task::spawn(async { - info!("received connection"); - let _ = handle_connection(&stream, control) - .await - .map_err(|e| warn!("connection terminated: {}", e)); + let idle_kernel = idle_kernel.clone(); + let _ = rx.borrow_mut().try_recv(); + + let new_rx = rx.clone(); + task::spawn(async move { + let mut new_rx = new_rx.borrow_mut(); + select_biased! { + _ = (async { + let _ = handle_connection(&stream, control.clone()) + .await + .map_err(|e| warn!("connection terminated: {}", e)); + if let Some(buffer) = &*idle_kernel { + info!("Loading idle kernel"); + let _ = load_kernel(&buffer, &control, None) + .await.map_err(|e| warn!("error loading idle kernel")); + info!("Running idle kernel"); + let _ = handle_run_kernel(None, &control) + .await.map_err(|e| warn!("error running idle kernel")); + info!("Idle kernel terminated"); + } + }).fuse() => (), + _ = new_rx.async_recv().fuse() => () + } + *has_connection.borrow_mut() = false; + // it is possible that when `handle_connection` is terminating, + // another connection sent an abort request and get blocked, + // so we try_recv here to unblock in that case. + let _ = new_rx.try_recv(); + core::mem::drop(new_rx); let _ = stream.flush().await; let _ = stream.abort().await; });