libasync: clean up executor

This commit is contained in:
Simon Renblad 2025-02-11 14:23:14 +08:00
parent af42d9b819
commit c6f6bd292d
2 changed files with 22 additions and 34 deletions

View File

@ -1,13 +1,12 @@
use alloc::{boxed::Box, vec::Vec};
use core::{ use core::{
cell::{RefCell, UnsafeCell}, cell::UnsafeCell,
future::Future, future::Future,
mem::MaybeUninit, mem::MaybeUninit,
pin::Pin, pin::Pin,
sync::atomic::{AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
}; };
use alloc::{boxed::Box, vec::Vec};
//use futures::future::FutureExt;
use pin_utils::pin_mut; use pin_utils::pin_mut;
// NOTE `*const ()` is &AtomicBool // NOTE `*const ()` is &AtomicBool
@ -39,35 +38,32 @@ fn wrap_waker(ready: &AtomicBool) -> Waker {
/// This is a singleton /// This is a singleton
pub struct Executor { pub struct Executor {
// Entered block_on() already? // Entered block_on() already?
in_block_on: RefCell<bool>, in_block_on: bool,
/// Tasks reside on the heap, so that we just queue pointers. They /// Tasks reside on the heap, so that we just queue pointers. They
/// must also be pinned in memory because our RawWaker is a pointer /// must also be pinned in memory because our RawWaker is a pointer
/// to their `ready` field. /// to their `ready` field.
tasks: RefCell<Vec<Pin<Box<Task>>>>, tasks: Vec<Pin<Box<Task>>>,
} }
impl Executor { impl Executor {
/// Creates a new instance of the executor /// Creates a new instance of the executor
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
in_block_on: RefCell::new(false), in_block_on: false,
tasks: RefCell::new(Vec::new()), tasks: Vec::new(),
} }
} }
pub fn block_on<T>(&self, f: impl Future<Output = T>) -> T { pub fn block_on<T>(&mut self, f: impl Future<Output = T>) -> T {
// we want to avoid reentering `block_on` because then all the code // we want to avoid reentering `block_on` because then all the code
// below has to become more complex. It's also likely that the // below has to become more complex. It's also likely that the
// application will only call `block_on` once on an infinite task // application will only call `block_on` once on an infinite task
// (`Future<Output = !>`) // (`Future<Output = !>`)
{ if self.in_block_on {
let mut in_block_on = self.in_block_on.borrow_mut();
if *in_block_on {
panic!("nested `block_on`"); panic!("nested `block_on`");
} }
*in_block_on = true; self.in_block_on = true;
}
pin_mut!(f); pin_mut!(f);
let ready = AtomicBool::new(true); let ready = AtomicBool::new(true);
@ -77,17 +73,14 @@ impl Executor {
// advance the main task // advance the main task
if ready.load(Ordering::Relaxed) { if ready.load(Ordering::Relaxed) {
ready.store(false, Ordering::Relaxed); ready.store(false, Ordering::Relaxed);
// println!("run block_on");
let mut cx = Context::from_waker(&waker); let mut cx = Context::from_waker(&waker);
if let Poll::Ready(val) = f.as_mut().poll(&mut cx) { if let Poll::Ready(val) = f.as_mut().poll(&mut cx) {
break val; break val;
} }
// println!("ran block_on");
} }
// advance all tasks // advance all tasks
core::mem::swap(&mut *self.tasks.borrow_mut(), &mut backup); core::mem::swap(&mut self.tasks, &mut backup);
for mut task in backup.drain(..) { for mut task in backup.drain(..) {
// NOTE we don't need a CAS operation here because `wake` invocations that come from // NOTE we don't need a CAS operation here because `wake` invocations that come from
// interrupt handlers (the only source of 'race conditions' (!= data races)) are // interrupt handlers (the only source of 'race conditions' (!= data races)) are
@ -106,20 +99,16 @@ impl Executor {
} }
} }
// Requeue // Requeue
self.tasks.borrow_mut().push(task); self.tasks.push(task);
} }
// // try to sleep; this will be a no-op if any of the previous tasks generated a SEV or an
// // interrupt ran (regardless of whether it generated a wake-up or not)
// asm::wfe();
}; };
self.in_block_on.replace(false); self.in_block_on = false;
val val
} }
pub fn spawn(&self, f: impl Future + 'static) { pub fn spawn(&mut self, f: impl Future<Output = ()> + 'static) {
let task = Box::pin(Task::new(f)); let task = Box::pin(Task::new(f));
self.tasks.borrow_mut().push(task); self.tasks.push(task);
} }
} }
@ -129,10 +118,10 @@ pub struct Task {
} }
impl Task { impl Task {
fn new(f: impl Future + 'static) -> Self { fn new(f: impl Future<Output = ()> + 'static) -> Self {
Task { Task {
ready: AtomicBool::new(true), ready: AtomicBool::new(true),
f: Box::pin(async { f.await; }), f: Box::pin(f),
} }
} }
} }
@ -140,18 +129,17 @@ impl Task {
/// Returns a handle to the executor singleton /// Returns a handle to the executor singleton
/// ///
/// This lazily initializes the executor and allocator when first called /// This lazily initializes the executor and allocator when first called
pub(crate) fn current() -> &'static Executor { pub(crate) fn current() -> &'static mut Executor {
static INIT: AtomicBool = AtomicBool::new(false); static INIT: AtomicBool = AtomicBool::new(false);
static mut EXECUTOR: UnsafeCell<MaybeUninit<Executor>> = UnsafeCell::new(MaybeUninit::uninit()); static mut EXECUTOR: UnsafeCell<MaybeUninit<Executor>> = UnsafeCell::new(MaybeUninit::uninit());
if INIT.load(Ordering::Relaxed) { if INIT.load(Ordering::Relaxed) {
unsafe { &*(EXECUTOR.get() as *const Executor) } unsafe { EXECUTOR.get_mut().assume_init_mut() }
} else { } else {
unsafe { unsafe {
let executorp = EXECUTOR.get() as *mut Executor; let executor = EXECUTOR.get_mut().write(Executor::new());
executorp.write(Executor::new());
INIT.store(true, Ordering::Relaxed); INIT.store(true, Ordering::Relaxed);
&*executorp executor
} }
} }
} }

View File

@ -17,7 +17,7 @@ pub fn block_on<T>(f: impl Future<Output = T>) -> T {
/// Spawns a task onto the executor /// Spawns a task onto the executor
/// ///
/// The spawned task will not make any progress until `block_on` is called. /// The spawned task will not make any progress until `block_on` is called.
pub fn spawn(f: impl Future + 'static) { pub fn spawn(f: impl Future<Output = ()> + 'static) {
executor::current().spawn(f) executor::current().spawn(f)
} }