libasync: pin tasks to memory

tcp-recv-fnmut
Astro 2020-03-31 01:13:01 +02:00
parent 965a00801e
commit 48257e989c
2 changed files with 17 additions and 22 deletions

View File

@ -34,8 +34,13 @@ static VTABLE: RawWakerVTable = {
/// ///
/// This is a singleton /// This is a singleton
pub struct Executor { pub struct Executor {
// Entered block_on() already?
in_block_on: RefCell<bool>, in_block_on: RefCell<bool>,
tasks: RefCell<Deque<Task>>,
/// Tasks reside on the heap, so that we just queue pointers. They
/// must also be pinned in memory because our RawWaker is a pointer
/// to their `ready` field.
tasks: RefCell<Deque<Pin<Box<Task>>>>,
} }
impl Executor { impl Executor {
@ -97,17 +102,14 @@ impl Executor {
let mut cx = Context::from_waker(&waker); let mut cx = Context::from_waker(&waker);
// this points into a `static` memory so it's already pinned // this points into a `static` memory so it's already pinned
// println!("run task"); // println!("run task");
let ready = unsafe { let ready = task.f.as_mut().poll(&mut cx).is_ready();
Pin::new_unchecked(&mut *task.f) if ready {
.poll(&mut cx) // Task is finished, do not requeue
.is_ready() continue;
};
// println!("ran task, ready={:?}", r);
if !ready {
// Task is not finished, requeue
self.tasks.borrow_mut().push_back(task);
} }
} }
// Requeue
self.tasks.borrow_mut().push_back(task);
} }
// // try to sleep; this will be a no-op if any of the previous tasks generated a SEV or an // // try to sleep; this will be a no-op if any of the previous tasks generated a SEV or an
@ -118,26 +120,22 @@ impl Executor {
val val
} }
// NOTE CAREFUL! this method can overlap with `block_on`
// FIXME we want to use `Future<Output = !>` here but the never type (`!`) is unstable; so as a
// workaround we'll "abort" if the task / future terminates (see `Task::new`)
pub fn spawn(&self, f: impl Future + 'static) { pub fn spawn(&self, f: impl Future + 'static) {
// NOTE(unsafe) only safe as long as `spawn` is never re-entered and this does not overlap let task = Box::pin(Task::new(f));
// with operation `(A)` (see `Task::block_on`) self.tasks.borrow_mut().push_back(task);
self.tasks.borrow_mut().push_back(Task::new(f));
} }
} }
pub struct Task { pub struct Task {
ready: AtomicBool, ready: AtomicBool,
f: Box<dyn Future<Output = ()>>, f: Pin<Box<dyn Future<Output = ()>>>,
} }
impl Task { impl Task {
fn new(f: impl Future + 'static) -> Self { fn new(f: impl Future + 'static) -> Self {
Task { Task {
ready: AtomicBool::new(true), ready: AtomicBool::new(true),
f: Box::new(async { f.await; }), f: Box::pin(async { f.await; }),
} }
} }
} }

View File

@ -17,10 +17,7 @@ pub fn block_on<T>(f: impl Future<Output = T>) -> T {
/// Spawns a task onto the executor /// Spawns a task onto the executor
/// ///
/// The spawned task will not make any progress until `block_on` is called. /// The spawned task will not make any progress until `block_on` is called.
/// pub fn spawn(f: impl Future + 'static) {
/// The future `f` must never terminate. The program will *abort* if `f` (the async code) returns.
/// The right signature here would be `f: impl Future<Output = !>` but that requires nightly
pub fn spawn<T>(f: impl Future<Output = T> + 'static) {
executor::current().spawn(f) executor::current().spawn(f)
} }