moninj: sort out futures::select!

This commit is contained in:
Sebastien Bourdeauducq 2020-04-25 20:31:38 +08:00
parent 9ecabfc251
commit 5df4a0a2f8
5 changed files with 165 additions and 46 deletions

89
Cargo.lock generated
View File

@ -56,6 +56,81 @@ dependencies = [
"void", "void",
] ]
[[package]]
name = "futures"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a"
[[package]]
name = "futures-io"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a638959aa96152c7a4cddf50fcb1e3fede0583b27157c26e67d6f99904090dc6"
[[package]]
name = "futures-macro"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a5081aa3de1f7542a794a397cde100ed903b0630152d0973479018fd85423a7"
dependencies = [
"proc-macro-hack",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3466821b4bc114d95b087b850a724c6f83115e929bc88f1fa98a3304a944c8a6"
[[package]]
name = "futures-task"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27"
[[package]]
name = "futures-util"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5"
dependencies = [
"futures-core",
"futures-macro",
"futures-sink",
"futures-task",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
]
[[package]] [[package]]
name = "libasync" name = "libasync"
version = "0.0.0" version = "0.0.0"
@ -169,6 +244,18 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "proc-macro-hack"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63"
[[package]]
name = "proc-macro-nested"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.10" version = "1.0.10"
@ -199,6 +286,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"cslice", "cslice",
"dyld", "dyld",
"futures",
"libasync", "libasync",
"libboard_zynq", "libboard_zynq",
"libcortex_a9", "libcortex_a9",
@ -206,6 +294,7 @@ dependencies = [
"log", "log",
"num-derive", "num-derive",
"num-traits", "num-traits",
"void",
] ]
[[package]] [[package]]

View File

@ -14,6 +14,8 @@ num-traits = { version = "0.2", default-features = false }
num-derive = "0.3" num-derive = "0.3"
cslice = "0.3" cslice = "0.3"
log = "0.4" log = "0.4"
void = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false, features = ["async-await"] }
libboard_zynq = { git = "https://git.m-labs.hk/M-Labs/zc706.git" } libboard_zynq = { git = "https://git.m-labs.hk/M-Labs/zc706.git" }
libsupport_zynq = { git = "https://git.m-labs.hk/M-Labs/zc706.git" } libsupport_zynq = { git = "https://git.m-labs.hk/M-Labs/zc706.git" }
libcortex_a9 = { git = "https://git.m-labs.hk/M-Labs/zc706.git" } libcortex_a9 = { git = "https://git.m-labs.hk/M-Labs/zc706.git" }

View File

