From 0000575ce0eadaf02d1f433b1dc3edeb126fca1f Mon Sep 17 00:00:00 2001 From: Astro Date: Mon, 13 Apr 2020 01:24:37 +0200 Subject: [PATCH] libasync: add async_send/async_recv methods --- experiments/src/main.rs | 51 ++++++++------ libcortex_a9/src/sync_channel.rs | 115 +++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 21 deletions(-) diff --git a/experiments/src/main.rs b/experiments/src/main.rs index 189fa5a..72bf6b9 100644 --- a/experiments/src/main.rs +++ b/experiments/src/main.rs @@ -110,8 +110,12 @@ pub fn main_core0() { flash = flash_io.stop(); } - task::spawn(async { + let (mut tx, mut rx) = sync_channel::sync_channel(0); + task::spawn(async move { println!("outer task"); + while let Some(item) = *rx.async_recv().await { + println!("received {}", item); + } }); task::spawn(async { for i in 1..=3 { @@ -127,25 +131,26 @@ pub fn main_core0() { for i in 1..=10 { println!("yield {}", i); task::r#yield().await; + tx.async_send(Some(i)).await; } + tx.async_send(None).await; }); let core1_stack = unsafe { &mut STACK_CORE1[..] }; println!("{} bytes stack for core1", core1_stack.len()); let core1 = boot::Core1::start(core1_stack); - - let (tx, mut rx) = sync_channel(10); - *SHARED.lock() = Some(tx); - for (i, r) in rx.enumerate() { - // println!("Recvd {}", r); - if i != *r { - println!("Expected {}, received {}", i, r); + let (mut core1_req, rx) = sync_channel(10); + *CORE1_REQ.lock() = Some(rx); + let (tx, mut core1_res) = sync_channel(10); + *CORE1_RES.lock() = Some(tx); + task::block_on(async { + for i in 0..10 { + core1_req.async_send(i).await; + let j = core1_res.async_recv().await; + println!("{} -> {}", i, j); } - if i % 100000 == 0 { - println!("{} Ok", i); - } - } + }); core1.reset(); libcortex_a9::asm::dsb(); @@ -249,23 +254,27 @@ pub fn main_core0() { }); } -static SHARED: Mutex>> = Mutex::new(None); +static CORE1_REQ: Mutex>> = Mutex::new(None); +static CORE1_RES: Mutex>> = Mutex::new(None); static DONE: Mutex = Mutex::new(false); #[no_mangle] pub fn main_core1() { println!("Hello from core1!"); - let mut tx = None; - while tx.is_none() { - tx = SHARED.lock().take(); + let mut req = None; + while req.is_none() { + req = CORE1_REQ.lock().take(); } - println!("Core1 got tx"); - let mut tx = tx.unwrap(); + let mut req = req.unwrap(); + let mut res = None; + while res.is_none() { + res = CORE1_RES.lock().take(); + } + let mut res = res.unwrap(); - for i in 0.. { - // println!("S {}", i); - tx.send(i); + for i in req { + res.send(*i * *i); } println!("core1 done!"); diff --git a/libcortex_a9/src/sync_channel.rs b/libcortex_a9/src/sync_channel.rs index 7b4e830..1cf5a06 100644 --- a/libcortex_a9/src/sync_channel.rs +++ b/libcortex_a9/src/sync_channel.rs @@ -1,6 +1,9 @@ use core::{ + future::Future, + pin::Pin, ptr::null_mut, sync::atomic::{AtomicPtr, Ordering}, + task::{Context, Poll}, }; use alloc::{ boxed::Box, @@ -64,8 +67,71 @@ impl Sender { self.pos = 0; } } + + /// Non-blocking send, handing you back ownership of the ocntent on **failure** + pub fn try_send>>(&mut self, content: B) -> Option> { + let ptr = Box::into_raw(content.into()); + let entry = &self.channel[self.pos]; + // try to write the new pointer if the current pointer is + // NULL + if entry.compare_and_swap(null_mut(), ptr, Ordering::Acquire) == null_mut() { + dsb(); + // wake power-saving receivers + sev(); + + // advance + self.pos += 1; + // wrap + if self.pos >= self.channel.len() { + self.pos = 0; + } + + // success + None + } else { + let content = unsafe { Box::from_raw(ptr) }; + // failure + Some(content) + } + } + + pub async fn async_send>>(&mut self, content: B) { + struct Send<'a, T> { + sender: &'a mut Sender, + content: Option>, + } + + impl Future for Send<'_, T> { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.content.take() { + Some(content) => { + if let Some(content) = self.sender.try_send(content) { + // failure + self.content = Some(content); + cx.waker().wake_by_ref(); + Poll::Pending + } else { + // success + Poll::Ready(()) + } + } + None => panic!("Send future polled after success"), + } + } + } + + Send { + sender: self, + content: Some(content.into()), + }.await + } } + + + /// Receiving half of a channel pub struct Receiver { channel: Arc>, @@ -101,6 +167,55 @@ impl Receiver { wfe(); } } + + /// Non-blocking receive + pub fn try_recv(&mut self) -> Option> { + let entry = &self.channel[self.pos]; + + dmb(); + let ptr = entry.swap(null_mut(), Ordering::Release); + if ptr != null_mut() { + dsb(); + // wake power-saving senders + sev(); + + let content = unsafe { Box::from_raw(ptr) }; + + // advance + self.pos += 1; + // wrap + if self.pos >= self.channel.len() { + self.pos = 0; + } + + Some(content) + } else { + None + } + } + + pub async fn async_recv(&mut self) -> Box { + struct Recv<'a, T> { + receiver: &'a mut Receiver, + } + + impl Future for Recv<'_, T> { + type Output = Box; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if let Some(content) = self.receiver.try_recv() { + Poll::Ready(content) + } else { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + + Recv { + receiver: self, + }.await + } } impl Iterator for Receiver {