rpc: revert to libio

* Recursive async fns in Rust are a mess and not usable.
* When doing ARTIQ async RPCs, it is simpler to encode the buffer on the kernel CPU and pass that to the comms CPU,
  instead of tracking when kernel CPU memory with the RPC values can be freed.
This commit is contained in:
Sebastien Bourdeauducq 2020-06-03 11:17:49 +08:00
parent 9c0cf7e84c
commit d65df2f454

View File

@ -1,17 +1,14 @@
use core::str; use core::str;
use cslice::{CSlice, CMutSlice}; use cslice::{CSlice, CMutSlice};
use log::debug;
use libasync::smoltcp::TcpStream; use io::{ProtoRead, Read, Write, ProtoWrite, Error};
use crate::proto::*;
use self::tag::{Tag, TagIterator, split_tag}; use self::tag::{Tag, TagIterator, split_tag};
/* TODO: figure out Rust problems with recursive async fns */ unsafe fn recv_value<R, E>(reader: &mut R, tag: Tag, data: &mut *mut (),
alloc: &dyn Fn(usize) -> Result<*mut (), E>)
async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (), -> Result<(), E>
alloc: &dyn Fn(usize) -> *mut ()) where R: Read + ?Sized,
-> Result<()> E: From<Error<R::ReadError>>
{ {
macro_rules! consume_value { macro_rules! consume_value {
($ty:ty, |$ptr:ident| $map:expr) => ({ ($ty:ty, |$ptr:ident| $map:expr) => ({
@ -24,25 +21,22 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (),
match tag { match tag {
Tag::None => Ok(()), Tag::None => Ok(()),
Tag::Bool => Tag::Bool =>
consume_value!(i8, |ptr| { consume_value!(u8, |ptr| {
*ptr = read_i8(stream).await?; *ptr = reader.read_u8()?; Ok(())
Ok(())
}), }),
Tag::Int32 => Tag::Int32 =>
consume_value!(i32, |ptr| { consume_value!(u32, |ptr| {
*ptr = read_i32(stream).await?; *ptr = reader.read_u32()?; Ok(())
Ok(())
}), }),
Tag::Int64 | Tag::Float64 => Tag::Int64 | Tag::Float64 =>
consume_value!(i64, |ptr| { consume_value!(u64, |ptr| {
*ptr = read_i64(stream).await?; *ptr = reader.read_u64()?; Ok(())
Ok(())
}), }),
Tag::String | Tag::Bytes | Tag::ByteArray => { Tag::String | Tag::Bytes | Tag::ByteArray => {
consume_value!(CMutSlice<u8>, |ptr| { consume_value!(CMutSlice<u8>, |ptr| {
let length = read_i32(stream).await? as usize; let length = reader.read_u32()? as usize;
*ptr = CMutSlice::new(alloc(length) as *mut u8, length); *ptr = CMutSlice::new(alloc(length)? as *mut u8, length);
read_chunk(stream, (*ptr).as_mut()).await?; reader.read_exact((*ptr).as_mut())?;
Ok(()) Ok(())
}) })
} }
@ -50,30 +44,30 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (),
let mut it = it.clone(); let mut it = it.clone();
for _ in 0..arity { for _ in 0..arity {
let tag = it.next().expect("truncated tag"); let tag = it.next().expect("truncated tag");
// TODO recv_value(stream, tag, data, alloc).await? recv_value(reader, tag, data, alloc)?
} }
Ok(()) Ok(())
} }
Tag::List(it) | Tag::Array(it) => { Tag::List(it) | Tag::Array(it) => {
struct List { elements: *mut (), length: u32 }; struct List { elements: *mut (), length: u32 };
consume_value!(List, |ptr| { consume_value!(List, |ptr| {
(*ptr).length = read_i32(stream).await? as u32; (*ptr).length = reader.read_u32()?;
let tag = it.clone().next().expect("truncated tag"); let tag = it.clone().next().expect("truncated tag");
(*ptr).elements = alloc(tag.size() * (*ptr).length as usize); (*ptr).elements = alloc(tag.size() * (*ptr).length as usize)?;
let mut data = (*ptr).elements; let mut data = (*ptr).elements;
for _ in 0..(*ptr).length as usize { for _ in 0..(*ptr).length as usize {
// TODO recv_value(stream, tag, &mut data, alloc).await? recv_value(reader, tag, &mut data, alloc)?
} }
Ok(()) Ok(())
}) })
} }
Tag::Range(it) => { Tag::Range(it) => {
let tag = it.clone().next().expect("truncated tag"); let tag = it.clone().next().expect("truncated tag");
// TODO recv_value(stream, tag, data, alloc).await?; recv_value(reader, tag, data, alloc)?;
// TODO recv_value(stream, tag, data, alloc).await?; recv_value(reader, tag, data, alloc)?;
// TODO recv_value(stream, tag, data, alloc).await?; recv_value(reader, tag, data, alloc)?;
Ok(()) Ok(())
} }
Tag::Keyword(_) => unreachable!(), Tag::Keyword(_) => unreachable!(),
@ -81,22 +75,26 @@ async unsafe fn recv_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *mut (),
} }
} }
pub async fn recv_return(stream: &TcpStream, tag_bytes: &[u8], data: *mut (), pub fn recv_return<R, E>(reader: &mut R, tag_bytes: &[u8], data: *mut (),
alloc: &dyn Fn(usize) -> *mut ()) alloc: &dyn Fn(usize) -> Result<*mut (), E>)
-> Result<()> -> Result<(), E>
where R: Read + ?Sized,
E: From<Error<R::ReadError>>
{ {
let mut it = TagIterator::new(tag_bytes); let mut it = TagIterator::new(tag_bytes);
#[cfg(feature = "log")]
debug!("recv ...->{}", it); debug!("recv ...->{}", it);
let tag = it.next().expect("truncated tag"); let tag = it.next().expect("truncated tag");
let mut data = data; let mut data = data;
unsafe { recv_value(stream, tag, &mut data, alloc).await? }; unsafe { recv_value(reader, tag, &mut data, alloc)? };
Ok(()) Ok(())
} }
async unsafe fn send_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *const ()) unsafe fn send_value<W>(writer: &mut W, tag: Tag, data: &mut *const ())
-> Result<()> -> Result<(), Error<W::WriteError>>
where W: Write + ?Sized
{ {
macro_rules! consume_value { macro_rules! consume_value {
($ty:ty, |$ptr:ident| $map:expr) => ({ ($ty:ty, |$ptr:ident| $map:expr) => ({
@ -106,60 +104,59 @@ async unsafe fn send_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *const (
}) })
} }
write_i8(stream, tag.as_u8() as i8).await?; writer.write_u8(tag.as_u8())?;
match tag { match tag {
Tag::None => Ok(()), Tag::None => Ok(()),
Tag::Bool => Tag::Bool =>
consume_value!(i8, |ptr| consume_value!(u8, |ptr|
write_i8(stream, *ptr).await), writer.write_u8(*ptr)),
Tag::Int32 => Tag::Int32 =>
consume_value!(i32, |ptr| consume_value!(u32, |ptr|
write_i32(stream, *ptr).await), writer.write_u32(*ptr)),
Tag::Int64 | Tag::Float64 => Tag::Int64 | Tag::Float64 =>
consume_value!(i64, |ptr| consume_value!(u64, |ptr|
write_i64(stream, *ptr).await), writer.write_u64(*ptr)),
Tag::String => Tag::String =>
consume_value!(CSlice<u8>, |ptr| consume_value!(CSlice<u8>, |ptr|
write_string(stream, str::from_utf8((*ptr).as_ref()).unwrap()).await), writer.write_string(str::from_utf8((*ptr).as_ref()).unwrap())),
Tag::Bytes | Tag::ByteArray => Tag::Bytes | Tag::ByteArray =>
consume_value!(CSlice<u8>, |ptr| consume_value!(CSlice<u8>, |ptr|
stream.send((*ptr).as_ref().iter().copied()).await), writer.write_bytes((*ptr).as_ref())),
Tag::Tuple(it, arity) => { Tag::Tuple(it, arity) => {
let mut it = it.clone(); let mut it = it.clone();
write_i8(stream, arity as i8).await?; writer.write_u8(arity)?;
for _ in 0..arity { for _ in 0..arity {
let tag = it.next().expect("truncated tag"); let tag = it.next().expect("truncated tag");
// TODO send_value(stream, tag, data).await? send_value(writer, tag, data)?
} }
Ok(()) Ok(())
} }
Tag::List(it) | Tag::Array(it) => { Tag::List(it) | Tag::Array(it) => {
struct List { elements: *const (), length: u32 }; struct List { elements: *const (), length: u32 };
consume_value!(List, |ptr| { consume_value!(List, |ptr| {
write_i32(stream, (*ptr).length as i32).await?; writer.write_u32((*ptr).length)?;
let tag = it.clone().next().expect("truncated tag"); let tag = it.clone().next().expect("truncated tag");
let mut data = (*ptr).elements; let mut data = (*ptr).elements;
for _ in 0..(*ptr).length as usize { for _ in 0..(*ptr).length as usize {
// TODO send_value(stream, tag, &mut data).await?; send_value(writer, tag, &mut data)?;
} }
Ok(()) Ok(())
}) })
} }
Tag::Range(it) => { Tag::Range(it) => {
let tag = it.clone().next().expect("truncated tag"); let tag = it.clone().next().expect("truncated tag");
// TODO send_value(stream, tag, data).await?; send_value(writer, tag, data)?;
// TODO send_value(stream, tag, data).await?; send_value(writer, tag, data)?;
// TODO send_value(stream, tag, data).await?; send_value(writer, tag, data)?;
Ok(()) Ok(())
} }
Tag::Keyword(it) => { Tag::Keyword(it) => {
struct Keyword<'a> { name: CSlice<'a, u8> }; struct Keyword<'a> { name: CSlice<'a, u8> };
consume_value!(Keyword, |ptr| { consume_value!(Keyword, |ptr| {
write_string(stream, str::from_utf8((*ptr).name.as_ref()).unwrap()).await?; writer.write_string(str::from_utf8((*ptr).name.as_ref()).unwrap())?;
let tag = it.clone().next().expect("truncated tag"); let tag = it.clone().next().expect("truncated tag");
let mut data = ptr.offset(1) as *const (); let mut data = ptr.offset(1) as *const ();
// TODO send_value(stream, tag, &mut data).await send_value(writer, tag, &mut data)
Ok(())
}) })
// Tag::Keyword never appears in composite types, so we don't have // Tag::Keyword never appears in composite types, so we don't have
// to accurately advance data. // to accurately advance data.
@ -167,31 +164,35 @@ async unsafe fn send_value(stream: &TcpStream, tag: Tag<'_>, data: &mut *const (
Tag::Object => { Tag::Object => {
struct Object { id: u32 }; struct Object { id: u32 };
consume_value!(*const Object, |ptr| consume_value!(*const Object, |ptr|
write_i32(stream, (**ptr).id as i32).await) writer.write_u32((**ptr).id))
} }
} }
} }
pub async fn send_args(stream: &TcpStream, service: u32, tag_bytes: &[u8], data: *const *const ()) pub fn send_args<W>(writer: &mut W, service: u32, tag_bytes: &[u8], data: *const *const ())
-> Result<()> -> Result<(), Error<W::WriteError>>
where W: Write + ?Sized
{ {
let (arg_tags_bytes, return_tag_bytes) = split_tag(tag_bytes); let (arg_tags_bytes, return_tag_bytes) = split_tag(tag_bytes);
let mut args_it = TagIterator::new(arg_tags_bytes); let mut args_it = TagIterator::new(arg_tags_bytes);
#[cfg(feature = "log")]
{
let return_it = TagIterator::new(return_tag_bytes); let return_it = TagIterator::new(return_tag_bytes);
debug!("send<{}>({})->{}", service, args_it, return_it); debug!("send<{}>({})->{}", service, args_it, return_it);
}
write_i32(stream, service as i32).await?; writer.write_u32(service)?;
for index in 0.. { for index in 0.. {
if let Some(arg_tag) = args_it.next() { if let Some(arg_tag) = args_it.next() {
let mut data = unsafe { *data.offset(index) }; let mut data = unsafe { *data.offset(index) };
unsafe { send_value(stream, arg_tag, &mut data).await? }; unsafe { send_value(writer, arg_tag, &mut data)? };
} else { } else {
break break
} }
} }
write_i8(stream, 0).await?; writer.write_u8(0)?;
stream.send(return_tag_bytes.iter().copied()).await?; writer.write_bytes(return_tag_bytes)?;
Ok(()) Ok(())
} }