libasync: new stream.recv API

M-Labs/artiq-zynq#40
tcp-recv-fnmut
parent ef88a1313a
commit 7b78bc0494
  1. 36
      experiments/src/main.rs
  2. 23
      libasync/src/smoltcp/tcp_stream.rs

@ -202,40 +202,8 @@ pub fn main_core0() {
ps7_init::report_differences();
Sockets::init(32);
/// `chargen`
const TCP_PORT: u16 = 19;
async fn handle_connection(stream: TcpStream) -> smoltcp::Result<()> {
stream.send("Enter your name: ".bytes()).await?;
let name = stream
.recv(|buf| {
for (i, b) in buf.iter().enumerate() {
if *b == '\n' as u8 {
return match core::str::from_utf8(&buf[0..i]) {
Ok(name) => Poll::Ready((i + 1, Some(name.to_owned()))),
Err(_) => Poll::Ready((i + 1, None)),
};
}
}
if buf.len() > 100 {
// Too much input, consume all
Poll::Ready((buf.len(), None))
} else {
Poll::Pending
}
})
.await?;
match name {
Some(name) => stream.send(format!("Hello {}!\n", name).bytes()).await?,
None => {
stream
.send("I had trouble reading your name.\n".bytes())
.await?
}
}
let _ = stream.close().await;
Ok(())
}
const TCP_PORT: u16 = 19;
// (rx, tx)
let stats = alloc::rc::Rc::new(core::cell::RefCell::new((0, 0)));
let stats_tx = stats.clone();
@ -264,7 +232,7 @@ pub fn main_core0() {
let stats_rx = stats_rx.clone();
task::spawn(async move {
loop {
match stream.recv(|buf| Poll::Ready((buf.len(), buf.len()))).await {
match stream.recv(|buf| (buf.len(), buf.len())).await {
Ok(len) => stats_rx.borrow_mut().0 += len,
Err(e) => {
warn!("rx: {:?}", e);

@ -103,21 +103,19 @@ impl TcpStream {
/// Probe the receive buffer
///
/// Instead of handing you the data on the heap all at once,
/// smoltcp's read interface is wrapped so that your callback can
/// just return `Poll::Pending` if there is not enough data
/// yet. Likewise, return the amount of bytes consumed from the
/// buffer in the `Poll::Ready` result.
/// Your callback will only be called when there is some data available,
/// and it must consume at least one byte. It returns a tuple with the
/// number of bytes it consumed, and a user-defined return value of type R.
pub async fn recv<F, R>(&self, f: F) -> Result<R>
where
F: Fn(&[u8]) -> Poll<(usize, R)>,
F: Fn(&[u8]) -> (usize, R),
{
struct Recv<'a, F: FnOnce(&[u8]) -> Poll<(usize, R)>, R> {
struct Recv<'a, F: FnOnce(&[u8]) -> (usize, R), R> {
stream: &'a TcpStream,
f: F,
}
impl<'a, F: Fn(&[u8]) -> Poll<(usize, R)>, R> Future for Recv<'a, F, R> {
impl<'a, F: Fn(&[u8]) -> (usize, R), R> Future for Recv<'a, F, R> {
type Output = Result<R>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
@ -128,13 +126,8 @@ impl TcpStream {
socket.recv(|buf| {
if buf.len() > 0 {
match (self.f)(buf) {
Poll::Ready((amount, result)) =>
(amount, Poll::Ready(Ok(result))),
Poll::Pending =>
// 0 bytes consumed
(0, Poll::Pending),
}
let (amount, result) = (self.f)(buf);
(amount, Poll::Ready(Ok(result)))
} else {
(0, Poll::Pending)
}

Loading…
Cancel
Save