From 7b78bc04947484eb8f86b8fc90b9f2a3260027ef Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 19 Jul 2020 15:34:32 +0800 Subject: [PATCH] libasync: new stream.recv API https://git.m-labs.hk/M-Labs/artiq-zynq/issues/40#issuecomment-593 --- experiments/src/main.rs | 36 ++---------------------------- libasync/src/smoltcp/tcp_stream.rs | 23 +++++++------------ 2 files changed, 10 insertions(+), 49 deletions(-) diff --git a/experiments/src/main.rs b/experiments/src/main.rs index 7143a34..21d3c98 100644 --- a/experiments/src/main.rs +++ b/experiments/src/main.rs @@ -202,40 +202,8 @@ pub fn main_core0() { ps7_init::report_differences(); Sockets::init(32); - /// `chargen` - const TCP_PORT: u16 = 19; - async fn handle_connection(stream: TcpStream) -> smoltcp::Result<()> { - stream.send("Enter your name: ".bytes()).await?; - let name = stream - .recv(|buf| { - for (i, b) in buf.iter().enumerate() { - if *b == '\n' as u8 { - return match core::str::from_utf8(&buf[0..i]) { - Ok(name) => Poll::Ready((i + 1, Some(name.to_owned()))), - Err(_) => Poll::Ready((i + 1, None)), - }; - } - } - if buf.len() > 100 { - // Too much input, consume all - Poll::Ready((buf.len(), None)) - } else { - Poll::Pending - } - }) - .await?; - match name { - Some(name) => stream.send(format!("Hello {}!\n", name).bytes()).await?, - None => { - stream - .send("I had trouble reading your name.\n".bytes()) - .await? - } - } - let _ = stream.close().await; - Ok(()) - } + const TCP_PORT: u16 = 19; // (rx, tx) let stats = alloc::rc::Rc::new(core::cell::RefCell::new((0, 0))); let stats_tx = stats.clone(); @@ -264,7 +232,7 @@ pub fn main_core0() { let stats_rx = stats_rx.clone(); task::spawn(async move { loop { - match stream.recv(|buf| Poll::Ready((buf.len(), buf.len()))).await { + match stream.recv(|buf| (buf.len(), buf.len())).await { Ok(len) => stats_rx.borrow_mut().0 += len, Err(e) => { warn!("rx: {:?}", e); diff --git a/libasync/src/smoltcp/tcp_stream.rs b/libasync/src/smoltcp/tcp_stream.rs index 2a26fa1..c5bf991 100644 --- a/libasync/src/smoltcp/tcp_stream.rs +++ b/libasync/src/smoltcp/tcp_stream.rs @@ -103,21 +103,19 @@ impl TcpStream { /// Probe the receive buffer /// - /// Instead of handing you the data on the heap all at once, - /// smoltcp's read interface is wrapped so that your callback can - /// just return `Poll::Pending` if there is not enough data - /// yet. Likewise, return the amount of bytes consumed from the - /// buffer in the `Poll::Ready` result. + /// Your callback will only be called when there is some data available, + /// and it must consume at least one byte. It returns a tuple with the + /// number of bytes it consumed, and a user-defined return value of type R. pub async fn recv(&self, f: F) -> Result where - F: Fn(&[u8]) -> Poll<(usize, R)>, + F: Fn(&[u8]) -> (usize, R), { - struct Recv<'a, F: FnOnce(&[u8]) -> Poll<(usize, R)>, R> { + struct Recv<'a, F: FnOnce(&[u8]) -> (usize, R), R> { stream: &'a TcpStream, f: F, } - impl<'a, F: Fn(&[u8]) -> Poll<(usize, R)>, R> Future for Recv<'a, F, R> { + impl<'a, F: Fn(&[u8]) -> (usize, R), R> Future for Recv<'a, F, R> { type Output = Result; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -128,13 +126,8 @@ impl TcpStream { socket.recv(|buf| { if buf.len() > 0 { - match (self.f)(buf) { - Poll::Ready((amount, result)) => - (amount, Poll::Ready(Ok(result))), - Poll::Pending => - // 0 bytes consumed - (0, Poll::Pending), - } + let (amount, result) = (self.f)(buf); + (amount, Poll::Ready(Ok(result))) } else { (0, Poll::Pending) }