From b099c565690d063abe46856e659618c45880f235 Mon Sep 17 00:00:00 2001 From: pca006132 Date: Tue, 28 Jul 2020 12:36:16 +0800 Subject: [PATCH] libcortex_a9/sync_channel: new version compiled. --- experiments/src/main.rs | 9 +- libcortex_a9/src/lib.rs | 1 + libcortex_a9/src/sync_channel.rs | 256 ++++++++++++------------------- 3 files changed, 106 insertions(+), 160 deletions(-) diff --git a/experiments/src/main.rs b/experiments/src/main.rs index 323c7bd..38383a3 100644 --- a/experiments/src/main.rs +++ b/experiments/src/main.rs @@ -1,5 +1,6 @@ #![no_std] #![no_main] +#![feature(const_in_array_repeat_expressions)] extern crate alloc; @@ -26,7 +27,7 @@ use libboard_zynq::{ use libboard_zynq::ps7_init; use libcortex_a9::{ mutex::Mutex, - sync_channel::{self, sync_channel}, + sync_channel, }; use libregister::RegisterR; use libsupport_zynq::{ @@ -160,9 +161,9 @@ pub fn main_core0() { let core1 = boot::Core1::start(false); - let (mut core1_req, rx) = sync_channel(10); + let (mut core1_req, rx) = sync_channel!(usize, 10); *CORE1_REQ.lock() = Some(rx); - let (tx, mut core1_res) = sync_channel(10); + let (tx, mut core1_res) = sync_channel!(usize, 10); *CORE1_RES.lock() = Some(tx); task::block_on(async { for i in 0..10 { @@ -285,7 +286,7 @@ pub fn main_core1() { let mut res = res.unwrap(); for i in req { - res.send(*i * *i); + res.send(i * i); } println!("core1 done!"); diff --git a/libcortex_a9/src/lib.rs b/libcortex_a9/src/lib.rs index 52cb03c..bd73521 100644 --- a/libcortex_a9/src/lib.rs +++ b/libcortex_a9/src/lib.rs @@ -1,6 +1,7 @@ #![no_std] #![feature(llvm_asm, global_asm)] #![feature(never_type)] +#![feature(const_fn)] extern crate alloc; diff --git a/libcortex_a9/src/sync_channel.rs b/libcortex_a9/src/sync_channel.rs index 6a8531f..a30c315 100644 --- a/libcortex_a9/src/sync_channel.rs +++ b/libcortex_a9/src/sync_channel.rs @@ -1,115 +1,75 @@ use core::{ - future::Future, pin::Pin, - ptr::null_mut, - sync::atomic::{AtomicPtr, Ordering}, + future::Future, + ptr::drop_in_place, + sync::atomic::{AtomicPtr, AtomicUsize, Ordering}, task::{Context, Poll}, }; -use alloc::{ - boxed::Box, - sync::Arc, - vec::Vec, -}; +use alloc::boxed::Box; use super::asm::*; - -type Channel = Vec>; - -/// Create a bounded channel -/// -/// Returns `(tx, rx)` where one should be used one the local core, -/// and the other is to be shared with another core. -pub fn sync_channel(bound: usize) -> (Sender, Receiver) { - // allow for bound=0 - let len = bound + 1; - let mut channel = Vec::with_capacity(len); - for _ in 0..len { - channel.push(AtomicPtr::default()); - } - - let channel = Arc::new(channel); - let sender = Sender { - channel: channel.clone(), - pos: 0, - }; - let receiver = Receiver { - channel: channel, - pos: 0, - }; - (sender, receiver) +pub struct Sender<'a, T> where T: Clone { + list: &'a [AtomicPtr], + write: &'a AtomicUsize, + read: &'a AtomicUsize, } -/// Sending half of a channel -pub struct Sender { - channel: Arc>, - pos: usize, +pub struct Receiver<'a, T> where T: Clone { + list: &'a [AtomicPtr], + write: &'a AtomicUsize, + read: &'a AtomicUsize, } -impl Sender { - /// Blocking send - pub fn send>>(&mut self, content: B) { - let ptr = Box::into_raw(content.into()); - let entry = &self.channel[self.pos]; - // try to write the new pointer if the current pointer is - // NULL, retrying while it is not NULL - while entry.compare_and_swap(null_mut(), ptr, Ordering::Acquire) != null_mut() { - // power-saving - wfe(); - } - dsb(); - // wake power-saving receivers - sev(); - - // advance - self.pos += 1; - // wrap - if self.pos >= self.channel.len() { - self.pos = 0; - } +impl<'a, T> Sender<'a, T> where T: Clone { + pub const fn new(list: &'static [AtomicPtr], write: &'static AtomicUsize, read: &'static AtomicUsize) -> Self { + Sender {list, write, read} } - /// Non-blocking send, handing you back ownership of the content on **failure** - pub fn try_send>>(&mut self, content: B) -> Option> { - let ptr = Box::into_raw(content.into()); - let entry = &self.channel[self.pos]; - // try to write the new pointer if the current pointer is - // NULL - if entry.compare_and_swap(null_mut(), ptr, Ordering::Acquire) == null_mut() { - dsb(); - // wake power-saving receivers - sev(); - - // advance - self.pos += 1; - // wrap - if self.pos >= self.channel.len() { - self.pos = 0; - } - - // success - None + pub fn try_send>>(&mut self, content: B) -> Result<(), B> { + let write = self.write.load(Ordering::Relaxed); + if (write + 1) % self.list.len() == self.read.load(Ordering::Acquire) { + Err(content) } else { - let content = unsafe { Box::from_raw(ptr) }; - // failure - Some(content) + let ptr = Box::into_raw(content.into()); + let entry = &self.list[write]; + let prev = entry.swap(ptr, Ordering::Relaxed); + // we allow other end get it first + self.write.store((write + 1) % self.list.len(), Ordering::Release); + // wake up other core, actually I wonder if the dsb is really needed... + dsb(); + sev(); + if !prev.is_null() { + unsafe { + drop_in_place(prev); + } + } + Ok(()) + } + } + + pub fn send>>(&mut self, content: B) { + let mut content = content; + while let Err(back) = self.try_send(content) { + content = back; + wfe(); } } pub async fn async_send>>(&mut self, content: B) { - struct Send<'a, T> { - sender: &'a mut Sender, - content: Option>, + struct Send<'a, 'b, T> where T: Clone, 'b: 'a { + sender: &'a mut Sender<'b, T>, + content: Result<(), Box>, } - impl Future for Send<'_, T> { + impl Future for Send<'_, '_, T> where T: Clone { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.content.take() { - Some(content) => { - if let Some(content) = self.sender.try_send(content) { + match core::mem::replace(&mut self.content, Ok(())) { + Err(content) => { + if let Err(content) = self.sender.try_send(content) { // failure - self.content = Some(content); + self.content = Err(content); cx.waker().wake_by_ref(); Poll::Pending } else { @@ -117,93 +77,61 @@ impl Sender { Poll::Ready(()) } } - None => panic!("Send future polled after success"), + Ok(_) => panic!("Send future polled after success"), } } } Send { sender: self, - content: Some(content.into()), + content: Err(content.into()), }.await } } +impl<'a, T> Receiver<'a, T> where T: Clone { + pub const fn new(list: &'static [AtomicPtr], write: &'static AtomicUsize, read: &'static AtomicUsize) -> Self { + Receiver {list, write, read} + } + pub fn try_recv(&mut self) -> Result { + let read = self.read.load(Ordering::Relaxed); + if read == self.write.load(Ordering::Acquire) { + Err(()) + } else { + let entry = &self.list[read]; + let data = unsafe { + // we cannot deallocate the box + Box::leak(Box::from_raw(entry.load(Ordering::Relaxed))) + }; + let result = data.clone(); + self.read.store((read + 1) % self.list.len(), Ordering::Release); + // wake up other core, still idk if the dsb is needed... + dsb(); + sev(); + Ok(result) + } + } - -/// Receiving half of a channel -pub struct Receiver { - channel: Arc>, - pos: usize, -} - -impl Receiver { - /// Blocking receive - pub fn recv(&mut self) -> Box { - let entry = &self.channel[self.pos]; - + pub fn recv(&mut self) -> T { loop { - dmb(); - let ptr = entry.swap(null_mut(), Ordering::Release); - if ptr != null_mut() { - dsb(); - // wake power-saving senders - sev(); - - let content = unsafe { Box::from_raw(ptr) }; - - // advance - self.pos += 1; - // wrap - if self.pos >= self.channel.len() { - self.pos = 0; - } - - return content; + if let Ok(data) = self.try_recv() { + return data; } - - // power-saving wfe(); } } - /// Non-blocking receive - pub fn try_recv(&mut self) -> Option> { - let entry = &self.channel[self.pos]; - - dmb(); - let ptr = entry.swap(null_mut(), Ordering::Release); - if ptr != null_mut() { - dsb(); - // wake power-saving senders - sev(); - - let content = unsafe { Box::from_raw(ptr) }; - - // advance - self.pos += 1; - // wrap - if self.pos >= self.channel.len() { - self.pos = 0; - } - - Some(content) - } else { - None - } - } - - pub async fn async_recv(&mut self) -> Box { - struct Recv<'a, T> { - receiver: &'a mut Receiver, + pub async fn async_recv(&mut self) -> T { + struct Recv<'a, 'b, T> where T: Clone, 'b: 'a { + receiver: &'a mut Receiver<'b, T>, } - impl Future for Recv<'_, T> { - type Output = Box; + impl Future for Recv<'_, '_, T> where T: Clone { + type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if let Some(content) = self.receiver.try_recv() { + if let Ok(content) = self.receiver.try_recv() { Poll::Ready(content) } else { cx.waker().wake_by_ref(); @@ -218,10 +146,26 @@ impl Receiver { } } -impl Iterator for Receiver { - type Item = Box; +impl<'a, T> Iterator for Receiver<'a, T> where T: Clone { + type Item = T; fn next(&mut self) -> Option { Some(self.recv()) } } + +#[macro_export] +/// Macro for initializing the sync_channel with static buffer and indexes. +/// Note that this requires `#![feature(const_in_array_repeat_expressions)]` +macro_rules! sync_channel { + ($t: ty, $cap: expr) => { + { + use core::sync::atomic::{AtomicUsize, AtomicPtr}; + use $crate::sync_channel::{Sender, Receiver}; + static LIST: [AtomicPtr<$t>; $cap + 1] = [AtomicPtr::new(core::ptr::null_mut()); $cap + 1]; + static WRITE: AtomicUsize = AtomicUsize::new(0); + static READ: AtomicUsize = AtomicUsize::new(0); + (Sender::new(&LIST, &WRITE, &READ), Receiver::new(&LIST, &WRITE, &READ)) + } + }; +}