From b310e068d4cb3e853df5ecbd394d278f5cde2eec Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 24 Apr 2020 14:37:16 +0800 Subject: [PATCH] add moninj skeleton (WIP) --- runtime/src/comms.rs | 72 ++++++--------------------------------- runtime/src/main.rs | 2 ++ runtime/src/moninj.rs | 78 +++++++++++++++++++++++++++++++++++++++++++ runtime/src/proto.rs | 65 ++++++++++++++++++++++++++++++++++++ 4 files changed, 155 insertions(+), 62 deletions(-) create mode 100644 runtime/src/moninj.rs create mode 100644 runtime/src/proto.rs diff --git a/runtime/src/comms.rs b/runtime/src/comms.rs index 8a4034a2..6709293e 100644 --- a/runtime/src/comms.rs +++ b/runtime/src/comms.rs @@ -21,7 +21,9 @@ use libsupport_zynq::alloc::{vec, vec::Vec}; use libasync::{smoltcp::{Sockets, TcpStream}, task}; use alloc::sync::Arc; +use crate::proto::*; use crate::kernel; +use crate::moninj; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -51,54 +53,6 @@ impl From for Error { } -async fn expect(stream: &TcpStream, pattern: &[u8]) -> Result<()> { - stream.recv(|buf| { - for (i, b) in buf.iter().enumerate() { - if *b == pattern[i] { - if i + 1 == pattern.len() { - return Poll::Ready((i + 1, Ok(()))); - } - } else { - return Poll::Ready((i + 1, Err(Error::UnexpectedPattern))); - } - } - Poll::Pending - }).await? -} - -async fn read_i8(stream: &TcpStream) -> Result { - Ok(stream.recv(|buf| { - Poll::Ready((1, buf[0] as i8)) - }).await?) -} - -async fn read_i32(stream: &TcpStream) -> Result { - Ok(stream.recv(|buf| { - if buf.len() >= 4 { - let value = - ((buf[0] as i32) << 24) - | ((buf[1] as i32) << 16) - | ((buf[2] as i32) << 8) - | (buf[3] as i32); - Poll::Ready((4, value)) - } else { - Poll::Pending - } - }).await?) -} - -async fn read_drain(stream: &TcpStream, total: usize) -> Result<()> { - let mut done = 0; - while done < total { - let count = stream.recv(|buf| { - let count = min(total - done, buf.len()); - Poll::Ready((count, count)) - }).await?; - done += count; - } - Ok(()) -} - async fn read_chunk(stream: &TcpStream, destination: &mut [u8]) -> Result<()> { let total = destination.len(); let destination = RefCell::new(destination); @@ -115,7 +69,7 @@ async fn read_chunk(stream: &TcpStream, destination: &mut [u8]) -> Result<()> { Ok(()) } -#[derive(FromPrimitive, ToPrimitive)] +#[derive(Debug, FromPrimitive, ToPrimitive)] enum Request { SystemInfo = 3, LoadKernel = 5, @@ -124,7 +78,7 @@ enum Request { RPCException = 8, } -#[derive(FromPrimitive, ToPrimitive)] +#[derive(Debug, FromPrimitive, ToPrimitive)] enum Reply { SystemInfo = 2, LoadCompleted = 5, @@ -137,15 +91,6 @@ enum Reply { ClockFailure = 15, } -async fn write_i32(stream: &TcpStream, value: i32) -> Result<()> { - stream.send([ - (value >> 24) as u8, - (value >> 16) as u8, - (value >> 8) as u8, - value as u8].iter().copied()).await?; - Ok(()) -} - async fn write_chunk(stream: &TcpStream, chunk: &[u8]) -> Result<()> { write_i32(stream, chunk.len() as i32).await?; stream.send(chunk.iter().copied()).await?; @@ -160,7 +105,9 @@ async fn write_header(stream: &TcpStream, reply: Reply) -> Result<()> { async fn handle_connection(stream: &TcpStream, control: Rc>) -> Result<()> { expect(&stream, b"ARTIQ coredev\n").await?; loop { - expect(&stream, &[0x5a, 0x5a, 0x5a, 0x5a]).await?; + if !expect(&stream, &[0x5a, 0x5a, 0x5a, 0x5a]).await? { + return Err(Error::UnexpectedPattern) + } let request: Request = FromPrimitive::from_i8(read_i8(&stream).await?) .ok_or(Error::UnrecognizedPacket)?; match request { @@ -242,7 +189,6 @@ pub fn main() { Sockets::init(32); let control: Rc> = Rc::new(RefCell::new(kernel::Control::start(8192))); - task::spawn(async move { loop { let stream = TcpStream::accept(1381, 2048, 2048).await.unwrap(); @@ -250,13 +196,15 @@ pub fn main() { task::spawn(async { let _ = handle_connection(&stream, control) .await - .map_err(|e| warn!("Connection: {}", e)); + .map_err(|e| warn!("connection terminated: {}", e)); let _ = stream.flush().await; let _ = stream.abort().await; }); } }); + moninj::start(); + let mut time = 0u32; Sockets::run(&mut iface, || { time += 1; diff --git a/runtime/src/main.rs b/runtime/src/main.rs index d7a7fdfe..5ec801b1 100644 --- a/runtime/src/main.rs +++ b/runtime/src/main.rs @@ -12,10 +12,12 @@ use libboard_zynq::{ }; use libsupport_zynq::{logger, ram}; +mod proto; mod comms; mod pl; mod rtio; mod kernel; +mod moninj; fn identifier_read(buf: &mut [u8]) -> &str { diff --git a/runtime/src/moninj.rs b/runtime/src/moninj.rs new file mode 100644 index 00000000..6f539a9e --- /dev/null +++ b/runtime/src/moninj.rs @@ -0,0 +1,78 @@ +use core::fmt; +use log::{info, warn}; + +use libboard_zynq::smoltcp; +use libasync::task; +use libasync::smoltcp::TcpStream; + +use num_derive::{FromPrimitive, ToPrimitive}; +use num_traits::{FromPrimitive, ToPrimitive}; + +use crate::proto::*; + + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Error { + NetworkError(smoltcp::Error), + UnexpectedPattern, + UnrecognizedPacket, + +} + +pub type Result = core::result::Result; + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + &Error::NetworkError(error) => write!(f, "network error: {}", error), + &Error::UnexpectedPattern => write!(f, "unexpected pattern"), + &Error::UnrecognizedPacket => write!(f, "unrecognized packet"), + } + } +} + +impl From for Error { + fn from(error: smoltcp::Error) -> Self { + Error::NetworkError(error) + } +} + +#[derive(Debug, FromPrimitive, ToPrimitive)] +enum HostMessage { + MonitorProbe = 0, + MonitorInjection = 3, + Inject = 1, + GetInjectionStatus = 2 +} + +#[derive(Debug, FromPrimitive, ToPrimitive)] +enum DeviceMessage { + MonitorStatus = 0, + InjectionStatus = 1 +} + +async fn handle_connection(stream: &TcpStream) -> Result<()> { + if !expect(&stream, b"ARTIQ moninj\n").await? { + return Err(Error::UnexpectedPattern); + } + loop { + let message: HostMessage = FromPrimitive::from_i8(read_i8(&stream).await?) + .ok_or(Error::UnrecognizedPacket)?; + info!("{:?}", message); + } +} + +pub fn start() { + task::spawn(async move { + loop { + let stream = TcpStream::accept(1383, 2048, 2048).await.unwrap(); + task::spawn(async { + let _ = handle_connection(&stream) + .await + .map_err(|e| warn!("connection terminated: {}", e)); + let _ = stream.flush().await; + let _ = stream.abort().await; + }); + } + }); +} diff --git a/runtime/src/proto.rs b/runtime/src/proto.rs new file mode 100644 index 00000000..30525051 --- /dev/null +++ b/runtime/src/proto.rs @@ -0,0 +1,65 @@ +use core::task::Poll; +use core::cmp::min; + +use libboard_zynq::smoltcp; +use libasync::smoltcp::TcpStream; + + +pub type Result = core::result::Result; + +pub async fn expect(stream: &TcpStream, pattern: &[u8]) -> Result { + stream.recv(|buf| { + for (i, b) in buf.iter().enumerate() { + if *b == pattern[i] { + if i + 1 == pattern.len() { + return Poll::Ready((i + 1, Ok(true))); + } + } else { + return Poll::Ready((i + 1, Ok(false))); + } + } + Poll::Pending + }).await? +} + +pub async fn read_i8(stream: &TcpStream) -> Result { + Ok(stream.recv(|buf| { + Poll::Ready((1, buf[0] as i8)) + }).await?) +} + +pub async fn read_i32(stream: &TcpStream) -> Result { + Ok(stream.recv(|buf| { + if buf.len() >= 4 { + let value = + ((buf[0] as i32) << 24) + | ((buf[1] as i32) << 16) + | ((buf[2] as i32) << 8) + | (buf[3] as i32); + Poll::Ready((4, value)) + } else { + Poll::Pending + } + }).await?) +} + +pub async fn read_drain(stream: &TcpStream, total: usize) -> Result<()> { + let mut done = 0; + while done < total { + let count = stream.recv(|buf| { + let count = min(total - done, buf.len()); + Poll::Ready((count, count)) + }).await?; + done += count; + } + Ok(()) +} + +pub async fn write_i32(stream: &TcpStream, value: i32) -> Result<()> { + stream.send([ + (value >> 24) as u8, + (value >> 16) as u8, + (value >> 8) as u8, + value as u8].iter().copied()).await?; + Ok(()) +}