Compare commits

...

4 Commits

2 changed files with 81 additions and 60 deletions

View File

@ -203,46 +203,40 @@ pub fn main_core0() {
Sockets::init(32); Sockets::init(32);
/// `chargen` /// `chargen`
const TCP_PORT: u16 = 19; const TCP_PORT: u16 = 19;
async fn handle_connection(socket: TcpStream) -> smoltcp::Result<()> { async fn handle_connection(stream: TcpStream) -> smoltcp::Result<()> {
socket.send("Enter your name: ".bytes()).await?; stream.send("Enter your name: ".bytes()).await?;
let name = socket.recv(|buf| { let name = stream.recv(|buf| {
for (i, b) in buf.iter().enumerate() {
if *b == '\n' as u8 {
return match core::str::from_utf8(&buf[0..i]) {
Ok(name) =>
Poll::Ready((i + 1, Some(name.to_owned()))),
Err(_) =>
Poll::Ready((i + 1, None))
};
}
}
if buf.len() > 100 { if buf.len() > 100 {
// Too much input, consume all // Too much input, consume all
Poll::Ready((buf.len(), None)) Poll::Ready((buf.len(), None))
} else { } else {
for (i, b) in buf.iter().enumerate() {
if *b == '\n' as u8 {
return match core::str::from_utf8(&buf[0..i]) {
Ok(name) =>
Poll::Ready((i + 1, Some(name.to_owned()))),
Err(_) =>
Poll::Ready((i + 1, None))
};
}
}
Poll::Pending Poll::Pending
} }
}).await?; }).await?;
match name { match name {
Some(name) => Some(name) =>
socket.send(format!("Hello {}!\n", name).bytes()).await?, stream.send(format!("Hello {}!\n", name).bytes()).await?,
None => None =>
socket.send("I had trouble reading your name.\n".bytes()).await?, stream.send("I had trouble reading your name.\n".bytes()).await?,
} }
socket.flush().await; stream.flush().await;
Ok(()) Ok(())
} }
task::spawn(async { TcpStream::listen(TCP_PORT, 2048, 2048, 8, |stream| async {
println!("listening"); handle_connection(stream)
while let socket = TcpStream::listen(TCP_PORT, 2048, 2048).await { .await
task::spawn(async { .map_err(|e| println!("Connection: {:?}", e));
handle_connection(socket)
.await
.map_err(|e| println!("Connection: {:?}", e));
});
}
println!("done?");
}); });
Sockets::run(&mut iface); Sockets::run(&mut iface);

View File

@ -7,13 +7,14 @@ use core::{
pin::Pin, pin::Pin,
task::{Context, Poll}, task::{Context, Poll},
}; };
use alloc::vec; use alloc::vec::Vec;
use smoltcp::{ use smoltcp::{
socket::{ socket::{
SocketHandle, SocketRef, SocketHandle, SocketRef,
TcpSocketBuffer, TcpSocket, TcpSocketBuffer, TcpSocket,
}, },
}; };
use crate::task;
use super::Sockets; use super::Sockets;
/// References a smoltcp TcpSocket /// References a smoltcp TcpSocket
@ -21,7 +22,7 @@ pub struct TcpStream {
handle: SocketHandle, handle: SocketHandle,
} }
/// Wait while polling a stream /// Wait while letting `$f()` poll a stream's socket
macro_rules! poll_stream { macro_rules! poll_stream {
($stream: expr, $output: ty, $f: expr) => (async { ($stream: expr, $output: ty, $f: expr) => (async {
struct Adhoc<'a> { struct Adhoc<'a> {
@ -45,10 +46,21 @@ macro_rules! poll_stream {
} }
impl TcpStream { impl TcpStream {
/// Allocates sockets and its buffers, registers it in the
/// SocketSet.
///
/// Not `pub` as the result can not yet be used. Use `listen()` or
/// `connect()` to obtain a valid TcpStream.
fn new(rx_bufsize: usize, tx_bufsize: usize) -> Self { fn new(rx_bufsize: usize, tx_bufsize: usize) -> Self {
// TODO: Uninitialized is faster than zeroed fn uninit_vec<T>(size: usize) -> Vec<T> {
let rx_buffer = TcpSocketBuffer::new(vec![0u8; rx_bufsize]); let mut result = Vec::with_capacity(size);
let tx_buffer = TcpSocketBuffer::new(vec![0u8; tx_bufsize]); unsafe {
result.set_len(size);
}
result
}
let rx_buffer = TcpSocketBuffer::new(uninit_vec(rx_bufsize));
let tx_buffer = TcpSocketBuffer::new(uninit_vec(tx_bufsize));
let socket = TcpSocket::new(rx_buffer, tx_buffer); let socket = TcpSocket::new(rx_buffer, tx_buffer);
let handle = Sockets::instance().sockets.borrow_mut() let handle = Sockets::instance().sockets.borrow_mut()
.add(socket); .add(socket);
@ -65,36 +77,46 @@ impl TcpStream {
f(socket_ref) f(socket_ref)
} }
/// Spawns `backlog` tasks with listening sockets so that more
/// connections can be accepted while some are still
/// handshaking. Spawns additional tasks for each connection.
pub fn listen<F, R, T>(port: u16, rx_bufsize: usize, tx_bufsize: usize, backlog: usize, f: F)
where
F: Fn(Self) -> R + Copy + 'static,
R: Future<Output = T> + 'static,
{
for _ in 0..backlog {
task::spawn(async move {
loop {
// Wait for new connection
let stream = TcpStream::accept(port, rx_bufsize, tx_bufsize).await;
// Spawn async task for new connection
task::spawn(f(stream));
}
});
}
}
/// Listen for the next incoming connection on a TCP /// Listen for the next incoming connection on a TCP
/// port. Succeeds on connection attempt. /// port. Succeeds on connection attempt.
pub async fn listen(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self { ///
struct Accept { /// Calling this serially in a loop will cause slow/botched
stream: Option<TcpStream>, /// connection attempts stall any more new connections. Use
} /// `listen()` with a backlog instead.
pub async fn accept(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self {
impl Future for Accept {
type Output = TcpStream;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let is_active = self.stream.as_ref()
.map(|s| s.with_socket(|s| s.is_active()))
.unwrap_or(true);
if is_active {
Poll::Ready(self.stream.take().unwrap())
} else {
Sockets::register_waker(cx.waker().clone());
//asm::sev();
Poll::Pending
}
}
}
let stream = Self::new(rx_bufsize, tx_bufsize); let stream = Self::new(rx_bufsize, tx_bufsize);
stream.with_socket(|mut s| s.listen(port)).expect("listen"); // Set socket to listen
Accept { stream.with_socket(|mut s| s.listen(port))
stream: Some(stream), .expect("listen");
}.await // Wait for a connection
poll_stream!(&stream, (), |socket| {
if socket.is_active() {
Poll::Ready(())
} else {
Poll::Pending
}
}).await;
stream
} }
/// Probe the receive buffer /// Probe the receive buffer
@ -119,8 +141,11 @@ impl TcpStream {
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = self.stream.with_socket(|mut socket| { let result = self.stream.with_socket(|mut socket| {
socket.recv(|buf| match (self.f)(buf) { socket.recv(|buf| match (self.f)(buf) {
Poll::Ready((amount, result)) => (amount, Poll::Ready(Ok(result))), Poll::Ready((amount, result)) =>
Poll::Pending => (0, Poll::Pending), (amount, Poll::Ready(Ok(result))),
Poll::Pending =>
// 0 bytes consumed
(0, Poll::Pending),
}) })
}); });
match result { match result {
@ -197,6 +222,8 @@ impl TcpStream {
} }
impl Drop for TcpStream { impl Drop for TcpStream {
/// Free item in the socket set, which leads to deallocation of
/// the rx/tx buffers associated with this socket.
fn drop(&mut self) { fn drop(&mut self) {
Sockets::instance().sockets.borrow_mut() Sockets::instance().sockets.borrow_mut()
.remove(self.handle); .remove(self.handle);