From 90e9a7db02ae40070208eff97d2f5ac94bf5aa7e Mon Sep 17 00:00:00 2001 From: Astro Date: Wed, 1 Apr 2020 22:55:25 +0200 Subject: [PATCH] libasync: refactor listen() into accept()+listen() with a backlog --- experiments/src/main.rs | 46 +++++++++++++----------------- libasync/src/smoltcp/tcp_stream.rs | 29 +++++++++++++++++-- 2 files changed, 47 insertions(+), 28 deletions(-) diff --git a/experiments/src/main.rs b/experiments/src/main.rs index 54eab76..bbe6d1a 100644 --- a/experiments/src/main.rs +++ b/experiments/src/main.rs @@ -203,46 +203,40 @@ pub fn main_core0() { Sockets::init(32); /// `chargen` const TCP_PORT: u16 = 19; - async fn handle_connection(socket: TcpStream) -> smoltcp::Result<()> { - socket.send("Enter your name: ".bytes()).await?; - let name = socket.recv(|buf| { + async fn handle_connection(stream: TcpStream) -> smoltcp::Result<()> { + stream.send("Enter your name: ".bytes()).await?; + let name = stream.recv(|buf| { + for (i, b) in buf.iter().enumerate() { + if *b == '\n' as u8 { + return match core::str::from_utf8(&buf[0..i]) { + Ok(name) => + Poll::Ready((i + 1, Some(name.to_owned()))), + Err(_) => + Poll::Ready((i + 1, None)) + }; + } + } if buf.len() > 100 { // Too much input, consume all Poll::Ready((buf.len(), None)) } else { - for (i, b) in buf.iter().enumerate() { - if *b == '\n' as u8 { - return match core::str::from_utf8(&buf[0..i]) { - Ok(name) => - Poll::Ready((i + 1, Some(name.to_owned()))), - Err(_) => - Poll::Ready((i + 1, None)) - }; - } - } Poll::Pending } }).await?; match name { Some(name) => - socket.send(format!("Hello {}!\n", name).bytes()).await?, + stream.send(format!("Hello {}!\n", name).bytes()).await?, None => - socket.send("I had trouble reading your name.\n".bytes()).await?, + stream.send("I had trouble reading your name.\n".bytes()).await?, } - socket.flush().await; + stream.flush().await; Ok(()) } - task::spawn(async { - println!("listening"); - while let socket = TcpStream::listen(TCP_PORT, 2048, 2048).await { - task::spawn(async { - handle_connection(socket) - .await - .map_err(|e| println!("Connection: {:?}", e)); - }); - } - println!("done?"); + TcpStream::listen(TCP_PORT, 2048, 2048, 8, |stream| async { + handle_connection(stream) + .await + .map_err(|e| println!("Connection: {:?}", e)); }); Sockets::run(&mut iface); diff --git a/libasync/src/smoltcp/tcp_stream.rs b/libasync/src/smoltcp/tcp_stream.rs index 37db7be..7795188 100644 --- a/libasync/src/smoltcp/tcp_stream.rs +++ b/libasync/src/smoltcp/tcp_stream.rs @@ -7,13 +7,14 @@ use core::{ pin::Pin, task::{Context, Poll}, }; -use alloc::vec; +use alloc::{boxed::Box, vec, vec::Vec}; use smoltcp::{ socket::{ SocketHandle, SocketRef, TcpSocketBuffer, TcpSocket, }, }; +use crate::task; use super::Sockets; /// References a smoltcp TcpSocket @@ -65,9 +66,33 @@ impl TcpStream { f(socket_ref) } + /// Spawns `backlog` tasks with listening sockets so that more + /// connections can be accepted while some are still + /// handshaking. Spawns additional tasks for each connection. + pub fn listen(port: u16, rx_bufsize: usize, tx_bufsize: usize, backlog: usize, f: F) + where + F: Fn(Self) -> R + Copy + 'static, + R: Future + 'static, + { + for _ in 0..backlog { + task::spawn(async move { + loop { + // Wait for new connection + let stream = TcpStream::accept(port, rx_bufsize, tx_bufsize).await; + // Spawn async task for new connection + task::spawn(f(stream)); + } + }); + } + } + /// Listen for the next incoming connection on a TCP /// port. Succeeds on connection attempt. - pub async fn listen(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self { + /// + /// Calling this serially in a loop will cause slow/botched + /// connection attempts stall any more new connections. Use + /// `listen()` with a backlog instead. + pub async fn accept(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self { struct Accept { stream: Option, }