libasync: replace executor Mutexes with RefCells

this will not run on multi-core.
tcp-recv-fnmut
Astro 2020-03-26 20:29:36 +01:00
parent 5b95410244
commit ea765fc529
1 changed files with 20 additions and 16 deletions

View File

@ -1,17 +1,16 @@
use core::{ use core::{
cell::{Cell, UnsafeCell}, cell::{RefCell, UnsafeCell},
future::Future, future::Future,
mem::MaybeUninit, mem::MaybeUninit,
pin::Pin, pin::Pin,
sync::atomic::{self, AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
}; };
use alloc::{boxed::Box, collections::VecDeque as Deque}; use alloc::{boxed::Box, collections::VecDeque as Deque};
//use futures::future::FutureExt; //use futures::future::FutureExt;
use pin_utils::pin_mut; use pin_utils::pin_mut;
use libcortex_a9::mutex::Mutex;
// TODO: delete // TODO: delete
use libboard_zynq::println; //use libboard_zynq::println;
// NOTE `*const ()` is &AtomicBool // NOTE `*const ()` is &AtomicBool
static VTABLE: RawWakerVTable = { static VTABLE: RawWakerVTable = {
@ -35,16 +34,16 @@ static VTABLE: RawWakerVTable = {
/// ///
/// This is a singleton /// This is a singleton
pub struct Executor { pub struct Executor {
in_block_on: Mutex<bool>, in_block_on: RefCell<bool>,
tasks: Mutex<Deque<Task>>, tasks: RefCell<Deque<Task>>,
} }
impl Executor { impl Executor {
/// Creates a new instance of the executor /// Creates a new instance of the executor
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
in_block_on: Mutex::new(false), in_block_on: RefCell::new(false),
tasks: Mutex::new(Deque::new()), tasks: RefCell::new(Deque::new()),
} }
} }
@ -54,7 +53,7 @@ impl Executor {
// application will only call `block_on` once on an infinite task // application will only call `block_on` once on an infinite task
// (`Future<Output = !>`) // (`Future<Output = !>`)
{ {
let mut in_block_on = self.in_block_on.lock(); let mut in_block_on = self.in_block_on.borrow_mut();
if *in_block_on { if *in_block_on {
panic!("nested `block_on`"); panic!("nested `block_on`");
} }
@ -70,14 +69,17 @@ impl Executor {
if ready.load(Ordering::Relaxed) { if ready.load(Ordering::Relaxed) {
ready.store(false, Ordering::Relaxed); ready.store(false, Ordering::Relaxed);
// println!("run block_on");
let mut cx = Context::from_waker(&waker); let mut cx = Context::from_waker(&waker);
if let Poll::Ready(val) = f.as_mut().poll(&mut cx) { if let Poll::Ready(val) = f.as_mut().poll(&mut cx) {
break val; break val;
} }
// println!("ran block_on");
} }
// println!("tasks: {}", self.tasks.borrow().len());
// advance other tasks // advance other tasks
let next_task = self.tasks.lock().pop_front(); let next_task = self.tasks.borrow_mut().pop_front();
if let Some(mut task) = next_task { if let Some(mut task) = next_task {
// NOTE we don't need a CAS operation here because `wake` invocations that come from // NOTE we don't need a CAS operation here because `wake` invocations that come from
// interrupt handlers (the only source of 'race conditions' (!= data races)) are // interrupt handlers (the only source of 'race conditions' (!= data races)) are
@ -94,14 +96,16 @@ impl Executor {
}; };
let mut cx = Context::from_waker(&waker); let mut cx = Context::from_waker(&waker);
// this points into a `static` memory so it's already pinned // this points into a `static` memory so it's already pinned
let r = unsafe { // println!("run task");
let ready = unsafe {
Pin::new_unchecked(&mut *task.f) Pin::new_unchecked(&mut *task.f)
.poll(&mut cx) .poll(&mut cx)
.is_ready() .is_ready()
}; };
if !r { // println!("ran task, ready={:?}", r);
if !ready {
// Task is not finished, requeue // Task is not finished, requeue
self.tasks.lock().push_back(task); self.tasks.borrow_mut().push_back(task);
} }
} }
} }
@ -110,7 +114,7 @@ impl Executor {
// // interrupt ran (regardless of whether it generated a wake-up or not) // // interrupt ran (regardless of whether it generated a wake-up or not)
// asm::wfe(); // asm::wfe();
}; };
*self.in_block_on.lock() = false; self.in_block_on.replace(false);
val val
} }
@ -120,13 +124,13 @@ impl Executor {
pub fn spawn(&self, f: impl Future + 'static) { pub fn spawn(&self, f: impl Future + 'static) {
// NOTE(unsafe) only safe as long as `spawn` is never re-entered and this does not overlap // NOTE(unsafe) only safe as long as `spawn` is never re-entered and this does not overlap
// with operation `(A)` (see `Task::block_on`) // with operation `(A)` (see `Task::block_on`)
self.tasks.lock().push_back(Task::new(f)); self.tasks.borrow_mut().push_back(Task::new(f));
} }
} }
pub struct Task { pub struct Task {
ready: AtomicBool, ready: AtomicBool,
f: Box<Future<Output = ()>>, f: Box<dyn Future<Output = ()>>,
} }
impl Task { impl Task {