From 48257e989cf045ab01f7c7355ccc192b11adc76d Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 31 Mar 2020 01:13:01 +0200 Subject: [PATCH] libasync: pin tasks to memory --- libasync/src/executor.rs | 34 ++++++++++++++++------------------ libasync/src/task.rs | 5 +---- 2 files changed, 17 insertions(+), 22 deletions(-) diff --git a/libasync/src/executor.rs b/libasync/src/executor.rs index 8b0206c..2de395c 100644 --- a/libasync/src/executor.rs +++ b/libasync/src/executor.rs @@ -34,8 +34,13 @@ static VTABLE: RawWakerVTable = { /// /// This is a singleton pub struct Executor { + // Entered block_on() already? in_block_on: RefCell, - tasks: RefCell>, + + /// Tasks reside on the heap, so that we just queue pointers. They + /// must also be pinned in memory because our RawWaker is a pointer + /// to their `ready` field. + tasks: RefCell>>>, } impl Executor { @@ -97,17 +102,14 @@ impl Executor { let mut cx = Context::from_waker(&waker); // this points into a `static` memory so it's already pinned // println!("run task"); - let ready = unsafe { - Pin::new_unchecked(&mut *task.f) - .poll(&mut cx) - .is_ready() - }; - // println!("ran task, ready={:?}", r); - if !ready { - // Task is not finished, requeue - self.tasks.borrow_mut().push_back(task); + let ready = task.f.as_mut().poll(&mut cx).is_ready(); + if ready { + // Task is finished, do not requeue + continue; } } + // Requeue + self.tasks.borrow_mut().push_back(task); } // // try to sleep; this will be a no-op if any of the previous tasks generated a SEV or an @@ -118,26 +120,22 @@ impl Executor { val } - // NOTE CAREFUL! this method can overlap with `block_on` - // FIXME we want to use `Future` here but the never type (`!`) is unstable; so as a - // workaround we'll "abort" if the task / future terminates (see `Task::new`) pub fn spawn(&self, f: impl Future + 'static) { - // NOTE(unsafe) only safe as long as `spawn` is never re-entered and this does not overlap - // with operation `(A)` (see `Task::block_on`) - self.tasks.borrow_mut().push_back(Task::new(f)); + let task = Box::pin(Task::new(f)); + self.tasks.borrow_mut().push_back(task); } } pub struct Task { ready: AtomicBool, - f: Box>, + f: Pin>>, } impl Task { fn new(f: impl Future + 'static) -> Self { Task { ready: AtomicBool::new(true), - f: Box::new(async { f.await; }), + f: Box::pin(async { f.await; }), } } } diff --git a/libasync/src/task.rs b/libasync/src/task.rs index f86925f..f19fd8e 100644 --- a/libasync/src/task.rs +++ b/libasync/src/task.rs @@ -17,10 +17,7 @@ pub fn block_on(f: impl Future) -> T { /// Spawns a task onto the executor /// /// The spawned task will not make any progress until `block_on` is called. -/// -/// The future `f` must never terminate. The program will *abort* if `f` (the async code) returns. -/// The right signature here would be `f: impl Future` but that requires nightly -pub fn spawn(f: impl Future + 'static) { +pub fn spawn(f: impl Future + 'static) { executor::current().spawn(f) }