diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index e8a00e0..67950e8 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -805,82 +805,59 @@ pub fn main(timer: GlobalTimer, cfg: Config) { let restart_idle = Rc::new(Semaphore::new(0, 1)); mgmt::start(cfg.clone(), restart_idle.clone()); - let semaphores = Rc::new( - (Semaphore::new(0, 1), - Semaphore::new(1, 1), - Semaphore::new(1, 1), - Semaphore::new(1, 1))); - - // handle connects + let terminate = Rc::new(Semaphore::new(0, 1)); + let connection = Rc::new(Semaphore::new(1, 1)); + let finish = Rc::new(Semaphore::new(1, 1)); + let start = Rc::new(Semaphore::new(1, 1)); // run idle kernel once without needing restart + // turn the above into a control obj + // + let stream_refcell = Rc::new(RefCell::new(None)); + task::spawn(async move { loop { clone_mult!(semaphores, control, up_destinations, aux_mutex, routing_table); - semaphores.0.async_wait().await; - semaphores.1.try_wait(); - let _ = task_lock.async_lock().await; - select_biased! { - _ = (async { - // run connection - let mut stream = stream_opt.borrow_mut()?; - let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) - .await - .map_err(|e| warn!("connection terminated: {}", e)); - }).fuse() => { - semaphores.2.signal(); - }, - _ = semaphores.1.async_wait().fuse() => (), - } - } - }); - - // handle idle - task::spawn(async move { - loop { - clone_mult!(semaphores, control, up_destinations, aux_mutex, routing_table); - semaphores.2.async_wait().await; - semaphores.3.try_wait(); - let _ = task_lock.async_lock().await; + start.async_wait().await; + let stream_opt = stream_refcell.borrow_mut(); + let _ = connection.try_wait(); + let _ = finish.try_wait(); select_biased! { _ = (async { + if let Some(&mut stream) = stream_opt { + let _ = handle_connection(&mut stream, control.clone(), &up_destinations, &aux_mutex, &routing_table, timer) + .await + .map_err(|e| warn!("connection terminated: {}", e)); + } + connection.signal(); if let Some(buffer) = cfg.read("idle_kernel").ok() { load_and_run_idle_kernel(&buffer, &control, &up_destinations, &aux_mutex, &routing_table, timer).await; } - }).fuse() => (), - _ = semaphores.3.async_wait().fuse() => (), + }).fuse() => (), // TODO: on clean exit with existing idle -> run again? + _ = terminate.async_wait().fuse() => (), } + if let Some(&stream) = stream_opt { + stream.flush().await(); + stream.abort().await(); + } + finish.signal(); } }); task::spawn(async move { loop { - select_biased! { + clone_mult!(semaphores, stream_refcell); + let stream_opt = select_biased! { + temp_s = (async { + Some(Rc::new(TcpStream::accept(1381, 0x10_000, 0x10_000).await.unwrap())) + }).fuse() => temp_s, _ = (async { - let temp_s = Some(TcpStream::accept(1381, 0x10_000, 0x10_000).await.unwrap()); // ALT: implement dummy - // tcp stream that is empty (another reason the priv construct is annoying) - if let Some(&stream) = stream_opt { - let _ = stream.flush().await; - let _ = stream.abort().await; - } - stream_opt.replace(temp_s); - }).fuse() => { - if semaphore.1.try_wait().is_none() { - semaphore.1.signal(); // STOP KERNEL - } - if semaphore.3.try_wait().is_none() { - semaphore.3.signal(); // STOP IDLE - } - // if the other stream exists -> flush it and replace with the new stream - semaphore.0.signal(); // NEW KERNEL - } - _ = (async { - restart_idle.wait_async().await; - }).fuse() => { - if semaphore.3.try_wait().is_none() { - semaphore.3.signal(); // STOP IDLE - semaphore.2.signal(); // NEW IDLE - } - } - } + restart_idle.async_wait().await; + connection.async_wait().await; + }).fuse() => None + }; + terminate.signal(); + finish.async_wait().await; + stream_refcell.replace(stream_opt); + start.signal(); } });