From d65df2f45441da8644295396421597b762896dbd Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Wed, 3 Jun 2020 11:17:49 +0800 Subject: [PATCH] rpc: revert to libio * Recursive async fns in Rust are a mess and not usable. * When doing ARTIQ async RPCs, it is simpler to encode the buffer on the kernel CPU and pass that to the comms CPU, instead of tracking when kernel CPU memory with the RPC values can be freed. --- src/runtime/src/rpc.rs | 125 +++++++++++++++++++++-------------------- 1 file changed, 63 insertions(+), 62 deletions(-) diff --git a/src/runtime/src/rpc.rs b/src/runtime/src/rpc.rs index f17c041b..915a3483 100644 --- a/src/runtime/src/rpc.rs +++ b/src/runtime/src/rpc.rs @@ -1,17 +1,14 @@ use core::str; use cslice::{CSlice, CMutSlice}; -use log::debug; -use libasync::smoltcp::TcpStream; - -use crate::proto::*; +use io::{ProtoRead, Read, Write, ProtoWrite, Error}; use self::tag::{Tag, TagIterator, split_tag}; -/* TODO: figure out Rust problems with recursive async fns */ - -async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (), - alloc: &dyn Fn(usize) -> *mut ()) - -> Result<()> +unsafe fn recv_value(reader: &mut R, tag: Tag, data: &mut *mut (), + alloc: &dyn Fn(usize) -> Result<*mut (), E>) + -> Result<(), E> + where R: Read + ?Sized, + E: From> { macro_rules! consume_value { ($ty:ty, |$ptr:ident| $map:expr) => ({ @@ -24,25 +21,22 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (), match tag { Tag::None => Ok(()), Tag::Bool => - consume_value!(i8, |ptr| { - *ptr = read_i8(stream).await?; - Ok(()) + consume_value!(u8, |ptr| { + *ptr = reader.read_u8()?; Ok(()) }), Tag::Int32 => - consume_value!(i32, |ptr| { - *ptr = read_i32(stream).await?; - Ok(()) + consume_value!(u32, |ptr| { + *ptr = reader.read_u32()?; Ok(()) }), Tag::Int64 | Tag::Float64 => - consume_value!(i64, |ptr| { - *ptr = read_i64(stream).await?; - Ok(()) + consume_value!(u64, |ptr| { + *ptr = reader.read_u64()?; Ok(()) }), Tag::String | Tag::Bytes | Tag::ByteArray => { consume_value!(CMutSlice, |ptr| { - let length = read_i32(stream).await? as usize; - *ptr = CMutSlice::new(alloc(length) as *mut u8, length); - read_chunk(stream, (*ptr).as_mut()).await?; + let length = reader.read_u32()? as usize; + *ptr = CMutSlice::new(alloc(length)? as *mut u8, length); + reader.read_exact((*ptr).as_mut())?; Ok(()) }) } @@ -50,30 +44,30 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (), let mut it = it.clone(); for _ in 0..arity { let tag = it.next().expect("truncated tag"); - // TODO recv_value(stream, tag, data, alloc).await? + recv_value(reader, tag, data, alloc)? } Ok(()) } Tag::List(it) | Tag::Array(it) => { struct List { elements: *mut (), length: u32 }; consume_value!(List, |ptr| { - (*ptr).length = read_i32(stream).await? as u32; + (*ptr).length = reader.read_u32()?; let tag = it.clone().next().expect("truncated tag"); - (*ptr).elements = alloc(tag.size() * (*ptr).length as usize); + (*ptr).elements = alloc(tag.size() * (*ptr).length as usize)?; let mut data = (*ptr).elements; for _ in 0..(*ptr).length as usize { - // TODO recv_value(stream, tag, &mut data, alloc).await? + recv_value(reader, tag, &mut data, alloc)? } Ok(()) }) } Tag::Range(it) => { let tag = it.clone().next().expect("truncated tag"); - // TODO recv_value(stream, tag, data, alloc).await?; - // TODO recv_value(stream, tag, data, alloc).await?; - // TODO recv_value(stream, tag, data, alloc).await?; + recv_value(reader, tag, data, alloc)?; + recv_value(reader, tag, data, alloc)?; + recv_value(reader, tag, data, alloc)?; Ok(()) } Tag::Keyword(_) => unreachable!(), @@ -81,22 +75,26 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (), } } -pub async fn recv_return(stream: &TcpStream, tag_bytes: &[u8], data: *mut (), - alloc: &dyn Fn(usize) -> *mut ()) - -> Result<()> +pub fn recv_return(reader: &mut R, tag_bytes: &[u8], data: *mut (), + alloc: &dyn Fn(usize) -> Result<*mut (), E>) + -> Result<(), E> + where R: Read + ?Sized, + E: From> { let mut it = TagIterator::new(tag_bytes); + #[cfg(feature = "log")] debug!("recv ...->{}", it); let tag = it.next().expect("truncated tag"); let mut data = data; - unsafe { recv_value(stream, tag, &mut data, alloc).await? }; + unsafe { recv_value(reader, tag, &mut data, alloc)? }; Ok(()) } -async unsafe fn send_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *const ()) - -> Result<()> +unsafe fn send_value(writer: &mut W, tag: Tag, data: &mut *const ()) + -> Result<(), Error> + where W: Write + ?Sized { macro_rules! consume_value { ($ty:ty, |$ptr:ident| $map:expr) => ({ @@ -106,60 +104,59 @@ async unsafe fn send_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *const ( }) } - write_i8(stream, tag.as_u8() as i8).await?; + writer.write_u8(tag.as_u8())?; match tag { Tag::None => Ok(()), Tag::Bool => - consume_value!(i8, |ptr| - write_i8(stream, *ptr).await), + consume_value!(u8, |ptr| + writer.write_u8(*ptr)), Tag::Int32 => - consume_value!(i32, |ptr| - write_i32(stream, *ptr).await), + consume_value!(u32, |ptr| + writer.write_u32(*ptr)), Tag::Int64 | Tag::Float64 => - consume_value!(i64, |ptr| - write_i64(stream, *ptr).await), + consume_value!(u64, |ptr| + writer.write_u64(*ptr)), Tag::String => consume_value!(CSlice, |ptr| - write_string(stream, str::from_utf8((*ptr).as_ref()).unwrap()).await), + writer.write_string(str::from_utf8((*ptr).as_ref()).unwrap())), Tag::Bytes | Tag::ByteArray => consume_value!(CSlice, |ptr| - stream.send((*ptr).as_ref().iter().copied()).await), + writer.write_bytes((*ptr).as_ref())), Tag::Tuple(it, arity) => { let mut it = it.clone(); - write_i8(stream, arity as i8).await?; + writer.write_u8(arity)?; for _ in 0..arity { let tag = it.next().expect("truncated tag"); - // TODO send_value(stream, tag, data).await? + send_value(writer, tag, data)? } Ok(()) } Tag::List(it) | Tag::Array(it) => { struct List { elements: *const (), length: u32 }; consume_value!(List, |ptr| { - write_i32(stream, (*ptr).length as i32).await?; + writer.write_u32((*ptr).length)?; let tag = it.clone().next().expect("truncated tag"); let mut data = (*ptr).elements; for _ in 0..(*ptr).length as usize { - // TODO send_value(stream, tag, &mut data).await?; + send_value(writer, tag, &mut data)?; } Ok(()) }) } Tag::Range(it) => { let tag = it.clone().next().expect("truncated tag"); - // TODO send_value(stream, tag, data).await?; - // TODO send_value(stream, tag, data).await?; - // TODO send_value(stream, tag, data).await?; + send_value(writer, tag, data)?; + send_value(writer, tag, data)?; + send_value(writer, tag, data)?; Ok(()) } Tag::Keyword(it) => { struct Keyword<'a> { name: CSlice<'a, u8> }; consume_value!(Keyword, |ptr| { - write_string(stream, str::from_utf8((*ptr).name.as_ref()).unwrap()).await?; + writer.write_string(str::from_utf8((*ptr).name.as_ref()).unwrap())?; let tag = it.clone().next().expect("truncated tag"); let mut data = ptr.offset(1) as *const (); - // TODO send_value(stream, tag, &mut data).await - Ok(()) + send_value(writer, tag, &mut data) }) // Tag::Keyword never appears in composite types, so we don't have // to accurately advance data. @@ -167,31 +164,35 @@ async unsafe fn send_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *const ( Tag::Object => { struct Object { id: u32 }; consume_value!(*const Object, |ptr| - write_i32(stream, (**ptr).id as i32).await) + writer.write_u32((**ptr).id)) } } } -pub async fn send_args(stream: &TcpStream, service: u32, tag_bytes: &[u8], data: *const *const ()) - -> Result<()> +pub fn send_args(writer: &mut W, service: u32, tag_bytes: &[u8], data: *const *const ()) + -> Result<(), Error> + where W: Write + ?Sized { let (arg_tags_bytes, return_tag_bytes) = split_tag(tag_bytes); let mut args_it = TagIterator::new(arg_tags_bytes); - let return_it = TagIterator::new(return_tag_bytes); - debug!("send<{}>({})->{}", service, args_it, return_it); + #[cfg(feature = "log")] + { + let return_it = TagIterator::new(return_tag_bytes); + debug!("send<{}>({})->{}", service, args_it, return_it); + } - write_i32(stream, service as i32).await?; + writer.write_u32(service)?; for index in 0.. { if let Some(arg_tag) = args_it.next() { let mut data = unsafe { *data.offset(index) }; - unsafe { send_value(stream, arg_tag, &mut data).await? }; + unsafe { send_value(writer, arg_tag, &mut data)? }; } else { break } } - write_i8(stream, 0).await?; - stream.send(return_tag_bytes.iter().copied()).await?; + writer.write_u8(0)?; + writer.write_bytes(return_tag_bytes)?; Ok(()) }