From c3fc9487142a7548e1b503d6f430b6baf5092399 Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 16 Apr 2020 20:28:40 +0200 Subject: [PATCH] libasync: replace TcpStream::listen() with accept(), make accept() return earlier --- experiments/src/main.rs | 18 +++++-- libasync/src/smoltcp/tcp_stream.rs | 82 +++++++++++++++--------------- 2 files changed, 55 insertions(+), 45 deletions(-) diff --git a/experiments/src/main.rs b/experiments/src/main.rs index 72bf6b9..e760054 100644 --- a/experiments/src/main.rs +++ b/experiments/src/main.rs @@ -241,10 +241,20 @@ pub fn main_core0() { Ok(()) } - TcpStream::listen(TCP_PORT, 2048, 2048, 8, |stream| async { - handle_connection(stream) - .await - .map_err(|e| println!("Connection: {:?}", e)); + let mut counter = alloc::rc::Rc::new(core::cell::RefCell::new(0)); + task::spawn(async move { + while let stream = TcpStream::accept(TCP_PORT, 2048, 2408).await.unwrap() { + let counter = counter.clone(); + task::spawn(async move { + *counter.borrow_mut() += 1; + println!("Serving {} connections", *counter.borrow()); + handle_connection(stream) + .await + .map_err(|e| println!("Connection: {:?}", e)); + *counter.borrow_mut() -= 1; + println!("Now serving {} connections", *counter.borrow()); + }); + } }); let mut time = 0u32; diff --git a/libasync/src/smoltcp/tcp_stream.rs b/libasync/src/smoltcp/tcp_stream.rs index 7fa176f..c8146ae 100644 --- a/libasync/src/smoltcp/tcp_stream.rs +++ b/libasync/src/smoltcp/tcp_stream.rs @@ -9,9 +9,10 @@ use core::{ }; use alloc::vec::Vec; use smoltcp::{ + Error, Result, socket::{ SocketHandle, SocketRef, - TcpSocketBuffer, TcpSocket, + TcpSocketBuffer, TcpSocket, TcpState, }, }; use crate::task; @@ -77,47 +78,26 @@ impl TcpStream { f(socket_ref) } - /// Spawns `backlog` tasks with listening sockets so that more - /// connections can be accepted while some are still - /// handshaking. Spawns additional tasks for each connection. - pub fn listen(port: u16, rx_bufsize: usize, tx_bufsize: usize, backlog: usize, f: F) - where - F: Fn(Self) -> R + Clone + 'static, - R: Future + 'static, - { - for _ in 0..backlog { - let f = f.clone(); - task::spawn(async move { - loop { - // Wait for new connection - let stream = TcpStream::accept(port, rx_bufsize, tx_bufsize).await; - // Spawn async task for new connection - task::spawn(f(stream)); - } - }); - } - } - /// Listen for the next incoming connection on a TCP /// port. Succeeds on connection attempt. /// /// Calling this serially in a loop will cause slow/botched /// connection attempts stall any more new connections. Use /// `listen()` with a backlog instead. - pub async fn accept(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self { + pub async fn accept(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Result { let stream = Self::new(rx_bufsize, tx_bufsize); // Set socket to listen - stream.with_socket(|mut s| s.listen(port)) - .expect("listen"); + stream.with_socket(|mut s| s.listen(port))?; // Wait for a connection poll_stream!(&stream, (), |socket| { - if socket.is_active() { + if socket.state() != TcpState::Listen { Poll::Ready(()) } else { Poll::Pending } }).await; - stream + + Ok(stream) } /// Probe the receive buffer @@ -127,7 +107,7 @@ impl TcpStream { /// just return `Poll::Pending` if there is not enough data /// yet. Likewise, return the amount of bytes consumed from the /// buffer in the `Poll::Ready` result. - pub async fn recv(&self, f: F) -> smoltcp::Result + pub async fn recv(&self, f: F) -> Result where F: Fn(&[u8]) -> Poll<(usize, R)>, { @@ -137,10 +117,14 @@ impl TcpStream { } impl<'a, F: Fn(&[u8]) -> Poll<(usize, R)>, R> Future for Recv<'a, F, R> { - type Output = smoltcp::Result; + type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let result = self.stream.with_socket(|mut socket| { + if socket_is_handhshaking(&socket) { + return Ok(Poll::Pending); + } + socket.recv(|buf| { if buf.len() > 0 { match (self.f)(buf) { @@ -156,10 +140,11 @@ impl TcpStream { }) }); match result { + Ok(Poll::Pending) => { + Sockets::register_waker(cx.waker().clone()); + Poll::Pending + } Ok(result) => { - if !result.is_ready() { - Sockets::register_waker(cx.waker().clone()); - } result } Err(e) => @@ -175,12 +160,14 @@ impl TcpStream { } /// Wait until there is any space in the socket's send queue - async fn wait_can_send(&self) -> smoltcp::Result<()> { - poll_stream!(self, smoltcp::Result<()>, |socket| { - if !socket.is_active() { - Poll::Ready(Err(smoltcp::Error::Illegal)) + async fn wait_can_send(&self) -> Result<()> { + poll_stream!(self, Result<()>, |socket| { + if socket_is_handhshaking(&socket) { + Poll::Pending } else if socket.can_send() { Poll::Ready(Ok(())) + } else if ! socket.may_send() { + Poll::Ready(Err(Error::Truncated)) } else { Poll::Pending } @@ -188,7 +175,7 @@ impl TcpStream { } /// Yields to wait for more buffer space - pub async fn send>(&self, data: I) -> Result<(), smoltcp::Error> { + pub async fn send>(&self, data: I) -> Result<()> { let mut data = data.into_iter(); let mut done = false; while !done { @@ -217,12 +204,16 @@ impl TcpStream { /// **Warning:** this may not work as immediately as expected! The /// other side may wait until it sends packets to you for /// piggybacking the ACKs. - pub async fn flush(&self) { - poll_stream!(self, (), |socket| { - if socket.may_send() && socket.send_queue() > 0 { + pub async fn flush(&self) -> Result<()> { + poll_stream!(self, Result<()>, |socket| { + if socket_is_handhshaking(&socket) { Poll::Pending + } else if socket.may_send() && socket.send_queue() > 0 { + Poll::Pending + } else if socket.may_send() { + Poll::Ready(Ok(())) } else { - Poll::Ready(()) + Poll::Ready(Err(Error::Truncated)) } }).await } @@ -236,3 +227,12 @@ impl Drop for TcpStream { .remove(self.handle); } } + +fn socket_is_handhshaking(socket: &SocketRef) -> bool { + match socket.state() { + TcpState::SynSent | TcpState::SynReceived => + true, + _ => + false, + } +}