libasync: add async_send/async_recv methods

This commit is contained in:
Astro 2020-04-13 01:24:37 +02:00
parent 526cfe7577
commit 0000575ce0
2 changed files with 145 additions and 21 deletions

View File

@ -110,8 +110,12 @@ pub fn main_core0() {
flash = flash_io.stop(); flash = flash_io.stop();
} }
task::spawn(async { let (mut tx, mut rx) = sync_channel::sync_channel(0);
task::spawn(async move {
println!("outer task"); println!("outer task");
while let Some(item) = *rx.async_recv().await {
println!("received {}", item);
}
}); });
task::spawn(async { task::spawn(async {
for i in 1..=3 { for i in 1..=3 {
@ -127,25 +131,26 @@ pub fn main_core0() {
for i in 1..=10 { for i in 1..=10 {
println!("yield {}", i); println!("yield {}", i);
task::r#yield().await; task::r#yield().await;
tx.async_send(Some(i)).await;
} }
tx.async_send(None).await;
}); });
let core1_stack = unsafe { &mut STACK_CORE1[..] }; let core1_stack = unsafe { &mut STACK_CORE1[..] };
println!("{} bytes stack for core1", core1_stack.len()); println!("{} bytes stack for core1", core1_stack.len());
let core1 = boot::Core1::start(core1_stack); let core1 = boot::Core1::start(core1_stack);
let (mut core1_req, rx) = sync_channel(10);
let (tx, mut rx) = sync_channel(10); *CORE1_REQ.lock() = Some(rx);
*SHARED.lock() = Some(tx); let (tx, mut core1_res) = sync_channel(10);
for (i, r) in rx.enumerate() { *CORE1_RES.lock() = Some(tx);
// println!("Recvd {}", r); task::block_on(async {
if i != *r { for i in 0..10 {
println!("Expected {}, received {}", i, r); core1_req.async_send(i).await;
} let j = core1_res.async_recv().await;
if i % 100000 == 0 { println!("{} -> {}", i, j);
println!("{} Ok", i);
}
} }
});
core1.reset(); core1.reset();
libcortex_a9::asm::dsb(); libcortex_a9::asm::dsb();
@ -249,23 +254,27 @@ pub fn main_core0() {
}); });
} }
static SHARED: Mutex<Option<sync_channel::Sender<usize>>> = Mutex::new(None); static CORE1_REQ: Mutex<Option<sync_channel::Receiver<usize>>> = Mutex::new(None);
static CORE1_RES: Mutex<Option<sync_channel::Sender<usize>>> = Mutex::new(None);
static DONE: Mutex<bool> = Mutex::new(false); static DONE: Mutex<bool> = Mutex::new(false);
#[no_mangle] #[no_mangle]
pub fn main_core1() { pub fn main_core1() {
println!("Hello from core1!"); println!("Hello from core1!");
let mut tx = None; let mut req = None;
while tx.is_none() { while req.is_none() {
tx = SHARED.lock().take(); req = CORE1_REQ.lock().take();
} }
println!("Core1 got tx"); let mut req = req.unwrap();
let mut tx = tx.unwrap(); let mut res = None;
while res.is_none() {
res = CORE1_RES.lock().take();
}
let mut res = res.unwrap();
for i in 0.. { for i in req {
// println!("S {}", i); res.send(*i * *i);
tx.send(i);
} }
println!("core1 done!"); println!("core1 done!");

View File

@ -1,6 +1,9 @@
use core::{ use core::{
future::Future,
pin::Pin,
ptr::null_mut, ptr::null_mut,
sync::atomic::{AtomicPtr, Ordering}, sync::atomic::{AtomicPtr, Ordering},
task::{Context, Poll},
}; };
use alloc::{ use alloc::{
boxed::Box, boxed::Box,
@ -64,8 +67,71 @@ impl<T> Sender<T> {
self.pos = 0; self.pos = 0;
} }
} }
/// Non-blocking send, handing you back ownership of the ocntent on **failure**
pub fn try_send<B: Into<Box<T>>>(&mut self, content: B) -> Option<Box<T>> {
let ptr = Box::into_raw(content.into());
let entry = &self.channel[self.pos];
// try to write the new pointer if the current pointer is
// NULL
if entry.compare_and_swap(null_mut(), ptr, Ordering::Acquire) == null_mut() {
dsb();
// wake power-saving receivers
sev();
// advance
self.pos += 1;
// wrap
if self.pos >= self.channel.len() {
self.pos = 0;
} }
// success
None
} else {
let content = unsafe { Box::from_raw(ptr) };
// failure
Some(content)
}
}
pub async fn async_send<B: Into<Box<T>>>(&mut self, content: B) {
struct Send<'a, T> {
sender: &'a mut Sender<T>,
content: Option<Box<T>>,
}
impl<T> Future for Send<'_, T> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.content.take() {
Some(content) => {
if let Some(content) = self.sender.try_send(content) {
// failure
self.content = Some(content);
cx.waker().wake_by_ref();
Poll::Pending
} else {
// success
Poll::Ready(())
}
}
None => panic!("Send future polled after success"),
}
}
}
Send {
sender: self,
content: Some(content.into()),
}.await
}
}
/// Receiving half of a channel /// Receiving half of a channel
pub struct Receiver<T> { pub struct Receiver<T> {
channel: Arc<Channel<T>>, channel: Arc<Channel<T>>,
@ -101,6 +167,55 @@ impl<T> Receiver<T> {
wfe(); wfe();
} }
} }
/// Non-blocking receive
pub fn try_recv(&mut self) -> Option<Box<T>> {
let entry = &self.channel[self.pos];
dmb();
let ptr = entry.swap(null_mut(), Ordering::Release);
if ptr != null_mut() {
dsb();
// wake power-saving senders
sev();
let content = unsafe { Box::from_raw(ptr) };
// advance
self.pos += 1;
// wrap
if self.pos >= self.channel.len() {
self.pos = 0;
}
Some(content)
} else {
None
}
}
pub async fn async_recv(&mut self) -> Box<T> {
struct Recv<'a, T> {
receiver: &'a mut Receiver<T>,
}
impl<T> Future for Recv<'_, T> {
type Output = Box<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(content) = self.receiver.try_recv() {
Poll::Ready(content)
} else {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
Recv {
receiver: self,
}.await
}
} }
impl<T> Iterator for Receiver<T> { impl<T> Iterator for Receiver<T> {