libcortex_a9/sync_channel: new version compiled.

This commit is contained in:
pca006132 2020-07-28 12:36:16 +08:00
parent ef4fb598fb
commit b099c56569
3 changed files with 106 additions and 160 deletions

View File

@ -1,5 +1,6 @@
#![no_std] #![no_std]
#![no_main] #![no_main]
#![feature(const_in_array_repeat_expressions)]
extern crate alloc; extern crate alloc;
@ -26,7 +27,7 @@ use libboard_zynq::{
use libboard_zynq::ps7_init; use libboard_zynq::ps7_init;
use libcortex_a9::{ use libcortex_a9::{
mutex::Mutex, mutex::Mutex,
sync_channel::{self, sync_channel}, sync_channel,
}; };
use libregister::RegisterR; use libregister::RegisterR;
use libsupport_zynq::{ use libsupport_zynq::{
@ -160,9 +161,9 @@ pub fn main_core0() {
let core1 = boot::Core1::start(false); let core1 = boot::Core1::start(false);
let (mut core1_req, rx) = sync_channel(10); let (mut core1_req, rx) = sync_channel!(usize, 10);
*CORE1_REQ.lock() = Some(rx); *CORE1_REQ.lock() = Some(rx);
let (tx, mut core1_res) = sync_channel(10); let (tx, mut core1_res) = sync_channel!(usize, 10);
*CORE1_RES.lock() = Some(tx); *CORE1_RES.lock() = Some(tx);
task::block_on(async { task::block_on(async {
for i in 0..10 { for i in 0..10 {
@ -285,7 +286,7 @@ pub fn main_core1() {
let mut res = res.unwrap(); let mut res = res.unwrap();
for i in req { for i in req {
res.send(*i * *i); res.send(i * i);
} }
println!("core1 done!"); println!("core1 done!");

View File

@ -1,6 +1,7 @@
#![no_std] #![no_std]
#![feature(llvm_asm, global_asm)] #![feature(llvm_asm, global_asm)]
#![feature(never_type)] #![feature(never_type)]
#![feature(const_fn)]
extern crate alloc; extern crate alloc;

View File

@ -1,115 +1,75 @@
use core::{ use core::{
future::Future,
pin::Pin, pin::Pin,
ptr::null_mut, future::Future,
sync::atomic::{AtomicPtr, Ordering}, ptr::drop_in_place,
sync::atomic::{AtomicPtr, AtomicUsize, Ordering},
task::{Context, Poll}, task::{Context, Poll},
}; };
use alloc::{ use alloc::boxed::Box;
boxed::Box,
sync::Arc,
vec::Vec,
};
use super::asm::*; use super::asm::*;
pub struct Sender<'a, T> where T: Clone {
type Channel<T> = Vec<AtomicPtr<T>>; list: &'a [AtomicPtr<T>],
write: &'a AtomicUsize,
/// Create a bounded channel read: &'a AtomicUsize,
///
/// Returns `(tx, rx)` where one should be used one the local core,
/// and the other is to be shared with another core.
pub fn sync_channel<T>(bound: usize) -> (Sender<T>, Receiver<T>) {
// allow for bound=0
let len = bound + 1;
let mut channel = Vec::with_capacity(len);
for _ in 0..len {
channel.push(AtomicPtr::default());
}
let channel = Arc::new(channel);
let sender = Sender {
channel: channel.clone(),
pos: 0,
};
let receiver = Receiver {
channel: channel,
pos: 0,
};
(sender, receiver)
} }
/// Sending half of a channel pub struct Receiver<'a, T> where T: Clone {
pub struct Sender<T> { list: &'a [AtomicPtr<T>],
channel: Arc<Channel<T>>, write: &'a AtomicUsize,
pos: usize, read: &'a AtomicUsize,
} }
impl<T> Sender<T> { impl<'a, T> Sender<'a, T> where T: Clone {
/// Blocking send pub const fn new(list: &'static [AtomicPtr<T>], write: &'static AtomicUsize, read: &'static AtomicUsize) -> Self {
pub fn send<B: Into<Box<T>>>(&mut self, content: B) { Sender {list, write, read}
let ptr = Box::into_raw(content.into());
let entry = &self.channel[self.pos];
// try to write the new pointer if the current pointer is
// NULL, retrying while it is not NULL
while entry.compare_and_swap(null_mut(), ptr, Ordering::Acquire) != null_mut() {
// power-saving
wfe();
}
dsb();
// wake power-saving receivers
sev();
// advance
self.pos += 1;
// wrap
if self.pos >= self.channel.len() {
self.pos = 0;
}
} }
/// Non-blocking send, handing you back ownership of the content on **failure** pub fn try_send<B: Into<Box<T>>>(&mut self, content: B) -> Result<(), B> {
pub fn try_send<B: Into<Box<T>>>(&mut self, content: B) -> Option<Box<T>> { let write = self.write.load(Ordering::Relaxed);
let ptr = Box::into_raw(content.into()); if (write + 1) % self.list.len() == self.read.load(Ordering::Acquire) {
let entry = &self.channel[self.pos]; Err(content)
// try to write the new pointer if the current pointer is
// NULL
if entry.compare_and_swap(null_mut(), ptr, Ordering::Acquire) == null_mut() {
dsb();
// wake power-saving receivers
sev();
// advance
self.pos += 1;
// wrap
if self.pos >= self.channel.len() {
self.pos = 0;
}
// success
None
} else { } else {
let content = unsafe { Box::from_raw(ptr) }; let ptr = Box::into_raw(content.into());
// failure let entry = &self.list[write];
Some(content) let prev = entry.swap(ptr, Ordering::Relaxed);
// we allow other end get it first
self.write.store((write + 1) % self.list.len(), Ordering::Release);
// wake up other core, actually I wonder if the dsb is really needed...
dsb();
sev();
if !prev.is_null() {
unsafe {
drop_in_place(prev);
}
}
Ok(())
}
}
pub fn send<B: Into<Box<T>>>(&mut self, content: B) {
let mut content = content;
while let Err(back) = self.try_send(content) {
content = back;
wfe();
} }
} }
pub async fn async_send<B: Into<Box<T>>>(&mut self, content: B) { pub async fn async_send<B: Into<Box<T>>>(&mut self, content: B) {
struct Send<'a, T> { struct Send<'a, 'b, T> where T: Clone, 'b: 'a {
sender: &'a mut Sender<T>, sender: &'a mut Sender<'b, T>,
content: Option<Box<T>>, content: Result<(), Box<T>>,
} }
impl<T> Future for Send<'_, T> { impl<T> Future for Send<'_, '_, T> where T: Clone {
type Output = (); type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.content.take() { match core::mem::replace(&mut self.content, Ok(())) {
Some(content) => { Err(content) => {
if let Some(content) = self.sender.try_send(content) { if let Err(content) = self.sender.try_send(content) {
// failure // failure
self.content = Some(content); self.content = Err(content);
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
Poll::Pending Poll::Pending
} else { } else {
@ -117,93 +77,61 @@ impl<T> Sender<T> {
Poll::Ready(()) Poll::Ready(())
} }
} }
None => panic!("Send future polled after success"), Ok(_) => panic!("Send future polled after success"),
} }
} }
} }
Send { Send {
sender: self, sender: self,
content: Some(content.into()), content: Err(content.into()),
}.await }.await
} }
} }
impl<'a, T> Receiver<'a, T> where T: Clone {
pub const fn new(list: &'static [AtomicPtr<T>], write: &'static AtomicUsize, read: &'static AtomicUsize) -> Self {
Receiver {list, write, read}
}
pub fn try_recv(&mut self) -> Result<T, ()> {
let read = self.read.load(Ordering::Relaxed);
if read == self.write.load(Ordering::Acquire) {
Err(())
} else {
let entry = &self.list[read];
let data = unsafe {
// we cannot deallocate the box
Box::leak(Box::from_raw(entry.load(Ordering::Relaxed)))
};
let result = data.clone();
self.read.store((read + 1) % self.list.len(), Ordering::Release);
// wake up other core, still idk if the dsb is needed...
dsb();
sev();
Ok(result)
}
}
pub fn recv(&mut self) -> T {
/// Receiving half of a channel
pub struct Receiver<T> {
channel: Arc<Channel<T>>,
pos: usize,
}
impl<T> Receiver<T> {
/// Blocking receive
pub fn recv(&mut self) -> Box<T> {
let entry = &self.channel[self.pos];
loop { loop {
dmb(); if let Ok(data) = self.try_recv() {
let ptr = entry.swap(null_mut(), Ordering::Release); return data;
if ptr != null_mut() {
dsb();
// wake power-saving senders
sev();
let content = unsafe { Box::from_raw(ptr) };
// advance
self.pos += 1;
// wrap
if self.pos >= self.channel.len() {
self.pos = 0;
}
return content;
} }
// power-saving
wfe(); wfe();
} }
} }
/// Non-blocking receive pub async fn async_recv(&mut self) -> T {
pub fn try_recv(&mut self) -> Option<Box<T>> { struct Recv<'a, 'b, T> where T: Clone, 'b: 'a {
let entry = &self.channel[self.pos]; receiver: &'a mut Receiver<'b, T>,
dmb();
let ptr = entry.swap(null_mut(), Ordering::Release);
if ptr != null_mut() {
dsb();
// wake power-saving senders
sev();
let content = unsafe { Box::from_raw(ptr) };
// advance
self.pos += 1;
// wrap
if self.pos >= self.channel.len() {
self.pos = 0;
}
Some(content)
} else {
None
}
}
pub async fn async_recv(&mut self) -> Box<T> {
struct Recv<'a, T> {
receiver: &'a mut Receiver<T>,
} }
impl<T> Future for Recv<'_, T> { impl<T> Future for Recv<'_, '_, T> where T: Clone {
type Output = Box<T>; type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(content) = self.receiver.try_recv() { if let Ok(content) = self.receiver.try_recv() {
Poll::Ready(content) Poll::Ready(content)
} else { } else {
cx.waker().wake_by_ref(); cx.waker().wake_by_ref();
@ -218,10 +146,26 @@ impl<T> Receiver<T> {
} }
} }
impl<T> Iterator for Receiver<T> { impl<'a, T> Iterator for Receiver<'a, T> where T: Clone {
type Item = Box<T>; type Item = T;
fn next(&mut self) -> Option<Self::Item> { fn next(&mut self) -> Option<Self::Item> {
Some(self.recv()) Some(self.recv())
} }
} }
#[macro_export]
/// Macro for initializing the sync_channel with static buffer and indexes.
/// Note that this requires `#![feature(const_in_array_repeat_expressions)]`
macro_rules! sync_channel {
($t: ty, $cap: expr) => {
{
use core::sync::atomic::{AtomicUsize, AtomicPtr};
use $crate::sync_channel::{Sender, Receiver};
static LIST: [AtomicPtr<$t>; $cap + 1] = [AtomicPtr::new(core::ptr::null_mut()); $cap + 1];
static WRITE: AtomicUsize = AtomicUsize::new(0);
static READ: AtomicUsize = AtomicUsize::new(0);
(Sender::new(&LIST, &WRITE, &READ), Receiver::new(&LIST, &WRITE, &READ))
}
};
}