synchronization primitives #82

Merged
sb10q merged 3 commits from pca006132/artiq-zynq:master into master 2020-08-04 14:40:03 +08:00
Showing only changes of commit f812517f6f - Show all commits

View File

@ -18,7 +18,7 @@ use libboard_zynq::{
}, },
timer::GlobalTimer, timer::GlobalTimer,
}; };
use libcortex_a9::sync_channel; use libcortex_a9::semaphore::Semaphore;
use futures::{select_biased, future::FutureExt}; use futures::{select_biased, future::FutureExt};
use libasync::{smoltcp::{Sockets, TcpStream}, task}; use libasync::{smoltcp::{Sockets, TcpStream}, task};
@ -353,30 +353,25 @@ pub fn main(timer: GlobalTimer, cfg: &config::Config) {
} }
task::spawn(async move { task::spawn(async move {
let (tx, rx) = sync_channel!(u32, 1); let connection = Rc::new(Semaphore::new(1, 1));
let tx = RefCell::new(tx); let terminate = Rc::new(Semaphore::new(0, 1));
let rx = Rc::new(RefCell::new(rx));
let has_connection = Rc::new(RefCell::new(false));
loop { loop {
let stream = TcpStream::accept(1381, 2048, 2048).await.unwrap(); let stream = TcpStream::accept(1381, 2048, 2048).await.unwrap();
let has_connection = has_connection.clone();
if *has_connection.borrow() { if connection.try_wait().is_none() {
let mut tx = tx.borrow_mut(); // there is an existing connection
tx.async_send(42).await; terminate.signal();
// the second send is used to block until another connection received the abort connection.async_wait().await;
// request.
tx.async_send(42).await;
} }
*has_connection.borrow_mut() = true;
let control = control.clone(); let control = control.clone();
let idle_kernel = idle_kernel.clone(); let idle_kernel = idle_kernel.clone();
let _ = rx.borrow_mut().try_recv(); let connection = connection.clone();
let terminate = terminate.clone();
let new_rx = rx.clone(); // we make sure the value of terminate is 0 before we start
let _ = terminate.try_wait();
task::spawn(async move { task::spawn(async move {
let mut new_rx = new_rx.borrow_mut();
select_biased! { select_biased! {
_ = (async { _ = (async {
let _ = handle_connection(&stream, control.clone()) let _ = handle_connection(&stream, control.clone())
@ -392,14 +387,9 @@ pub fn main(timer: GlobalTimer, cfg: &config::Config) {
info!("Idle kernel terminated"); info!("Idle kernel terminated");
} }
}).fuse() => (), }).fuse() => (),
_ = new_rx.async_recv().fuse() => () _ = terminate.async_wait().fuse() => ()
} }
*has_connection.borrow_mut() = false; connection.signal();
// it is possible that when `handle_connection` is terminating,
// another connection sent an abort request and get blocked,
// so we try_recv here to unblock in that case.
let _ = new_rx.try_recv();
core::mem::drop(new_rx);
let _ = stream.flush().await; let _ = stream.flush().await;
let _ = stream.abort().await; let _ = stream.abort().await;
}); });