process RPC replies (WIP)

pull/10/head
Sebastien Bourdeauducq 2020-06-08 13:11:09 +08:00
parent de2618045a
commit b1161a0f71
4 changed files with 125 additions and 76 deletions

View File

@ -2,6 +2,8 @@ use core::mem::transmute;
use core::fmt;
use core::cell::RefCell;
use alloc::rc::Rc;
use alloc::sync::Arc;
use alloc::{vec, vec::Vec};
use log::{debug, warn, error};
use num_derive::{FromPrimitive, ToPrimitive};
@ -17,12 +19,11 @@ use libboard_zynq::{
},
timer::GlobalTimer,
};
use libsupport_zynq::alloc::{vec, vec::Vec};
use libasync::{smoltcp::{Sockets, TcpStream}, task};
use alloc::sync::Arc;
use crate::proto_async::*;
use crate::kernel;
use crate::rpc;
use crate::moninj;
@ -85,79 +86,117 @@ async fn write_header(stream: &TcpStream, reply: Reply) -> Result<()> {
Ok(())
}
async fn read_request(stream: &TcpStream, allow_close: bool) -> Result<Option<Request>> {
match expect(&stream, &[0x5a, 0x5a, 0x5a, 0x5a]).await {
Ok(true) => {}
Ok(false) =>
return Err(Error::UnexpectedPattern),
Err(smoltcp::Error::Illegal) => {
if allow_close {
debug!("peer closed connection");
return Ok(None);
} else {
error!("peer unexpectedly closed connection");
return Err(smoltcp::Error::Illegal)?;
}
},
Err(e) =>
return Err(e)?,
}
Ok(Some(FromPrimitive::from_i8(read_i8(&stream).await?).ok_or(Error::UnrecognizedPacket)?))
}
async fn handle_run_kernel(stream: &TcpStream, control: &mut kernel::Control) -> Result<()> {
control.tx.async_send(kernel::Message::StartRequest).await;
loop {
let reply = control.rx.async_recv().await;
match *reply {
kernel::Message::RpcSend { is_async, data } => {
write_header(&stream, Reply::RPCRequest).await?;
write_bool(&stream, is_async).await?;
stream.send(data.iter().copied()).await?;
if !is_async {
let host_request = read_request(stream, false).await?.unwrap();
match host_request {
Request::RPCReply => {
let tag = read_bytes(&stream, 512).await?;
let rpc_recv_request = control.rx.async_recv().await;
let slot = if let kernel::Message::RpcRecvRequest(slot) = *rpc_recv_request {
slot
} else {
panic!(
"expected root value slot from core1, not {:?}", rpc_recv_request);
};
rpc::recv_return(stream, &tag, slot, &async move |size| -> *mut () {
if size == 0 {
// Don't try to allocate zero-length values, as RpcRecvReply(0) is
// used to terminate the kernel-side receive loop.
return 0 as *mut ()
}
unimplemented!();
}).await?;
control.tx.async_send(kernel::Message::RpcRecvReply(Ok(0))).await;
},
Request::RPCException => unimplemented!(),
_ => {
error!("unexpected RPC request from host: {:?}", host_request);
return Err(Error::UnrecognizedPacket)
}
}
}
},
kernel::Message::KernelFinished => {
write_header(&stream, Reply::KernelFinished).await?;
break;
},
_ => {
panic!("unexpected message from core1 while kernel was running: {:?}", reply);
}
}
}
Ok(())
}
async fn handle_connection(stream: &TcpStream, control: Rc<RefCell<kernel::Control>>) -> Result<()> {
expect(&stream, b"ARTIQ coredev\n").await?;
debug!("received connection");
loop {
match expect(&stream, &[0x5a, 0x5a, 0x5a, 0x5a]).await {
Ok(true) => {}
Ok(false) =>
return Err(Error::UnexpectedPattern),
// Peer has closed the connection.
// Closing here is a normal condition so do not report an error.
// An error is still reported if the connection is unexpectedly closed elsewhere.
Err(smoltcp::Error::Illegal) =>
return Ok(()),
Err(e) =>
return Err(e)?,
let request = read_request(stream, true).await?;
if request.is_none() {
return Ok(());
}
let request: Request = FromPrimitive::from_i8(read_i8(&stream).await?)
.ok_or(Error::UnrecognizedPacket)?;
let request = request.unwrap();
match request {
Request::SystemInfo => {
write_header(&stream, Reply::SystemInfo).await?;
stream.send("ARZQ".bytes()).await?;
},
Request::LoadKernel => {
let length = read_i32(&stream).await? as usize;
if length < 1024*1024 {
let mut buffer = vec![0; length];
read_chunk(&stream, &mut buffer).await?;
let mut control = control.borrow_mut();
control.restart();
control.tx.async_send(kernel::Message::LoadRequest(Arc::new(buffer))).await;
let reply = control.rx.async_recv().await;
match *reply {
kernel::Message::LoadCompleted => write_header(&stream, Reply::LoadCompleted).await?,
kernel::Message::LoadFailed => {
write_header(&stream, Reply::LoadFailed).await?;
write_chunk(&stream, b"core1 failed to process data").await?;
},
_ => {
error!("received unexpected message from core1: {:?}", reply);
write_header(&stream, Reply::LoadFailed).await?;
write_chunk(&stream, b"core1 sent unexpected reply").await?;
}
let buffer = read_bytes(&stream, 1024*1024).await?;
let mut control = control.borrow_mut();
control.restart();
control.tx.async_send(kernel::Message::LoadRequest(Arc::new(buffer))).await;
let reply = control.rx.async_recv().await;
match *reply {
kernel::Message::LoadCompleted => write_header(&stream, Reply::LoadCompleted).await?,
kernel::Message::LoadFailed => {
write_header(&stream, Reply::LoadFailed).await?;
write_chunk(&stream, b"core1 failed to process data").await?;
},
_ => {
error!("unexpected message from core1: {:?}", reply);
write_header(&stream, Reply::LoadFailed).await?;
write_chunk(&stream, b"core1 sent unexpected reply").await?;
}
} else {
read_drain(&stream, length).await?;
write_header(&stream, Reply::LoadFailed).await?;
write_chunk(&stream, b"kernel is too large").await?;
}
},
Request::RunKernel => {
let mut control = control.borrow_mut();
control.tx.async_send(kernel::Message::StartRequest).await;
loop {
let reply = control.rx.async_recv().await;
match *reply {
kernel::Message::KernelFinished => {
write_header(&stream, Reply::KernelFinished).await?;
break;
},
kernel::Message::RpcSend { is_async, data } => {
write_header(&stream, Reply::RPCRequest).await?;
write_bool(&stream, is_async).await?;
stream.send(data.iter().copied()).await?;
},
_ => {
error!("received unexpected message from core1 while kernel was running: {:?}", reply);
}
}
}
handle_run_kernel(stream, &mut control.borrow_mut()).await?;
},
_ => {
error!("unexpected request from host: {:?}", request);
return Err(Error::UnrecognizedPacket)
}
_ => return Err(Error::UnrecognizedPacket)
}
}
}

View File

@ -19,6 +19,8 @@ pub enum Message {
StartRequest,
KernelFinished,
RpcSend { is_async: bool, data: Arc<Vec<u8>> },
RpcRecvRequest(*mut ()),
RpcRecvReply(Result<usize, ()>),
}
static CHANNEL_0TO1: Mutex<Option<sync_channel::Receiver<Message>>> = Mutex::new(None);
@ -64,23 +66,31 @@ impl Control {
static mut KERNEL_CHANNEL_0TO1: *mut () = ptr::null_mut();
static mut KERNEL_CHANNEL_1TO0: *mut () = ptr::null_mut();
extern fn rpc_send(service: u32, tag: &CSlice<u8>, data: *const *const ()) {
let core1_rx: &mut sync_channel::Receiver<Message> = unsafe { mem::transmute(KERNEL_CHANNEL_0TO1) };
fn rpc_send_common(is_async: bool, service: u32, tag: &CSlice<u8>, data: *const *const ()) {
let core1_tx: &mut sync_channel::Sender<Message> = unsafe { mem::transmute(KERNEL_CHANNEL_1TO0) };
let mut buffer = Vec::<u8>::new();
rpc::send_args(&mut buffer, service, tag.as_ref(), data).expect("RPC encoding failed");
core1_tx.send(Message::RpcSend { is_async: false, data: Arc::new(buffer) })
core1_tx.send(Message::RpcSend { is_async: is_async, data: Arc::new(buffer) });
}
extern fn rpc_send(service: u32, tag: &CSlice<u8>, data: *const *const ()) {
rpc_send_common(false, service, tag, data);
}
extern fn rpc_send_async(service: u32, tag: &CSlice<u8>, data: *const *const ()) {
let core1_tx: &mut sync_channel::Sender<Message> = unsafe { mem::transmute(KERNEL_CHANNEL_1TO0) };
let mut buffer = Vec::<u8>::new();
rpc::send_args(&mut buffer, service, tag.as_ref(), data).expect("RPC encoding failed");
core1_tx.send(Message::RpcSend { is_async: true, data: Arc::new(buffer) })
rpc_send_common(true, service, tag, data);
}
extern fn rpc_recv(slot: *mut ()) -> usize {
unimplemented!();
let core1_rx: &mut sync_channel::Receiver<Message> = unsafe { mem::transmute(KERNEL_CHANNEL_0TO1) };
let core1_tx: &mut sync_channel::Sender<Message> = unsafe { mem::transmute(KERNEL_CHANNEL_1TO0) };
core1_tx.send(Message::RpcRecvRequest(slot));
let reply = core1_rx.recv();
match *reply {
Message::RpcRecvReply(Ok(alloc_size)) => alloc_size,
Message::RpcRecvReply(Err(_)) => unimplemented!(),
_ => panic!("received unexpected reply to RpcRecvRequest: {:?}", reply)
}
}
macro_rules! api {

View File

@ -2,6 +2,7 @@
#![no_main]
#![recursion_limit="1024"] // for futures_util::select!
#![feature(llvm_asm)]
#![feature(async_closure)]
extern crate alloc;

View File

@ -1,6 +1,7 @@
use core::task::Poll;
use core::cmp::min;
use core::cell::RefCell;
use alloc::{vec, vec::Vec};
use libboard_zynq::smoltcp;
use libasync::smoltcp::TcpStream;
@ -86,16 +87,14 @@ pub async fn read_chunk(stream: &TcpStream, destination: &mut [u8]) -> Result<()
Ok(())
}
pub async fn read_drain(stream: &TcpStream, total: usize) -> Result<()> {
let mut done = 0;
while done < total {
let count = stream.recv(|buf| {
let count = min(total - done, buf.len());
Poll::Ready((count, count))
}).await?;
done += count;
pub async fn read_bytes(stream: &TcpStream, max_length: usize) -> Result<Vec<u8>> {
let length = read_i32(&stream).await? as usize;
if length > max_length {
return Err(smoltcp::Error::Exhausted);
}
Ok(())
let mut buffer = vec![0; length];
read_chunk(&stream, &mut buffer).await?;
Ok(buffer)
}
pub async fn write_i8(stream: &TcpStream, value: i8) -> Result<()> {