forked from M-Labs/artiq
Rust: add basic RPC support.
This commit is contained in:
parent
0a29c00fcc
commit
4cfc4e89b9
|
@ -38,6 +38,7 @@ pub enum Message<'a> {
|
||||||
|
|
||||||
RpcSend {
|
RpcSend {
|
||||||
service: u32,
|
service: u32,
|
||||||
|
batch: bool,
|
||||||
tag: &'a [u8],
|
tag: &'a [u8],
|
||||||
data: *const *const ()
|
data: *const *const ()
|
||||||
},
|
},
|
||||||
|
@ -182,10 +183,11 @@ impl<'a> Message<'a> {
|
||||||
Message::WatchdogClear { id: (*msg).id as usize }
|
Message::WatchdogClear { id: (*msg).id as usize }
|
||||||
}
|
}
|
||||||
|
|
||||||
c::Type::RpcSend => {
|
c::Type::RpcSend | c::Type::RpcBatch => {
|
||||||
let msg = ptr as *const c::RpcSend;
|
let msg = ptr as *const c::RpcSend;
|
||||||
Message::RpcSend {
|
Message::RpcSend {
|
||||||
service: (*msg).service as _,
|
service: (*msg).service as _,
|
||||||
|
batch: (*msg).ty == c::Type::RpcBatch,
|
||||||
tag: slice::from_raw_parts((*msg).tag as *const _,
|
tag: slice::from_raw_parts((*msg).tag as *const _,
|
||||||
c::strlen((*msg).tag) as usize),
|
c::strlen((*msg).tag) as usize),
|
||||||
data: (*msg).data as *const _
|
data: (*msg).data as *const _
|
||||||
|
@ -248,7 +250,7 @@ mod c {
|
||||||
extern { pub fn strlen(ptr: *const c_char) -> size_t; }
|
extern { pub fn strlen(ptr: *const c_char) -> size_t; }
|
||||||
|
|
||||||
#[repr(u32)]
|
#[repr(u32)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug, PartialEq, Eq)]
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
pub enum Type {
|
pub enum Type {
|
||||||
LoadRequest,
|
LoadRequest,
|
||||||
|
|
|
@ -31,6 +31,7 @@ mod moninj_proto;
|
||||||
mod analyzer_proto;
|
mod analyzer_proto;
|
||||||
|
|
||||||
mod kernel;
|
mod kernel;
|
||||||
|
mod rpc;
|
||||||
mod session;
|
mod session;
|
||||||
mod moninj;
|
mod moninj;
|
||||||
#[cfg(has_rtio_analyzer)]
|
#[cfg(has_rtio_analyzer)]
|
||||||
|
@ -57,7 +58,7 @@ pub unsafe extern fn rust_main() {
|
||||||
network_init();
|
network_init();
|
||||||
|
|
||||||
let mut scheduler = sched::Scheduler::new();
|
let mut scheduler = sched::Scheduler::new();
|
||||||
scheduler.spawner().spawn(8192, session::thread);
|
scheduler.spawner().spawn(16384, session::thread);
|
||||||
scheduler.spawner().spawn(4096, moninj::thread);
|
scheduler.spawner().spawn(4096, moninj::thread);
|
||||||
#[cfg(has_rtio_analyzer)]
|
#[cfg(has_rtio_analyzer)]
|
||||||
scheduler.spawner().spawn(4096, analyzer::thread);
|
scheduler.spawner().spawn(4096, analyzer::thread);
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
|
use std::vec::Vec;
|
||||||
use byteorder::{ByteOrder, NetworkEndian};
|
use byteorder::{ByteOrder, NetworkEndian};
|
||||||
|
|
||||||
// FIXME: replace these with byteorder core io traits once those are in
|
// FIXME: replace these with byteorder core io traits once those are in
|
||||||
|
@ -50,3 +51,15 @@ pub fn write_u64(writer: &mut Write, value: u64) -> io::Result<()> {
|
||||||
NetworkEndian::write_u64(&mut bytes, value);
|
NetworkEndian::write_u64(&mut bytes, value);
|
||||||
writer.write_all(&bytes)
|
writer.write_all(&bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn read_bytes(reader: &mut Read) -> io::Result<Vec<u8>> {
|
||||||
|
let length = try!(read_u32(reader));
|
||||||
|
let mut value = vec![0; length as usize];
|
||||||
|
try!(reader.read_exact(&mut value));
|
||||||
|
Ok(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_bytes(writer: &mut Write, value: &[u8]) -> io::Result<()> {
|
||||||
|
try!(write_u32(writer, value.len() as u32));
|
||||||
|
writer.write_all(value)
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,187 @@
|
||||||
|
use std::io::{self, Read, Write};
|
||||||
|
use proto::{write_u8, write_bytes};
|
||||||
|
use self::tag::{Tag, TagIterator, split_tag};
|
||||||
|
|
||||||
|
fn recv_value(reader: &mut Read, tag: Tag, data: &mut *const ()) -> io::Result<()> {
|
||||||
|
match tag {
|
||||||
|
Tag::None => Ok(()),
|
||||||
|
_ => unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn recv_return(reader: &mut Read, tag_bytes: &[u8], data: *const ()) -> io::Result<()> {
|
||||||
|
let mut it = TagIterator::new(tag_bytes);
|
||||||
|
trace!("recv ...->{}", it);
|
||||||
|
|
||||||
|
let mut data = data;
|
||||||
|
try!(recv_value(reader, it.next().expect("RPC without a return value"), &mut data));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_value(writer: &mut Write, tag: Tag, data: &mut *const ()) -> io::Result<()> {
|
||||||
|
match tag {
|
||||||
|
Tag::None => write_u8(writer, b'n'),
|
||||||
|
_ => unreachable!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send_args(writer: &mut Write, tag_bytes: &[u8],
|
||||||
|
data: *const *const ()) -> io::Result<()> {
|
||||||
|
let (arg_tags_bytes, return_tag_bytes) = split_tag(tag_bytes);
|
||||||
|
|
||||||
|
let mut args_it = TagIterator::new(arg_tags_bytes);
|
||||||
|
let return_it = TagIterator::new(return_tag_bytes);
|
||||||
|
trace!("send ({})->{}", args_it, return_it);
|
||||||
|
|
||||||
|
for index in 0.. {
|
||||||
|
if let Some(arg_tag) = args_it.next() {
|
||||||
|
let mut data = unsafe { *data.offset(index) };
|
||||||
|
try!(send_value(writer, arg_tag, &mut data));
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
try!(write_u8(writer, 0));
|
||||||
|
try!(write_bytes(writer, return_tag_bytes));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
mod tag {
|
||||||
|
use core::fmt;
|
||||||
|
|
||||||
|
pub fn split_tag(tag_bytes: &[u8]) -> (&[u8], &[u8]) {
|
||||||
|
let tag_separator =
|
||||||
|
tag_bytes.iter()
|
||||||
|
.position(|&b| b == b':')
|
||||||
|
.expect("tag without a return separator");
|
||||||
|
let (arg_tags_bytes, rest) = tag_bytes.split_at(tag_separator);
|
||||||
|
let return_tag_bytes = &rest[1..];
|
||||||
|
(arg_tags_bytes, return_tag_bytes)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Tag<'a> {
|
||||||
|
None,
|
||||||
|
Bool,
|
||||||
|
Int32,
|
||||||
|
Int64,
|
||||||
|
Float64,
|
||||||
|
String,
|
||||||
|
Tuple(TagIterator<'a>, u8),
|
||||||
|
List(TagIterator<'a>),
|
||||||
|
Array(TagIterator<'a>),
|
||||||
|
Range(TagIterator<'a>),
|
||||||
|
Keyword(TagIterator<'a>),
|
||||||
|
Object
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub struct TagIterator<'a> {
|
||||||
|
data: &'a [u8]
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> TagIterator<'a> {
|
||||||
|
pub fn new(data: &'a [u8]) -> TagIterator<'a> {
|
||||||
|
TagIterator { data: data }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn next(&mut self) -> Option<Tag<'a>> {
|
||||||
|
if self.data.len() == 0 {
|
||||||
|
return None
|
||||||
|
}
|
||||||
|
|
||||||
|
let tag_byte = self.data[0];
|
||||||
|
self.data = &self.data[1..];
|
||||||
|
Some(match tag_byte {
|
||||||
|
b'n' => Tag::None,
|
||||||
|
b'b' => Tag::Bool,
|
||||||
|
b'i' => Tag::Int32,
|
||||||
|
b'I' => Tag::Int64,
|
||||||
|
b'f' => Tag::Float64,
|
||||||
|
b's' => Tag::String,
|
||||||
|
b't' => {
|
||||||
|
let count = self.data[0];
|
||||||
|
self.data = &self.data[1..];
|
||||||
|
Tag::Tuple(self.skip(count), count)
|
||||||
|
}
|
||||||
|
b'l' => Tag::List(self.skip(1)),
|
||||||
|
b'a' => Tag::Array(self.skip(1)),
|
||||||
|
b'r' => Tag::Range(self.skip(1)),
|
||||||
|
b'k' => Tag::Keyword(self.skip(1)),
|
||||||
|
b'O' => Tag::Object,
|
||||||
|
_ => unreachable!()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn skip(&mut self, count: u8) -> TagIterator<'a> {
|
||||||
|
let origin = self.clone();
|
||||||
|
for _ in 0..count {
|
||||||
|
self.next().expect("truncated tag");
|
||||||
|
}
|
||||||
|
origin
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> fmt::Display for TagIterator<'a> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
let mut it = self.clone();
|
||||||
|
let mut first = true;
|
||||||
|
while let Some(tag) = it.next() {
|
||||||
|
if first {
|
||||||
|
first = false
|
||||||
|
} else {
|
||||||
|
try!(write!(f, ", "))
|
||||||
|
}
|
||||||
|
|
||||||
|
match tag {
|
||||||
|
Tag::None =>
|
||||||
|
try!(write!(f, "None")),
|
||||||
|
Tag::Bool =>
|
||||||
|
try!(write!(f, "Bool")),
|
||||||
|
Tag::Int32 =>
|
||||||
|
try!(write!(f, "Int32")),
|
||||||
|
Tag::Int64 =>
|
||||||
|
try!(write!(f, "Int64")),
|
||||||
|
Tag::Float64 =>
|
||||||
|
try!(write!(f, "Float64")),
|
||||||
|
Tag::String =>
|
||||||
|
try!(write!(f, "String")),
|
||||||
|
Tag::Tuple(it, cnt) => {
|
||||||
|
try!(write!(f, "Tuple("));
|
||||||
|
for i in 0..cnt {
|
||||||
|
try!(it.fmt(f));
|
||||||
|
if i != cnt - 1 { try!(write!(f, ", ")) }
|
||||||
|
}
|
||||||
|
try!(write!(f, ")"))
|
||||||
|
}
|
||||||
|
Tag::List(it) => {
|
||||||
|
try!(write!(f, "List("));
|
||||||
|
try!(it.fmt(f));
|
||||||
|
try!(write!(f, ")"))
|
||||||
|
}
|
||||||
|
Tag::Array(it) => {
|
||||||
|
try!(write!(f, "Array("));
|
||||||
|
try!(it.fmt(f));
|
||||||
|
try!(write!(f, ")"))
|
||||||
|
}
|
||||||
|
Tag::Range(it) => {
|
||||||
|
try!(write!(f, "Range("));
|
||||||
|
try!(it.fmt(f));
|
||||||
|
try!(write!(f, ")"))
|
||||||
|
}
|
||||||
|
Tag::Keyword(it) => {
|
||||||
|
try!(write!(f, "Keyword("));
|
||||||
|
try!(it.fmt(f));
|
||||||
|
try!(write!(f, ")"))
|
||||||
|
}
|
||||||
|
Tag::Object =>
|
||||||
|
try!(write!(f, "Object"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -9,6 +9,7 @@ use urc::Urc;
|
||||||
use sched::{ThreadHandle, Waiter, Spawner};
|
use sched::{ThreadHandle, Waiter, Spawner};
|
||||||
use sched::{TcpListener, TcpStream, SocketAddr, IP_ANY};
|
use sched::{TcpListener, TcpStream, SocketAddr, IP_ANY};
|
||||||
|
|
||||||
|
use rpc;
|
||||||
use session_proto as host;
|
use session_proto as host;
|
||||||
use kernel_proto as kern;
|
use kernel_proto as kern;
|
||||||
|
|
||||||
|
@ -230,11 +231,33 @@ fn process_host_message(waiter: Waiter,
|
||||||
Err(_) => host_write(stream, host::Reply::KernelStartupFailed)
|
Err(_) => host_write(stream, host::Reply::KernelStartupFailed)
|
||||||
},
|
},
|
||||||
|
|
||||||
|
host::Request::RpcReply { tag, data } => {
|
||||||
|
if session.kernel_state != KernelState::RpcWait {
|
||||||
|
unexpected!("unsolicited RPC reply")
|
||||||
|
}
|
||||||
|
|
||||||
|
try!(kern_recv(waiter, |reply| {
|
||||||
|
match reply {
|
||||||
|
kern::RpcRecvRequest { slot } => {
|
||||||
|
let mut data = io::Cursor::new(data);
|
||||||
|
rpc::recv_return(&mut data, &tag, slot)
|
||||||
|
}
|
||||||
|
other =>
|
||||||
|
unexpected!("unexpected reply from kernel CPU: {:?}", other)
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
try!(kern_send(waiter, kern::RpcRecvReply { alloc_size: 0, exception: None }));
|
||||||
|
|
||||||
|
session.kernel_state = KernelState::Running;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
request => unexpected!("unexpected request {:?} from host machine", request)
|
request => unexpected!("unexpected request {:?} from host machine", request)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_kern_message(waiter: Waiter,
|
fn process_kern_message(waiter: Waiter,
|
||||||
|
mut stream: Option<&mut TcpStream>,
|
||||||
session: &mut Session) -> io::Result<bool> {
|
session: &mut Session) -> io::Result<bool> {
|
||||||
kern::Message::wait_and_receive(waiter, |request| {
|
kern::Message::wait_and_receive(waiter, |request| {
|
||||||
match (&request, session.kernel_state) {
|
match (&request, session.kernel_state) {
|
||||||
|
@ -276,6 +299,24 @@ fn process_kern_message(waiter: Waiter,
|
||||||
kern_acknowledge()
|
kern_acknowledge()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
kern::RpcSend { service, batch, tag, data } => {
|
||||||
|
match stream {
|
||||||
|
None => unexpected!("unexpected RPC in flash kernel"),
|
||||||
|
Some(ref mut stream) => {
|
||||||
|
let mut buf = Vec::new();
|
||||||
|
try!(rpc::send_args(&mut buf, tag, data));
|
||||||
|
try!(host_write(stream, host::Reply::RpcRequest {
|
||||||
|
service: service,
|
||||||
|
data: &buf[..]
|
||||||
|
}));
|
||||||
|
if !batch {
|
||||||
|
session.kernel_state = KernelState::RpcWait
|
||||||
|
}
|
||||||
|
kern_acknowledge()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
kern::CacheGetRequest { key } => {
|
kern::CacheGetRequest { key } => {
|
||||||
let value = session.congress.cache.get(key);
|
let value = session.congress.cache.get(key);
|
||||||
kern_send(waiter, kern::CacheGetReply {
|
kern_send(waiter, kern::CacheGetReply {
|
||||||
|
@ -289,6 +330,7 @@ fn process_kern_message(waiter: Waiter,
|
||||||
}
|
}
|
||||||
|
|
||||||
kern::RunFinished => {
|
kern::RunFinished => {
|
||||||
|
try!(kern_acknowledge());
|
||||||
session.kernel_state = KernelState::Absent;
|
session.kernel_state = KernelState::Absent;
|
||||||
return Ok(true)
|
return Ok(true)
|
||||||
}
|
}
|
||||||
|
@ -309,7 +351,7 @@ fn host_kernel_worker(waiter: Waiter,
|
||||||
}
|
}
|
||||||
|
|
||||||
if mailbox::receive() != 0 {
|
if mailbox::receive() != 0 {
|
||||||
if try!(process_kern_message(waiter, &mut session)) {
|
if try!(process_kern_message(waiter, Some(stream), &mut session)) {
|
||||||
try!(host_write(stream, host::Reply::KernelFinished))
|
try!(host_write(stream, host::Reply::KernelFinished))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -345,7 +387,7 @@ fn flash_kernel_worker(waiter: Waiter,
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if mailbox::receive() != 0 {
|
if mailbox::receive() != 0 {
|
||||||
if try!(process_kern_message(waiter, &mut session)) {
|
if try!(process_kern_message(waiter, None, &mut session)) {
|
||||||
return Ok(())
|
return Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -376,7 +418,7 @@ fn respawn<F>(spawner: Spawner, waiter: Waiter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*handle = Some(spawner.spawn(8192, f))
|
*handle = Some(spawner.spawn(16384, f))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn thread(waiter: Waiter, spawner: Spawner) {
|
pub fn thread(waiter: Waiter, spawner: Spawner) {
|
||||||
|
|
|
@ -2,18 +2,6 @@ use std::prelude::v1::*;
|
||||||
use std::io::{self, Read, Write};
|
use std::io::{self, Read, Write};
|
||||||
use proto::*;
|
use proto::*;
|
||||||
|
|
||||||
fn read_bytes(reader: &mut Read) -> io::Result<Vec<u8>> {
|
|
||||||
let length = try!(read_u32(reader));
|
|
||||||
let mut value = vec![0; length as usize];
|
|
||||||
try!(reader.read_exact(&mut value));
|
|
||||||
Ok(value)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write_bytes(writer: &mut Write, value: &[u8]) -> io::Result<()> {
|
|
||||||
try!(write_u32(writer, value.len() as u32));
|
|
||||||
writer.write_all(value)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn read_string(reader: &mut Read) -> io::Result<String> {
|
fn read_string(reader: &mut Read) -> io::Result<String> {
|
||||||
let mut bytes = try!(read_bytes(reader));
|
let mut bytes = try!(read_bytes(reader));
|
||||||
let len = bytes.len() - 1; // length without trailing \0
|
let len = bytes.len() - 1; // length without trailing \0
|
||||||
|
@ -89,7 +77,7 @@ pub enum Request {
|
||||||
LoadKernel(Vec<u8>),
|
LoadKernel(Vec<u8>),
|
||||||
RunKernel,
|
RunKernel,
|
||||||
|
|
||||||
RpcReply { tag: String }, // FIXME
|
RpcReply { tag: Vec<u8>, data: Vec<u8> },
|
||||||
RpcException(Exception),
|
RpcException(Exception),
|
||||||
|
|
||||||
FlashRead { key: String },
|
FlashRead { key: String },
|
||||||
|
@ -115,11 +103,14 @@ impl Request {
|
||||||
let mut code = vec![0; length - HEADER_SIZE];
|
let mut code = vec![0; length - HEADER_SIZE];
|
||||||
try!(reader.read_exact(&mut code));
|
try!(reader.read_exact(&mut code));
|
||||||
Request::LoadKernel(code)
|
Request::LoadKernel(code)
|
||||||
},
|
}
|
||||||
6 => Request::RunKernel,
|
6 => Request::RunKernel,
|
||||||
7 => Request::RpcReply {
|
7 => {
|
||||||
tag: try!(read_string(reader))
|
let tag = try!(read_bytes(reader));
|
||||||
},
|
let mut data = vec![0; length - HEADER_SIZE - 4 - tag.len()];
|
||||||
|
try!(reader.read_exact(&mut data));
|
||||||
|
Request::RpcReply { tag: tag, data: data }
|
||||||
|
}
|
||||||
8 => Request::RpcException(try!(Exception::read_from(reader))),
|
8 => Request::RpcException(try!(Exception::read_from(reader))),
|
||||||
9 => Request::FlashRead {
|
9 => Request::FlashRead {
|
||||||
key: try!(read_string(reader))
|
key: try!(read_string(reader))
|
||||||
|
@ -152,7 +143,7 @@ pub enum Reply<'a> {
|
||||||
KernelStartupFailed,
|
KernelStartupFailed,
|
||||||
KernelException(Exception),
|
KernelException(Exception),
|
||||||
|
|
||||||
RpcRequest { service: u32 },
|
RpcRequest { service: u32, data: &'a [u8] },
|
||||||
|
|
||||||
FlashRead(&'a [u8]),
|
FlashRead(&'a [u8]),
|
||||||
FlashOk,
|
FlashOk,
|
||||||
|
@ -204,9 +195,10 @@ impl<'a> Reply<'a> {
|
||||||
try!(exception.write_to(writer));
|
try!(exception.write_to(writer));
|
||||||
},
|
},
|
||||||
|
|
||||||
Reply::RpcRequest { service } => {
|
Reply::RpcRequest { service, data } => {
|
||||||
try!(write_u8(&mut buf, 10));
|
try!(write_u8(&mut buf, 10));
|
||||||
try!(write_u32(&mut buf, service));
|
try!(write_u32(&mut buf, service));
|
||||||
|
try!(buf.write(data));
|
||||||
},
|
},
|
||||||
|
|
||||||
Reply::FlashRead(ref bytes) => {
|
Reply::FlashRead(ref bytes) => {
|
||||||
|
|
Loading…
Reference in New Issue