From 5df4a0a2f8fc27d0a4451ac71a01b8aac3c35717 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sat, 25 Apr 2020 20:31:38 +0800 Subject: [PATCH] moninj: sort out futures::select! --- Cargo.lock | 89 ++++++++++++++++++++++++++++++++ runtime/Cargo.toml | 2 + runtime/src/comms.rs | 2 +- runtime/src/main.rs | 1 + runtime/src/moninj.rs | 117 ++++++++++++++++++++++++++---------------- 5 files changed, 165 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cd5d957..52d0206 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,6 +56,81 @@ dependencies = [ "void", ] +[[package]] +name = "futures" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c329ae8753502fb44ae4fc2b622fa2a94652c41e795143765ba0927f92ab780" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0c77d04ce8edd9cb903932b608268b3fffec4163dc053b3b402bf47eac1f1a8" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f25592f769825e89b92358db00d26f965761e094951ac44d3663ef25b7ac464a" + +[[package]] +name = "futures-io" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a638959aa96152c7a4cddf50fcb1e3fede0583b27157c26e67d6f99904090dc6" + +[[package]] +name = "futures-macro" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a5081aa3de1f7542a794a397cde100ed903b0630152d0973479018fd85423a7" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3466821b4bc114d95b087b850a724c6f83115e929bc88f1fa98a3304a944c8a6" + +[[package]] +name = "futures-task" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b0a34e53cf6cdcd0178aa573aed466b646eb3db769570841fda0c7ede375a27" + +[[package]] +name = "futures-util" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22766cf25d64306bedf0384da004d05c9974ab104fcc4528f1236181c18004c5" +dependencies = [ + "futures-core", + "futures-macro", + "futures-sink", + "futures-task", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", +] + [[package]] name = "libasync" version = "0.0.0" @@ -169,6 +244,18 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "proc-macro-hack" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63" + +[[package]] +name = "proc-macro-nested" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694" + [[package]] name = "proc-macro2" version = "1.0.10" @@ -199,6 +286,7 @@ version = "0.1.0" dependencies = [ "cslice", "dyld", + "futures", "libasync", "libboard_zynq", "libcortex_a9", @@ -206,6 +294,7 @@ dependencies = [ "log", "num-derive", "num-traits", + "void", ] [[package]] diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 1b06266..20c381a 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -14,6 +14,8 @@ num-traits = { version = "0.2", default-features = false } num-derive = "0.3" cslice = "0.3" log = "0.4" +void = { version = "1", default-features = false } +futures = { version = "0.3", default-features = false, features = ["async-await"] } libboard_zynq = { git = "https://git.m-labs.hk/M-Labs/zc706.git" } libsupport_zynq = { git = "https://git.m-labs.hk/M-Labs/zc706.git" } libcortex_a9 = { git = "https://git.m-labs.hk/M-Labs/zc706.git" } diff --git a/runtime/src/comms.rs b/runtime/src/comms.rs index 23b2ea9..2d1f5dc 100644 --- a/runtime/src/comms.rs +++ b/runtime/src/comms.rs @@ -204,7 +204,7 @@ pub fn main(timer: GlobalTimer) { } }); - moninj::start(); + moninj::start(timer); Sockets::run(&mut iface, || { Instant::from_millis(timer.get_time().0 as i32) diff --git a/runtime/src/main.rs b/runtime/src/main.rs index acda878..9d2f538 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -1,5 +1,6 @@ #![no_std] #![no_main] +#![recursion_limit="512"] // for futures_util::select! extern crate alloc; extern crate log; diff --git a/runtime/src/moninj.rs b/runtime/src/moninj.rs index b0098d7..fe255f4 100644 --- a/runtime/src/moninj.rs +++ b/runtime/src/moninj.rs @@ -1,13 +1,14 @@ use core::fmt; use alloc::collections::BTreeMap; -use log::warn; +use log::{debug, warn}; +use void::Void; -use libboard_zynq::smoltcp; -use libasync::task; -use libasync::smoltcp::TcpStream; +use libboard_zynq::{smoltcp, timer::GlobalTimer, time::Milliseconds}; +use libasync::{task, smoltcp::TcpStream, block_async, nb}; use num_derive::{FromPrimitive, ToPrimitive}; use num_traits::{FromPrimitive, ToPrimitive}; +use futures::{pin_mut, select_biased, FutureExt}; use crate::proto::*; use crate::pl::csr; @@ -78,63 +79,89 @@ fn read_injection_status(channel: i32, overrd: i8) -> i8 { } } -async fn handle_connection(stream: &TcpStream) -> Result<()> { +async fn handle_connection(stream: &TcpStream, timer: GlobalTimer) -> Result<()> { if !expect(&stream, b"ARTIQ moninj\n").await? { return Err(Error::UnexpectedPattern); } let mut probe_watch_list: BTreeMap<(i32, i8), Option> = BTreeMap::new(); let mut inject_watch_list: BTreeMap<(i32, i8), Option> = BTreeMap::new(); - + let mut next_check = Milliseconds(0); loop { - let message: HostMessage = FromPrimitive::from_i8(read_i8(&stream).await?) - .ok_or(Error::UnrecognizedPacket)?; - match message { - HostMessage::MonitorProbe => { - let enable = read_bool(&stream).await?; - let channel = read_i32(&stream).await?; - let probe = read_i8(&stream).await?; - if enable { - let _ = probe_watch_list.entry((channel, probe)).or_insert(None); - } else { - let _ = probe_watch_list.remove(&(channel, probe)); + // TODO: we don't need fuse() here. + // remove after https://github.com/rust-lang/futures-rs/issues/1989 lands + let read_message_f = read_i8(&stream).fuse(); + let next_check_c = next_check.clone(); + let timeout = || -> nb::Result<(), Void> { + if timer.get_time() < next_check_c { + Err(nb::Error::WouldBlock) + } else { + Ok(()) + } + }; + let timeout_f = block_async!(timeout()).fuse(); + pin_mut!(read_message_f, timeout_f); + select_biased! { + message = read_message_f => { + let message: HostMessage = FromPrimitive::from_i8(message?) + .ok_or(Error::UnrecognizedPacket)?; + match message { + HostMessage::MonitorProbe => { + let enable = read_bool(&stream).await?; + let channel = read_i32(&stream).await?; + let probe = read_i8(&stream).await?; + if enable { + let _ = probe_watch_list.entry((channel, probe)).or_insert(None); + debug!("START monitoring channel {}, probe {}", channel, probe); + } else { + let _ = probe_watch_list.remove(&(channel, probe)); + debug!("END monitoring channel {}, probe {}", channel, probe); + } + }, + HostMessage::MonitorInjection => { + let enable = read_bool(&stream).await?; + let channel = read_i32(&stream).await?; + let overrd = read_i8(&stream).await?; + if enable { + let _ = inject_watch_list.entry((channel, overrd)).or_insert(None); + debug!("START monitoring channel {}, overrd {}", channel, overrd); + } else { + let _ = inject_watch_list.remove(&(channel, overrd)); + debug!("END monitoring channel {}, overrd {}", channel, overrd); + } + }, + HostMessage::Inject => { + let channel = read_i32(&stream).await?; + let overrd = read_i8(&stream).await?; + let value = read_i8(&stream).await?; + inject(channel, overrd, value); + debug!("INJECT channel {}, overrd {}, value {}", channel, overrd, value); + }, + HostMessage::GetInjectionStatus => { + let channel = read_i32(&stream).await?; + let overrd = read_i8(&stream).await?; + let value = read_injection_status(channel, overrd); + write_i8(&stream, DeviceMessage::InjectionStatus.to_i8().unwrap()).await?; + write_i32(&stream, channel).await?; + write_i8(&stream, overrd).await?; + write_i8(&stream, value).await?; + }, } }, - HostMessage::MonitorInjection => { - let enable = read_bool(&stream).await?; - let channel = read_i32(&stream).await?; - let overrd = read_i8(&stream).await?; - if enable { - let _ = inject_watch_list.entry((channel, overrd)).or_insert(None); - } else { - let _ = inject_watch_list.remove(&(channel, overrd)); - } - }, - HostMessage::Inject => { - let channel = read_i32(&stream).await?; - let overrd = read_i8(&stream).await?; - let value = read_i8(&stream).await?; - inject(channel, overrd, value); - }, - HostMessage::GetInjectionStatus => { - let channel = read_i32(&stream).await?; - let overrd = read_i8(&stream).await?; - let value = read_injection_status(channel, overrd); - write_i8(&stream, DeviceMessage::InjectionStatus.to_i8().unwrap()).await?; - write_i32(&stream, channel).await?; - write_i8(&stream, overrd).await?; - write_i8(&stream, value).await?; - }, + _ = timeout_f => { + warn!("tick"); + next_check = next_check + Milliseconds(200); + } } } } -pub fn start() { +pub fn start(timer: GlobalTimer) { task::spawn(async move { loop { let stream = TcpStream::accept(1383, 2048, 2048).await.unwrap(); - task::spawn(async { - let _ = handle_connection(&stream) + task::spawn(async move { + let _ = handle_connection(&stream, timer) .await .map_err(|e| warn!("connection terminated: {}", e)); let _ = stream.flush().await;