From 4b258c19f57ad7142c14e909e6882b3cba5a22d9 Mon Sep 17 00:00:00 2001 From: Astro Date: Mon, 17 Aug 2020 00:58:12 +0200 Subject: [PATCH] libasync: improve scheduling fairness between block_on and spawned tasks in the libasync::smoltcp::Sockets::run() case the block_on iface.poll loop would progress just one task before. now all tasks get to run in each iteration. --- libasync/src/executor.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/libasync/src/executor.rs b/libasync/src/executor.rs index 25cd0c6..cfb5af4 100644 --- a/libasync/src/executor.rs +++ b/libasync/src/executor.rs @@ -6,7 +6,7 @@ use core::{ sync::atomic::{AtomicBool, Ordering}, task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, }; -use alloc::{boxed::Box, collections::VecDeque as Deque}; +use alloc::{boxed::Box, vec::Vec}; //use futures::future::FutureExt; use pin_utils::pin_mut; @@ -31,7 +31,7 @@ static VTABLE: RawWakerVTable = { /// ready should not move as long as this waker references it. That is /// the reason for keeping Tasks in a pinned box. fn wrap_waker(ready: &AtomicBool) -> Waker { - unsafe { Waker::from_raw(RawWaker::new(ready as *const _ as *const _, &VTABLE)) } + unsafe { Waker::from_raw(RawWaker::new(ready as *const _ as *const (), &VTABLE)) } } /// A single-threaded executor @@ -44,7 +44,7 @@ pub struct Executor { /// Tasks reside on the heap, so that we just queue pointers. They /// must also be pinned in memory because our RawWaker is a pointer /// to their `ready` field. - tasks: RefCell>>>, + tasks: RefCell>>>, } impl Executor { @@ -52,7 +52,7 @@ impl Executor { pub fn new() -> Self { Self { in_block_on: RefCell::new(false), - tasks: RefCell::new(Deque::new()), + tasks: RefCell::new(Vec::new()), } } @@ -85,10 +85,9 @@ impl Executor { // println!("ran block_on"); } - // println!("tasks: {}", self.tasks.borrow().len()); - // advance other tasks - let next_task = self.tasks.borrow_mut().pop_front(); - if let Some(mut task) = next_task { + // advance all tasks + let next_tasks = self.tasks.replace_with(|tasks| Vec::with_capacity(tasks.len())); + for mut task in next_tasks.into_iter() { // NOTE we don't need a CAS operation here because `wake` invocations that come from // interrupt handlers (the only source of 'race conditions' (!= data races)) are // "oneshot": they'll issue a `wake` and then disable themselves to not run again @@ -106,7 +105,7 @@ impl Executor { } } // Requeue - self.tasks.borrow_mut().push_back(task); + self.tasks.borrow_mut().push(task); } // // try to sleep; this will be a no-op if any of the previous tasks generated a SEV or an @@ -119,7 +118,7 @@ impl Executor { pub fn spawn(&self, f: impl Future + 'static) { let task = Box::pin(Task::new(f)); - self.tasks.borrow_mut().push_back(task); + self.tasks.borrow_mut().push(task); } }