libasync: fix TcpStream, implement recv()+send()

This commit is contained in:
Astro 2020-03-31 22:34:32 +02:00
parent 8bc721826c
commit ab1404488c
5 changed files with 168 additions and 40 deletions

View File

@ -3,13 +3,14 @@
extern crate alloc; extern crate alloc;
use core::mem::transmute; use core::{mem::transmute, task::Poll};
use alloc::collections::BTreeMap; use alloc::{borrow::ToOwned, collections::BTreeMap, format};
use libcortex_a9::mutex::Mutex; use libcortex_a9::mutex::Mutex;
use libboard_zynq::{ use libboard_zynq::{
print, println, print, println,
self as zynq, clocks::Clocks, clocks::source::{ClockSource, ArmPll, IoPll}, self as zynq, clocks::Clocks, clocks::source::{ClockSource, ArmPll, IoPll},
smoltcp::{ smoltcp::{
self,
wire::{EthernetAddress, IpAddress, IpCidr}, wire::{EthernetAddress, IpAddress, IpCidr},
iface::{NeighborCache, EthernetInterfaceBuilder, Routes}, iface::{NeighborCache, EthernetInterfaceBuilder, Routes},
time::Instant, time::Instant,
@ -199,18 +200,46 @@ pub fn main_core0() {
// TODO: compare with ps7_init // TODO: compare with ps7_init
println!("Sockets init...");
Sockets::init(32); Sockets::init(32);
/// `chargen` /// `chargen`
const TCP_PORT: u16 = 19; const TCP_PORT: u16 = 19;
async fn handle_connection(socket: TcpStream) -> smoltcp::Result<()> {
socket.send("Enter your name: ".bytes()).await?;
let name = socket.recv(|buf| {
if buf.len() > 100 {
// Too much input, consume all
Poll::Ready((buf.len(), None))
} else {
for (i, b) in buf.iter().enumerate() {
if *b == '\n' as u8 {
return match core::str::from_utf8(&buf[0..i]) {
Ok(name) =>
Poll::Ready((i + 1, Some(name.to_owned()))),
Err(_) =>
Poll::Ready((i + 1, None))
};
}
}
Poll::Pending
}
}).await?;
match name {
Some(name) =>
socket.send(format!("Hello {}!\n", name).bytes()).await?,
None =>
socket.send("I had trouble reading your name.\n".bytes()).await?,
}
socket.flush().await;
Ok(())
}
task::spawn(async { task::spawn(async {
println!("listening"); println!("listening");
while let socket = TcpStream::listen(TCP_PORT, 2048, 2048).await { while let socket = TcpStream::listen(TCP_PORT, 2048, 2048).await {
println!("got connection");
task::spawn(async { task::spawn(async {
println!("spawned for connection"); handle_connection(socket)
// while l .await
drop(socket); .map_err(|e| println!("Connection: {:?}", e));
}); });
} }
println!("done?"); println!("done?");

View File

@ -9,8 +9,6 @@ edition = "2018"
#futures = { version = "0.3", default-features = false } #futures = { version = "0.3", default-features = false }
pin-utils = "0.1.0-alpha.4" pin-utils = "0.1.0-alpha.4"
libcortex_a9 = { path = "../libcortex_a9" } libcortex_a9 = { path = "../libcortex_a9" }
# TODO: delete
libboard_zynq = { path = "../libboard_zynq" }
[dependencies.smoltcp] [dependencies.smoltcp]
version = "0.6" version = "0.6"

View File

@ -9,8 +9,6 @@ use core::{
use alloc::{boxed::Box, collections::VecDeque as Deque}; use alloc::{boxed::Box, collections::VecDeque as Deque};
//use futures::future::FutureExt; //use futures::future::FutureExt;
use pin_utils::pin_mut; use pin_utils::pin_mut;
// TODO: delete
//use libboard_zynq::println;
// NOTE `*const ()` is &AtomicBool // NOTE `*const ()` is &AtomicBool
static VTABLE: RawWakerVTable = { static VTABLE: RawWakerVTable = {

View File

@ -2,17 +2,13 @@ use core::{
cell::RefCell, cell::RefCell,
task::Waker, task::Waker,
}; };
use alloc::{vec, vec::Vec}; use alloc::vec::Vec;
use smoltcp::{ use smoltcp::{
iface::EthernetInterface, iface::EthernetInterface,
phy::Device, phy::Device,
socket::{ socket::SocketSet,
SocketSet, SocketHandle,
TcpSocketBuffer, TcpSocket,
},
time::Instant, time::Instant,
}; };
use libboard_zynq::println;
use crate::task; use crate::task;
mod tcp_stream; mod tcp_stream;
@ -27,7 +23,6 @@ pub struct Sockets {
impl Sockets { impl Sockets {
pub fn init(max_sockets: usize) { pub fn init(max_sockets: usize) {
println!("initializing {} sockets", max_sockets);
let mut sockets_storage = Vec::with_capacity(max_sockets); let mut sockets_storage = Vec::with_capacity(max_sockets);
for _ in 0..max_sockets { for _ in 0..max_sockets {
sockets_storage.push(None); sockets_storage.push(None);
@ -40,7 +35,6 @@ impl Sockets {
sockets, sockets,
wakers, wakers,
}; };
println!("sockets initialized");
unsafe { SOCKETS = Some(instance); } unsafe { SOCKETS = Some(instance); }
} }
@ -64,16 +58,13 @@ impl Sockets {
let instant = Instant::from_millis(0); let instant = Instant::from_millis(0);
let processed = { let processed = {
let mut sockets = self.sockets.borrow_mut(); let mut sockets = self.sockets.borrow_mut();
let r = iface.poll(&mut sockets, instant); match iface.poll(&mut sockets, instant) {
if r != Ok(false) { println!("poll: {:?}", r); }
match r {
Ok(processed) => processed, Ok(processed) => processed,
Err(_) => true, Err(_) => true,
} }
}; };
if processed { if processed {
let mut wakers = self.wakers.borrow_mut(); let mut wakers = self.wakers.borrow_mut();
println!("wakeup of {}", wakers.len());
for waker in wakers.drain(..) { for waker in wakers.drain(..) {
waker.wake(); waker.wake();
} }
@ -83,7 +74,6 @@ impl Sockets {
/// TODO: this was called through eg. TcpStream, another poll() /// TODO: this was called through eg. TcpStream, another poll()
/// might want to send packets before sleeping for an interrupt. /// might want to send packets before sleeping for an interrupt.
pub(crate) fn register_waker(waker: Waker) { pub(crate) fn register_waker(waker: Waker) {
println!("register_waker");
Self::instance().wakers.borrow_mut() Self::instance().wakers.borrow_mut()
.push(waker); .push(waker);
} }

View File

@ -1,30 +1,52 @@
//! async TCP interface
//!
//! TODO: implement futures AsyncRead/AsyncWrite/Stream/Sink interfaces
use core::{ use core::{
cell::{RefCell, UnsafeCell},
future::Future, future::Future,
mem::MaybeUninit,
pin::Pin, pin::Pin,
sync::atomic::{AtomicBool, Ordering},
task::{Context, Poll}, task::{Context, Poll},
}; };
use alloc::vec; use alloc::vec;
use smoltcp::{ use smoltcp::{
iface::EthernetInterface,
phy::Device,
socket::{ socket::{
SocketSet, SocketHandle, SocketRef, SocketHandle, SocketRef,
TcpSocketBuffer, TcpSocket, TcpSocketBuffer, TcpSocket,
}, },
time::Instant,
}; };
use libboard_zynq::println;
use super::Sockets; use super::Sockets;
/// References a smoltcp TcpSocket
pub struct TcpStream { pub struct TcpStream {
handle: SocketHandle, handle: SocketHandle,
} }
/// Wait while polling a stream
macro_rules! poll_stream {
($stream: expr, $output: ty, $f: expr) => (async {
struct Adhoc<'a> {
stream: &'a TcpStream,
}
impl<'a> Future for Adhoc<'a> {
type Output = $output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = self.stream.with_socket($f);
if !result.is_ready() {
Sockets::register_waker(cx.waker().clone());
}
result
}
}
Adhoc { stream: $stream }.await
})
}
impl TcpStream { impl TcpStream {
fn new(rx_bufsize: usize, tx_bufsize: usize) -> Self { fn new(rx_bufsize: usize, tx_bufsize: usize) -> Self {
// TODO: Uninitialized is faster than zeroed
let rx_buffer = TcpSocketBuffer::new(vec![0u8; rx_bufsize]); let rx_buffer = TcpSocketBuffer::new(vec![0u8; rx_bufsize]);
let tx_buffer = TcpSocketBuffer::new(vec![0u8; tx_bufsize]); let tx_buffer = TcpSocketBuffer::new(vec![0u8; tx_bufsize]);
let socket = TcpSocket::new(rx_buffer, tx_buffer); let socket = TcpSocket::new(rx_buffer, tx_buffer);
@ -33,16 +55,18 @@ impl TcpStream {
TcpStream { handle } TcpStream { handle }
} }
/// Operate on the referenced TCP socket
fn with_socket<F, R>(&self, f: F) -> R fn with_socket<F, R>(&self, f: F) -> R
where where
F: FnOnce(SocketRef<TcpSocket>) -> R, F: FnOnce(SocketRef<TcpSocket>) -> R,
{ {
println!("with_socket");
let mut sockets = Sockets::instance().sockets.borrow_mut(); let mut sockets = Sockets::instance().sockets.borrow_mut();
let socket_ref = sockets.get::<TcpSocket>(self.handle); let socket_ref = sockets.get::<TcpSocket>(self.handle);
f(socket_ref) f(socket_ref)
} }
/// Listen for the next incoming connection on a TCP
/// port. Succeeds on connection attempt.
pub async fn listen(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self { pub async fn listen(port: u16, rx_bufsize: usize, tx_bufsize: usize) -> Self {
struct Accept { struct Accept {
stream: Option<TcpStream>, stream: Option<TcpStream>,
@ -55,11 +79,9 @@ impl TcpStream {
let is_active = self.stream.as_ref() let is_active = self.stream.as_ref()
.map(|s| s.with_socket(|s| s.is_active())) .map(|s| s.with_socket(|s| s.is_active()))
.unwrap_or(true); .unwrap_or(true);
println!("is_active={:?}", is_active);
if is_active { if is_active {
Poll::Ready(self.stream.take().unwrap()) Poll::Ready(self.stream.take().unwrap())
} else { } else {
println!("register_waker");
Sockets::register_waker(cx.waker().clone()); Sockets::register_waker(cx.waker().clone());
//asm::sev(); //asm::sev();
@ -70,21 +92,112 @@ impl TcpStream {
let stream = Self::new(rx_bufsize, tx_bufsize); let stream = Self::new(rx_bufsize, tx_bufsize);
stream.with_socket(|mut s| s.listen(port)).expect("listen"); stream.with_socket(|mut s| s.listen(port)).expect("listen");
println!("listening on {}", port);
Accept { Accept {
stream: Some(stream), stream: Some(stream),
}.await }.await
} }
pub async fn recv(&self) { /// Probe the receive buffer
// self.socket(); ///
/// Instead of handing you the data on the heap all at once,
/// smoltcp's read interface is wrapped so that your callback can
/// just return `Poll::Pending` if there is not enough data
/// yet. Likewise, return the amount of bytes consumed from the
/// buffer in the `Poll::Ready` result.
pub async fn recv<F, R>(&self, f: F) -> smoltcp::Result<R>
where
F: Fn(&[u8]) -> Poll<(usize, R)>,
{
struct Recv<'a, F: FnOnce(&[u8]) -> Poll<(usize, R)>, R> {
stream: &'a TcpStream,
f: F,
}
impl<'a, F: Fn(&[u8]) -> Poll<(usize, R)>, R> Future for Recv<'a, F, R> {
type Output = smoltcp::Result<R>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let result = self.stream.with_socket(|mut socket| {
socket.recv(|buf| match (self.f)(buf) {
Poll::Ready((amount, result)) => (amount, Poll::Ready(Ok(result))),
Poll::Pending => (0, Poll::Pending),
})
});
match result {
Ok(result) => {
if !result.is_ready() {
Sockets::register_waker(cx.waker().clone());
}
result
}
Err(e) =>
Poll::Ready(Err(e)),
}
}
}
Recv {
stream: self,
f,
}.await
}
/// Wait until there is any space in the socket's send queue
async fn wait_can_send(&self) -> smoltcp::Result<()> {
poll_stream!(self, smoltcp::Result<()>, |socket| {
if !socket.is_active() {
Poll::Ready(Err(smoltcp::Error::Illegal))
} else if socket.can_send() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}).await
}
/// Yields to wait for more buffer space
pub async fn send<I: IntoIterator<Item = u8>>(&self, data: I) -> Result<(), smoltcp::Error> {
let mut data = data.into_iter();
let mut done = false;
while !done {
self.wait_can_send().await?;
self.with_socket(|mut socket| {
socket.send(|buf| {
for i in 0..buf.len() {
if let Some(byte) = data.next() {
buf[i] = byte;
} else {
done = true;
return (i, ())
}
}
(buf.len(), ())
})
})?;
}
Ok(())
}
/// Wait for all queued data to be sent and ACKed
///
/// **Warning:** this may not work as immediately as expected! The
/// other side may wait until it sends packets to you for
/// piggybacking the ACKs.
pub async fn flush(&self) {
poll_stream!(self, (), |socket| {
if socket.may_send() && socket.send_queue() > 0 {
Poll::Pending
} else {
Poll::Ready(())
}
}).await
} }
} }
impl Drop for TcpStream { impl Drop for TcpStream {
fn drop(&mut self) { fn drop(&mut self) {
// TODO: verify
println!("tcpstream drop");
Sockets::instance().sockets.borrow_mut() Sockets::instance().sockets.borrow_mut()
.remove(self.handle); .remove(self.handle);
} }