From e54edbf32dcfe2aa01b73de4a6bd153bb183d1a4 Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 9 Apr 2020 02:49:24 +0200 Subject: [PATCH] libcortex_a9: add sync_channel --- experiments/src/main.rs | 77 +++++++++-------------- libcortex_a9/src/lib.rs | 3 + libcortex_a9/src/sync_channel.rs | 104 +++++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 48 deletions(-) create mode 100644 libcortex_a9/src/sync_channel.rs diff --git a/experiments/src/main.rs b/experiments/src/main.rs index 9aec44e..dd18d1d 100644 --- a/experiments/src/main.rs +++ b/experiments/src/main.rs @@ -5,7 +5,7 @@ extern crate alloc; use core::{mem::transmute, task::Poll}; use alloc::{borrow::ToOwned, collections::BTreeMap, format}; -use libcortex_a9::mutex::Mutex; +use libcortex_a9::{mutex::Mutex, sync_channel::{self, sync_channel}}; use libboard_zynq::{ print, println, self as zynq, clocks::Clocks, clocks::source::{ClockSource, ArmPll, IoPll}, @@ -134,17 +134,22 @@ pub fn main_core0() { println!("{} bytes stack for core1", core1_stack.len()); let core1 = boot::Core1::start(core1_stack); - for _ in 0..0x1000000 { - let mut l = SHARED.lock(); - *l += 1; - } - while !*DONE.lock() { - let x = { *SHARED.lock() }; - println!("shared: {:08X}", x); - } - let x = { *SHARED.lock() }; - println!("done shared: {:08X}", x); + let (tx, mut rx) = sync_channel(1000); + *SHARED.lock() = Some(tx); + let mut i = 0u32; + loop { + let r = rx.recv(); + // println!("Recvd {}", r); + if i != *r { + println!("Expected {}, received {}", i, r); + } + if i % 100000 == 0 { + println!("{} Ok", i); + } + + i += 1; + } core1.reset(); libcortex_a9::asm::dsb(); @@ -246,51 +251,27 @@ pub fn main_core0() { time += 1; Instant::from_millis(time) }); - // loop { - // time += 1; - // let timestamp = Instant::from_millis(time); - - // match iface.poll(&mut sockets, timestamp) { - // Ok(_) => {}, - // Err(e) => { - // println!("poll error: {}", e); - // } - // } - - // // (mostly) taken from smoltcp example: TCP echo server - // let mut socket = sockets.get::(tcp_handle); - // if !socket.is_open() { - // socket.listen(TCP_PORT).unwrap() - // } - // if socket.may_recv() && socket.can_send() { - // socket.recv(|buf| { - // let len = buf.len().min(4096); - // let buffer = buf[..len].iter().cloned().collect::>(); - // (len, buffer) - // }) - // .and_then(|buffer| socket.send_slice(&buffer[..])) - // .map(|_| {}) - // .unwrap_or_else(|e| println!("tcp: {:?}", e)); - - // } - // } - - // #[allow(unreachable_code)] - // drop(tx_descs); - // #[allow(unreachable_code)] - // drop(tx_buffers); } -static SHARED: Mutex = Mutex::new(0); +static SHARED: Mutex>> = Mutex::new(None); static DONE: Mutex = Mutex::new(false); #[no_mangle] pub fn main_core1() { println!("Hello from core1!"); - for _ in 0..0x1000000 { - let mut l = SHARED.lock(); - *l += 1; + + let mut tx = None; + while tx.is_none() { + tx = SHARED.lock().take(); } + println!("Core1 got tx"); + let mut tx = tx.unwrap(); + + for i in 0.. { + // println!("S {}", i); + tx.send(i); + } + println!("core1 done!"); *DONE.lock() = true; diff --git a/libcortex_a9/src/lib.rs b/libcortex_a9/src/lib.rs index 15bca23..8cd9aae 100644 --- a/libcortex_a9/src/lib.rs +++ b/libcortex_a9/src/lib.rs @@ -2,10 +2,13 @@ #![feature(asm, global_asm)] #![feature(never_type)] +extern crate alloc; + pub mod asm; pub mod regs; pub mod cache; pub mod mmu; pub mod mutex; +pub mod sync_channel; global_asm!(include_str!("exceptions.s")); diff --git a/libcortex_a9/src/sync_channel.rs b/libcortex_a9/src/sync_channel.rs new file mode 100644 index 0000000..c6ac8fc --- /dev/null +++ b/libcortex_a9/src/sync_channel.rs @@ -0,0 +1,104 @@ +use core::{ + ptr::null_mut, + sync::atomic::{AtomicPtr, Ordering}, +}; +use alloc::{ + boxed::Box, + sync::Arc, + vec::Vec, +}; +use super::asm::*; + + +type Channel = Vec>; + +/// Create a bounded channel +/// +/// Returns `(tx, rx)` where one should be used one the local core, +/// and the other is to be shared with another core. +pub fn sync_channel(bound: usize) -> (Sender, Receiver) { + // allow for bound=0 + let len = bound + 1; + let mut channel = Vec::with_capacity(len); + for _ in 0..len { + channel.push(AtomicPtr::default()); + } + + let channel = Arc::new(channel); + let sender = Sender { + channel: channel.clone(), + pos: 0, + }; + let receiver = Receiver { + channel: channel, + pos: 0, + }; + (sender, receiver) +} + +/// Sending half of a channel +pub struct Sender { + channel: Arc>, + pos: usize, +} + +impl Sender { + /// Blocking send + pub fn send>>(&mut self, content: B) { + let ptr = Box::into_raw(content.into()); + let entry = &self.channel[self.pos]; + // try to write the new pointer if the current pointer is + // NULL, retrying while it is not NULL + while entry.compare_and_swap(null_mut(), ptr, Ordering::Acquire) != null_mut() { + // power-saving + wfe(); + } + dsb(); + // wake power-saving receivers + sev(); + + // advance + self.pos += 1; + // wrap + if self.pos >= self.channel.len() { + self.pos = 0; + } + } +} + +/// Receiving half of a channel +pub struct Receiver { + channel: Arc>, + pos: usize, +} + +impl Receiver { + /// Blocking receive + pub fn recv(&mut self) -> Box { + let entry = &self.channel[self.pos]; + + loop { + dmb(); + let ptr = entry.swap(null_mut(), Ordering::Release); + if ptr != null_mut() { + dsb(); + // wake power-saving senders + sev(); + + let content = unsafe { Box::from_raw(ptr) }; + + // advance + self.pos += 1; + // wrap + if self.pos >= self.channel.len() { + self.pos = 0; + } + + return content; + } + + // power-saving + wfe(); + } + } +}