diff --git a/src/runtime/src/comms.rs b/src/runtime/src/comms.rs index 8dfdfda..33a4e1b 100644 --- a/src/runtime/src/comms.rs +++ b/src/runtime/src/comms.rs @@ -2,6 +2,8 @@ use core::mem::transmute; use core::fmt; use core::cell::RefCell; use alloc::rc::Rc; +use alloc::sync::Arc; +use alloc::{vec, vec::Vec}; use log::{debug, warn, error}; use num_derive::{FromPrimitive, ToPrimitive}; @@ -17,12 +19,11 @@ use libboard_zynq::{ }, timer::GlobalTimer, }; -use libsupport_zynq::alloc::{vec, vec::Vec}; use libasync::{smoltcp::{Sockets, TcpStream}, task}; -use alloc::sync::Arc; use crate::proto_async::*; use crate::kernel; +use crate::rpc; use crate::moninj; @@ -85,79 +86,117 @@ async fn write_header(stream: &TcpStream, reply: Reply) -> Result<()> { Ok(()) } +async fn read_request(stream: &TcpStream, allow_close: bool) -> Result> { + match expect(&stream, &[0x5a, 0x5a, 0x5a, 0x5a]).await { + Ok(true) => {} + Ok(false) => + return Err(Error::UnexpectedPattern), + Err(smoltcp::Error::Illegal) => { + if allow_close { + debug!("peer closed connection"); + return Ok(None); + } else { + error!("peer unexpectedly closed connection"); + return Err(smoltcp::Error::Illegal)?; + } + }, + Err(e) => + return Err(e)?, + } + Ok(Some(FromPrimitive::from_i8(read_i8(&stream).await?).ok_or(Error::UnrecognizedPacket)?)) +} + +async fn handle_run_kernel(stream: &TcpStream, control: &mut kernel::Control) -> Result<()> { + control.tx.async_send(kernel::Message::StartRequest).await; + loop { + let reply = control.rx.async_recv().await; + match *reply { + kernel::Message::RpcSend { is_async, data } => { + write_header(&stream, Reply::RPCRequest).await?; + write_bool(&stream, is_async).await?; + stream.send(data.iter().copied()).await?; + if !is_async { + let host_request = read_request(stream, false).await?.unwrap(); + match host_request { + Request::RPCReply => { + let tag = read_bytes(&stream, 512).await?; + let rpc_recv_request = control.rx.async_recv().await; + let slot = if let kernel::Message::RpcRecvRequest(slot) = *rpc_recv_request { + slot + } else { + panic!( + "expected root value slot from core1, not {:?}", rpc_recv_request); + }; + rpc::recv_return(stream, &tag, slot, &async move |size| -> *mut () { + if size == 0 { + // Don't try to allocate zero-length values, as RpcRecvReply(0) is + // used to terminate the kernel-side receive loop. + return 0 as *mut () + } + unimplemented!(); + }).await?; + control.tx.async_send(kernel::Message::RpcRecvReply(Ok(0))).await; + }, + Request::RPCException => unimplemented!(), + _ => { + error!("unexpected RPC request from host: {:?}", host_request); + return Err(Error::UnrecognizedPacket) + } + } + } + }, + kernel::Message::KernelFinished => { + write_header(&stream, Reply::KernelFinished).await?; + break; + }, + _ => { + panic!("unexpected message from core1 while kernel was running: {:?}", reply); + } + } + } + Ok(()) +} + async fn handle_connection(stream: &TcpStream, control: Rc>) -> Result<()> { expect(&stream, b"ARTIQ coredev\n").await?; debug!("received connection"); loop { - match expect(&stream, &[0x5a, 0x5a, 0x5a, 0x5a]).await { - Ok(true) => {} - Ok(false) => - return Err(Error::UnexpectedPattern), - // Peer has closed the connection. - // Closing here is a normal condition so do not report an error. - // An error is still reported if the connection is unexpectedly closed elsewhere. - Err(smoltcp::Error::Illegal) => - return Ok(()), - Err(e) => - return Err(e)?, + let request = read_request(stream, true).await?; + if request.is_none() { + return Ok(()); } - let request: Request = FromPrimitive::from_i8(read_i8(&stream).await?) - .ok_or(Error::UnrecognizedPacket)?; + let request = request.unwrap(); match request { Request::SystemInfo => { write_header(&stream, Reply::SystemInfo).await?; stream.send("ARZQ".bytes()).await?; }, Request::LoadKernel => { - let length = read_i32(&stream).await? as usize; - if length < 1024*1024 { - let mut buffer = vec![0; length]; - read_chunk(&stream, &mut buffer).await?; - - let mut control = control.borrow_mut(); - control.restart(); - control.tx.async_send(kernel::Message::LoadRequest(Arc::new(buffer))).await; - let reply = control.rx.async_recv().await; - match *reply { - kernel::Message::LoadCompleted => write_header(&stream, Reply::LoadCompleted).await?, - kernel::Message::LoadFailed => { - write_header(&stream, Reply::LoadFailed).await?; - write_chunk(&stream, b"core1 failed to process data").await?; - }, - _ => { - error!("received unexpected message from core1: {:?}", reply); - write_header(&stream, Reply::LoadFailed).await?; - write_chunk(&stream, b"core1 sent unexpected reply").await?; - } + let buffer = read_bytes(&stream, 1024*1024).await?; + let mut control = control.borrow_mut(); + control.restart(); + control.tx.async_send(kernel::Message::LoadRequest(Arc::new(buffer))).await; + let reply = control.rx.async_recv().await; + match *reply { + kernel::Message::LoadCompleted => write_header(&stream, Reply::LoadCompleted).await?, + kernel::Message::LoadFailed => { + write_header(&stream, Reply::LoadFailed).await?; + write_chunk(&stream, b"core1 failed to process data").await?; + }, + _ => { + error!("unexpected message from core1: {:?}", reply); + write_header(&stream, Reply::LoadFailed).await?; + write_chunk(&stream, b"core1 sent unexpected reply").await?; } - } else { - read_drain(&stream, length).await?; - write_header(&stream, Reply::LoadFailed).await?; - write_chunk(&stream, b"kernel is too large").await?; } }, Request::RunKernel => { - let mut control = control.borrow_mut(); - control.tx.async_send(kernel::Message::StartRequest).await; - loop { - let reply = control.rx.async_recv().await; - match *reply { - kernel::Message::KernelFinished => { - write_header(&stream, Reply::KernelFinished).await?; - break; - }, - kernel::Message::RpcSend { is_async, data } => { - write_header(&stream, Reply::RPCRequest).await?; - write_bool(&stream, is_async).await?; - stream.send(data.iter().copied()).await?; - }, - _ => { - error!("received unexpected message from core1 while kernel was running: {:?}", reply); - } - } - } + handle_run_kernel(stream, &mut control.borrow_mut()).await?; + }, + _ => { + error!("unexpected request from host: {:?}", request); + return Err(Error::UnrecognizedPacket) } - _ => return Err(Error::UnrecognizedPacket) } } } diff --git a/src/runtime/src/kernel.rs b/src/runtime/src/kernel.rs index 4ff27d4..3234ccb 100644 --- a/src/runtime/src/kernel.rs +++ b/src/runtime/src/kernel.rs @@ -19,6 +19,8 @@ pub enum Message { StartRequest, KernelFinished, RpcSend { is_async: bool, data: Arc> }, + RpcRecvRequest(*mut ()), + RpcRecvReply(Result), } static CHANNEL_0TO1: Mutex>> = Mutex::new(None); @@ -64,23 +66,31 @@ impl Control { static mut KERNEL_CHANNEL_0TO1: *mut () = ptr::null_mut(); static mut KERNEL_CHANNEL_1TO0: *mut () = ptr::null_mut(); -extern fn rpc_send(service: u32, tag: &CSlice, data: *const *const ()) { - let core1_rx: &mut sync_channel::Receiver = unsafe { mem::transmute(KERNEL_CHANNEL_0TO1) }; +fn rpc_send_common(is_async: bool, service: u32, tag: &CSlice, data: *const *const ()) { let core1_tx: &mut sync_channel::Sender = unsafe { mem::transmute(KERNEL_CHANNEL_1TO0) }; let mut buffer = Vec::::new(); rpc::send_args(&mut buffer, service, tag.as_ref(), data).expect("RPC encoding failed"); - core1_tx.send(Message::RpcSend { is_async: false, data: Arc::new(buffer) }) + core1_tx.send(Message::RpcSend { is_async: is_async, data: Arc::new(buffer) }); +} + +extern fn rpc_send(service: u32, tag: &CSlice, data: *const *const ()) { + rpc_send_common(false, service, tag, data); } extern fn rpc_send_async(service: u32, tag: &CSlice, data: *const *const ()) { - let core1_tx: &mut sync_channel::Sender = unsafe { mem::transmute(KERNEL_CHANNEL_1TO0) }; - let mut buffer = Vec::::new(); - rpc::send_args(&mut buffer, service, tag.as_ref(), data).expect("RPC encoding failed"); - core1_tx.send(Message::RpcSend { is_async: true, data: Arc::new(buffer) }) + rpc_send_common(true, service, tag, data); } extern fn rpc_recv(slot: *mut ()) -> usize { - unimplemented!(); + let core1_rx: &mut sync_channel::Receiver = unsafe { mem::transmute(KERNEL_CHANNEL_0TO1) }; + let core1_tx: &mut sync_channel::Sender = unsafe { mem::transmute(KERNEL_CHANNEL_1TO0) }; + core1_tx.send(Message::RpcRecvRequest(slot)); + let reply = core1_rx.recv(); + match *reply { + Message::RpcRecvReply(Ok(alloc_size)) => alloc_size, + Message::RpcRecvReply(Err(_)) => unimplemented!(), + _ => panic!("received unexpected reply to RpcRecvRequest: {:?}", reply) + } } macro_rules! api { diff --git a/src/runtime/src/main.rs b/src/runtime/src/main.rs index 48522df..f950819 100644 --- a/src/runtime/src/main.rs +++ b/src/runtime/src/main.rs @@ -2,6 +2,7 @@ #![no_main] #![recursion_limit="1024"] // for futures_util::select! #![feature(llvm_asm)] +#![feature(async_closure)] extern crate alloc; diff --git a/src/runtime/src/proto_async.rs b/src/runtime/src/proto_async.rs index e9ee00b..bd53d07 100644 --- a/src/runtime/src/proto_async.rs +++ b/src/runtime/src/proto_async.rs @@ -1,6 +1,7 @@ use core::task::Poll; use core::cmp::min; use core::cell::RefCell; +use alloc::{vec, vec::Vec}; use libboard_zynq::smoltcp; use libasync::smoltcp::TcpStream; @@ -86,16 +87,14 @@ pub async fn read_chunk(stream: &TcpStream, destination: &mut [u8]) -> Result<() Ok(()) } -pub async fn read_drain(stream: &TcpStream, total: usize) -> Result<()> { - let mut done = 0; - while done < total { - let count = stream.recv(|buf| { - let count = min(total - done, buf.len()); - Poll::Ready((count, count)) - }).await?; - done += count; +pub async fn read_bytes(stream: &TcpStream, max_length: usize) -> Result> { + let length = read_i32(&stream).await? as usize; + if length > max_length { + return Err(smoltcp::Error::Exhausted); } - Ok(()) + let mut buffer = vec![0; length]; + read_chunk(&stream, &mut buffer).await?; + Ok(buffer) } pub async fn write_i8(stream: &TcpStream, value: i8) -> Result<()> {