proto_async: always consume one byte in recv

This commit is contained in:
Sebastien Bourdeauducq 2020-07-19 16:07:55 +08:00
parent 523524c319
commit 3ec9788eb1
1 changed files with 70 additions and 36 deletions

View File

@ -5,23 +5,39 @@ use core::cell::RefCell;
use libboard_zynq::smoltcp; use libboard_zynq::smoltcp;
use libasync::smoltcp::TcpStream; use libasync::smoltcp::TcpStream;
// TODO: use byteorder, make it more like libio
type Result<T> = core::result::Result<T, smoltcp::Error>; type Result<T> = core::result::Result<T, smoltcp::Error>;
enum RecvState<T> {
NeedsMore(usize, T), // bytes consumed so far, partial result
Completed(T), // final result
}
pub async fn expect(stream: &TcpStream, pattern: &[u8]) -> Result<bool> { pub async fn expect(stream: &TcpStream, pattern: &[u8]) -> Result<bool> {
stream.recv(|buf| { let mut state = RecvState::NeedsMore(0, true);
for (i, b) in buf.iter().enumerate() { loop {
if *b == pattern[i] { state = stream.recv(|buf| {
if i + 1 == pattern.len() { let mut consumed = 0;
return Poll::Ready((i + 1, Ok(true))); if let RecvState::NeedsMore(mut cur_index, _) = state {
for b in buf.iter() {
consumed += 1;
if *b == pattern[cur_index] {
if cur_index + 1 == pattern.len() {
return Poll::Ready((consumed, RecvState::Completed(true)));
} }
} else { } else {
return Poll::Ready((i + 1, Ok(false))); return Poll::Ready((consumed, RecvState::Completed(false)));
}
cur_index += 1;
}
Poll::Ready((consumed, RecvState::NeedsMore(cur_index, true)))
} else {
unreachable!();
}
}).await?;
if let RecvState::Completed(result) = state {
return Ok(result);
} }
} }
Poll::Pending
}).await?
} }
pub async fn read_bool(stream: &TcpStream) -> Result<bool> { pub async fn read_bool(stream: &TcpStream) -> Result<bool> {
@ -37,37 +53,55 @@ pub async fn read_i8(stream: &TcpStream) -> Result<i8> {
} }
pub async fn read_i32(stream: &TcpStream) -> Result<i32> { pub async fn read_i32(stream: &TcpStream) -> Result<i32> {
Ok(stream.recv(|buf| { let mut state = RecvState::NeedsMore(0, 0);
if buf.len() >= 4 { loop {
let value = state = stream.recv(|buf| {
((buf[0] as i32) << 24) let mut consumed = 0;
| ((buf[1] as i32) << 16) if let RecvState::NeedsMore(mut cur_index, mut cur_value) = state {
| ((buf[2] as i32) << 8) for b in buf.iter() {
| (buf[3] as i32); consumed += 1;
Poll::Ready((4, value)) cur_index += 1;
} else { cur_value <<= 8;
Poll::Pending cur_value |= *b as i32;
if cur_index == 4 {
return Poll::Ready((consumed, RecvState::Completed(cur_value)));
}
}
Poll::Ready((consumed, RecvState::NeedsMore(cur_index, cur_value)))
} else {
unreachable!();
}
}).await?;
if let RecvState::Completed(result) = state {
return Ok(result);
}
} }
}).await?)
} }
pub async fn read_i64(stream: &TcpStream) -> Result<i64> { pub async fn read_i64(stream: &TcpStream) -> Result<i64> {
Ok(stream.recv(|buf| { let mut state = RecvState::NeedsMore(0, 0);
if buf.len() >= 8 { loop {
let value = state = stream.recv(|buf| {
((buf[0] as i64) << 56) let mut consumed = 0;
| ((buf[1] as i64) << 48) if let RecvState::NeedsMore(mut cur_index, mut cur_value) = state {
| ((buf[2] as i64) << 40) for b in buf.iter() {
| ((buf[3] as i64) << 32) consumed += 1;
| ((buf[4] as i64) << 24) cur_index += 1;
| ((buf[5] as i64) << 16) cur_value <<= 8;
| ((buf[6] as i64) << 8) cur_value |= *b as i64;
| (buf[7] as i64); if cur_index == 8 {
Poll::Ready((8, value)) return Poll::Ready((consumed, RecvState::Completed(cur_value)));
} else { }
Poll::Pending }
Poll::Ready((consumed, RecvState::NeedsMore(cur_index, cur_value)))
} else {
unreachable!();
}
}).await?;
if let RecvState::Completed(result) = state {
return Ok(result);
}
} }
}).await?)
} }
pub async fn read_chunk(stream: &TcpStream, destination: &mut [u8]) -> Result<()> { pub async fn read_chunk(stream: &TcpStream, destination: &mut [u8]) -> Result<()> {