diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 0e21c98..dc730e4 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -803,70 +803,51 @@ pub fn main(timer: GlobalTimer, cfg: Config) { error!("Error loading startup kernel!"); } } - - mgmt::start(cfg); + let cfg = Rc::new(cfg); + mgmt::start(cfg.clone()); task::spawn(async move { - let connection = Rc::new(Semaphore::new(0, 1)); let terminate = Rc::new(Semaphore::new(0, 1)); - { - let control = control.clone(); - let idle_kernel = idle_kernel.clone(); - let connection = connection.clone(); - let terminate = terminate.clone(); - let up_destinations = up_destinations.clone(); - let aux_mutex = aux_mutex.clone(); - let routing_table = drtio_routing_table.clone(); - task::spawn(async move { - let routing_table = routing_table.borrow(); - select_biased! { - _ = (async { - if let Some(buffer) = &*idle_kernel { - load_and_run_idle_kernel(&buffer, &control, &up_destinations, &aux_mutex, &routing_table, timer).await; + let running = Rc::new(Mutex::new(false)); + let cfg = cfg.clone(); + + select_biased! { + _ = (async { + loop { + let mut stream = TcpStream::accept(1381, 0x10_000, 0x10_000).await.unwrap(); + terminate.signal(); + task::spawn(async move { + let _lock = running.lock(); + let routing_table = routing_table.borrow(); + select_biased! { + _ = (async { + let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) + .await + .map_err(|e| warn!("connection terminated: {}", e)); + }).fuse() => (), + _ = terminate.async_wait().fuse() => () + } + let _ = stream.flush().await; + let _ = stream.abort().await; + drop(_lock); + }); + } + }).fuse() => (), + _ = (async { + loop { + let _lock = running.lock(); + select_biased! { + _ = (async { + if let Some(buffer) = cfg.read("idle_kernel").ok() { + load_and_run_idle_kernel(&buffer, &control, &up_destinations, &aux_mutex, &routing_table, timer).await; + } + }).fuse() => (), + _ = terminate.async_wait().fuse() => (), + _ = update_idle.async_wait().fuse() => () } - }).fuse() => (), - _ = terminate.async_wait().fuse() => () - } - connection.signal(); - }); - } - - loop { - let mut stream = TcpStream::accept(1381, 0x10_000, 0x10_000).await.unwrap(); - - if connection.try_wait().is_none() { - // there is an existing connection - terminate.signal(); - connection.async_wait().await; - } - - let control = control.clone(); - let idle_kernel = idle_kernel.clone(); - let connection = connection.clone(); - let terminate = terminate.clone(); - let up_destinations = up_destinations.clone(); - let aux_mutex = aux_mutex.clone(); - let routing_table = drtio_routing_table.clone(); - - // we make sure the value of terminate is 0 before we start - let _ = terminate.try_wait(); - task::spawn(async move { - let routing_table = routing_table.borrow(); - select_biased! { - _ = (async { - let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) - .await - .map_err(|e| warn!("connection terminated: {}", e)); - if let Some(buffer) = &*idle_kernel { - load_and_run_idle_kernel(&buffer, &control, &up_destinations, &aux_mutex, &routing_table, timer).await; - } - }).fuse() => (), - _ = terminate.async_wait().fuse() => () - } - connection.signal(); - let _ = stream.flush().await; - let _ = stream.abort().await; - }); + drop(_lock); + } + }).fuse() => (), } }); diff --git a/src/runtime/src/mgmt.rs b/src/runtime/src/mgmt.rs index bfeed9b..77113ed 100644 --- a/src/runtime/src/mgmt.rs +++ b/src/runtime/src/mgmt.rs @@ -229,10 +229,9 @@ async fn handle_connection(stream: &mut TcpStream, pull_id: Rc>, cf } } -pub fn start(cfg: Config) { +pub fn start(cfg: Rc) { task::spawn(async move { let pull_id = Rc::new(RefCell::new(0u32)); - let cfg = Rc::new(cfg); loop { let mut stream = TcpStream::accept(1380, 2048, 2048).await.unwrap(); let pull_id = pull_id.clone();