libasync: improve scheduling fairness between block_on and spawned tasks

in the libasync::smoltcp::Sockets::run() case the block_on iface.poll
loop would progress just one task before. now all tasks get to run in
each iteration.
This commit is contained in:
Astro 2020-08-17 00:58:12 +02:00
parent c69cd9951e
commit 4b258c19f5

View File

@ -6,7 +6,7 @@ use core::{
sync::atomic::{AtomicBool, Ordering}, sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
}; };
use alloc::{boxed::Box, collections::VecDeque as Deque}; use alloc::{boxed::Box, vec::Vec};
//use futures::future::FutureExt; //use futures::future::FutureExt;
use pin_utils::pin_mut; use pin_utils::pin_mut;
@ -31,7 +31,7 @@ static VTABLE: RawWakerVTable = {
/// ready should not move as long as this waker references it. That is /// ready should not move as long as this waker references it. That is
/// the reason for keeping Tasks in a pinned box. /// the reason for keeping Tasks in a pinned box.
fn wrap_waker(ready: &AtomicBool) -> Waker { fn wrap_waker(ready: &AtomicBool) -> Waker {
unsafe { Waker::from_raw(RawWaker::new(ready as *const _ as *const _, &VTABLE)) } unsafe { Waker::from_raw(RawWaker::new(ready as *const _ as *const (), &VTABLE)) }
} }
/// A single-threaded executor /// A single-threaded executor
@ -44,7 +44,7 @@ pub struct Executor {
/// Tasks reside on the heap, so that we just queue pointers. They /// Tasks reside on the heap, so that we just queue pointers. They
/// must also be pinned in memory because our RawWaker is a pointer /// must also be pinned in memory because our RawWaker is a pointer
/// to their `ready` field. /// to their `ready` field.
tasks: RefCell<Deque<Pin<Box<Task>>>>, tasks: RefCell<Vec<Pin<Box<Task>>>>,
} }
impl Executor { impl Executor {
@ -52,7 +52,7 @@ impl Executor {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
in_block_on: RefCell::new(false), in_block_on: RefCell::new(false),
tasks: RefCell::new(Deque::new()), tasks: RefCell::new(Vec::new()),
} }
} }
@ -85,10 +85,9 @@ impl Executor {
// println!("ran block_on"); // println!("ran block_on");
} }
// println!("tasks: {}", self.tasks.borrow().len()); // advance all tasks
// advance other tasks let next_tasks = self.tasks.replace_with(|tasks| Vec::with_capacity(tasks.len()));
let next_task = self.tasks.borrow_mut().pop_front(); for mut task in next_tasks.into_iter() {
if let Some(mut task) = next_task {
// NOTE we don't need a CAS operation here because `wake` invocations that come from // NOTE we don't need a CAS operation here because `wake` invocations that come from
// interrupt handlers (the only source of 'race conditions' (!= data races)) are // interrupt handlers (the only source of 'race conditions' (!= data races)) are
// "oneshot": they'll issue a `wake` and then disable themselves to not run again // "oneshot": they'll issue a `wake` and then disable themselves to not run again
@ -106,7 +105,7 @@ impl Executor {
} }
} }
// Requeue // Requeue
self.tasks.borrow_mut().push_back(task); self.tasks.borrow_mut().push(task);
} }
// // try to sleep; this will be a no-op if any of the previous tasks generated a SEV or an // // try to sleep; this will be a no-op if any of the previous tasks generated a SEV or an
@ -119,7 +118,7 @@ impl Executor {
pub fn spawn(&self, f: impl Future + 'static) { pub fn spawn(&self, f: impl Future + 'static) {
let task = Box::pin(Task::new(f)); let task = Box::pin(Task::new(f));
self.tasks.borrow_mut().push_back(task); self.tasks.borrow_mut().push(task);
} }
} }