From 0a29c00fcc24d93044b42812ef159f1568c49293 Mon Sep 17 00:00:00 2001 From: whitequark Date: Wed, 5 Oct 2016 05:59:38 +0000 Subject: [PATCH] Rust: implement analyzer. --- artiq/runtime.rs/liblwip/lib.rs | 56 ++++++++++--- artiq/runtime.rs/src/analyzer.rs | 109 +++++++++++++++++++++++++ artiq/runtime.rs/src/analyzer_proto.rs | 22 +++++ artiq/runtime.rs/src/board.rs | 1 + artiq/runtime.rs/src/lib.rs | 7 +- artiq/runtime.rs/src/sched.rs | 4 +- artiq/runtime.rs/src/session.rs | 2 +- 7 files changed, 185 insertions(+), 16 deletions(-) create mode 100644 artiq/runtime.rs/src/analyzer.rs create mode 100644 artiq/runtime.rs/src/analyzer_proto.rs diff --git a/artiq/runtime.rs/liblwip/lib.rs b/artiq/runtime.rs/liblwip/lib.rs index 81c874bc6..f94b65dac 100644 --- a/artiq/runtime.rs/liblwip/lib.rs +++ b/artiq/runtime.rs/liblwip/lib.rs @@ -8,6 +8,7 @@ extern crate lwip_sys; extern crate std_artiq as std; use core::marker::PhantomData; +use core::ptr; use core::cell::RefCell; use core::fmt; use alloc::boxed::Box; @@ -33,6 +34,8 @@ pub enum Error { ConnectionReset, ConnectionClosed, IllegalArgument, + // Not used by lwip; added for building blocking interfaces. + Interrupted } impl Error { @@ -54,6 +57,7 @@ impl Error { Error::ConnectionReset => "connection reset", Error::ConnectionClosed => "connection closed", Error::IllegalArgument => "illegal argument", + Error::Interrupted => "interrupted" } } } @@ -72,7 +76,12 @@ impl error::Error for Error { impl From for std::io::Error { fn from(lower: Error) -> std::io::Error { - std::io::Error::new(std::io::ErrorKind::Other, lower) + use std::io; + + match lower { + Error::Interrupted => io::Error::new(io::ErrorKind::Interrupted, "interrupted"), + err => io::Error::new(io::ErrorKind::Other, err) + } } } @@ -472,7 +481,8 @@ pub enum Shutdown { #[derive(Debug)] pub struct TcpStreamState { recv_buffer: LinkedList>>, - send_avail: usize + send_avail: usize, + total_sent: usize } impl TcpStreamState { @@ -511,10 +521,12 @@ impl TcpStream { } extern fn sent(arg: *mut c_void, raw: *mut lwip_sys::tcp_pcb, - _len: u16) -> lwip_sys::err { + len: u16) -> lwip_sys::err { unsafe { let state = arg as *mut RefCell; - (*state).borrow_mut().send_avail = lwip_sys::tcp_sndbuf_(raw) as usize; + let mut state = (*state).borrow_mut(); + state.send_avail = lwip_sys::tcp_sndbuf_(raw) as usize; + state.total_sent = state.total_sent.wrapping_add(len as usize); } lwip_sys::ERR_OK } @@ -529,7 +541,8 @@ impl TcpStream { unsafe { let mut state = Box::new(RefCell::new(TcpStreamState { recv_buffer: LinkedList::new(), - send_avail: lwip_sys::tcp_sndbuf_(raw) as usize + send_avail: lwip_sys::tcp_sndbuf_(raw) as usize, + total_sent: 0 })); let arg = &mut *state as *mut RefCell as *mut _; lwip_sys::tcp_arg(raw, arg); @@ -544,22 +557,39 @@ impl TcpStream { &*self.state } - pub fn write(&self, data: &[u8]) -> Result { - let sndbuf = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize; + unsafe fn write_common(&self, data: &[u8], copy: bool) -> Result { + let sndbuf = lwip_sys::tcp_sndbuf_(self.raw) as usize; let len = if data.len() < sndbuf { data.len() } else { sndbuf }; - let result = result_from(unsafe { + let result = result_from({ lwip_sys::tcp_write(self.raw, data as *const [u8] as *const _, len as u16, - lwip_sys::TCP_WRITE_FLAG_COPY | - lwip_sys::TCP_WRITE_FLAG_MORE) + lwip_sys::TCP_WRITE_FLAG_MORE | + if copy { lwip_sys::TCP_WRITE_FLAG_COPY } else { 0 }) }, || len); - self.state.borrow_mut().send_avail = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize; + self.state.borrow_mut().send_avail = lwip_sys::tcp_sndbuf_(self.raw) as usize; result } + pub fn write(&self, data: &[u8]) -> Result { + unsafe { self.write_common(data, true) } + } + + pub fn write_in_place(&self, data: &[u8], mut relinquish: F) -> Result + where F: FnMut() -> Result<()> { + let cursor = self.state.borrow().total_sent; + let written = try!(unsafe { self.write_common(data, false) }); + loop { + let cursor_now = self.state.borrow().total_sent; + if cursor_now >= cursor.wrapping_add(written) { + return Ok(written) + } else { + try!(relinquish()) + } + } + } + pub fn flush(&self) -> Result<()> { - const EMPTY_DATA: [u8; 0] = []; result_from(unsafe { - lwip_sys::tcp_write(self.raw, &EMPTY_DATA as *const [u8] as *const _, 0, 0) + lwip_sys::tcp_write(self.raw, ptr::null(), 0, 0) }, || ()) } diff --git a/artiq/runtime.rs/src/analyzer.rs b/artiq/runtime.rs/src/analyzer.rs new file mode 100644 index 000000000..c16bf1f95 --- /dev/null +++ b/artiq/runtime.rs/src/analyzer.rs @@ -0,0 +1,109 @@ +use std::io::{self, Read, Write}; +use board::{self, csr}; +use sched::{Waiter, Spawner}; +use sched::{TcpListener, TcpStream, SocketAddr, IP_ANY}; +use analyzer_proto::*; + +const BUFFER_SIZE: usize = 512 * 1024; + +// hack until https://github.com/rust-lang/rust/issues/33626 is fixed +#[repr(simd)] +struct Align64(u64, u64, u64, u64, u64, u64, u64, u64); + +struct Buffer { + data: [u8; BUFFER_SIZE], + __alignment: [Align64; 0] +} + +static mut BUFFER: Buffer = Buffer { + data: [0; BUFFER_SIZE], + __alignment: [] +}; + +fn arm() { + unsafe { + let base_addr = &mut BUFFER.data[0] as *mut _ as usize; + let last_addr = &mut BUFFER.data[BUFFER_SIZE - 1] as *mut _ as usize; + csr::rtio_analyzer::message_encoder_overflow_reset_write(1); + csr::rtio_analyzer::dma_base_address_write(base_addr as u64); + csr::rtio_analyzer::dma_last_address_write(last_addr as u64); + csr::rtio_analyzer::dma_reset_write(1); + csr::rtio_analyzer::enable_write(1); + } +} + +fn disarm() { + unsafe { + csr::rtio_analyzer::enable_write(0); + while csr::rtio_analyzer::busy_read() != 0 {} + board::flush_cpu_dcache(); + board::flush_l2_cache(); + } +} + +fn worker(mut stream: TcpStream) -> io::Result<()> { + let data = unsafe { &BUFFER.data[..] }; + let overflow_occurred = unsafe { csr::rtio_analyzer::message_encoder_overflow_read() != 0 }; + let total_byte_count = unsafe { csr::rtio_analyzer::dma_byte_count_read() }; + let pointer = (total_byte_count % BUFFER_SIZE as u64) as usize; + let wraparound = total_byte_count >= BUFFER_SIZE as u64; + + let header = Header { + total_byte_count: total_byte_count, + sent_bytes: if wraparound { BUFFER_SIZE as u32 } else { total_byte_count as u32 }, + overflow_occurred: overflow_occurred, + log_channel: csr::CONFIG_RTIO_LOG_CHANNEL as u8, + dds_onehot_sel: csr::CONFIG_DDS_ONEHOT_SEL != 0 + }; + trace!("{:?}", header); + + try!(header.write_to(&mut stream)); + if wraparound { + try!(stream.write(&data[pointer..])); + try!(stream.write(&data[..pointer])); + } else { + try!(stream.write(&data[..pointer])); + } + + Ok(()) +} + +// TODO: remove this, it's pointless in analyzer +fn check_magic(stream: &mut TcpStream) -> io::Result<()> { + const MAGIC: &'static [u8] = b"ARTIQ coredev\n"; + + let mut magic: [u8; 14] = [0; 14]; + try!(stream.read_exact(&mut magic)); + if magic != MAGIC { + Err(io::Error::new(io::ErrorKind::InvalidData, "unrecognized magic")) + } else { + Ok(()) + } +} + +pub fn thread(waiter: Waiter, _spawner: Spawner) { + // verify that the hack above works + assert!(::core::mem::align_of::() == 64); + + let addr = SocketAddr::new(IP_ANY, 1382); + let listener = TcpListener::bind(waiter, addr).expect("cannot bind socket"); + listener.set_keepalive(true); + + loop { + arm(); + + let (mut stream, addr) = listener.accept().expect("cannot accept client"); + match check_magic(&mut stream) { + Ok(()) => (), + Err(_) => continue + } + info!("connection from {}", addr); + + disarm(); + + match worker(stream) { + Ok(()) => (), + Err(err) => error!("analyzer aborted: {}", err) + } + } +} diff --git a/artiq/runtime.rs/src/analyzer_proto.rs b/artiq/runtime.rs/src/analyzer_proto.rs new file mode 100644 index 000000000..344882d3b --- /dev/null +++ b/artiq/runtime.rs/src/analyzer_proto.rs @@ -0,0 +1,22 @@ +use std::io::{self, Write}; +use proto::*; + +#[derive(Debug)] +pub struct Header { + pub sent_bytes: u32, + pub total_byte_count: u64, + pub overflow_occurred: bool, + pub log_channel: u8, + pub dds_onehot_sel: bool +} + +impl Header { + pub fn write_to(&self, writer: &mut Write) -> io::Result<()> { + try!(write_u32(writer, self.sent_bytes)); + try!(write_u64(writer, self.total_byte_count)); + try!(write_u8(writer, self.overflow_occurred as u8)); + try!(write_u8(writer, self.log_channel)); + try!(write_u8(writer, self.dds_onehot_sel as u8)); + Ok(()) + } +} diff --git a/artiq/runtime.rs/src/board.rs b/artiq/runtime.rs/src/board.rs index 94ad10291..21c8d3d85 100644 --- a/artiq/runtime.rs/src/board.rs +++ b/artiq/runtime.rs/src/board.rs @@ -5,6 +5,7 @@ include!(concat!(env!("BUILDINC_DIRECTORY"), "/generated/csr.rs")); extern { pub fn flush_cpu_dcache(); + pub fn flush_l2_cache(); } pub fn ident(buf: &mut [u8]) -> &str { diff --git a/artiq/runtime.rs/src/lib.rs b/artiq/runtime.rs/src/lib.rs index 0e1860f48..68a0ff269 100644 --- a/artiq/runtime.rs/src/lib.rs +++ b/artiq/runtime.rs/src/lib.rs @@ -1,5 +1,5 @@ #![no_std] -#![feature(libc, const_fn, try_borrow, stmt_expr_attributes)] +#![feature(libc, const_fn, try_borrow, stmt_expr_attributes, repr_simd)] #[macro_use] extern crate std_artiq as std; @@ -28,10 +28,13 @@ mod proto; mod kernel_proto; mod session_proto; mod moninj_proto; +mod analyzer_proto; mod kernel; mod session; mod moninj; +#[cfg(has_rtio_analyzer)] +mod analyzer; extern { fn network_init(); @@ -56,6 +59,8 @@ pub unsafe extern fn rust_main() { let mut scheduler = sched::Scheduler::new(); scheduler.spawner().spawn(8192, session::thread); scheduler.spawner().spawn(4096, moninj::thread); + #[cfg(has_rtio_analyzer)] + scheduler.spawner().spawn(4096, analyzer::thread); loop { scheduler.run(); diff --git a/artiq/runtime.rs/src/sched.rs b/artiq/runtime.rs/src/sched.rs index ef4c8876f..1e84cd73e 100644 --- a/artiq/runtime.rs/src/sched.rs +++ b/artiq/runtime.rs/src/sched.rs @@ -460,7 +460,9 @@ impl<'a> Read for TcpStream<'a> { impl<'a> Write for TcpStream<'a> { fn write(&mut self, buf: &[u8]) -> Result { try!(self.waiter.tcp_writeable(&self.lower)); - Ok(try!(self.lower.write(buf))) + Ok(try!(self.lower.write_in_place(buf, + || self.waiter.relinquish() + .map_err(|_| lwip::Error::Interrupted)))) } fn flush(&mut self) -> Result<()> { diff --git a/artiq/runtime.rs/src/session.rs b/artiq/runtime.rs/src/session.rs index 879ad159d..42f180e44 100644 --- a/artiq/runtime.rs/src/session.rs +++ b/artiq/runtime.rs/src/session.rs @@ -407,7 +407,7 @@ pub fn thread(waiter: Waiter, spawner: Spawner) { Ok(()) => (), Err(_) => continue } - info!("new connection from {:?}", addr); + info!("new connection from {}", addr); let stream = stream.into_lower(); let congress = congress.clone();