From 7c2cc51ae3d493e97fbbe8d03062accbf5d0ee71 Mon Sep 17 00:00:00 2001 From: Simon Renblad Date: Thu, 5 Sep 2024 17:20:20 +0800 Subject: [PATCH] changing to semaphore heavy solution --- src/runtime/src/comms.rs | 213 ++++++++++++++++----------------------- 1 file changed, 85 insertions(+), 128 deletions(-) diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 0da9d9f..e8a00e0 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -4,9 +4,7 @@ use core::{cell::RefCell, fmt, slice, str}; use core_io::Error as IoError; use cslice::CSlice; use dyld::elf; -use futures::{future::FutureExt, select_biased}; -#[cfg(has_drtio)] -use io::Cursor; +use futures::{future::FutureExt, select_biased}; #[cfg(has_drtio)] use io::Cursor; #[cfg(has_drtio)] use ksupport::rpc; use ksupport::{kernel, resolve_channel_name}; @@ -806,136 +804,83 @@ pub fn main(timer: GlobalTimer, cfg: Config) { let cfg = Rc::new(cfg); let restart_idle = Rc::new(Semaphore::new(0, 1)); mgmt::start(cfg.clone(), restart_idle.clone()); + + let semaphores = Rc::new( + (Semaphore::new(0, 1), + Semaphore::new(1, 1), + Semaphore::new(1, 1), + Semaphore::new(1, 1))); + + // handle connects + task::spawn(async move { + loop { + clone_mult!(semaphores, control, up_destinations, aux_mutex, routing_table); + semaphores.0.async_wait().await; + semaphores.1.try_wait(); + let _ = task_lock.async_lock().await; + select_biased! { + _ = (async { + // run connection + let mut stream = stream_opt.borrow_mut()?; + let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) + .await + .map_err(|e| warn!("connection terminated: {}", e)); + }).fuse() => { + semaphores.2.signal(); + }, + _ = semaphores.1.async_wait().fuse() => (), + } + } + }); + + // handle idle + task::spawn(async move { + loop { + clone_mult!(semaphores, control, up_destinations, aux_mutex, routing_table); + semaphores.2.async_wait().await; + semaphores.3.try_wait(); + let _ = task_lock.async_lock().await; + select_biased! { + _ = (async { + if let Some(buffer) = cfg.read("idle_kernel").ok() { + load_and_run_idle_kernel(&buffer, &control, &up_destinations, &aux_mutex, &routing_table, timer).await; + } + }).fuse() => (), + _ = semaphores.3.async_wait().fuse() => (), + } + } + }); task::spawn(async move { - let terminate = Rc::new(Semaphore::new(0, 1)); - let running = Rc::new(Mutex::new(false)); - let repeat_lock = Rc::new(Mutex::new(false)); - let restart_idle = restart_idle.clone(); - let cfg = cfg.clone(); - let control = control.clone(); - let up_destinations = up_destinations.clone(); - let aux_mutex = aux_mutex.clone(); - let routing_table = drtio_routing_table.clone(); - - // copy same idea as artiq mainline - // loop - // check if socket.is_active() -> accept and run kernel task - // check if task is terminated -> run idle -> terminate idle on restart_idle - // yield and reloop (annoying thing is if async behaves as expected) - - // start listening on a socket and return the socket - let listener = TcpListener::new(10_000); - listener.listen(1381); - loop { - if listener.can_accept() { // ie can_accept (socket in state SYN-RECEIVED) - let mut stream = listener.accept().await; - if connection.try_wait().is_none() { - // there is an existing connection - terminate.signal(); - connection.async_wait().await; - } - - let _ = terminate.try_wait(); - task::spawn(async move { - select_biased! { - _ = (async { - - }) => (), - _ = terminate.async_wait().fuse() => () - } - let _ = stream.flush().await; - let _ = stream.abort().await; - connection.signal(); - }); - } - - if connection.try_wait().is_some() { - let _ = terminate.try_wait(); // ensure terminate and restart_idle are always set - let _ = restart_idle.try_wait(); - task::spawn(async move { - select_biased! { - _ = (async { - // needs to ensure that if there is no idle_kernel, enter idle loop and - // keep yielding - }) => (), - _ = terminate.async_wait().fuse() => (), - _ = async { - restart_idle.async_wait().fuse(), - } - } - connection.signal(); - }); - } - - // yield and rerun - task::r#yield().await; - } - - select_biased! { - _ = (async { - loop { - let cfg = cfg.clone(); - let control = control.clone(); - let up_destinations = up_destinations.clone(); - let aux_mutex = aux_mutex.clone(); - let routing_table = drtio_routing_table.clone(); - let terminate = terminate.clone(); - let running = running.clone(); - let repeat_lock = repeat_lock.clone(); - - let mut stream = TcpStream::accept(1381, 0x10_000, 0x10_000).await.unwrap(); - task::spawn(async move { - let _rep_lock = repeat_lock.async_lock().await; - terminate.signal(); - let _lock = running.async_lock().await; - drop(_rep_lock); - let routing_table = routing_table.borrow(); - select_biased! { - _ = (async { - let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) - .await - .map_err(|e| warn!("connection terminated: {}", e)); - }).fuse() => (), - _ = terminate.async_wait().fuse() => () - } - drop(_lock); - let _ = stream.flush().await; - let _ = stream.abort().await; - }); - } - }).fuse() => (), - _ = (async { - loop { - let cfg = cfg.clone(); - let control = control.clone(); - let up_destinations = up_destinations.clone(); - let aux_mutex = aux_mutex.clone(); - let routing_table = drtio_routing_table.clone(); - let terminate = terminate.clone(); - let running = running.clone(); - let repeat_lock = repeat_lock.clone(); - let restart_idle = restart_idle.clone(); - - let _rep_lock = repeat_lock.async_lock().await; - drop(_rep_lock); - let _lock = running.async_lock().await; - let routing_table = routing_table.borrow(); - select_biased! { - _ = (async { - if let Some(buffer) = cfg.read("idle_kernel").ok() { - load_and_run_idle_kernel(&buffer, &control, &up_destinations, &aux_mutex, &routing_table, timer).await; - } - }).fuse() => (), - _ = (async { - restart_idle.0.async_wait().await; - restart_idle.1.signal(); - }).fuse() => (), + select_biased! { + _ = (async { + let temp_s = Some(TcpStream::accept(1381, 0x10_000, 0x10_000).await.unwrap()); // ALT: implement dummy + // tcp stream that is empty (another reason the priv construct is annoying) + if let Some(&stream) = stream_opt { + let _ = stream.flush().await; + let _ = stream.abort().await; } - drop(_lock); + stream_opt.replace(temp_s); + }).fuse() => { + if semaphore.1.try_wait().is_none() { + semaphore.1.signal(); // STOP KERNEL + } + if semaphore.3.try_wait().is_none() { + semaphore.3.signal(); // STOP IDLE + } + // if the other stream exists -> flush it and replace with the new stream + semaphore.0.signal(); // NEW KERNEL } - }).fuse() => (), + _ = (async { + restart_idle.wait_async().await; + }).fuse() => { + if semaphore.3.try_wait().is_none() { + semaphore.3.signal(); // STOP IDLE + semaphore.2.signal(); // NEW IDLE + } + } + } } }); @@ -996,3 +941,15 @@ pub fn soft_panic_main(timer: GlobalTimer, cfg: Config) -> ! { Sockets::run(&mut iface, || Instant::from_millis(timer.get_time().0 as i32)); } + +macro_rules! clone_mult { + ($id:ident) => { + let $id = $id.clone(); + }; + + // Decompose multiple `eval`s recursively + ($id:ident, $($ids:ident),+) => { + clone_mult!($id) + clone_mult!($($ids),+) + }; +}