From bc81fa2e48cbe6a71c08ee6b9e72f869eca5660b Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 9 Jun 2020 13:03:08 +0800 Subject: [PATCH] handle recursive RPC return values --- src/runtime/src/comms.rs | 47 ++++++++++++++++++++++------------------ src/runtime/src/main.rs | 1 - 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index e707804..05a77f5 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -125,10 +125,10 @@ async fn read_string(stream: &TcpStream, max_length: usize) -> Result { Ok(String::from_utf8(bytes).map_err(|err| Error::Utf8Error(err.utf8_error()))?) } -async fn handle_run_kernel(stream: &TcpStream, control: &mut kernel::Control) -> Result<()> { - control.tx.async_send(kernel::Message::StartRequest).await; +async fn handle_run_kernel(stream: &TcpStream, control: &Rc>) -> Result<()> { + control.borrow_mut().tx.async_send(kernel::Message::StartRequest).await; loop { - let reply = control.rx.async_recv().await; + let reply = control.borrow_mut().rx.async_recv().await; match *reply { kernel::Message::RpcSend { is_async, data } => { write_header(stream, Reply::RPCRequest).await?; @@ -139,29 +139,34 @@ async fn handle_run_kernel(stream: &TcpStream, control: &mut kernel::Control) -> match host_request { Request::RPCReply => { let tag = read_bytes(stream, 512).await?; - let rpc_recv_request = control.rx.async_recv().await; - let slot = if let kernel::Message::RpcRecvRequest(slot) = *rpc_recv_request { - slot - } else { - panic!( - "expected root value slot from core1, not {:?}", rpc_recv_request); + let slot = match *control.borrow_mut().rx.async_recv().await { + kernel::Message::RpcRecvRequest(slot) => slot, + other => panic!("expected root value slot from core1, not {:?}", other), }; - rpc::recv_return(stream, &tag, slot, &async move |size| -> *mut () { - if size == 0 { - // Don't try to allocate zero-length values, as RpcRecvReply(0) is - // used to terminate the kernel-side receive loop. - return 0 as *mut () + rpc::recv_return(stream, &tag, slot, &|size| { + let control = control.clone(); + async move { + if size == 0 { + // Don't try to allocate zero-length values, as RpcRecvReply(0) is + // used to terminate the kernel-side receive loop. + 0 as *mut () + } else { + let mut control = control.borrow_mut(); + control.tx.async_send(kernel::Message::RpcRecvReply(Ok(size))).await; + match *control.rx.async_recv().await { + kernel::Message::RpcRecvRequest(slot) => slot, + other => panic!("expected nested value slot from kernel CPU, not {:?}", other), + } + } } - unimplemented!(); }).await?; - control.tx.async_send(kernel::Message::RpcRecvReply(Ok(0))).await; + control.borrow_mut().tx.async_send(kernel::Message::RpcRecvReply(Ok(0))).await; }, Request::RPCException => { - let rpc_recv_request = control.rx.async_recv().await; - match *rpc_recv_request { + let mut control = control.borrow_mut(); + match *control.rx.async_recv().await { kernel::Message::RpcRecvRequest(_) => (), - _ => panic!( - "expected (ignored) root value slot from kernel CPU, not {:?}", rpc_recv_request), + other => panic!("expected (ignored) root value slot from kernel CPU, not {:?}", other), } let name = read_string(stream, 16384).await?; let message = read_string(stream, 16384).await?; @@ -227,7 +232,7 @@ async fn handle_connection(stream: &TcpStream, control: Rc { - handle_run_kernel(stream, &mut control.borrow_mut()).await?; + handle_run_kernel(stream, &control).await?; }, _ => { error!("unexpected request from host: {:?}", request); diff --git a/src/runtime/src/main.rs b/src/runtime/src/main.rs index f950819..48522df 100644 --- a/src/runtime/src/main.rs +++ b/src/runtime/src/main.rs @@ -2,7 +2,6 @@ #![no_main] #![recursion_limit="1024"] // for futures_util::select! #![feature(llvm_asm)] -#![feature(async_closure)] extern crate alloc;