From 4cfc4e89b983b209989603ba5ec8dbc3fdcc028a Mon Sep 17 00:00:00 2001 From: whitequark Date: Wed, 5 Oct 2016 14:15:53 +0000 Subject: [PATCH] Rust: add basic RPC support. --- artiq/runtime.rs/src/kernel_proto.rs | 6 +- artiq/runtime.rs/src/lib.rs | 3 +- artiq/runtime.rs/src/proto.rs | 13 ++ artiq/runtime.rs/src/rpc.rs | 187 ++++++++++++++++++++++++++ artiq/runtime.rs/src/session.rs | 48 ++++++- artiq/runtime.rs/src/session_proto.rs | 30 ++--- 6 files changed, 262 insertions(+), 25 deletions(-) create mode 100644 artiq/runtime.rs/src/rpc.rs diff --git a/artiq/runtime.rs/src/kernel_proto.rs b/artiq/runtime.rs/src/kernel_proto.rs index e7c76a508..eb8fb7c2a 100644 --- a/artiq/runtime.rs/src/kernel_proto.rs +++ b/artiq/runtime.rs/src/kernel_proto.rs @@ -38,6 +38,7 @@ pub enum Message<'a> { RpcSend { service: u32, + batch: bool, tag: &'a [u8], data: *const *const () }, @@ -182,10 +183,11 @@ impl<'a> Message<'a> { Message::WatchdogClear { id: (*msg).id as usize } } - c::Type::RpcSend => { + c::Type::RpcSend | c::Type::RpcBatch => { let msg = ptr as *const c::RpcSend; Message::RpcSend { service: (*msg).service as _, + batch: (*msg).ty == c::Type::RpcBatch, tag: slice::from_raw_parts((*msg).tag as *const _, c::strlen((*msg).tag) as usize), data: (*msg).data as *const _ @@ -248,7 +250,7 @@ mod c { extern { pub fn strlen(ptr: *const c_char) -> size_t; } #[repr(u32)] - #[derive(Debug)] + #[derive(Debug, PartialEq, Eq)] #[allow(dead_code)] pub enum Type { LoadRequest, diff --git a/artiq/runtime.rs/src/lib.rs b/artiq/runtime.rs/src/lib.rs index 68a0ff269..a9da393ec 100644 --- a/artiq/runtime.rs/src/lib.rs +++ b/artiq/runtime.rs/src/lib.rs @@ -31,6 +31,7 @@ mod moninj_proto; mod analyzer_proto; mod kernel; +mod rpc; mod session; mod moninj; #[cfg(has_rtio_analyzer)] @@ -57,7 +58,7 @@ pub unsafe extern fn rust_main() { network_init(); let mut scheduler = sched::Scheduler::new(); - scheduler.spawner().spawn(8192, session::thread); + scheduler.spawner().spawn(16384, session::thread); scheduler.spawner().spawn(4096, moninj::thread); #[cfg(has_rtio_analyzer)] scheduler.spawner().spawn(4096, analyzer::thread); diff --git a/artiq/runtime.rs/src/proto.rs b/artiq/runtime.rs/src/proto.rs index acb22998b..d7df40b8c 100644 --- a/artiq/runtime.rs/src/proto.rs +++ b/artiq/runtime.rs/src/proto.rs @@ -1,6 +1,7 @@ #![allow(dead_code)] use std::io::{self, Read, Write}; +use std::vec::Vec; use byteorder::{ByteOrder, NetworkEndian}; // FIXME: replace these with byteorder core io traits once those are in @@ -50,3 +51,15 @@ pub fn write_u64(writer: &mut Write, value: u64) -> io::Result<()> { NetworkEndian::write_u64(&mut bytes, value); writer.write_all(&bytes) } + +pub fn read_bytes(reader: &mut Read) -> io::Result> { + let length = try!(read_u32(reader)); + let mut value = vec![0; length as usize]; + try!(reader.read_exact(&mut value)); + Ok(value) +} + +pub fn write_bytes(writer: &mut Write, value: &[u8]) -> io::Result<()> { + try!(write_u32(writer, value.len() as u32)); + writer.write_all(value) +} diff --git a/artiq/runtime.rs/src/rpc.rs b/artiq/runtime.rs/src/rpc.rs new file mode 100644 index 000000000..5507a98e0 --- /dev/null +++ b/artiq/runtime.rs/src/rpc.rs @@ -0,0 +1,187 @@ +use std::io::{self, Read, Write}; +use proto::{write_u8, write_bytes}; +use self::tag::{Tag, TagIterator, split_tag}; + +fn recv_value(reader: &mut Read, tag: Tag, data: &mut *const ()) -> io::Result<()> { + match tag { + Tag::None => Ok(()), + _ => unreachable!() + } +} + +pub fn recv_return(reader: &mut Read, tag_bytes: &[u8], data: *const ()) -> io::Result<()> { + let mut it = TagIterator::new(tag_bytes); + trace!("recv ...->{}", it); + + let mut data = data; + try!(recv_value(reader, it.next().expect("RPC without a return value"), &mut data)); + + Ok(()) +} + +fn send_value(writer: &mut Write, tag: Tag, data: &mut *const ()) -> io::Result<()> { + match tag { + Tag::None => write_u8(writer, b'n'), + _ => unreachable!() + } +} + +pub fn send_args(writer: &mut Write, tag_bytes: &[u8], + data: *const *const ()) -> io::Result<()> { + let (arg_tags_bytes, return_tag_bytes) = split_tag(tag_bytes); + + let mut args_it = TagIterator::new(arg_tags_bytes); + let return_it = TagIterator::new(return_tag_bytes); + trace!("send ({})->{}", args_it, return_it); + + for index in 0.. { + if let Some(arg_tag) = args_it.next() { + let mut data = unsafe { *data.offset(index) }; + try!(send_value(writer, arg_tag, &mut data)); + } else { + break + } + } + try!(write_u8(writer, 0)); + try!(write_bytes(writer, return_tag_bytes)); + + Ok(()) +} + +mod tag { + use core::fmt; + + pub fn split_tag(tag_bytes: &[u8]) -> (&[u8], &[u8]) { + let tag_separator = + tag_bytes.iter() + .position(|&b| b == b':') + .expect("tag without a return separator"); + let (arg_tags_bytes, rest) = tag_bytes.split_at(tag_separator); + let return_tag_bytes = &rest[1..]; + (arg_tags_bytes, return_tag_bytes) + } + + #[derive(Debug)] + pub enum Tag<'a> { + None, + Bool, + Int32, + Int64, + Float64, + String, + Tuple(TagIterator<'a>, u8), + List(TagIterator<'a>), + Array(TagIterator<'a>), + Range(TagIterator<'a>), + Keyword(TagIterator<'a>), + Object + } + + #[derive(Debug, Clone, Copy)] + pub struct TagIterator<'a> { + data: &'a [u8] + } + + impl<'a> TagIterator<'a> { + pub fn new(data: &'a [u8]) -> TagIterator<'a> { + TagIterator { data: data } + } + + pub fn next(&mut self) -> Option> { + if self.data.len() == 0 { + return None + } + + let tag_byte = self.data[0]; + self.data = &self.data[1..]; + Some(match tag_byte { + b'n' => Tag::None, + b'b' => Tag::Bool, + b'i' => Tag::Int32, + b'I' => Tag::Int64, + b'f' => Tag::Float64, + b's' => Tag::String, + b't' => { + let count = self.data[0]; + self.data = &self.data[1..]; + Tag::Tuple(self.skip(count), count) + } + b'l' => Tag::List(self.skip(1)), + b'a' => Tag::Array(self.skip(1)), + b'r' => Tag::Range(self.skip(1)), + b'k' => Tag::Keyword(self.skip(1)), + b'O' => Tag::Object, + _ => unreachable!() + }) + } + + fn skip(&mut self, count: u8) -> TagIterator<'a> { + let origin = self.clone(); + for _ in 0..count { + self.next().expect("truncated tag"); + } + origin + } + } + + impl<'a> fmt::Display for TagIterator<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + let mut it = self.clone(); + let mut first = true; + while let Some(tag) = it.next() { + if first { + first = false + } else { + try!(write!(f, ", ")) + } + + match tag { + Tag::None => + try!(write!(f, "None")), + Tag::Bool => + try!(write!(f, "Bool")), + Tag::Int32 => + try!(write!(f, "Int32")), + Tag::Int64 => + try!(write!(f, "Int64")), + Tag::Float64 => + try!(write!(f, "Float64")), + Tag::String => + try!(write!(f, "String")), + Tag::Tuple(it, cnt) => { + try!(write!(f, "Tuple(")); + for i in 0..cnt { + try!(it.fmt(f)); + if i != cnt - 1 { try!(write!(f, ", ")) } + } + try!(write!(f, ")")) + } + Tag::List(it) => { + try!(write!(f, "List(")); + try!(it.fmt(f)); + try!(write!(f, ")")) + } + Tag::Array(it) => { + try!(write!(f, "Array(")); + try!(it.fmt(f)); + try!(write!(f, ")")) + } + Tag::Range(it) => { + try!(write!(f, "Range(")); + try!(it.fmt(f)); + try!(write!(f, ")")) + } + Tag::Keyword(it) => { + try!(write!(f, "Keyword(")); + try!(it.fmt(f)); + try!(write!(f, ")")) + } + Tag::Object => + try!(write!(f, "Object")) + } + } + + Ok(()) + } + } +} diff --git a/artiq/runtime.rs/src/session.rs b/artiq/runtime.rs/src/session.rs index 42f180e44..d4b85ead6 100644 --- a/artiq/runtime.rs/src/session.rs +++ b/artiq/runtime.rs/src/session.rs @@ -9,6 +9,7 @@ use urc::Urc; use sched::{ThreadHandle, Waiter, Spawner}; use sched::{TcpListener, TcpStream, SocketAddr, IP_ANY}; +use rpc; use session_proto as host; use kernel_proto as kern; @@ -230,11 +231,33 @@ fn process_host_message(waiter: Waiter, Err(_) => host_write(stream, host::Reply::KernelStartupFailed) }, + host::Request::RpcReply { tag, data } => { + if session.kernel_state != KernelState::RpcWait { + unexpected!("unsolicited RPC reply") + } + + try!(kern_recv(waiter, |reply| { + match reply { + kern::RpcRecvRequest { slot } => { + let mut data = io::Cursor::new(data); + rpc::recv_return(&mut data, &tag, slot) + } + other => + unexpected!("unexpected reply from kernel CPU: {:?}", other) + } + })); + try!(kern_send(waiter, kern::RpcRecvReply { alloc_size: 0, exception: None })); + + session.kernel_state = KernelState::Running; + Ok(()) + } + request => unexpected!("unexpected request {:?} from host machine", request) } } fn process_kern_message(waiter: Waiter, + mut stream: Option<&mut TcpStream>, session: &mut Session) -> io::Result { kern::Message::wait_and_receive(waiter, |request| { match (&request, session.kernel_state) { @@ -276,6 +299,24 @@ fn process_kern_message(waiter: Waiter, kern_acknowledge() } + kern::RpcSend { service, batch, tag, data } => { + match stream { + None => unexpected!("unexpected RPC in flash kernel"), + Some(ref mut stream) => { + let mut buf = Vec::new(); + try!(rpc::send_args(&mut buf, tag, data)); + try!(host_write(stream, host::Reply::RpcRequest { + service: service, + data: &buf[..] + })); + if !batch { + session.kernel_state = KernelState::RpcWait + } + kern_acknowledge() + } + } + } + kern::CacheGetRequest { key } => { let value = session.congress.cache.get(key); kern_send(waiter, kern::CacheGetReply { @@ -289,6 +330,7 @@ fn process_kern_message(waiter: Waiter, } kern::RunFinished => { + try!(kern_acknowledge()); session.kernel_state = KernelState::Absent; return Ok(true) } @@ -309,7 +351,7 @@ fn host_kernel_worker(waiter: Waiter, } if mailbox::receive() != 0 { - if try!(process_kern_message(waiter, &mut session)) { + if try!(process_kern_message(waiter, Some(stream), &mut session)) { try!(host_write(stream, host::Reply::KernelFinished)) } } @@ -345,7 +387,7 @@ fn flash_kernel_worker(waiter: Waiter, loop { if mailbox::receive() != 0 { - if try!(process_kern_message(waiter, &mut session)) { + if try!(process_kern_message(waiter, None, &mut session)) { return Ok(()) } } @@ -376,7 +418,7 @@ fn respawn(spawner: Spawner, waiter: Waiter, } } - *handle = Some(spawner.spawn(8192, f)) + *handle = Some(spawner.spawn(16384, f)) } pub fn thread(waiter: Waiter, spawner: Spawner) { diff --git a/artiq/runtime.rs/src/session_proto.rs b/artiq/runtime.rs/src/session_proto.rs index d2118f911..ba32fbc5a 100644 --- a/artiq/runtime.rs/src/session_proto.rs +++ b/artiq/runtime.rs/src/session_proto.rs @@ -2,18 +2,6 @@ use std::prelude::v1::*; use std::io::{self, Read, Write}; use proto::*; -fn read_bytes(reader: &mut Read) -> io::Result> { - let length = try!(read_u32(reader)); - let mut value = vec![0; length as usize]; - try!(reader.read_exact(&mut value)); - Ok(value) -} - -fn write_bytes(writer: &mut Write, value: &[u8]) -> io::Result<()> { - try!(write_u32(writer, value.len() as u32)); - writer.write_all(value) -} - fn read_string(reader: &mut Read) -> io::Result { let mut bytes = try!(read_bytes(reader)); let len = bytes.len() - 1; // length without trailing \0 @@ -89,7 +77,7 @@ pub enum Request { LoadKernel(Vec), RunKernel, - RpcReply { tag: String }, // FIXME + RpcReply { tag: Vec, data: Vec }, RpcException(Exception), FlashRead { key: String }, @@ -115,11 +103,14 @@ impl Request { let mut code = vec![0; length - HEADER_SIZE]; try!(reader.read_exact(&mut code)); Request::LoadKernel(code) - }, + } 6 => Request::RunKernel, - 7 => Request::RpcReply { - tag: try!(read_string(reader)) - }, + 7 => { + let tag = try!(read_bytes(reader)); + let mut data = vec![0; length - HEADER_SIZE - 4 - tag.len()]; + try!(reader.read_exact(&mut data)); + Request::RpcReply { tag: tag, data: data } + } 8 => Request::RpcException(try!(Exception::read_from(reader))), 9 => Request::FlashRead { key: try!(read_string(reader)) @@ -152,7 +143,7 @@ pub enum Reply<'a> { KernelStartupFailed, KernelException(Exception), - RpcRequest { service: u32 }, + RpcRequest { service: u32, data: &'a [u8] }, FlashRead(&'a [u8]), FlashOk, @@ -204,9 +195,10 @@ impl<'a> Reply<'a> { try!(exception.write_to(writer)); }, - Reply::RpcRequest { service } => { + Reply::RpcRequest { service, data } => { try!(write_u8(&mut buf, 10)); try!(write_u32(&mut buf, service)); + try!(buf.write(data)); }, Reply::FlashRead(ref bytes) => {