handle recursive RPC return values

core0-buffer
Sebastien Bourdeauducq 2020-06-09 13:03:08 +08:00
parent 7387e42306
commit bc81fa2e48
2 changed files with 26 additions and 22 deletions

View File

@ -125,10 +125,10 @@ async fn read_string(stream: &TcpStream, max_length: usize) -> Result<String> {
Ok(String::from_utf8(bytes).map_err(|err| Error::Utf8Error(err.utf8_error()))?) Ok(String::from_utf8(bytes).map_err(|err| Error::Utf8Error(err.utf8_error()))?)
} }
async fn handle_run_kernel(stream: &TcpStream, control: &mut kernel::Control) -> Result<()> { async fn handle_run_kernel(stream: &TcpStream, control: &Rc<RefCell<kernel::Control>>) -> Result<()> {
control.tx.async_send(kernel::Message::StartRequest).await; control.borrow_mut().tx.async_send(kernel::Message::StartRequest).await;
loop { loop {
let reply = control.rx.async_recv().await; let reply = control.borrow_mut().rx.async_recv().await;
match *reply { match *reply {
kernel::Message::RpcSend { is_async, data } => { kernel::Message::RpcSend { is_async, data } => {
write_header(stream, Reply::RPCRequest).await?; write_header(stream, Reply::RPCRequest).await?;
@ -139,29 +139,34 @@ async fn handle_run_kernel(stream: &TcpStream, control: &mut kernel::Control) ->
match host_request { match host_request {
Request::RPCReply => { Request::RPCReply => {
let tag = read_bytes(stream, 512).await?; let tag = read_bytes(stream, 512).await?;
let rpc_recv_request = control.rx.async_recv().await; let slot = match *control.borrow_mut().rx.async_recv().await {
let slot = if let kernel::Message::RpcRecvRequest(slot) = *rpc_recv_request { kernel::Message::RpcRecvRequest(slot) => slot,
slot other => panic!("expected root value slot from core1, not {:?}", other),
} else {
panic!(
"expected root value slot from core1, not {:?}", rpc_recv_request);
}; };
rpc::recv_return(stream, &tag, slot, &async move |size| -> *mut () { rpc::recv_return(stream, &tag, slot, &|size| {
if size == 0 { let control = control.clone();
// Don't try to allocate zero-length values, as RpcRecvReply(0) is async move {
// used to terminate the kernel-side receive loop. if size == 0 {
return 0 as *mut () // Don't try to allocate zero-length values, as RpcRecvReply(0) is
// used to terminate the kernel-side receive loop.
0 as *mut ()
} else {
let mut control = control.borrow_mut();
control.tx.async_send(kernel::Message::RpcRecvReply(Ok(size))).await;
match *control.rx.async_recv().await {
kernel::Message::RpcRecvRequest(slot) => slot,
other => panic!("expected nested value slot from kernel CPU, not {:?}", other),
}
}
} }
unimplemented!();
}).await?; }).await?;
control.tx.async_send(kernel::Message::RpcRecvReply(Ok(0))).await; control.borrow_mut().tx.async_send(kernel::Message::RpcRecvReply(Ok(0))).await;
}, },
Request::RPCException => { Request::RPCException => {
let rpc_recv_request = control.rx.async_recv().await; let mut control = control.borrow_mut();
match *rpc_recv_request { match *control.rx.async_recv().await {
kernel::Message::RpcRecvRequest(_) => (), kernel::Message::RpcRecvRequest(_) => (),
_ => panic!( other => panic!("expected (ignored) root value slot from kernel CPU, not {:?}", other),
"expected (ignored) root value slot from kernel CPU, not {:?}", rpc_recv_request),
} }
let name = read_string(stream, 16384).await?; let name = read_string(stream, 16384).await?;
let message = read_string(stream, 16384).await?; let message = read_string(stream, 16384).await?;
@ -227,7 +232,7 @@ async fn handle_connection(stream: &TcpStream, control: Rc<RefCell<kernel::Contr
} }
}, },
Request::RunKernel => { Request::RunKernel => {
handle_run_kernel(stream, &mut control.borrow_mut()).await?; handle_run_kernel(stream, &control).await?;
}, },
_ => { _ => {
error!("unexpected request from host: {:?}", request); error!("unexpected request from host: {:?}", request);

View File

@ -2,7 +2,6 @@
#![no_main] #![no_main]
#![recursion_limit="1024"] // for futures_util::select! #![recursion_limit="1024"] // for futures_util::select!
#![feature(llvm_asm)] #![feature(llvm_asm)]
#![feature(async_closure)]
extern crate alloc; extern crate alloc;