rpc: make receive async

This commit is contained in:
Sebastien Bourdeauducq 2020-06-07 20:24:41 +08:00
parent ef4bdbb668
commit f0588c49ed
2 changed files with 50 additions and 29 deletions

View File

@ -51,6 +51,25 @@ pub async fn read_i32(stream: &TcpStream) -> Result<i32> {
}).await?) }).await?)
} }
pub async fn read_i64(stream: &TcpStream) -> Result<i64> {
Ok(stream.recv(|buf| {
if buf.len() >= 8 {
let value =
((buf[0] as i64) << 56)
| ((buf[1] as i64) << 48)
| ((buf[2] as i64) << 40)
| ((buf[3] as i64) << 32)
| ((buf[4] as i64) << 24)
| ((buf[5] as i64) << 16)
| ((buf[6] as i64) << 8)
| (buf[7] as i64);
Poll::Ready((8, value))
} else {
Poll::Pending
}
}).await?)
}
pub async fn read_chunk(stream: &TcpStream, destination: &mut [u8]) -> Result<()> { pub async fn read_chunk(stream: &TcpStream, destination: &mut [u8]) -> Result<()> {
let total = destination.len(); let total = destination.len();
let destination = RefCell::new(destination); let destination = RefCell::new(destination);

View File

@ -2,16 +2,17 @@ use core::str;
use cslice::{CSlice, CMutSlice}; use cslice::{CSlice, CMutSlice};
use log::debug; use log::debug;
use core_io::{Read, Write, Error}; use libboard_zynq::smoltcp;
use libasync::smoltcp::TcpStream;
use core_io::{Write, Error};
use crate::proto_core_io::{ProtoRead, ProtoWrite}; use crate::proto_core_io::ProtoWrite;
use crate::proto_async;
use self::tag::{Tag, TagIterator, split_tag}; use self::tag::{Tag, TagIterator, split_tag};
unsafe fn recv_value<R, E>(reader: &mut R, tag: Tag, data: &mut *mut (), async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (),
alloc: &dyn Fn(usize) -> Result<*mut (), E>) alloc: &dyn Fn(usize) -> *mut ())
-> Result<(), E> -> Result<(), smoltcp::Error>
where R: Read + ?Sized,
E: From<Error>
{ {
macro_rules! consume_value { macro_rules! consume_value {
($ty:ty, |$ptr:ident| $map:expr) => ({ ($ty:ty, |$ptr:ident| $map:expr) => ({
@ -24,22 +25,25 @@ unsafe fn recv_value<R, E>(reader: &mut R, tag: Tag, data: &mut *mut (),
match tag { match tag {
Tag::None => Ok(()), Tag::None => Ok(()),
Tag::Bool => Tag::Bool =>
consume_value!(u8, |ptr| { consume_value!(i8, |ptr| {
*ptr = reader.read_u8()?; Ok(()) *ptr = proto_async::read_i8(stream).await?;
Ok(())
}), }),
Tag::Int32 => Tag::Int32 =>
consume_value!(u32, |ptr| { consume_value!(i32, |ptr| {
*ptr = reader.read_u32()?; Ok(()) *ptr = proto_async::read_i32(stream).await?;
Ok(())
}), }),
Tag::Int64 | Tag::Float64 => Tag::Int64 | Tag::Float64 =>
consume_value!(u64, |ptr| { consume_value!(i64, |ptr| {
*ptr = reader.read_u64()?; Ok(()) *ptr = proto_async::read_i64(stream).await?;
Ok(())
}), }),
Tag::String | Tag::Bytes | Tag::ByteArray => { Tag::String | Tag::Bytes | Tag::ByteArray => {
consume_value!(CMutSlice<u8>, |ptr| { consume_value!(CMutSlice<u8>, |ptr| {
let length = reader.read_u32()? as usize; let length = proto_async::read_i32(stream).await? as usize;
*ptr = CMutSlice::new(alloc(length)? as *mut u8, length); *ptr = CMutSlice::new(alloc(length) as *mut u8, length);
reader.read_exact((*ptr).as_mut())?; proto_async::read_chunk(stream, (*ptr).as_mut()).await?;
Ok(()) Ok(())
}) })
} }
@ -47,30 +51,30 @@ unsafe fn recv_value<R, E>(reader: &mut R, tag: Tag, data: &mut *mut (),
let mut it = it.clone(); let mut it = it.clone();
for _ in 0..arity { for _ in 0..arity {
let tag = it.next().expect("truncated tag"); let tag = it.next().expect("truncated tag");
recv_value(reader, tag, data, alloc)? //TODO: recv_value(reader, tag, data, alloc)?
} }
Ok(()) Ok(())
} }
Tag::List(it) | Tag::Array(it) => { Tag::List(it) | Tag::Array(it) => {
struct List { elements: *mut (), length: u32 }; struct List { elements: *mut (), length: u32 };
consume_value!(List, |ptr| { consume_value!(List, |ptr| {
(*ptr).length = reader.read_u32()?; (*ptr).length = proto_async::read_i32(stream).await? as u32;
let tag = it.clone().next().expect("truncated tag"); let tag = it.clone().next().expect("truncated tag");
(*ptr).elements = alloc(tag.size() * (*ptr).length as usize)?; (*ptr).elements = alloc(tag.size() * (*ptr).length as usize);
let mut data = (*ptr).elements; let mut data = (*ptr).elements;
for _ in 0..(*ptr).length as usize { for _ in 0..(*ptr).length as usize {
recv_value(reader, tag, &mut data, alloc)? //TODO: recv_value(reader, tag, &mut data, alloc).await?
} }
Ok(()) Ok(())
}) })
} }
Tag::Range(it) => { Tag::Range(it) => {
let tag = it.clone().next().expect("truncated tag"); let tag = it.clone().next().expect("truncated tag");
recv_value(reader, tag, data, alloc)?; /*TODO: recv_value(reader, tag, data, alloc).await?;
recv_value(reader, tag, data, alloc)?; recv_value(reader, tag, data, alloc).await?;
recv_value(reader, tag, data, alloc)?; recv_value(reader, tag, data, alloc).await?;*/
Ok(()) Ok(())
} }
Tag::Keyword(_) => unreachable!(), Tag::Keyword(_) => unreachable!(),
@ -78,18 +82,16 @@ unsafe fn recv_value<R, E>(reader: &mut R, tag: Tag, data: &mut *mut (),
} }
} }
pub fn recv_return<R, E>(reader: &mut R, tag_bytes: &[u8], data: *mut (), pub async fn recv_return(stream: &TcpStream, tag_bytes: &[u8], data: *mut (),
alloc: &dyn Fn(usize) -> Result<*mut (), E>) alloc: &dyn Fn(usize) -> *mut ())
-> Result<(), E> -> Result<(), smoltcp::Error>
where R: Read + ?Sized,
E: From<Error>
{ {
let mut it = TagIterator::new(tag_bytes); let mut it = TagIterator::new(tag_bytes);
debug!("recv ...->{}", it); debug!("recv ...->{}", it);
let tag = it.next().expect("truncated tag"); let tag = it.next().expect("truncated tag");
let mut data = data; let mut data = data;
unsafe { recv_value(reader, tag, &mut data, alloc)? }; unsafe { recv_value(stream, tag, &mut data, alloc).await? };
Ok(()) Ok(())
} }