diff --git a/src/runtime/src/rpc.rs b/src/runtime/src/rpc.rs index f17c041b..915a3483 100644 --- a/src/runtime/src/rpc.rs +++ b/src/runtime/src/rpc.rs @@ -1,17 +1,14 @@ use core::str; use cslice::{CSlice, CMutSlice}; -use log::debug; -use libasync::smoltcp::TcpStream; - -use crate::proto::*; +use io::{ProtoRead, Read, Write, ProtoWrite, Error}; use self::tag::{Tag, TagIterator, split_tag}; -/* TODO: figure out Rust problems with recursive async fns */ - -async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (), - alloc: &dyn Fn(usize) -> *mut ()) - -> Result<()> +unsafe fn recv_value(reader: &mut R, tag: Tag, data: &mut *mut (), + alloc: &dyn Fn(usize) -> Result<*mut (), E>) + -> Result<(), E> + where R: Read + ?Sized, + E: From> { macro_rules! consume_value { ($ty:ty, |$ptr:ident| $map:expr) => ({ @@ -24,25 +21,22 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (), match tag { Tag::None => Ok(()), Tag::Bool => - consume_value!(i8, |ptr| { - *ptr = read_i8(stream).await?; - Ok(()) + consume_value!(u8, |ptr| { + *ptr = reader.read_u8()?; Ok(()) }), Tag::Int32 => - consume_value!(i32, |ptr| { - *ptr = read_i32(stream).await?; - Ok(()) + consume_value!(u32, |ptr| { + *ptr = reader.read_u32()?; Ok(()) }), Tag::Int64 | Tag::Float64 => - consume_value!(i64, |ptr| { - *ptr = read_i64(stream).await?; - Ok(()) + consume_value!(u64, |ptr| { + *ptr = reader.read_u64()?; Ok(()) }), Tag::String | Tag::Bytes | Tag::ByteArray => { consume_value!(CMutSlice, |ptr| { - let length = read_i32(stream).await? as usize; - *ptr = CMutSlice::new(alloc(length) as *mut u8, length); - read_chunk(stream, (*ptr).as_mut()).await?; + let length = reader.read_u32()? as usize; + *ptr = CMutSlice::new(alloc(length)? as *mut u8, length); + reader.read_exact((*ptr).as_mut())?; Ok(()) }) } @@ -50,30 +44,30 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (), let mut it = it.clone(); for _ in 0..arity { let tag = it.next().expect("truncated tag"); - // TODO recv_value(stream, tag, data, alloc).await? + recv_value(reader, tag, data, alloc)? } Ok(()) } Tag::List(it) | Tag::Array(it) => { struct List { elements: *mut (), length: u32 }; consume_value!(List, |ptr| { - (*ptr).length = read_i32(stream).await? as u32; + (*ptr).length = reader.read_u32()?; let tag = it.clone().next().expect("truncated tag"); - (*ptr).elements = alloc(tag.size() * (*ptr).length as usize); + (*ptr).elements = alloc(tag.size() * (*ptr).length as usize)?; let mut data = (*ptr).elements; for _ in 0..(*ptr).length as usize { - // TODO recv_value(stream, tag, &mut data, alloc).await? + recv_value(reader, tag, &mut data, alloc)? } Ok(()) }) } Tag::Range(it) => { let tag = it.clone().next().expect("truncated tag"); - // TODO recv_value(stream, tag, data, alloc).await?; - // TODO recv_value(stream, tag, data, alloc).await?; - // TODO recv_value(stream, tag, data, alloc).await?; + recv_value(reader, tag, data, alloc)?; + recv_value(reader, tag, data, alloc)?; + recv_value(reader, tag, data, alloc)?; Ok(()) } Tag::Keyword(_) => unreachable!(), @@ -81,22 +75,26 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (), } } -pub async fn recv_return(stream: &TcpStream, tag_bytes: &[u8], data: *mut (), - alloc: &dyn Fn(usize) -> *mut ()) - -> Result<()> +pub fn recv_return(reader: &mut R, tag_bytes: &[u8], data: *mut (), + alloc: &dyn Fn(usize) -> Result<*mut (), E>) + -> Result<(), E> + where R: Read + ?Sized, + E: From> { let mut it = TagIterator::new(tag_bytes); + #[cfg(feature = "log")] debug!("recv ...->{}", it); let tag = it.next().expect("truncated tag"); let mut data = data; - unsafe { recv_value(stream, tag, &mut data, alloc).await? }; + unsafe { recv_value(reader, tag, &mut data, alloc)? }; Ok(()) } -async unsafe fn send_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *const ()) - -> Result<()> +unsafe fn send_value(writer: &mut W, tag: Tag, data: &mut *const ()) + -> Result<(), Error> + where W: Write + ?Sized { macro_rules! consume_value { ($ty:ty, |$ptr:ident| $map:expr) => ({ @@ -106,60 +104,59 @@ async unsafe fn send_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *const ( }) } - write_i8(stream, tag.as_u8() as i8).await?; + writer.write_u8(tag.as_u8())?; match tag { Tag::None => Ok(()), Tag::Bool => - consume_value!(i8, |ptr| - write_i8(stream, *ptr).await), + consume_value!(u8, |ptr| + writer.write_u8(*ptr)), Tag::Int32 => - consume_value!(i32, |ptr| - write_i32(stream, *ptr).await), + consume_value!(u32, |ptr| + writer.write_u32(*ptr)), Tag::Int64 | Tag::Float64 => - consume_value!(i64, |ptr| - write_i64(stream, *ptr).await), + consume_value!(u64, |ptr| + writer.write_u64(*ptr)), Tag::String => consume_value!(CSlice, |ptr| - write_string(stream, str::from_utf8((*ptr).as_ref()).unwrap()).await), + writer.write_string(str::from_utf8((*ptr).as_ref()).unwrap())), Tag::Bytes | Tag::ByteArray => consume_value!(CSlice, |ptr| - stream.send((*ptr).as_ref().iter().copied()).await), + writer.write_bytes((*ptr).as_ref())), Tag::Tuple(it, arity) => { let mut it = it.clone(); - write_i8(stream, arity as i8).await?; + writer.write_u8(arity)?; for _ in 0..arity { let tag = it.next().expect("truncated tag"); - // TODO send_value(stream, tag, data).await? + send_value(writer, tag, data)? } Ok(()) } Tag::List(it) | Tag::Array(it) => { struct List { elements: *const (), length: u32 }; consume_value!(List, |ptr| { - write_i32(stream, (*ptr).length as i32).await?; + writer.write_u32((*ptr).length)?; let tag = it.clone().next().expect("truncated tag"); let mut data = (*ptr).elements; for _ in 0..(*ptr).length as usize { - // TODO send_value(stream, tag, &mut data).await?; + send_value(writer, tag, &mut data)?; } Ok(()) }) } Tag::Range(it) => { let tag = it.clone().next().expect("truncated tag"); - // TODO send_value(stream, tag, data).await?; - // TODO send_value(stream, tag, data).await?; - // TODO send_value(stream, tag, data).await?; + send_value(writer, tag, data)?; + send_value(writer, tag, data)?; + send_value(writer, tag, data)?; Ok(()) } Tag::Keyword(it) => { struct Keyword<'a> { name: CSlice<'a, u8> }; consume_value!(Keyword, |ptr| { - write_string(stream, str::from_utf8((*ptr).name.as_ref()).unwrap()).await?; + writer.write_string(str::from_utf8((*ptr).name.as_ref()).unwrap())?; let tag = it.clone().next().expect("truncated tag"); let mut data = ptr.offset(1) as *const (); - // TODO send_value(stream, tag, &mut data).await - Ok(()) + send_value(writer, tag, &mut data) }) // Tag::Keyword never appears in composite types, so we don't have // to accurately advance data. @@ -167,31 +164,35 @@ async unsafe fn send_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *const ( Tag::Object => { struct Object { id: u32 }; consume_value!(*const Object, |ptr| - write_i32(stream, (**ptr).id as i32).await) + writer.write_u32((**ptr).id)) } } } -pub async fn send_args(stream: &TcpStream, service: u32, tag_bytes: &[u8], data: *const *const ()) - -> Result<()> +pub fn send_args(writer: &mut W, service: u32, tag_bytes: &[u8], data: *const *const ()) + -> Result<(), Error> + where W: Write + ?Sized { let (arg_tags_bytes, return_tag_bytes) = split_tag(tag_bytes); let mut args_it = TagIterator::new(arg_tags_bytes); - let return_it = TagIterator::new(return_tag_bytes); - debug!("send<{}>({})->{}", service, args_it, return_it); + #[cfg(feature = "log")] + { + let return_it = TagIterator::new(return_tag_bytes); + debug!("send<{}>({})->{}", service, args_it, return_it); + } - write_i32(stream, service as i32).await?; + writer.write_u32(service)?; for index in 0.. { if let Some(arg_tag) = args_it.next() { let mut data = unsafe { *data.offset(index) }; - unsafe { send_value(stream, arg_tag, &mut data).await? }; + unsafe { send_value(writer, arg_tag, &mut data)? }; } else { break } } - write_i8(stream, 0).await?; - stream.send(return_tag_bytes.iter().copied()).await?; + writer.write_u8(0)?; + writer.write_bytes(return_tag_bytes)?; Ok(()) }