forked from M-Labs/artiq-zynq
runtime/comms: limited concurrent connections to 1 only.
This commit is contained in:
parent
d58a3ef12c
commit
56e7cc822c
@ -18,6 +18,8 @@ use libboard_zynq::{
|
|||||||
},
|
},
|
||||||
timer::GlobalTimer,
|
timer::GlobalTimer,
|
||||||
};
|
};
|
||||||
|
use libcortex_a9::sync_channel;
|
||||||
|
use futures::{select_biased, future::FutureExt};
|
||||||
use libasync::{smoltcp::{Sockets, TcpStream}, task};
|
use libasync::{smoltcp::{Sockets, TcpStream}, task};
|
||||||
|
|
||||||
use crate::config;
|
use crate::config;
|
||||||
@ -230,10 +232,10 @@ async fn handle_run_kernel(stream: Option<&TcpStream>, control: &Rc<RefCell<kern
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async fn load_kernel(buffer: Vec<u8>, control: &Rc<RefCell<kernel::Control>>, stream: Option<&TcpStream>) -> Result<()> {
|
async fn load_kernel(buffer: &Vec<u8>, control: &Rc<RefCell<kernel::Control>>, stream: Option<&TcpStream>) -> Result<()> {
|
||||||
let mut control = control.borrow_mut();
|
let mut control = control.borrow_mut();
|
||||||
control.restart();
|
control.restart();
|
||||||
control.tx.async_send(kernel::Message::LoadRequest(buffer)).await;
|
control.tx.async_send(kernel::Message::LoadRequest(buffer.to_vec())).await;
|
||||||
let reply = control.rx.async_recv().await;
|
let reply = control.rx.async_recv().await;
|
||||||
match reply {
|
match reply {
|
||||||
kernel::Message::LoadCompleted => {
|
kernel::Message::LoadCompleted => {
|
||||||
@ -279,7 +281,7 @@ async fn handle_connection(stream: &TcpStream, control: Rc<RefCell<kernel::Contr
|
|||||||
},
|
},
|
||||||
Request::LoadKernel => {
|
Request::LoadKernel => {
|
||||||
let buffer = read_bytes(stream, 1024*1024).await?;
|
let buffer = read_bytes(stream, 1024*1024).await?;
|
||||||
load_kernel(buffer, &control, Some(stream)).await?;
|
load_kernel(&buffer, &control, Some(stream)).await?;
|
||||||
},
|
},
|
||||||
Request::RunKernel => {
|
Request::RunKernel => {
|
||||||
handle_run_kernel(Some(stream), &control).await?;
|
handle_run_kernel(Some(stream), &control).await?;
|
||||||
@ -338,9 +340,10 @@ pub fn main(timer: GlobalTimer, cfg: &config::Config) {
|
|||||||
moninj::start(timer);
|
moninj::start(timer);
|
||||||
|
|
||||||
let control: Rc<RefCell<kernel::Control>> = Rc::new(RefCell::new(kernel::Control::start()));
|
let control: Rc<RefCell<kernel::Control>> = Rc::new(RefCell::new(kernel::Control::start()));
|
||||||
|
let idle_kernel = Rc::new(cfg.read("idle").ok());
|
||||||
if let Ok(buffer) = cfg.read("startup") {
|
if let Ok(buffer) = cfg.read("startup") {
|
||||||
info!("Loading startup kernel...");
|
info!("Loading startup kernel...");
|
||||||
if let Ok(()) = task::block_on(load_kernel(buffer, &control, None)) {
|
if let Ok(()) = task::block_on(load_kernel(&buffer, &control, None)) {
|
||||||
info!("Starting startup kernel...");
|
info!("Starting startup kernel...");
|
||||||
let _ = task::block_on(handle_run_kernel(None, &control));
|
let _ = task::block_on(handle_run_kernel(None, &control));
|
||||||
info!("Startup kernel finished!");
|
info!("Startup kernel finished!");
|
||||||
@ -350,14 +353,53 @@ pub fn main(timer: GlobalTimer, cfg: &config::Config) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
task::spawn(async move {
|
task::spawn(async move {
|
||||||
|
let (tx, rx) = sync_channel!(u32, 1);
|
||||||
|
let tx = RefCell::new(tx);
|
||||||
|
let rx = Rc::new(RefCell::new(rx));
|
||||||
|
let has_connection = Rc::new(RefCell::new(false));
|
||||||
loop {
|
loop {
|
||||||
let stream = TcpStream::accept(1381, 2048, 2048).await.unwrap();
|
let stream = TcpStream::accept(1381, 2048, 2048).await.unwrap();
|
||||||
|
let has_connection = has_connection.clone();
|
||||||
|
|
||||||
|
if *has_connection.borrow() {
|
||||||
|
let mut tx = tx.borrow_mut();
|
||||||
|
tx.async_send(42).await;
|
||||||
|
// the second send is used to block until another connection received the abort
|
||||||
|
// request.
|
||||||
|
tx.async_send(42).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
*has_connection.borrow_mut() = true;
|
||||||
let control = control.clone();
|
let control = control.clone();
|
||||||
task::spawn(async {
|
let idle_kernel = idle_kernel.clone();
|
||||||
info!("received connection");
|
let _ = rx.borrow_mut().try_recv();
|
||||||
let _ = handle_connection(&stream, control)
|
|
||||||
|
let new_rx = rx.clone();
|
||||||
|
task::spawn(async move {
|
||||||
|
let mut new_rx = new_rx.borrow_mut();
|
||||||
|
select_biased! {
|
||||||
|
_ = (async {
|
||||||
|
let _ = handle_connection(&stream, control.clone())
|
||||||
.await
|
.await
|
||||||
.map_err(|e| warn!("connection terminated: {}", e));
|
.map_err(|e| warn!("connection terminated: {}", e));
|
||||||
|
if let Some(buffer) = &*idle_kernel {
|
||||||
|
info!("Loading idle kernel");
|
||||||
|
let _ = load_kernel(&buffer, &control, None)
|
||||||
|
.await.map_err(|e| warn!("error loading idle kernel"));
|
||||||
|
info!("Running idle kernel");
|
||||||
|
let _ = handle_run_kernel(None, &control)
|
||||||
|
.await.map_err(|e| warn!("error running idle kernel"));
|
||||||
|
info!("Idle kernel terminated");
|
||||||
|
}
|
||||||
|
}).fuse() => (),
|
||||||
|
_ = new_rx.async_recv().fuse() => ()
|
||||||
|
}
|
||||||
|
*has_connection.borrow_mut() = false;
|
||||||
|
// it is possible that when `handle_connection` is terminating,
|
||||||
|
// another connection sent an abort request and get blocked,
|
||||||
|
// so we try_recv here to unblock in that case.
|
||||||
|
let _ = new_rx.try_recv();
|
||||||
|
core::mem::drop(new_rx);
|
||||||
let _ = stream.flush().await;
|
let _ = stream.flush().await;
|
||||||
let _ = stream.abort().await;
|
let _ = stream.abort().await;
|
||||||
});
|
});
|
||||||
|
Loading…
Reference in New Issue
Block a user