From ab1404488c0e4ee05980197cc90fee09ddef4cc6 Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 31 Mar 2020 22:34:32 +0200 Subject: [PATCH] libasync: fix TcpStream, implement recv()+send() --- experiments/src/main.rs | 43 +++++++-- libasync/Cargo.toml | 2 - libasync/src/executor.rs | 2 - libasync/src/smoltcp/mod.rs | 16 +--- libasync/src/smoltcp/tcp_stream.rs | 145 +++++++++++++++++++++++++---- 5 files changed, 168 insertions(+), 40 deletions(-) diff --git a/experiments/src/main.rs b/experiments/src/main.rs index 2281a35..54eab76 100644 --- a/experiments/src/main.rs +++ b/experiments/src/main.rs @@ -3,13 +3,14 @@ extern crate alloc; -use core::mem::transmute; -use alloc::collections::BTreeMap; +use core::{mem::transmute, task::Poll}; +use alloc::{borrow::ToOwned, collections::BTreeMap, format}; use libcortex_a9::mutex::Mutex; use libboard_zynq::{ print, println, self as zynq, clocks::Clocks, clocks::source::{ClockSource, ArmPll, IoPll}, smoltcp::{ + self, wire::{EthernetAddress, IpAddress, IpCidr}, iface::{NeighborCache, EthernetInterfaceBuilder, Routes}, time::Instant, @@ -199,18 +200,46 @@ pub fn main_core0() { // TODO: compare with ps7_init - println!("Sockets init..."); Sockets::init(32); /// `chargen` const TCP_PORT: u16 = 19; + async fn handle_connection(socket: TcpStream) -> smoltcp::Result<()> { + socket.send("Enter your name: ".bytes()).await?; + let name = socket.recv(|buf| { + if buf.len() > 100 { + // Too much input, consume all + Poll::Ready((buf.len(), None)) + } else { + for (i, b) in buf.iter().enumerate() { + if *b == '\n' as u8 { + return match core::str::from_utf8(&buf[0..i]) { + Ok(name) => + Poll::Ready((i + 1, Some(name.to_owned()))), + Err(_) => + Poll::Ready((i + 1, None)) + }; + } + } + Poll::Pending + } + }).await?; + match name { + Some(name) => + socket.send(format!("Hello {}!\n", name).bytes()).await?, + None => + socket.send("I had trouble reading your name.\n".bytes()).await?, + } + socket.flush().await; + Ok(()) + } + task::spawn(async { println!("listening"); while let socket = TcpStream::listen(TCP_PORT, 2048, 2048).await { - println!("got connection"); task::spawn(async { - println!("spawned for connection"); - // while l - drop(socket); + handle_connection(socket) + .await + .map_err(|e| println!("Connection: {:?}", e)); }); } println!("done?"); diff --git a/libasync/Cargo.toml b/libasync/Cargo.toml index 8b0e394..3858977 100644 --- a/libasync/Cargo.toml +++ b/libasync/Cargo.toml @@ -9,8 +9,6 @@ edition = "2018" #futures = { version = "0.3", default-features = false } pin-utils = "0.1.0-alpha.4" libcortex_a9 = { path = "../libcortex_a9" } -# TODO: delete -libboard_zynq = { path = "../libboard_zynq" } [dependencies.smoltcp] version = "0.6" diff --git a/libasync/src/executor.rs b/libasync/src/executor.rs index 4fbd57e..25cd0c6 100644 --- a/libasync/src/executor.rs +++ b/libasync/src/executor.rs @@ -9,8 +9,6 @@ use core::{ use alloc::{boxed::Box, collections::VecDeque as Deque}; //use futures::future::FutureExt; use pin_utils::pin_mut; -// TODO: delete -//use libboard_zynq::println; // NOTE `*const ()` is &AtomicBool static VTABLE: RawWakerVTable = { diff --git a/libasync/src/smoltcp/mod.rs b/libasync/src/smoltcp/mod.rs index e908348..750f13f 100644 --- a/libasync/src/smoltcp/mod.rs +++ b/libasync/src/smoltcp/mod.rs @@ -2,17 +2,13 @@ use core::{ cell::RefCell, task::Waker, }; -use alloc::{vec, vec::Vec}; +use alloc::vec::Vec; use smoltcp::{ iface::EthernetInterface, phy::Device, - socket::{ - SocketSet, SocketHandle, - TcpSocketBuffer, TcpSocket, - }, + socket::SocketSet, time::Instant, }; -use libboard_zynq::println; use crate::task; mod tcp_stream; @@ -27,7 +23,6 @@ pub struct Sockets { impl Sockets { pub fn init(max_sockets: usize) { - println!("initializing {} sockets", max_sockets); let mut sockets_storage = Vec::with_capacity(max_sockets); for _ in 0..max_sockets { sockets_storage.push(None); @@ -40,7 +35,6 @@ impl Sockets { sockets, wakers, }; - println!("sockets initialized"); unsafe { SOCKETS = Some(instance); } } @@ -64,16 +58,13 @@ impl Sockets { let instant = Instant::from_millis(0); let processed = { let mut sockets = self.sockets.borrow_mut(); - let r = iface.poll(&mut sockets, instant); - if r != Ok(false) { println!("poll: {:?}", r); } - match r { + match iface.poll(&mut sockets, instant) { Ok(processed) => processed, Err(_) => true, } }; if processed { let mut wakers = self.wakers.borrow_mut(); - println!("wakeup of {}", wakers.len()); for waker in wakers.drain(..) { waker.wake(); } @@ -83,7 +74,6 @@ impl Sockets { /// TODO: this was called through eg. TcpStream, another poll() /// might want to send packets before sleeping for an interrupt. pub(crate) fn register_waker(waker: Waker) { - println!("register_waker"); Self::instance().wakers.borrow_mut() .push(waker); } diff --git a/libasync/src/smoltcp/tcp_stream.rs b/libasync/src/smoltcp/tcp_stream.rs index cb8645a..37db7be 100644 --- a/libasync/src/smoltcp/tcp_stream.rs +++ b/libasync/src/smoltcp/tcp_stream.rs @@ -1,30 +1,52 @@ +//! async TCP interface +//! +//! TODO: implement futures AsyncRead/AsyncWrite/Stream/Sink interfaces + use core::{ - cell::{RefCell, UnsafeCell}, future::Future, - mem::MaybeUninit, pin::Pin, - sync::atomic::{AtomicBool, Ordering}, task::{Context, Poll}, }; use alloc::vec; use smoltcp::{ - iface::EthernetInterface, - phy::Device, socket::{ - SocketSet, SocketHandle, SocketRef, + SocketHandle, SocketRef, TcpSocketBuffer, TcpSocket, }, - time::Instant, }; -use libboard_zynq::println; use super::Sockets; +/// References a smoltcp TcpSocket pub struct TcpStream { handle: SocketHandle, } +/// Wait while polling a stream +macro_rules! poll_stream { + ($stream: expr, $output: ty, $f: expr) => (async { + struct Adhoc<'a> { + stream: &'a TcpStream, + } + + impl<'a> Future for Adhoc<'a> { + type Output = $output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let result = self.stream.with_socket($f); + if !result.is_ready() { + Sockets::register_waker(cx.waker().clone()); + } + result + } + } + + Adhoc { stream: $stream }.await + }) +} + impl TcpStream { fn new(rx_bufsize: usize, tx_bufsize: usize) -> Self { + // TODO: Uninitialized is faster than zeroed let rx_buffer = TcpSocketBuffer::new(vec![0u8; rx_bufsize]); let tx_buffer = TcpSocketBuffer::new(vec![0u8; tx_bufsize]); let socket = TcpSocket::new(rx_buffer, tx_buffer); @@ -33,16 +55,18 @@ impl TcpStream { TcpStream { handle } } + /// Operate on the referenced TCP socket fn with_socket(&self, f: F) -> R where F: FnOnce(SocketRef) -> R, { - println!("with_socket"); let mut sockets = Sockets::instance().sockets.borrow_mut(); let socket_ref = sockets.get::(self.handle); f(socket_ref) } + /// Listen for the next incoming connection on a TCP + /// port. Succeeds on connection attempt. pub async fn listen(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self { struct Accept { stream: Option, @@ -55,11 +79,9 @@ impl TcpStream { let is_active = self.stream.as_ref() .map(|s| s.with_socket(|s| s.is_active())) .unwrap_or(true); - println!("is_active={:?}", is_active); if is_active { Poll::Ready(self.stream.take().unwrap()) } else { - println!("register_waker"); Sockets::register_waker(cx.waker().clone()); //asm::sev(); @@ -70,21 +92,112 @@ impl TcpStream { let stream = Self::new(rx_bufsize, tx_bufsize); stream.with_socket(|mut s| s.listen(port)).expect("listen"); - println!("listening on {}", port); Accept { stream: Some(stream), }.await } - pub async fn recv(&self) { - // self.socket(); + /// Probe the receive buffer + /// + /// Instead of handing you the data on the heap all at once, + /// smoltcp's read interface is wrapped so that your callback can + /// just return `Poll::Pending` if there is not enough data + /// yet. Likewise, return the amount of bytes consumed from the + /// buffer in the `Poll::Ready` result. + pub async fn recv(&self, f: F) -> smoltcp::Result + where + F: Fn(&[u8]) -> Poll<(usize, R)>, + { + struct Recv<'a, F: FnOnce(&[u8]) -> Poll<(usize, R)>, R> { + stream: &'a TcpStream, + f: F, + } + + impl<'a, F: Fn(&[u8]) -> Poll<(usize, R)>, R> Future for Recv<'a, F, R> { + type Output = smoltcp::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let result = self.stream.with_socket(|mut socket| { + socket.recv(|buf| match (self.f)(buf) { + Poll::Ready((amount, result)) => (amount, Poll::Ready(Ok(result))), + Poll::Pending => (0, Poll::Pending), + }) + }); + match result { + Ok(result) => { + if !result.is_ready() { + Sockets::register_waker(cx.waker().clone()); + } + result + } + Err(e) => + Poll::Ready(Err(e)), + } + } + } + + Recv { + stream: self, + f, + }.await + } + + /// Wait until there is any space in the socket's send queue + async fn wait_can_send(&self) -> smoltcp::Result<()> { + poll_stream!(self, smoltcp::Result<()>, |socket| { + if !socket.is_active() { + Poll::Ready(Err(smoltcp::Error::Illegal)) + } else if socket.can_send() { + Poll::Ready(Ok(())) + } else { + Poll::Pending + } + }).await + } + + /// Yields to wait for more buffer space + pub async fn send>(&self, data: I) -> Result<(), smoltcp::Error> { + let mut data = data.into_iter(); + let mut done = false; + while !done { + self.wait_can_send().await?; + + self.with_socket(|mut socket| { + socket.send(|buf| { + for i in 0..buf.len() { + if let Some(byte) = data.next() { + buf[i] = byte; + } else { + done = true; + return (i, ()) + } + } + (buf.len(), ()) + }) + })?; + } + + Ok(()) + } + + /// Wait for all queued data to be sent and ACKed + /// + /// **Warning:** this may not work as immediately as expected! The + /// other side may wait until it sends packets to you for + /// piggybacking the ACKs. + pub async fn flush(&self) { + poll_stream!(self, (), |socket| { + if socket.may_send() && socket.send_queue() > 0 { + Poll::Pending + } else { + Poll::Ready(()) + } + }).await } } impl Drop for TcpStream { fn drop(&mut self) { - // TODO: verify - println!("tcpstream drop"); Sockets::instance().sockets.borrow_mut() .remove(self.handle); }