@ -204,7 +204,7 @@ pub fn main(timer: GlobalTimer) {
} }
}); });
moninj::start(); moninj::start(timer);
Sockets::run(&mut iface, || { Sockets::run(&mut iface, || {
Instant::from_millis(timer.get_time().0 as i32) Instant::from_millis(timer.get_time().0 as i32)

View File

@ -1,5 +1,6 @@
#![no_std] #![no_std]
#![no_main] #![no_main]
#![recursion_limit="512"] // for futures_util::select!
extern crate alloc; extern crate alloc;
extern crate log; extern crate log;

View File

@ -1,13 +1,14 @@
use core::fmt; use core::fmt;
use alloc::collections::BTreeMap; use alloc::collections::BTreeMap;
use log::warn; use log::{debug, warn};
use void::Void;
use libboard_zynq::smoltcp; use libboard_zynq::{smoltcp, timer::GlobalTimer, time::Milliseconds};
use libasync::task; use libasync::{task, smoltcp::TcpStream, block_async, nb};
use libasync::smoltcp::TcpStream;
use num_derive::{FromPrimitive, ToPrimitive}; use num_derive::{FromPrimitive, ToPrimitive};
use num_traits::{FromPrimitive, ToPrimitive}; use num_traits::{FromPrimitive, ToPrimitive};
use futures::{pin_mut, select_biased, FutureExt};
use crate::proto::*; use crate::proto::*;
use crate::pl::csr; use crate::pl::csr;
@ -78,63 +79,89 @@ fn read_injection_status(channel: i32, overrd: i8) -> i8 {
} }
} }
async fn handle_connection(stream: &TcpStream) -> Result<()> { async fn handle_connection(stream: &TcpStream, timer: GlobalTimer) -> Result<()> {
if !expect(&stream, b"ARTIQ moninj\n").await? { if !expect(&stream, b"ARTIQ moninj\n").await? {
return Err(Error::UnexpectedPattern); return Err(Error::UnexpectedPattern);
} }
let mut probe_watch_list: BTreeMap<(i32, i8), Option<i32>> = BTreeMap::new(); let mut probe_watch_list: BTreeMap<(i32, i8), Option<i32>> = BTreeMap::new();
let mut inject_watch_list: BTreeMap<(i32, i8), Option<i8>> = BTreeMap::new(); let mut inject_watch_list: BTreeMap<(i32, i8), Option<i8>> = BTreeMap::new();
let mut next_check = Milliseconds(0);
loop { loop {
let message: HostMessage = FromPrimitive::from_i8(read_i8(&stream).await?) // TODO: we don't need fuse() here.
.ok_or(Error::UnrecognizedPacket)?; // remove after https://github.com/rust-lang/futures-rs/issues/1989 lands
match message { let read_message_f = read_i8(&stream).fuse();
HostMessage::MonitorProbe => { let next_check_c = next_check.clone();
let enable = read_bool(&stream).await?; let timeout = || -> nb::Result<(), Void> {
let channel = read_i32(&stream).await?; if timer.get_time() < next_check_c {
let probe = read_i8(&stream).await?; Err(nb::Error::WouldBlock)
if enable { } else {
let _ = probe_watch_list.entry((channel, probe)).or_insert(None); Ok(())
} else { }
let _ = probe_watch_list.remove(&(channel, probe)); };
let timeout_f = block_async!(timeout()).fuse();
pin_mut!(read_message_f, timeout_f);
select_biased! {
message = read_message_f => {
let message: HostMessage = FromPrimitive::from_i8(message?)
.ok_or(Error::UnrecognizedPacket)?;
match message {
HostMessage::MonitorProbe => {
let enable = read_bool(&stream).await?;
let channel = read_i32(&stream).await?;
let probe = read_i8(&stream).await?;
if enable {
let _ = probe_watch_list.entry((channel, probe)).or_insert(None);
debug!("START monitoring channel {}, probe {}", channel, probe);
} else {
let _ = probe_watch_list.remove(&(channel, probe));
debug!("END monitoring channel {}, probe {}", channel, probe);
}
},
HostMessage::MonitorInjection => {
let enable = read_bool(&stream).await?;
let channel = read_i32(&stream).await?;
let overrd = read_i8(&stream).await?;
if enable {
let _ = inject_watch_list.entry((channel, overrd)).or_insert(None);
debug!("START monitoring channel {}, overrd {}", channel, overrd);
} else {
let _ = inject_watch_list.remove(&(channel, overrd));
debug!("END monitoring channel {}, overrd {}", channel, overrd);
}
},
HostMessage::Inject => {
let channel = read_i32(&stream).await?;
let overrd = read_i8(&stream).await?;
let value = read_i8(&stream).await?;
inject(channel, overrd, value);
debug!("INJECT channel {}, overrd {}, value {}", channel, overrd, value);
},
HostMessage::GetInjectionStatus => {
let channel = read_i32(&stream).await?;
let overrd = read_i8(&stream).await?;
let value = read_injection_status(channel, overrd);
write_i8(&stream, DeviceMessage::InjectionStatus.to_i8().unwrap()).await?;
write_i32(&stream, channel).await?;
write_i8(&stream, overrd).await?;
write_i8(&stream, value).await?;
},
} }
}, },
HostMessage::MonitorInjection => { _ = timeout_f => {
let enable = read_bool(&stream).await?; warn!("tick");
let channel = read_i32(&stream).await?; next_check = next_check + Milliseconds(200);
let overrd = read_i8(&stream).await?; }
if enable {
let _ = inject_watch_list.entry((channel, overrd)).or_insert(None);
} else {
let _ = inject_watch_list.remove(&(channel, overrd));
}
},
HostMessage::Inject => {
let channel = read_i32(&stream).await?;
let overrd = read_i8(&stream).await?;
let value = read_i8(&stream).await?;
inject(channel, overrd, value);
},
HostMessage::GetInjectionStatus => {
let channel = read_i32(&stream).await?;
let overrd = read_i8(&stream).await?;
let value = read_injection_status(channel, overrd);
write_i8(&stream, DeviceMessage::InjectionStatus.to_i8().unwrap()).await?;
write_i32(&stream, channel).await?;
write_i8(&stream, overrd).await?;
write_i8(&stream, value).await?;
},
} }
} }
} }
pub fn start() { pub fn start(timer: GlobalTimer) {
task::spawn(async move { task::spawn(async move {
loop { loop {
let stream = TcpStream::accept(1383, 2048, 2048).await.unwrap(); let stream = TcpStream::accept(1383, 2048, 2048).await.unwrap();
task::spawn(async { task::spawn(async move {
let _ = handle_connection(&stream) let _ = handle_connection(&stream, timer)
.await .await
.map_err(|e| warn!("connection terminated: {}", e)); .map_err(|e| warn!("connection terminated: {}", e));
let _ = stream.flush().await; let _ = stream.flush().await;