From f0588c49ed4aa3df00be04318010589056d5ce98 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 7 Jun 2020 20:24:41 +0800 Subject: [PATCH] rpc: make receive async --- src/runtime/src/proto_async.rs | 19 +++++++++++ src/runtime/src/rpc.rs | 60 ++++++++++++++++++---------------- 2 files changed, 50 insertions(+), 29 deletions(-) diff --git a/src/runtime/src/proto_async.rs b/src/runtime/src/proto_async.rs index 3a35d7c6..e9ee00b8 100644 --- a/src/runtime/src/proto_async.rs +++ b/src/runtime/src/proto_async.rs @@ -51,6 +51,25 @@ pub async fn read_i32(stream: &TcpStream) -> Result { }).await?) } +pub async fn read_i64(stream: &TcpStream) -> Result { + Ok(stream.recv(|buf| { + if buf.len() >= 8 { + let value = + ((buf[0] as i64) << 56) + | ((buf[1] as i64) << 48) + | ((buf[2] as i64) << 40) + | ((buf[3] as i64) << 32) + | ((buf[4] as i64) << 24) + | ((buf[5] as i64) << 16) + | ((buf[6] as i64) << 8) + | (buf[7] as i64); + Poll::Ready((8, value)) + } else { + Poll::Pending + } + }).await?) +} + pub async fn read_chunk(stream: &TcpStream, destination: &mut [u8]) -> Result<()> { let total = destination.len(); let destination = RefCell::new(destination); diff --git a/src/runtime/src/rpc.rs b/src/runtime/src/rpc.rs index 7fed3ed8..fbaa209c 100644 --- a/src/runtime/src/rpc.rs +++ b/src/runtime/src/rpc.rs @@ -2,16 +2,17 @@ use core::str; use cslice::{CSlice, CMutSlice}; use log::debug; -use core_io::{Read, Write, Error}; +use libboard_zynq::smoltcp; +use libasync::smoltcp::TcpStream; +use core_io::{Write, Error}; -use crate::proto_core_io::{ProtoRead, ProtoWrite}; +use crate::proto_core_io::ProtoWrite; +use crate::proto_async; use self::tag::{Tag, TagIterator, split_tag}; -unsafe fn recv_value(reader: &mut R, tag: Tag, data: &mut *mut (), - alloc: &dyn Fn(usize) -> Result<*mut (), E>) - -> Result<(), E> - where R: Read + ?Sized, - E: From +async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (), + alloc: &dyn Fn(usize) -> *mut ()) + -> Result<(), smoltcp::Error> { macro_rules! consume_value { ($ty:ty, |$ptr:ident| $map:expr) => ({ @@ -24,22 +25,25 @@ unsafe fn recv_value(reader: &mut R, tag: Tag, data: &mut *mut (), match tag { Tag::None => Ok(()), Tag::Bool => - consume_value!(u8, |ptr| { - *ptr = reader.read_u8()?; Ok(()) + consume_value!(i8, |ptr| { + *ptr = proto_async::read_i8(stream).await?; + Ok(()) }), Tag::Int32 => - consume_value!(u32, |ptr| { - *ptr = reader.read_u32()?; Ok(()) + consume_value!(i32, |ptr| { + *ptr = proto_async::read_i32(stream).await?; + Ok(()) }), Tag::Int64 | Tag::Float64 => - consume_value!(u64, |ptr| { - *ptr = reader.read_u64()?; Ok(()) + consume_value!(i64, |ptr| { + *ptr = proto_async::read_i64(stream).await?; + Ok(()) }), Tag::String | Tag::Bytes | Tag::ByteArray => { consume_value!(CMutSlice, |ptr| { - let length = reader.read_u32()? as usize; - *ptr = CMutSlice::new(alloc(length)? as *mut u8, length); - reader.read_exact((*ptr).as_mut())?; + let length = proto_async::read_i32(stream).await? as usize; + *ptr = CMutSlice::new(alloc(length) as *mut u8, length); + proto_async::read_chunk(stream, (*ptr).as_mut()).await?; Ok(()) }) } @@ -47,30 +51,30 @@ unsafe fn recv_value(reader: &mut R, tag: Tag, data: &mut *mut (), let mut it = it.clone(); for _ in 0..arity { let tag = it.next().expect("truncated tag"); - recv_value(reader, tag, data, alloc)? + //TODO: recv_value(reader, tag, data, alloc)? } Ok(()) } Tag::List(it) | Tag::Array(it) => { struct List { elements: *mut (), length: u32 }; consume_value!(List, |ptr| { - (*ptr).length = reader.read_u32()?; + (*ptr).length = proto_async::read_i32(stream).await? as u32; let tag = it.clone().next().expect("truncated tag"); - (*ptr).elements = alloc(tag.size() * (*ptr).length as usize)?; + (*ptr).elements = alloc(tag.size() * (*ptr).length as usize); let mut data = (*ptr).elements; for _ in 0..(*ptr).length as usize { - recv_value(reader, tag, &mut data, alloc)? + //TODO: recv_value(reader, tag, &mut data, alloc).await? } Ok(()) }) } Tag::Range(it) => { let tag = it.clone().next().expect("truncated tag"); - recv_value(reader, tag, data, alloc)?; - recv_value(reader, tag, data, alloc)?; - recv_value(reader, tag, data, alloc)?; + /*TODO: recv_value(reader, tag, data, alloc).await?; + recv_value(reader, tag, data, alloc).await?; + recv_value(reader, tag, data, alloc).await?;*/ Ok(()) } Tag::Keyword(_) => unreachable!(), @@ -78,18 +82,16 @@ unsafe fn recv_value(reader: &mut R, tag: Tag, data: &mut *mut (), } } -pub fn recv_return(reader: &mut R, tag_bytes: &[u8], data: *mut (), - alloc: &dyn Fn(usize) -> Result<*mut (), E>) - -> Result<(), E> - where R: Read + ?Sized, - E: From +pub async fn recv_return(stream: &TcpStream, tag_bytes: &[u8], data: *mut (), + alloc: &dyn Fn(usize) -> *mut ()) + -> Result<(), smoltcp::Error> { let mut it = TagIterator::new(tag_bytes); debug!("recv ...->{}", it); let tag = it.next().expect("truncated tag"); let mut data = data; - unsafe { recv_value(reader, tag, &mut data, alloc)? }; + unsafe { recv_value(stream, tag, &mut data, alloc).await? }; Ok(()) }