rpc: make alloc async

This commit is contained in:
Sebastien Bourdeauducq 2020-06-07 21:47:36 +08:00
parent 7df6ab2149
commit 06915ec25d

View File

@ -1,4 +1,5 @@
use core::str; use core::str;
use core::future::Future;
use cslice::{CSlice, CMutSlice}; use cslice::{CSlice, CMutSlice};
use log::debug; use log::debug;
@ -13,9 +14,10 @@ use crate::proto_async;
use self::tag::{Tag, TagIterator, split_tag}; use self::tag::{Tag, TagIterator, split_tag};
#[async_recursion(?Send)] #[async_recursion(?Send)]
async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'async_recursion>, data: &mut *mut (), async unsafe fn recv_value<F>(stream: &TcpStream, tag: Tag<'async_recursion>, data: &mut *mut (),
alloc: &dyn Fn(usize) -> *mut ()) alloc: &(impl Fn(usize) -> F + 'async_recursion))
-> Result<(), smoltcp::Error> -> Result<(), smoltcp::Error>
where F: Future<Output=*mut ()>
{ {
macro_rules! consume_value { macro_rules! consume_value {
($ty:ty, |$ptr:ident| $map:expr) => ({ ($ty:ty, |$ptr:ident| $map:expr) => ({
@ -45,7 +47,7 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'async_recursion>, data:
Tag::String | Tag::Bytes | Tag::ByteArray => { Tag::String | Tag::Bytes | Tag::ByteArray => {
consume_value!(CMutSlice<u8>, |ptr| { consume_value!(CMutSlice<u8>, |ptr| {
let length = proto_async::read_i32(stream).await? as usize; let length = proto_async::read_i32(stream).await? as usize;
*ptr = CMutSlice::new(alloc(length) as *mut u8, length); *ptr = CMutSlice::new(alloc(length).await as *mut u8, length);
proto_async::read_chunk(stream, (*ptr).as_mut()).await?; proto_async::read_chunk(stream, (*ptr).as_mut()).await?;
Ok(()) Ok(())
}) })
@ -64,7 +66,7 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'async_recursion>, data:
(*ptr).length = proto_async::read_i32(stream).await? as u32; (*ptr).length = proto_async::read_i32(stream).await? as u32;
let tag = it.clone().next().expect("truncated tag"); let tag = it.clone().next().expect("truncated tag");
(*ptr).elements = alloc(tag.size() * (*ptr).length as usize); (*ptr).elements = alloc(tag.size() * (*ptr).length as usize).await;
let mut data = (*ptr).elements; let mut data = (*ptr).elements;
for _ in 0..(*ptr).length as usize { for _ in 0..(*ptr).length as usize {
@ -85,9 +87,10 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'async_recursion>, data:
} }
} }
pub async fn recv_return(stream: &TcpStream, tag_bytes: &[u8], data: *mut (), pub async fn recv_return<F>(stream: &TcpStream, tag_bytes: &[u8], data: *mut (),
alloc: &dyn Fn(usize) -> *mut ()) alloc: &impl Fn(usize) -> F)
-> Result<(), smoltcp::Error> -> Result<(), smoltcp::Error>
where F: Future<Output=*mut ()>
{ {
let mut it = TagIterator::new(tag_bytes); let mut it = TagIterator::new(tag_bytes);
debug!("recv ...->{}", it); debug!("recv ...->{}", it);