runtime: new moninj protocol, TCP-based, with DRTIO support

This commit is contained in:
Sebastien Bourdeauducq 2017-02-25 12:07:00 +08:00
parent 1486a945d9
commit 5a16660aa2
4 changed files with 267 additions and 160 deletions

View File

@ -42,7 +42,7 @@ mod cache;
mod proto; mod proto;
mod kernel_proto; mod kernel_proto;
mod session_proto; mod session_proto;
#[cfg(has_rtio_moninj)] #[cfg(any(has_rtio_moninj, has_drtio))]
mod moninj_proto; mod moninj_proto;
#[cfg(has_rtio_analyzer)] #[cfg(has_rtio_analyzer)]
mod analyzer_proto; mod analyzer_proto;
@ -50,7 +50,7 @@ mod rpc_proto;
mod kernel; mod kernel;
mod session; mod session;
#[cfg(has_rtio_moninj)] #[cfg(any(has_rtio_moninj, has_drtio))]
mod moninj; mod moninj;
#[cfg(has_rtio_analyzer)] #[cfg(has_rtio_analyzer)]
mod analyzer; mod analyzer;
@ -121,7 +121,7 @@ fn startup() {
let io = scheduler.io(); let io = scheduler.io();
rtio_mgt::startup(&io); rtio_mgt::startup(&io);
io.spawn(16384, session::thread); io.spawn(16384, session::thread);
#[cfg(has_rtio_moninj)] #[cfg(any(has_rtio_moninj, has_drtio))]
io.spawn(4096, moninj::thread); io.spawn(4096, moninj::thread);
#[cfg(has_rtio_analyzer)] #[cfg(has_rtio_analyzer)]
io.spawn(4096, analyzer::thread); io.spawn(4096, analyzer::thread);

View File

@ -1,126 +1,241 @@
use std::io; use std::io::{self, Read};
use board::csr; use std::btree_map::BTreeMap;
use sched::{Io, UdpSocket};
use sched::Io;
use sched::{TcpListener, TcpStream};
use board::{clock, csr};
#[cfg(has_drtio)]
use drtioaux;
#[cfg(has_drtio)]
use rtio_mgt;
use moninj_proto::*; use moninj_proto::*;
const MONINJ_TTL_OVERRIDE_ENABLE: u8 = 0;
const MONINJ_TTL_OVERRIDE_O: u8 = 1;
const MONINJ_TTL_OVERRIDE_OE: u8 = 2;
fn worker(socket: &mut UdpSocket) -> io::Result<()> { fn check_magic(stream: &mut TcpStream) -> io::Result<()> {
let mut buf = vec![0; 512]; const MAGIC: &'static [u8] = b"ARTIQ moninj\n";
loop {
let (size, addr) = socket.recv_from(&mut buf)?;
let request = Request::read_from(&mut io::Cursor::new(&buf[..size]))?;
trace!("{} -> {:?}", addr, request);
match request { let mut magic: [u8; 13] = [0; 13];
Request::Monitor => { stream.read_exact(&mut magic)?;
#[cfg(has_dds)] if magic != MAGIC {
let mut dds_ftws = [0u32; (csr::CONFIG_RTIO_DDS_COUNT as usize * Err(io::Error::new(io::ErrorKind::InvalidData, "unrecognized magic"))
csr::CONFIG_DDS_CHANNELS_PER_BUS as usize)]; } else {
let mut reply = Reply::default(); Ok(())
}
}
for i in 0..csr::CONFIG_RTIO_REGULAR_TTL_COUNT as u8 { #[cfg(has_rtio_moninj)]
fn read_probe_local(channel: u16, probe: u8) -> u32 {
unsafe { unsafe {
csr::rtio_moninj::mon_chan_sel_write(i); csr::rtio_moninj::mon_chan_sel_write(channel as _);
csr::rtio_moninj::mon_probe_sel_write(0); csr::rtio_moninj::mon_probe_sel_write(probe);
csr::rtio_moninj::mon_value_update_write(1); csr::rtio_moninj::mon_value_update_write(1);
if csr::rtio_moninj::mon_value_read() != 0 { csr::rtio_moninj::mon_value_read() as u32
reply.ttl_levels |= 1 << i;
}
csr::rtio_moninj::mon_probe_sel_write(1);
csr::rtio_moninj::mon_value_update_write(1);
if csr::rtio_moninj::mon_value_read() != 0 {
reply.ttl_oes |= 1 << i;
}
csr::rtio_moninj::inj_chan_sel_write(i);
csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_ENABLE);
if csr::rtio_moninj::inj_value_read() != 0 {
reply.ttl_overrides |= 1 << i;
}
} }
} }
#[cfg(has_dds)] #[cfg(has_drtio)]
fn read_probe_drtio(io: &Io, channel: u16, probe: u8) -> u32 {
if rtio_mgt::drtio::link_is_running() {
let request = drtioaux::Packet::MonitorRequest { channel: channel, probe: probe };
drtioaux::hw::send(&request).unwrap();
let timeout = clock::get_ms() + 20;
while clock::get_ms() < timeout {
if !rtio_mgt::drtio::link_is_running() {
return 0
}
match drtioaux::hw::recv() {
Ok(None) => (),
Ok(Some(drtioaux::Packet::MonitorReply { value })) => return value,
Ok(Some(_)) => warn!("received unexpected aux packet"),
Err(e) => warn!("aux packet error ({})", e)
}
io.relinquish().unwrap();
}
warn!("aux packet timeout");
0
} else {
0
}
}
fn read_probe(_io: &Io, channel: u32, probe: u8) -> u32 {
#[cfg(has_rtio_moninj)]
{ {
reply.dds_rtio_first_channel = csr::CONFIG_RTIO_FIRST_DDS_CHANNEL as u16; if channel & 0xff0000 == 0 {
reply.dds_channels_per_bus = csr::CONFIG_DDS_CHANNELS_PER_BUS as u16; return read_probe_local(channel as u16, probe)
}
}
#[cfg(has_drtio)]
{
if channel & 0xff0000 != 0 {
return read_probe_drtio(_io, channel as u16, probe)
}
}
error!("read_probe: unrecognized channel number {}", channel);
0
}
for j in 0..csr::CONFIG_RTIO_DDS_COUNT { #[cfg(has_rtio_moninj)]
fn inject_local(channel: u16, overrd: u8, value: u8) {
unsafe { unsafe {
csr::rtio_moninj::mon_chan_sel_write( csr::rtio_moninj::inj_chan_sel_write(channel as _);
(csr::CONFIG_RTIO_FIRST_DDS_CHANNEL + j) as u8); csr::rtio_moninj::inj_override_sel_write(overrd);
for i in 0..csr::CONFIG_DDS_CHANNELS_PER_BUS { csr::rtio_moninj::inj_value_write(value);
csr::rtio_moninj::mon_probe_sel_write(i as u8);
csr::rtio_moninj::mon_value_update_write(1);
dds_ftws[(csr::CONFIG_DDS_CHANNELS_PER_BUS * j + i) as usize] =
csr::rtio_moninj::mon_value_read() as u32;
} }
} }
}
reply.dds_ftws = &dds_ftws;
}
trace!("{} <- {:?}", addr, reply); #[cfg(has_drtio)]
buf.clear(); fn inject_drtio(channel: u16, overrd: u8, value: u8) {
reply.write_to(&mut buf)?; if rtio_mgt::drtio::link_is_running() {
socket.send_to(&buf, addr)?; let request = drtioaux::Packet::InjectionRequest {
}, channel: channel,
overrd: overrd,
value: value
};
drtioaux::hw::send(&request).unwrap();
}
}
Request::TtlSet { channel, mode: TtlMode::Experiment } => { fn inject(channel: u32, overrd: u8, value: u8) {
#[cfg(has_rtio_moninj)]
{
if channel & 0xff0000 == 0 {
inject_local(channel as u16, overrd, value);
return
}
}
#[cfg(has_drtio)]
{
if channel & 0xff0000 != 0 {
inject_drtio(channel as u16, overrd, value);
return
}
}
error!("inject: unrecognized channel number {}", channel);
}
#[cfg(has_rtio_moninj)]
fn read_injection_status_local(channel: u16, overrd: u8) -> u8 {
unsafe { unsafe {
csr::rtio_moninj::inj_chan_sel_write(channel); csr::rtio_moninj::inj_chan_sel_write(channel as _);
csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_ENABLE); csr::rtio_moninj::inj_override_sel_write(overrd);
csr::rtio_moninj::inj_value_write(0); csr::rtio_moninj::inj_value_read()
}
}
#[cfg(has_drtio)]
fn read_injection_status_drtio(io: &Io, channel: u16, overrd: u8) -> u8 {
if rtio_mgt::drtio::link_is_running() {
let request = drtioaux::Packet::InjectionStatusRequest {
channel: channel,
overrd: overrd
};
drtioaux::hw::send(&request).unwrap();
let timeout = clock::get_ms() + 20;
while clock::get_ms() < timeout {
if !rtio_mgt::drtio::link_is_running() {
return 0
}
match drtioaux::hw::recv() {
Ok(None) => (),
Ok(Some(drtioaux::Packet::InjectionStatusReply { value })) => return value,
Ok(Some(_)) => warn!("received unexpected aux packet"),
Err(e) => warn!("aux packet error ({})", e)
}
io.relinquish().unwrap();
}
warn!("aux packet timeout");
0
} else {
0
}
}
fn read_injection_status(_io: &Io, channel: u32, probe: u8) -> u8 {
#[cfg(has_rtio_moninj)]
{
if channel & 0xff0000 == 0 {
return read_injection_status_local(channel as u16, probe)
}
}
#[cfg(has_drtio)]
{
if channel & 0xff0000 != 0 {
return read_injection_status_drtio(_io, channel as u16, probe)
}
}
error!("read_injection_status: unrecognized channel number {}", channel);
0
}
fn connection_worker(io: &Io, mut stream: &mut TcpStream) -> io::Result<()> {
let mut watch_list = BTreeMap::new();
let mut next_check = 0;
check_magic(&mut stream)?;
info!("new connection from {}", stream.remote_endpoint());
loop {
if stream.can_recv() {
let request = HostMessage::read_from(stream)?;
match request {
HostMessage::Monitor { enable, channel, probe } => {
if enable {
let _ = watch_list.entry((channel, probe)).or_insert(None);
} else {
let _ = watch_list.remove(&(channel, probe));
} }
}, },
HostMessage::Inject { channel, overrd, value } => inject(channel, overrd, value),
HostMessage::GetInjectionStatus { channel, overrd } => {
let value = read_injection_status(io, channel, overrd);
let reply = DeviceMessage::InjectionStatus {
channel: channel,
overrd: overrd,
value: value
};
reply.write_to(stream)?;
}
}
} else if !stream.may_recv() {
return Ok(())
}
Request::TtlSet { channel, mode: TtlMode::High } => { if clock::get_ms() > next_check {
unsafe { for (&(channel, probe), previous) in watch_list.iter_mut() {
csr::rtio_moninj::inj_chan_sel_write(channel); let current = read_probe(io, channel, probe);
csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_O); if previous.is_none() || (previous.unwrap() != current) {
csr::rtio_moninj::inj_value_write(1); let message = DeviceMessage::MonitorStatus {
csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_OE); channel: channel,
csr::rtio_moninj::inj_value_write(1); probe: probe,
csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_ENABLE); value: current
csr::rtio_moninj::inj_value_write(1); };
message.write_to(stream)?;
*previous = Some(current);
}
}
next_check = clock::get_ms() + 200;
} }
},
Request::TtlSet { channel, mode: TtlMode::Low } => { io.relinquish().unwrap();
unsafe {
csr::rtio_moninj::inj_chan_sel_write(channel);
csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_O);
csr::rtio_moninj::inj_value_write(0);
csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_OE);
csr::rtio_moninj::inj_value_write(1);
csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_ENABLE);
csr::rtio_moninj::inj_value_write(1);
}
},
Request::TtlSet { channel, mode: TtlMode::Input } => {
unsafe {
csr::rtio_moninj::inj_chan_sel_write(channel);
csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_OE);
csr::rtio_moninj::inj_value_write(0);
csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_ENABLE);
csr::rtio_moninj::inj_value_write(1);
}
}
}
} }
} }
pub fn thread(io: Io) { pub fn thread(io: Io) {
let mut socket = UdpSocket::new(&io, 1, 512); let listener = TcpListener::new(&io, 65535);
socket.bind(3250); listener.listen(1383).expect("moninj: cannot listen");
loop { loop {
match worker(&mut socket) { let stream = listener.accept().expect("moninj: cannot accept").into_handle();
Ok(()) => unreachable!(), io.spawn(16384, move |io| {
let mut stream = TcpStream::from_handle(&io, stream);
match connection_worker(&io, &mut stream) {
Ok(()) => {},
Err(err) => error!("moninj aborted: {}", err) Err(err) => error!("moninj aborted: {}", err)
} }
});
} }
} }

