diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index d02d974..01aa923 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -722,49 +722,71 @@ pub fn main(timer: GlobalTimer, cfg: Config) { mgmt::start(cfg); task::spawn(async move { - let connection = Rc::new(Semaphore::new(1, 1)); - let terminate = Rc::new(Semaphore::new(0, 1)); + let enter_idle = Rc::new(Semaphore::new(1, 1)); + let term_idle = Rc::new(Semaphore::new(0, 1)); + let enter_conn = Rc::new(Semaphore::new(0, 1)); + let term_conn = Rc::new(Semaphore::new(0, 1)); loop { - let mut stream = TcpStream::accept(1381, 0x10_000, 0x10_000).await.unwrap(); - - if connection.try_wait().is_none() { - // there is an existing connection - terminate.signal(); - connection.async_wait().await; - } - - let control = control.clone(); - let idle_kernel = idle_kernel.clone(); - let connection = connection.clone(); - let terminate = terminate.clone(); - let up_destinations = up_destinations.clone(); - let aux_mutex = aux_mutex.clone(); - let routing_table = drtio_routing_table.clone(); - - // we make sure the value of terminate is 0 before we start - let _ = terminate.try_wait(); - task::spawn(async move { - let routing_table = routing_table.borrow(); - select_biased! { - _ = (async { - let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) - .await - .map_err(|e| warn!("connection terminated: {}", e)); - if let Some(buffer) = &*idle_kernel { + task::spawn({ + let control = control.clone(); + let idle_kernel = idle_kernel.clone(); + let up_destinations = up_destinations.clone(); + let aux_mutex = aux_mutex.clone(); + let drtio_routing_table = drtio_routing_table.clone(); + let enter_idle = enter_idle.clone(); + let term_idle = term_idle.clone(); + let enter_conn = enter_conn.clone(); + async move { + enter_idle.async_wait().await; + let routing_table = drtio_routing_table.borrow(); + select_biased! { + _ = (async { info!("Loading idle kernel"); - let _ = load_kernel(&buffer, &control, None) - .await.map_err(|_| warn!("error loading idle kernel")); - info!("Running idle kernel"); - let _ = handle_run_kernel(None, &control, &up_destinations, &aux_mutex, &routing_table, timer) - .await.map_err(|_| warn!("error running idle kernel")); - info!("Idle kernel terminated"); - } - }).fuse() => (), - _ = terminate.async_wait().fuse() => () + if let Some(buffer) = &*idle_kernel { + let _ = load_kernel(&buffer, &control, None) + .await.map_err(|_| warn!("error loading idle kernel")); + info!("Running idle kernel"); + let _ = handle_run_kernel(None, &control, &up_destinations, &aux_mutex, &routing_table, timer) + .await.map_err(|_| warn!("error running idle kernel")); + info!("Idle kernel terminated"); + } else { + info!("No idle kernel found"); + } + }).fuse() => (), + _ = term_idle.async_wait().fuse() => () + } + enter_conn.signal() + } + }); + + let mut stream = TcpStream::accept(1381, 0x10_000, 0x10_000).await.unwrap(); + term_conn.signal(); + term_idle.signal(); + enter_conn.async_wait().await; + let _ = term_conn.try_wait(); + let _ = term_idle.try_wait(); + + task::spawn({ + let control = control.clone(); + let up_destinations = up_destinations.clone(); + let aux_mutex = aux_mutex.clone(); + let drtio_routing_table = drtio_routing_table.clone(); + let term_conn = term_conn.clone(); + let enter_idle = enter_idle.clone(); + async move { + let routing_table = drtio_routing_table.borrow(); + select_biased! { + _ = (async { + let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) + .await + .map_err(|e| warn!("connection terminated: {}", e)); + }).fuse() => (), + _ = term_conn.async_wait().fuse() => () + } + enter_idle.signal(); + let _ = stream.flush().await; + let _ = stream.abort().await; } - connection.signal(); - let _ = stream.flush().await; - let _ = stream.abort().await; }); } });