From d2f91eac2576999ff076165c6ed67f65e45ddb0a Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 31 Mar 2020 01:16:58 +0200 Subject: [PATCH] libasync: start smoltcp support --- Cargo.lock | 21 +++++- experiments/src/main.rs | 107 +++++++++++++++++------------ libasync/Cargo.toml | 5 ++ libasync/src/lib.rs | 2 + libasync/src/smoltcp/mod.rs | 90 ++++++++++++++++++++++++ libasync/src/smoltcp/tcp_stream.rs | 91 ++++++++++++++++++++++++ 6 files changed, 269 insertions(+), 47 deletions(-) create mode 100644 libasync/src/smoltcp/mod.rs create mode 100644 libasync/src/smoltcp/tcp_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 8b0d2f4..55d751a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,6 +15,11 @@ name = "byteorder" version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "compiler_builtins" version = "0.1.26" @@ -38,6 +43,7 @@ dependencies = [ "libboard_zynq 0.0.0", "libcortex_a9 0.0.0", "pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)", + "smoltcp 0.6.0", ] [[package]] @@ -47,7 +53,8 @@ dependencies = [ "bit_field 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)", "libcortex_a9 0.0.0", "libregister 0.0.0", - "smoltcp 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "smoltcp 0.6.0", "volatile-register 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", ] @@ -85,6 +92,14 @@ name = "linked_list_allocator" version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "log" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "managed" version = "0.7.1" @@ -103,7 +118,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "smoltcp" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)", @@ -127,11 +141,12 @@ dependencies = [ "checksum bit_field 0.10.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a165d606cf084741d4ac3a28fb6e9b1eb0bd31f6cd999098cfddb0b2ab381dc0" "checksum bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" "checksum byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" +"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" "checksum compiler_builtins 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "036b035e9ebcd705affece16319223d19f229e2358be6e3b7b094e57193312e6" "checksum linked_list_allocator 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "5825aea823c659d0fdcdbe8c9b78baf56f3a10365d783db874f6d360df72626f" +"checksum log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)" = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" "checksum managed 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "fdcec5e97041c7f0f1c5b7d93f12e57293c831c646f4cc7a5db59460c7ea8de6" "checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" "checksum r0 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "bd7a31eed1591dcbc95d92ad7161908e72f4677f8fabf2a32ca49b4237cbf211" -"checksum smoltcp 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0fe46639fd2ec79eadf8fe719f237a7a0bd4dac5d957f1ca5bbdbc1c3c39e53a" "checksum vcell 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "876e32dcadfe563a4289e994f7cb391197f362b6315dc45e8ba4aa6f564a4b3c" "checksum volatile-register 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0d67cb4616d99b940db1d6bd28844ff97108b498a6ca850e5b6191a532063286" diff --git a/experiments/src/main.rs b/experiments/src/main.rs index 990942e..f445cac 100644 --- a/experiments/src/main.rs +++ b/experiments/src/main.rs @@ -1,14 +1,17 @@ #![no_std] #![no_main] +extern crate alloc; + use core::mem::transmute; +use alloc::collections::BTreeMap; use libcortex_a9::mutex::Mutex; use libboard_zynq::{ print, println, self as zynq, clocks::Clocks, clocks::source::{ClockSource, ArmPll, IoPll}, smoltcp::{ wire::{EthernetAddress, IpAddress, IpCidr}, - iface::{NeighborCache, EthernetInterfaceBuilder}, + iface::{NeighborCache, EthernetInterfaceBuilder, Routes}, time::Instant, socket::SocketSet, socket::{TcpSocket, TcpSocketBuffer}, @@ -18,7 +21,7 @@ use libsupport_zynq::{ ram, alloc::{vec, vec::Vec}, boot, }; -use libasync::task; +use libasync::{smoltcp::{Sockets, TcpStream}, task}; const HWADDR: [u8; 6] = [0, 0x23, 0xde, 0xea, 0xbe, 0xef]; @@ -152,78 +155,94 @@ pub fn main_core0() { let mut rx_descs = (0..RX_LEN) .map(|_| zynq::eth::rx::DescEntry::zeroed()) .collect::>(); - let mut rx_buffers = vec![[0u8; zynq::eth::MTU]; RX_LEN]; + let mut rx_buffers = vec![zynq::eth::Buffer::new(); RX_LEN]; // Number of transmission buffers (minimum is two because with // one, duplicate packet transmission occurs) const TX_LEN: usize = 8; let mut tx_descs = (0..TX_LEN) .map(|_| zynq::eth::tx::DescEntry::zeroed()) .collect::>(); - let mut tx_buffers = vec![[0u8; zynq::eth::MTU]; TX_LEN]; + let mut tx_buffers = vec![zynq::eth::Buffer::new(); TX_LEN]; let eth = eth.start_rx(&mut rx_descs, &mut rx_buffers); - //let mut eth = eth.start_tx(&mut tx_descs, &mut tx_buffers); + // let mut eth = eth.start_tx(&mut tx_descs, &mut tx_buffers); let mut eth = eth.start_tx( // HACK unsafe { transmute(tx_descs.as_mut_slice()) }, unsafe { transmute(tx_buffers.as_mut_slice()) }, ); + // loop { + // match eth.recv_next() { + // Ok(None) => {}, + // Ok(Some(pkt)) => println!("received {} bytes", pkt.len()), + // Err(e) => println!("e: {:?}", e), + // } + // } + println!("iface..."); let ethernet_addr = EthernetAddress(HWADDR); // IP stack let local_addr = IpAddress::v4(192, 168, 1, 51); let mut ip_addrs = [IpCidr::new(local_addr, 24)]; + let mut routes_storage = vec![None; 4]; + let routes = Routes::new(/*BTreeMap::new()*/ &mut routes_storage[..]); let mut neighbor_storage = vec![None; 256]; let neighbor_cache = NeighborCache::new(&mut neighbor_storage[..]); let mut iface = EthernetInterfaceBuilder::new(&mut eth) .ethernet_addr(ethernet_addr) .ip_addrs(&mut ip_addrs[..]) + .routes(routes) .neighbor_cache(neighbor_cache) .finalize(); - let mut sockets_storage = [ - None, None, None, None, - None, None, None, None - ]; - let mut sockets = SocketSet::new(&mut sockets_storage[..]); - // taken from example code for smoltcp - let mut tcp_server_rx_data = vec![0; 512 * 1024]; - let mut tcp_server_tx_data = vec![0; 512 * 1024]; - let tcp_rx_buffer = TcpSocketBuffer::new(&mut tcp_server_rx_data[..]); - let tcp_tx_buffer = TcpSocketBuffer::new(&mut tcp_server_tx_data[..]); - let tcp_socket = TcpSocket::new(tcp_rx_buffer, tcp_tx_buffer); - let tcp_handle = sockets.add(tcp_socket); + // TODO: compare with ps7_init + + println!("Sockets init..."); + Sockets::init(32); /// `chargen` const TCP_PORT: u16 = 19; - - let mut time = 0u32; - loop { - time += 1; - let timestamp = Instant::from_millis(time); - - match iface.poll(&mut sockets, timestamp) { - Ok(_) => {}, - Err(e) => { - println!("poll error: {}", e); - } + task::spawn(async { + println!("listening"); + while let socket = TcpStream::listen(TCP_PORT, 2048, 2048).await { + println!("got connection"); + task::spawn(async { + println!("spawned for connection"); + // while l + drop(socket); + }); } + println!("done?"); + }); - // (mostly) taken from smoltcp example: TCP echo server - let mut socket = sockets.get::(tcp_handle); - if !socket.is_open() { - socket.listen(TCP_PORT).unwrap() - } - if socket.may_recv() && socket.can_send() { - socket.recv(|buf| { - let len = buf.len().min(4096); - let buffer = buf[..len].iter().cloned().collect::>(); - (len, buffer) - }) - .and_then(|buffer| socket.send_slice(&buffer[..])) - .map(|_| {}) - .unwrap_or_else(|e| println!("tcp: {:?}", e)); + Sockets::run(&mut iface); + // let mut time = 0u32; + // loop { + // time += 1; + // let timestamp = Instant::from_millis(time); - } - } + // match iface.poll(&mut sockets, timestamp) { + // Ok(_) => {}, + // Err(e) => { + // println!("poll error: {}", e); + // } + // } + + // // (mostly) taken from smoltcp example: TCP echo server + // let mut socket = sockets.get::(tcp_handle); + // if !socket.is_open() { + // socket.listen(TCP_PORT).unwrap() + // } + // if socket.may_recv() && socket.can_send() { + // socket.recv(|buf| { + // let len = buf.len().min(4096); + // let buffer = buf[..len].iter().cloned().collect::>(); + // (len, buffer) + // }) + // .and_then(|buffer| socket.send_slice(&buffer[..])) + // .map(|_| {}) + // .unwrap_or_else(|e| println!("tcp: {:?}", e)); + + // } + // } // #[allow(unreachable_code)] // drop(tx_descs); diff --git a/libasync/Cargo.toml b/libasync/Cargo.toml index 86f6f22..8b0e394 100644 --- a/libasync/Cargo.toml +++ b/libasync/Cargo.toml @@ -11,3 +11,8 @@ pin-utils = "0.1.0-alpha.4" libcortex_a9 = { path = "../libcortex_a9" } # TODO: delete libboard_zynq = { path = "../libboard_zynq" } + +[dependencies.smoltcp] +version = "0.6" +default-features = false +features = ["alloc"] diff --git a/libasync/src/lib.rs b/libasync/src/lib.rs index 37fee31..0478fb3 100644 --- a/libasync/src/lib.rs +++ b/libasync/src/lib.rs @@ -4,3 +4,5 @@ extern crate alloc; pub mod task; pub mod executor; + +pub mod smoltcp; diff --git a/libasync/src/smoltcp/mod.rs b/libasync/src/smoltcp/mod.rs new file mode 100644 index 0000000..e908348 --- /dev/null +++ b/libasync/src/smoltcp/mod.rs @@ -0,0 +1,90 @@ +use core::{ + cell::RefCell, + task::Waker, +}; +use alloc::{vec, vec::Vec}; +use smoltcp::{ + iface::EthernetInterface, + phy::Device, + socket::{ + SocketSet, SocketHandle, + TcpSocketBuffer, TcpSocket, + }, + time::Instant, +}; +use libboard_zynq::println; +use crate::task; + +mod tcp_stream; +pub use tcp_stream::TcpStream; + +static mut SOCKETS: Option = None; + +pub struct Sockets { + sockets: RefCell>, + wakers: RefCell>, +} + +impl Sockets { + pub fn init(max_sockets: usize) { + println!("initializing {} sockets", max_sockets); + let mut sockets_storage = Vec::with_capacity(max_sockets); + for _ in 0..max_sockets { + sockets_storage.push(None); + } + let sockets = RefCell::new(SocketSet::new(sockets_storage)); + + let wakers = RefCell::new(Vec::new()); + + let instance = Sockets { + sockets, + wakers, + }; + println!("sockets initialized"); + unsafe { SOCKETS = Some(instance); } + } + + /// Block and run executor indefinitely while polling the smoltcp + /// iface + pub fn run<'b, 'c, 'e, D: for<'d> Device<'d>>(iface: &mut EthernetInterface<'b, 'c, 'e, D>) { + task::block_on(async { + loop { + Self::instance().poll(iface); + task::r#yield().await; + } + }); + } + + pub(crate) fn instance() -> &'static Self { + unsafe { SOCKETS.as_ref().expect("Sockets") } + } + + fn poll<'b, 'c, 'e, D: for<'d> Device<'d>>(&self, iface: &mut EthernetInterface<'b, 'c, 'e, D>) { + // TODO: + let instant = Instant::from_millis(0); + let processed = { + let mut sockets = self.sockets.borrow_mut(); + let r = iface.poll(&mut sockets, instant); + if r != Ok(false) { println!("poll: {:?}", r); } + match r { + Ok(processed) => processed, + Err(_) => true, + } + }; + if processed { + let mut wakers = self.wakers.borrow_mut(); + println!("wakeup of {}", wakers.len()); + for waker in wakers.drain(..) { + waker.wake(); + } + } + } + + /// TODO: this was called through eg. TcpStream, another poll() + /// might want to send packets before sleeping for an interrupt. + pub(crate) fn register_waker(waker: Waker) { + println!("register_waker"); + Self::instance().wakers.borrow_mut() + .push(waker); + } +} diff --git a/libasync/src/smoltcp/tcp_stream.rs b/libasync/src/smoltcp/tcp_stream.rs new file mode 100644 index 0000000..cb8645a --- /dev/null +++ b/libasync/src/smoltcp/tcp_stream.rs @@ -0,0 +1,91 @@ +use core::{ + cell::{RefCell, UnsafeCell}, + future::Future, + mem::MaybeUninit, + pin::Pin, + sync::atomic::{AtomicBool, Ordering}, + task::{Context, Poll}, +}; +use alloc::vec; +use smoltcp::{ + iface::EthernetInterface, + phy::Device, + socket::{ + SocketSet, SocketHandle, SocketRef, + TcpSocketBuffer, TcpSocket, + }, + time::Instant, +}; +use libboard_zynq::println; +use super::Sockets; + +pub struct TcpStream { + handle: SocketHandle, +} + +impl TcpStream { + fn new(rx_bufsize: usize, tx_bufsize: usize) -> Self { + let rx_buffer = TcpSocketBuffer::new(vec![0u8; rx_bufsize]); + let tx_buffer = TcpSocketBuffer::new(vec![0u8; tx_bufsize]); + let socket = TcpSocket::new(rx_buffer, tx_buffer); + let handle = Sockets::instance().sockets.borrow_mut() + .add(socket); + TcpStream { handle } + } + + fn with_socket(&self, f: F) -> R + where + F: FnOnce(SocketRef) -> R, + { + println!("with_socket"); + let mut sockets = Sockets::instance().sockets.borrow_mut(); + let socket_ref = sockets.get::(self.handle); + f(socket_ref) + } + + pub async fn listen(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self { + struct Accept { + stream: Option, + } + + impl Future for Accept { + type Output = TcpStream; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let is_active = self.stream.as_ref() + .map(|s| s.with_socket(|s| s.is_active())) + .unwrap_or(true); + println!("is_active={:?}", is_active); + if is_active { + Poll::Ready(self.stream.take().unwrap()) + } else { + println!("register_waker"); + Sockets::register_waker(cx.waker().clone()); + + //asm::sev(); + Poll::Pending + } + } + } + + let stream = Self::new(rx_bufsize, tx_bufsize); + stream.with_socket(|mut s| s.listen(port)).expect("listen"); + println!("listening on {}", port); + Accept { + stream: Some(stream), + }.await + } + + pub async fn recv(&self) { + // self.socket(); + } +} + +impl Drop for TcpStream { + fn drop(&mut self) { + // TODO: verify + println!("tcpstream drop"); + Sockets::instance().sockets.borrow_mut() + .remove(self.handle); + } +}