View File

@ -2,63 +2,55 @@ use std::io::{self, Read, Write};
use proto::*; use proto::*;
#[derive(Debug)] #[derive(Debug)]
pub enum TtlMode { pub enum HostMessage {
Experiment, Monitor { enable: bool, channel: u32, probe: u8 },
High, Inject { channel: u32, overrd: u8, value: u8 },
Low, GetInjectionStatus { channel: u32, overrd: u8 }
Input
}
impl TtlMode {
pub fn read_from(reader: &mut Read) -> io::Result<TtlMode> {
Ok(match read_u8(reader)? {
0 => TtlMode::Experiment,
1 => TtlMode::High,
2 => TtlMode::Low,
3 => TtlMode::Input,
_ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unknown TTL mode"))
})
}
} }
#[derive(Debug)] #[derive(Debug)]
pub enum Request { pub enum DeviceMessage {
Monitor, MonitorStatus { channel: u32, probe: u8, value: u32 },
TtlSet { channel: u8, mode: TtlMode } InjectionStatus { channel: u32, overrd: u8, value: u8 }
} }
impl Request { impl HostMessage {
pub fn read_from(reader: &mut Read) -> io::Result<Request> { pub fn read_from(reader: &mut Read) -> io::Result<HostMessage> {
Ok(match read_u8(reader)? { Ok(match read_u8(reader)? {
1 => Request::Monitor, 0 => HostMessage::Monitor {
2 => Request::TtlSet { enable: if read_u8(reader)? == 0 { false } else { true },
channel: read_u8(reader)?, channel: read_u32(reader)?,
mode: TtlMode::read_from(reader)? probe: read_u8(reader)?
}, },
_ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unknown request type")) 1 => HostMessage::Inject {
channel: read_u32(reader)?,
overrd: read_u8(reader)?,
value: read_u8(reader)?
},
2 => HostMessage::GetInjectionStatus {
channel: read_u32(reader)?,
overrd: read_u8(reader)?
},
_ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unknown packet type"))
}) })
} }
} }
#[derive(Debug, Default)] impl DeviceMessage {
pub struct Reply<'a> {
pub ttl_levels: u64,
pub ttl_oes: u64,
pub ttl_overrides: u64,
pub dds_rtio_first_channel: u16,
pub dds_channels_per_bus: u16,
pub dds_ftws: &'a [u32]
}
impl<'a> Reply<'a> {
pub fn write_to(&self, writer: &mut Write) -> io::Result<()> { pub fn write_to(&self, writer: &mut Write) -> io::Result<()> {
write_u64(writer, self.ttl_levels)?; match *self {
write_u64(writer, self.ttl_oes)?; DeviceMessage::MonitorStatus { channel, probe, value } => {
write_u64(writer, self.ttl_overrides)?; write_u8(writer, 0)?;
write_u16(writer, self.dds_rtio_first_channel)?; write_u32(writer, channel)?;
write_u16(writer, self.dds_channels_per_bus)?; write_u8(writer, probe)?;
for dds_ftw in self.dds_ftws { write_u32(writer, value)?;
write_u32(writer, *dds_ftw)?; },
DeviceMessage::InjectionStatus { channel, overrd, value } => {
write_u8(writer, 1)?;
write_u32(writer, channel)?;
write_u8(writer, overrd)?;
write_u8(writer, value)?;
}
} }
Ok(()) Ok(())
} }

View File

@ -37,7 +37,7 @@ pub mod crg {
} }
#[cfg(has_drtio)] #[cfg(has_drtio)]
mod drtio { pub mod drtio {
use super::*; use super::*;
use drtioaux; use drtioaux;
@ -54,7 +54,7 @@ mod drtio {
} }
} }
fn link_is_running() -> bool { pub fn link_is_running() -> bool {
unsafe { unsafe {
LINK_RUNNING LINK_RUNNING
} }