Rust: implement analyzer.

This commit is contained in:
whitequark 2016-10-05 05:59:38 +00:00
parent 2fefd0ad4a
commit 0a29c00fcc
7 changed files with 185 additions and 16 deletions

View File

@ -8,6 +8,7 @@ extern crate lwip_sys;
extern crate std_artiq as std; extern crate std_artiq as std;
use core::marker::PhantomData; use core::marker::PhantomData;
use core::ptr;
use core::cell::RefCell; use core::cell::RefCell;
use core::fmt; use core::fmt;
use alloc::boxed::Box; use alloc::boxed::Box;
@ -33,6 +34,8 @@ pub enum Error {
ConnectionReset, ConnectionReset,
ConnectionClosed, ConnectionClosed,
IllegalArgument, IllegalArgument,
// Not used by lwip; added for building blocking interfaces.
Interrupted
} }
impl Error { impl Error {
@ -54,6 +57,7 @@ impl Error {
Error::ConnectionReset => "connection reset", Error::ConnectionReset => "connection reset",
Error::ConnectionClosed => "connection closed", Error::ConnectionClosed => "connection closed",
Error::IllegalArgument => "illegal argument", Error::IllegalArgument => "illegal argument",
Error::Interrupted => "interrupted"
} }
} }
} }
@ -72,7 +76,12 @@ impl error::Error for Error {
impl From<Error> for std::io::Error { impl From<Error> for std::io::Error {
fn from(lower: Error) -> std::io::Error { fn from(lower: Error) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, lower) use std::io;
match lower {
Error::Interrupted => io::Error::new(io::ErrorKind::Interrupted, "interrupted"),
err => io::Error::new(io::ErrorKind::Other, err)
}
} }
} }
@ -472,7 +481,8 @@ pub enum Shutdown {
#[derive(Debug)] #[derive(Debug)]
pub struct TcpStreamState { pub struct TcpStreamState {
recv_buffer: LinkedList<Result<Pbuf<'static>>>, recv_buffer: LinkedList<Result<Pbuf<'static>>>,
send_avail: usize send_avail: usize,
total_sent: usize
} }
impl TcpStreamState { impl TcpStreamState {
@ -511,10 +521,12 @@ impl TcpStream {
} }
extern fn sent(arg: *mut c_void, raw: *mut lwip_sys::tcp_pcb, extern fn sent(arg: *mut c_void, raw: *mut lwip_sys::tcp_pcb,
_len: u16) -> lwip_sys::err { len: u16) -> lwip_sys::err {
unsafe { unsafe {
let state = arg as *mut RefCell<TcpStreamState>; let state = arg as *mut RefCell<TcpStreamState>;
(*state).borrow_mut().send_avail = lwip_sys::tcp_sndbuf_(raw) as usize; let mut state = (*state).borrow_mut();
state.send_avail = lwip_sys::tcp_sndbuf_(raw) as usize;
state.total_sent = state.total_sent.wrapping_add(len as usize);
} }
lwip_sys::ERR_OK lwip_sys::ERR_OK
} }
@ -529,7 +541,8 @@ impl TcpStream {
unsafe { unsafe {
let mut state = Box::new(RefCell::new(TcpStreamState { let mut state = Box::new(RefCell::new(TcpStreamState {
recv_buffer: LinkedList::new(), recv_buffer: LinkedList::new(),
send_avail: lwip_sys::tcp_sndbuf_(raw) as usize send_avail: lwip_sys::tcp_sndbuf_(raw) as usize,
total_sent: 0
})); }));
let arg = &mut *state as *mut RefCell<TcpStreamState> as *mut _; let arg = &mut *state as *mut RefCell<TcpStreamState> as *mut _;
lwip_sys::tcp_arg(raw, arg); lwip_sys::tcp_arg(raw, arg);
@ -544,22 +557,39 @@ impl TcpStream {
&*self.state &*self.state
} }
pub fn write(&self, data: &[u8]) -> Result<usize> { unsafe fn write_common(&self, data: &[u8], copy: bool) -> Result<usize> {
let sndbuf = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize; let sndbuf = lwip_sys::tcp_sndbuf_(self.raw) as usize;
let len = if data.len() < sndbuf { data.len() } else { sndbuf }; let len = if data.len() < sndbuf { data.len() } else { sndbuf };
let result = result_from(unsafe { let result = result_from({
lwip_sys::tcp_write(self.raw, data as *const [u8] as *const _, len as u16, lwip_sys::tcp_write(self.raw, data as *const [u8] as *const _, len as u16,
lwip_sys::TCP_WRITE_FLAG_COPY | lwip_sys::TCP_WRITE_FLAG_MORE |
lwip_sys::TCP_WRITE_FLAG_MORE) if copy { lwip_sys::TCP_WRITE_FLAG_COPY } else { 0 })
}, || len); }, || len);
self.state.borrow_mut().send_avail = unsafe { lwip_sys::tcp_sndbuf_(self.raw) } as usize; self.state.borrow_mut().send_avail = lwip_sys::tcp_sndbuf_(self.raw) as usize;
result result
} }
pub fn write(&self, data: &[u8]) -> Result<usize> {
unsafe { self.write_common(data, true) }
}
pub fn write_in_place<F>(&self, data: &[u8], mut relinquish: F) -> Result<usize>
where F: FnMut() -> Result<()> {
let cursor = self.state.borrow().total_sent;
let written = try!(unsafe { self.write_common(data, false) });
loop {
let cursor_now = self.state.borrow().total_sent;
if cursor_now >= cursor.wrapping_add(written) {
return Ok(written)
} else {
try!(relinquish())
}
}
}
pub fn flush(&self) -> Result<()> { pub fn flush(&self) -> Result<()> {
const EMPTY_DATA: [u8; 0] = [];
result_from(unsafe { result_from(unsafe {
lwip_sys::tcp_write(self.raw, &EMPTY_DATA as *const [u8] as *const _, 0, 0) lwip_sys::tcp_write(self.raw, ptr::null(), 0, 0)
}, || ()) }, || ())
} }

View File

@ -0,0 +1,109 @@
use std::io::{self, Read, Write};
use board::{self, csr};
use sched::{Waiter, Spawner};
use sched::{TcpListener, TcpStream, SocketAddr, IP_ANY};
use analyzer_proto::*;
const BUFFER_SIZE: usize = 512 * 1024;
// hack until https://github.com/rust-lang/rust/issues/33626 is fixed
#[repr(simd)]
struct Align64(u64, u64, u64, u64, u64, u64, u64, u64);
struct Buffer {
data: [u8; BUFFER_SIZE],
__alignment: [Align64; 0]
}
static mut BUFFER: Buffer = Buffer {
data: [0; BUFFER_SIZE],
__alignment: []
};
fn arm() {
unsafe {
let base_addr = &mut BUFFER.data[0] as *mut _ as usize;
let last_addr = &mut BUFFER.data[BUFFER_SIZE - 1] as *mut _ as usize;
csr::rtio_analyzer::message_encoder_overflow_reset_write(1);
csr::rtio_analyzer::dma_base_address_write(base_addr as u64);
csr::rtio_analyzer::dma_last_address_write(last_addr as u64);
csr::rtio_analyzer::dma_reset_write(1);
csr::rtio_analyzer::enable_write(1);
}
}
fn disarm() {
unsafe {
csr::rtio_analyzer::enable_write(0);
while csr::rtio_analyzer::busy_read() != 0 {}
board::flush_cpu_dcache();
board::flush_l2_cache();
}
}
fn worker(mut stream: TcpStream) -> io::Result<()> {
let data = unsafe { &BUFFER.data[..] };
let overflow_occurred = unsafe { csr::rtio_analyzer::message_encoder_overflow_read() != 0 };
let total_byte_count = unsafe { csr::rtio_analyzer::dma_byte_count_read() };
let pointer = (total_byte_count % BUFFER_SIZE as u64) as usize;
let wraparound = total_byte_count >= BUFFER_SIZE as u64;
let header = Header {
total_byte_count: total_byte_count,
sent_bytes: if wraparound { BUFFER_SIZE as u32 } else { total_byte_count as u32 },
overflow_occurred: overflow_occurred,
log_channel: csr::CONFIG_RTIO_LOG_CHANNEL as u8,
dds_onehot_sel: csr::CONFIG_DDS_ONEHOT_SEL != 0
};
trace!("{:?}", header);
try!(header.write_to(&mut stream));
if wraparound {
try!(stream.write(&data[pointer..]));
try!(stream.write(&data[..pointer]));
} else {
try!(stream.write(&data[..pointer]));
}
Ok(())
}
// TODO: remove this, it's pointless in analyzer
fn check_magic(stream: &mut TcpStream) -> io::Result<()> {
const MAGIC: &'static [u8] = b"ARTIQ coredev\n";
let mut magic: [u8; 14] = [0; 14];
try!(stream.read_exact(&mut magic));
if magic != MAGIC {
Err(io::Error::new(io::ErrorKind::InvalidData, "unrecognized magic"))
} else {
Ok(())
}
}
pub fn thread(waiter: Waiter, _spawner: Spawner) {
// verify that the hack above works
assert!(::core::mem::align_of::<Buffer>() == 64);
let addr = SocketAddr::new(IP_ANY, 1382);
let listener = TcpListener::bind(waiter, addr).expect("cannot bind socket");
listener.set_keepalive(true);
loop {
arm();
let (mut stream, addr) = listener.accept().expect("cannot accept client");
match check_magic(&mut stream) {
Ok(()) => (),
Err(_) => continue
}
info!("connection from {}", addr);
disarm();
match worker(stream) {
Ok(()) => (),
Err(err) => error!("analyzer aborted: {}", err)
}
}
}

View File

@ -0,0 +1,22 @@
use std::io::{self, Write};
use proto::*;
#[derive(Debug)]
pub struct Header {
pub sent_bytes: u32,
pub total_byte_count: u64,
pub overflow_occurred: bool,
pub log_channel: u8,
pub dds_onehot_sel: bool
}
impl Header {
pub fn write_to(&self, writer: &mut Write) -> io::Result<()> {
try!(write_u32(writer, self.sent_bytes));
try!(write_u64(writer, self.total_byte_count));
try!(write_u8(writer, self.overflow_occurred as u8));
try!(write_u8(writer, self.log_channel));
try!(write_u8(writer, self.dds_onehot_sel as u8));
Ok(())
}
}

View File

@ -5,6 +5,7 @@ include!(concat!(env!("BUILDINC_DIRECTORY"), "/generated/csr.rs"));
extern { extern {
pub fn flush_cpu_dcache(); pub fn flush_cpu_dcache();
pub fn flush_l2_cache();
} }
pub fn ident(buf: &mut [u8]) -> &str { pub fn ident(buf: &mut [u8]) -> &str {

View File

@ -1,5 +1,5 @@
#![no_std] #![no_std]
#![feature(libc, const_fn, try_borrow, stmt_expr_attributes)] #![feature(libc, const_fn, try_borrow, stmt_expr_attributes, repr_simd)]
#[macro_use] #[macro_use]
extern crate std_artiq as std; extern crate std_artiq as std;
@ -28,10 +28,13 @@ mod proto;
mod kernel_proto; mod kernel_proto;
mod session_proto; mod session_proto;
mod moninj_proto; mod moninj_proto;
mod analyzer_proto;
mod kernel; mod kernel;
mod session; mod session;
mod moninj; mod moninj;
#[cfg(has_rtio_analyzer)]
mod analyzer;
extern { extern {
fn network_init(); fn network_init();
@ -56,6 +59,8 @@ pub unsafe extern fn rust_main() {
let mut scheduler = sched::Scheduler::new(); let mut scheduler = sched::Scheduler::new();
scheduler.spawner().spawn(8192, session::thread); scheduler.spawner().spawn(8192, session::thread);
scheduler.spawner().spawn(4096, moninj::thread); scheduler.spawner().spawn(4096, moninj::thread);
#[cfg(has_rtio_analyzer)]
scheduler.spawner().spawn(4096, analyzer::thread);
loop { loop {
scheduler.run(); scheduler.run();

View File

@ -460,7 +460,9 @@ impl<'a> Read for TcpStream<'a> {
impl<'a> Write for TcpStream<'a> { impl<'a> Write for TcpStream<'a> {
fn write(&mut self, buf: &[u8]) -> Result<usize> { fn write(&mut self, buf: &[u8]) -> Result<usize> {
try!(self.waiter.tcp_writeable(&self.lower)); try!(self.waiter.tcp_writeable(&self.lower));
Ok(try!(self.lower.write(buf))) Ok(try!(self.lower.write_in_place(buf,
|| self.waiter.relinquish()
.map_err(|_| lwip::Error::Interrupted))))
} }
fn flush(&mut self) -> Result<()> { fn flush(&mut self) -> Result<()> {

View File

@ -407,7 +407,7 @@ pub fn thread(waiter: Waiter, spawner: Spawner) {
Ok(()) => (), Ok(()) => (),
Err(_) => continue Err(_) => continue
} }
info!("new connection from {:?}", addr); info!("new connection from {}", addr);
let stream = stream.into_lower(); let stream = stream.into_lower();
let congress = congress.clone(); let congress = congress.clone();