From 5b95410244d809681d6c915f8d2c8aee99eccb6d Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 26 Mar 2020 01:35:05 +0100 Subject: [PATCH] libasync: adapt from async-on-embedded/async-cortex-m --- Cargo.lock | 16 ++++ Cargo.toml | 1 + default.nix | 2 +- experiments/Cargo.toml | 1 + experiments/src/main.rs | 21 ++++++ libasync/Cargo.toml | 13 ++++ libasync/src/executor.rs | 158 +++++++++++++++++++++++++++++++++++++++ libasync/src/lib.rs | 6 ++ libasync/src/task.rs | 50 +++++++++++++ 9 files changed, 267 insertions(+), 1 deletion(-) create mode 100644 libasync/Cargo.toml create mode 100644 libasync/src/executor.rs create mode 100644 libasync/src/lib.rs create mode 100644 libasync/src/task.rs diff --git a/Cargo.lock b/Cargo.lock index 9a74467..8b0d2f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -24,12 +24,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "experiments" version = "0.0.0" dependencies = [ + "libasync 0.0.0", "libboard_zynq 0.0.0", "libcortex_a9 0.0.0", "libregister 0.0.0", "libsupport_zynq 0.0.0", ] +[[package]] +name = "libasync" +version = "0.0.0" +dependencies = [ + "libboard_zynq 0.0.0", + "libcortex_a9 0.0.0", + "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "libboard_zynq" version = "0.0.0" @@ -80,6 +90,11 @@ name = "managed" version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "pin-utils" +version = "0.1.0-alpha.4" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "r0" version = "1.0.0" @@ -115,6 +130,7 @@ dependencies = [ "checksum compiler_builtins 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "036b035e9ebcd705affece16319223d19f229e2358be6e3b7b094e57193312e6" "checksum linked_list_allocator 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5825aea823c659d0fdcdbe8c9b78baf56f3a10365d783db874f6d360df72626f" "checksum managed 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fdcec5e97041c7f0f1c5b7d93f12e57293c831c646f4cc7a5db59460c7ea8de6" +"checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" "checksum r0 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bd7a31eed1591dcbc95d92ad7161908e72f4677f8fabf2a32ca49b4237cbf211" "checksum smoltcp 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0fe46639fd2ec79eadf8fe719f237a7a0bd4dac5d957f1ca5bbdbc1c3c39e53a" "checksum vcell 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "876e32dcadfe563a4289e994f7cb391197f362b6315dc45e8ba4aa6f564a4b3c" diff --git a/Cargo.toml b/Cargo.toml index 95cb416..8d4db67 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "libregister", "libcortex_a9", "libboard_zynq", "libsupport_zynq", + "libasync", "experiments", ] diff --git a/default.nix b/default.nix index 4539979..456d4bd 100644 --- a/default.nix +++ b/default.nix @@ -49,7 +49,7 @@ let name = "${crate}"; src = ./.; crateSubdir = crate; - cargoSha256 = "0106i3qg44gvcv18bq3854lbj7x4qkjzqpw5ldrxjlzf6h13gfgv"; + cargoSha256 = "1fvild4mygcvnfcnmni4icg5nj16dj7hkzjpahzhpbyv9rr24722"; cargoFeatures = features; doCheck = false; }; diff --git a/experiments/Cargo.toml b/experiments/Cargo.toml index a9bdc2e..8c0e18c 100644 --- a/experiments/Cargo.toml +++ b/experiments/Cargo.toml @@ -15,3 +15,4 @@ libregister = { path = "../libregister" } libcortex_a9 = { path = "../libcortex_a9" } libboard_zynq = { path = "../libboard_zynq" } libsupport_zynq = { path = "../libsupport_zynq" } +libasync = { path = "../libasync" } diff --git a/experiments/src/main.rs b/experiments/src/main.rs index c78febf..990942e 100644 --- a/experiments/src/main.rs +++ b/experiments/src/main.rs @@ -18,6 +18,7 @@ use libsupport_zynq::{ ram, alloc::{vec, vec::Vec}, boot, }; +use libasync::task; const HWADDR: [u8; 6] = [0, 0x23, 0xde, 0xea, 0xbe, 0xef]; @@ -100,6 +101,26 @@ pub fn main_core0() { flash = flash_io.stop(); } + task::spawn(async { + println!("outer task"); + }); + task::spawn(async { + for i in 1..=3 { + println!("outer task2: {}", i); + task::r#yield().await; + } + }); + task::block_on(async { + task::spawn(async { + println!("inner task"); + }); + + for i in 1..=10 { + println!("yield {}", i); + task::r#yield().await; + } + }); + let core1_stack = unsafe { &mut STACK_CORE1[..] }; println!("{} bytes stack for core1", core1_stack.len()); let core1 = boot::Core1::start(core1_stack); diff --git a/libasync/Cargo.toml b/libasync/Cargo.toml new file mode 100644 index 0000000..86f6f22 --- /dev/null +++ b/libasync/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "libasync" +description = "low-level async support" +version = "0.0.0" +authors = ["Astro "] +edition = "2018" + +[dependencies] +#futures = { version = "0.3", default-features = false } +pin-utils = "0.1.0-alpha.4" +libcortex_a9 = { path = "../libcortex_a9" } +# TODO: delete +libboard_zynq = { path = "../libboard_zynq" } diff --git a/libasync/src/executor.rs b/libasync/src/executor.rs new file mode 100644 index 0000000..9e48538 --- /dev/null +++ b/libasync/src/executor.rs @@ -0,0 +1,158 @@ +use core::{ + cell::{Cell, UnsafeCell}, + future::Future, + mem::MaybeUninit, + pin::Pin, + sync::atomic::{self, AtomicBool, Ordering}, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, +}; +use alloc::{boxed::Box, collections::VecDeque as Deque}; +//use futures::future::FutureExt; +use pin_utils::pin_mut; +use libcortex_a9::mutex::Mutex; +// TODO: delete +use libboard_zynq::println; + +// NOTE `*const ()` is &AtomicBool +static VTABLE: RawWakerVTable = { + unsafe fn clone(p: *const ()) -> RawWaker { + RawWaker::new(p, &VTABLE) + } + unsafe fn wake(p: *const ()) { + wake_by_ref(p) + } + unsafe fn wake_by_ref(p: *const ()) { + (*(p as *const AtomicBool)).store(true, Ordering::Relaxed) + } + unsafe fn drop(_: *const ()) { + // no-op + } + + RawWakerVTable::new(clone, wake, wake_by_ref, drop) +}; + +/// A single-threaded executor +/// +/// This is a singleton +pub struct Executor { + in_block_on: Mutex, + tasks: Mutex>, +} + +impl Executor { + /// Creates a new instance of the executor + pub fn new() -> Self { + Self { + in_block_on: Mutex::new(false), + tasks: Mutex::new(Deque::new()), + } + } + + pub fn block_on(&self, f: impl Future) -> T { + // we want to avoid reentering `block_on` because then all the code + // below has to become more complex. It's also likely that the + // application will only call `block_on` once on an infinite task + // (`Future`) + { + let mut in_block_on = self.in_block_on.lock(); + if *in_block_on { + panic!("nested `block_on`"); + } + *in_block_on = true; + } + + pin_mut!(f); + let ready = AtomicBool::new(true); + let waker = + unsafe { Waker::from_raw(RawWaker::new(&ready as *const _ as *const _, &VTABLE)) }; + let val = loop { + // advance the main task + if ready.load(Ordering::Relaxed) { + ready.store(false, Ordering::Relaxed); + + let mut cx = Context::from_waker(&waker); + if let Poll::Ready(val) = f.as_mut().poll(&mut cx) { + break val; + } + } + + // advance other tasks + let next_task = self.tasks.lock().pop_front(); + if let Some(mut task) = next_task { + // NOTE we don't need a CAS operation here because `wake` invocations that come from + // interrupt handlers (the only source of 'race conditions' (!= data races)) are + // "oneshot": they'll issue a `wake` and then disable themselves to not run again + // until the woken task has made more work + if task.ready.load(Ordering::Relaxed) { + // we are about to service the task so switch the `ready` flag to `false` + task.ready.store(false, Ordering::Relaxed); + + // NOTE we never deallocate tasks so `&ready` is always pointing to + // allocated memory (`&'static AtomicBool`) + let waker = unsafe { + Waker::from_raw(RawWaker::new(&task.ready as *const _ as *const _, &VTABLE)) + }; + let mut cx = Context::from_waker(&waker); + // this points into a `static` memory so it's already pinned + let r = unsafe { + Pin::new_unchecked(&mut *task.f) + .poll(&mut cx) + .is_ready() + }; + if !r { + // Task is not finished, requeue + self.tasks.lock().push_back(task); + } + } + } + + // // try to sleep; this will be a no-op if any of the previous tasks generated a SEV or an + // // interrupt ran (regardless of whether it generated a wake-up or not) + // asm::wfe(); + }; + *self.in_block_on.lock() = false; + val + } + + // NOTE CAREFUL! this method can overlap with `block_on` + // FIXME we want to use `Future` here but the never type (`!`) is unstable; so as a + // workaround we'll "abort" if the task / future terminates (see `Task::new`) + pub fn spawn(&self, f: impl Future + 'static) { + // NOTE(unsafe) only safe as long as `spawn` is never re-entered and this does not overlap + // with operation `(A)` (see `Task::block_on`) + self.tasks.lock().push_back(Task::new(f)); + } +} + +pub struct Task { + ready: AtomicBool, + f: Box>, +} + +impl Task { + fn new(f: impl Future + 'static) -> Self { + Task { + ready: AtomicBool::new(true), + f: Box::new(async { f.await; }), + } + } +} + +/// Returns a handle to the executor singleton +/// +/// This lazily initializes the executor and allocator when first called +pub(crate) fn current() -> &'static Executor { + static INIT: AtomicBool = AtomicBool::new(false); + static mut EXECUTOR: UnsafeCell> = UnsafeCell::new(MaybeUninit::uninit()); + + if INIT.load(Ordering::Relaxed) { + unsafe { &*(EXECUTOR.get() as *const Executor) } + } else { + unsafe { + let executorp = EXECUTOR.get() as *mut Executor; + executorp.write(Executor::new()); + INIT.store(true, Ordering::Relaxed); + &*executorp + } + } +} diff --git a/libasync/src/lib.rs b/libasync/src/lib.rs new file mode 100644 index 0000000..37fee31 --- /dev/null +++ b/libasync/src/lib.rs @@ -0,0 +1,6 @@ +#![no_std] + +extern crate alloc; + +pub mod task; +pub mod executor; diff --git a/libasync/src/task.rs b/libasync/src/task.rs new file mode 100644 index 0000000..f86925f --- /dev/null +++ b/libasync/src/task.rs @@ -0,0 +1,50 @@ +//! Asynchronous tasks + +use core::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use super::executor; + +/// Drives the future `f` to completion +/// +/// This also makes any previously `spawn`-ed future make progress +pub fn block_on(f: impl Future) -> T { + executor::current().block_on(f) +} + +/// Spawns a task onto the executor +/// +/// The spawned task will not make any progress until `block_on` is called. +/// +/// The future `f` must never terminate. The program will *abort* if `f` (the async code) returns. +/// The right signature here would be `f: impl Future` but that requires nightly +pub fn spawn(f: impl Future + 'static) { + executor::current().spawn(f) +} + +/// Use `r#yield.await` to suspend the execution of a task +pub async fn r#yield() { + struct Yield { + yielded: bool, + } + + impl Future for Yield { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.yielded { + Poll::Ready(()) + } else { + self.yielded = true; + // wake ourselves + cx.waker().wake_by_ref(); + //asm::sev(); + Poll::Pending + } + } + } + + Yield { yielded: false }.await +}