libasync: replace TcpStream::listen() with accept(), make accept() return earlier

tcp-recv-fnmut
Astro 2020-04-16 20:28:40 +02:00
parent be35be8d38
commit c3fc948714
2 changed files with 55 additions and 45 deletions

View File

@ -241,10 +241,20 @@ pub fn main_core0() {
Ok(()) Ok(())
} }
TcpStream::listen(TCP_PORT, 2048, 2048, 8, |stream| async { let mut counter = alloc::rc::Rc::new(core::cell::RefCell::new(0));
handle_connection(stream) task::spawn(async move {
.await while let stream = TcpStream::accept(TCP_PORT, 2048, 2408).await.unwrap() {
.map_err(|e| println!("Connection: {:?}", e)); let counter = counter.clone();
task::spawn(async move {
*counter.borrow_mut() += 1;
println!("Serving {} connections", *counter.borrow());
handle_connection(stream)
.await
.map_err(|e| println!("Connection: {:?}", e));
*counter.borrow_mut() -= 1;
println!("Now serving {} connections", *counter.borrow());
});
}
}); });
let mut time = 0u32; let mut time = 0u32;

View File

@ -9,9 +9,10 @@ use core::{
}; };
use alloc::vec::Vec; use alloc::vec::Vec;
use smoltcp::{ use smoltcp::{
Error, Result,
socket::{ socket::{
SocketHandle, SocketRef, SocketHandle, SocketRef,
TcpSocketBuffer, TcpSocket, TcpSocketBuffer, TcpSocket, TcpState,
}, },
}; };
use crate::task; use crate::task;
@ -77,47 +78,26 @@ impl TcpStream {
f(socket_ref) f(socket_ref)
} }
/// Spawns `backlog` tasks with listening sockets so that more
/// connections can be accepted while some are still
/// handshaking. Spawns additional tasks for each connection.
pub fn listen<F, R, T>(port: u16, rx_bufsize: usize, tx_bufsize: usize, backlog: usize, f: F)
where
F: Fn(Self) -> R + Clone + 'static,
R: Future<Output = T> + 'static,
{
for _ in 0..backlog {
let f = f.clone();
task::spawn(async move {
loop {
// Wait for new connection
let stream = TcpStream::accept(port, rx_bufsize, tx_bufsize).await;
// Spawn async task for new connection
task::spawn(f(stream));
}
});
}
}
/// Listen for the next incoming connection on a TCP /// Listen for the next incoming connection on a TCP
/// port. Succeeds on connection attempt. /// port. Succeeds on connection attempt.
/// ///
/// Calling this serially in a loop will cause slow/botched /// Calling this serially in a loop will cause slow/botched
/// connection attempts stall any more new connections. Use /// connection attempts stall any more new connections. Use
/// `listen()` with a backlog instead. /// `listen()` with a backlog instead.
pub async fn accept(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self { pub async fn accept(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Result<Self> {
let stream = Self::new(rx_bufsize, tx_bufsize); let stream = Self::new(rx_bufsize, tx_bufsize);
// Set socket to listen // Set socket to listen
stream.with_socket(|mut s| s.listen(port)) stream.with_socket(|mut s| s.listen(port))?;
.expect("listen");
// Wait for a connection // Wait for a connection
poll_stream!(&stream, (), |socket| { poll_stream!(&stream, (), |socket| {
if socket.is_active() { if socket.state() != TcpState::Listen {
Poll::Ready(()) Poll::Ready(())
} else { } else {
Poll::Pending Poll::Pending
} }
}).await; }).await;
stream
Ok(stream)
} }
/// Probe the receive buffer /// Probe the receive buffer
@ -127,7 +107,7 @@ impl TcpStream {
/// just return `Poll::Pending` if there is not enough data /// just return `Poll::Pending` if there is not enough data
/// yet. Likewise, return the amount of bytes consumed from the /// yet. Likewise, return the amount of bytes consumed from the
/// buffer in the `Poll::Ready` result. /// buffer in the `Poll::Ready` result.
pub async fn recv<F, R>(&self, f: F) -> smoltcp::Result<R> pub async fn recv<F, R>(&self, f: F) -> Result<R>
where where
F: Fn(&[u8]) -> Poll<(usize, R)>, F: Fn(&[u8]) -> Poll<(usize, R)>,
{ {
@ -137,10 +117,14 @@ impl TcpStream {
} }
impl<'a, F: Fn(&[u8]) -> Poll<(usize, R)>, R> Future for Recv<'a, F, R> { impl<'a, F: Fn(&[u8]) -> Poll<(usize, R)>, R> Future for Recv<'a, F, R> {
type Output = smoltcp::Result<R>; type Output = Result<R>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = self.stream.with_socket(|mut socket| { let result = self.stream.with_socket(|mut socket| {
if socket_is_handhshaking(&socket) {
return Ok(Poll::Pending);
}
socket.recv(|buf| { socket.recv(|buf| {
if buf.len() > 0 { if buf.len() > 0 {
match (self.f)(buf) { match (self.f)(buf) {
@ -156,10 +140,11 @@ impl TcpStream {
}) })
}); });
match result { match result {
Ok(Poll::Pending) => {
Sockets::register_waker(cx.waker().clone());
Poll::Pending
}
Ok(result) => { Ok(result) => {
if !result.is_ready() {
Sockets::register_waker(cx.waker().clone());
}
result result
} }
Err(e) => Err(e) =>
@ -175,12 +160,14 @@ impl TcpStream {
} }
/// Wait until there is any space in the socket's send queue /// Wait until there is any space in the socket's send queue
async fn wait_can_send(&self) -> smoltcp::Result<()> { async fn wait_can_send(&self) -> Result<()> {
poll_stream!(self, smoltcp::Result<()>, |socket| { poll_stream!(self, Result<()>, |socket| {
if !socket.is_active() { if socket_is_handhshaking(&socket) {
Poll::Ready(Err(smoltcp::Error::Illegal)) Poll::Pending
} else if socket.can_send() { } else if socket.can_send() {
Poll::Ready(Ok(())) Poll::Ready(Ok(()))
} else if ! socket.may_send() {
Poll::Ready(Err(Error::Truncated))
} else { } else {
Poll::Pending Poll::Pending
} }
@ -188,7 +175,7 @@ impl TcpStream {
} }
/// Yields to wait for more buffer space /// Yields to wait for more buffer space
pub async fn send<I: IntoIterator<Item = u8>>(&self, data: I) -> Result<(), smoltcp::Error> { pub async fn send<I: IntoIterator<Item = u8>>(&self, data: I) -> Result<()> {
let mut data = data.into_iter(); let mut data = data.into_iter();
let mut done = false; let mut done = false;
while !done { while !done {
@ -217,12 +204,16 @@ impl TcpStream {
/// **Warning:** this may not work as immediately as expected! The /// **Warning:** this may not work as immediately as expected! The
/// other side may wait until it sends packets to you for /// other side may wait until it sends packets to you for
/// piggybacking the ACKs. /// piggybacking the ACKs.
pub async fn flush(&self) { pub async fn flush(&self) -> Result<()> {
poll_stream!(self, (), |socket| { poll_stream!(self, Result<()>, |socket| {
if socket.may_send() && socket.send_queue() > 0 { if socket_is_handhshaking(&socket) {
Poll::Pending Poll::Pending
} else if socket.may_send() && socket.send_queue() > 0 {
Poll::Pending
} else if socket.may_send() {
Poll::Ready(Ok(()))
} else { } else {
Poll::Ready(()) Poll::Ready(Err(Error::Truncated))
} }
}).await }).await
} }
@ -236,3 +227,12 @@ impl Drop for TcpStream {
.remove(self.handle); .remove(self.handle);
} }
} }
fn socket_is_handhshaking(socket: &SocketRef<TcpSocket>) -> bool {
match socket.state() {
TcpState::SynSent | TcpState::SynReceived =>
true,
_ =>
false,
}
}