diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index d02d974..78021d0 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -722,47 +722,72 @@ pub fn main(timer: GlobalTimer, cfg: Config) { mgmt::start(cfg); task::spawn(async move { - let connection = Rc::new(Semaphore::new(1, 1)); - let terminate = Rc::new(Semaphore::new(0, 1)); + let enter_idle = Rc::new(Semaphore::new(1, 1)); + let term_idle = Rc::new(Semaphore::new(0, 1)); + let enter_conn = Rc::new(Semaphore::new(0, 1)); + let term_conn = Rc::new(Semaphore::new(0, 1)); loop { - let mut stream = TcpStream::accept(1381, 0x10_000, 0x10_000).await.unwrap(); - if connection.try_wait().is_none() { - // there is an existing connection - terminate.signal(); - connection.async_wait().await; - } + // start idle kernel first + let control_cop = control.clone(); + let idle_kernel_clone = idle_kernel.clone(); - let control = control.clone(); - let idle_kernel = idle_kernel.clone(); - let connection = connection.clone(); - let terminate = terminate.clone(); - let up_destinations = up_destinations.clone(); - let aux_mutex = aux_mutex.clone(); - let routing_table = drtio_routing_table.clone(); - - // we make sure the value of terminate is 0 before we start - let _ = terminate.try_wait(); + let enter_idle_clone = enter_idle.clone(); + let term_idle_clone = term_idle.clone(); + let enter_conn_clone = enter_conn.clone(); + + let up_destinations_clone = up_destinations.clone(); + let aux_mutex_clone = aux_mutex.clone(); + let routing_table_clone = drtio_routing_table.clone(); task::spawn(async move { - let routing_table = routing_table.borrow(); + enter_idle_clone.async_wait().await; + let routing_table = routing_table_clone.borrow(); select_biased! { _ = (async { - let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) - .await - .map_err(|e| warn!("connection terminated: {}", e)); - if let Some(buffer) = &*idle_kernel { - info!("Loading idle kernel"); - let _ = load_kernel(&buffer, &control, None) + info!("Loading idle kernel"); + if let Some(buffer) = &*idle_kernel_clone { + let _ = load_kernel(&buffer, &control_clone, None) .await.map_err(|_| warn!("error loading idle kernel")); info!("Running idle kernel"); - let _ = handle_run_kernel(None, &control, &up_destinations, &aux_mutex, &routing_table, timer) + let _ = handle_run_kernel(None, &control_clone, &up_destinations_clone, &aux_mutex_clone, &routing_table, timer) .await.map_err(|_| warn!("error running idle kernel")); info!("Idle kernel terminated"); + } else { + info!("No idle kernel found"); } }).fuse() => (), - _ = terminate.async_wait().fuse() => () + _ = term_idle_clone.async_wait().fuse() => () } - connection.signal(); + enter_conn_clone.signal() + }); + + let mut stream = TcpStream::accept(1381, 0x10_000, 0x10_000).await.unwrap(); + + term_conn.signal(); + term_idle.signal(); + enter_conn.async_wait().await; + let _ = term_conn.try_wait(); + let _ = term_idle.try_wait(); + + let control_clone = control.clone(); + let enter_idle_clone = enter_idle.clone(); + let term_conn_clone = enter_conn.clone(); + + let up_destinations_clone = up_destinations.clone(); + let aux_mutex_clone = aux_mutex.clone(); + let routing_table_clone = drtio_routing_table.clone(); + + task::spawn(async move { + let routing_table = routing_table_clone.borrow(); + select_biased! { + _ = (async { + let _ = handle_connection(&mut stream, control_clone.clone(), &up_destinations_clone, &aux_mutex_clone, &routing_table, timer) + .await + .map_err(|e| warn!("connection terminated: {}", e)); + }).fuse() => (), + _ = term_conn_clone.async_wait().fuse() => () + } + enter_idle_clone.signal(); let _ = stream.flush().await; let _ = stream.abort().await; });