From 5a16660aa29e51a63c92dcd395ab8b032c2632e7 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sat, 25 Feb 2017 12:07:00 +0800 Subject: [PATCH] runtime: new moninj protocol, TCP-based, with DRTIO support --- artiq/firmware/runtime/lib.rs | 6 +- artiq/firmware/runtime/moninj.rs | 335 +++++++++++++++++-------- artiq/firmware/runtime/moninj_proto.rs | 82 +++--- artiq/firmware/runtime/rtio_mgt.rs | 4 +- 4 files changed, 267 insertions(+), 160 deletions(-) diff --git a/artiq/firmware/runtime/lib.rs b/artiq/firmware/runtime/lib.rs index b8d477615..0ccd8e678 100644 --- a/artiq/firmware/runtime/lib.rs +++ b/artiq/firmware/runtime/lib.rs @@ -42,7 +42,7 @@ mod cache; mod proto; mod kernel_proto; mod session_proto; -#[cfg(has_rtio_moninj)] +#[cfg(any(has_rtio_moninj, has_drtio))] mod moninj_proto; #[cfg(has_rtio_analyzer)] mod analyzer_proto; @@ -50,7 +50,7 @@ mod rpc_proto; mod kernel; mod session; -#[cfg(has_rtio_moninj)] +#[cfg(any(has_rtio_moninj, has_drtio))] mod moninj; #[cfg(has_rtio_analyzer)] mod analyzer; @@ -121,7 +121,7 @@ fn startup() { let io = scheduler.io(); rtio_mgt::startup(&io); io.spawn(16384, session::thread); - #[cfg(has_rtio_moninj)] + #[cfg(any(has_rtio_moninj, has_drtio))] io.spawn(4096, moninj::thread); #[cfg(has_rtio_analyzer)] io.spawn(4096, analyzer::thread); diff --git a/artiq/firmware/runtime/moninj.rs b/artiq/firmware/runtime/moninj.rs index 812fb115d..43aa744e0 100644 --- a/artiq/firmware/runtime/moninj.rs +++ b/artiq/firmware/runtime/moninj.rs @@ -1,126 +1,241 @@ -use std::io; -use board::csr; -use sched::{Io, UdpSocket}; +use std::io::{self, Read}; +use std::btree_map::BTreeMap; + +use sched::Io; +use sched::{TcpListener, TcpStream}; +use board::{clock, csr}; +#[cfg(has_drtio)] +use drtioaux; +#[cfg(has_drtio)] +use rtio_mgt; + use moninj_proto::*; -const MONINJ_TTL_OVERRIDE_ENABLE: u8 = 0; -const MONINJ_TTL_OVERRIDE_O: u8 = 1; -const MONINJ_TTL_OVERRIDE_OE: u8 = 2; -fn worker(socket: &mut UdpSocket) -> io::Result<()> { - let mut buf = vec![0; 512]; +fn check_magic(stream: &mut TcpStream) -> io::Result<()> { + const MAGIC: &'static [u8] = b"ARTIQ moninj\n"; + + let mut magic: [u8; 13] = [0; 13]; + stream.read_exact(&mut magic)?; + if magic != MAGIC { + Err(io::Error::new(io::ErrorKind::InvalidData, "unrecognized magic")) + } else { + Ok(()) + } +} + +#[cfg(has_rtio_moninj)] +fn read_probe_local(channel: u16, probe: u8) -> u32 { + unsafe { + csr::rtio_moninj::mon_chan_sel_write(channel as _); + csr::rtio_moninj::mon_probe_sel_write(probe); + csr::rtio_moninj::mon_value_update_write(1); + csr::rtio_moninj::mon_value_read() as u32 + } +} + +#[cfg(has_drtio)] +fn read_probe_drtio(io: &Io, channel: u16, probe: u8) -> u32 { + if rtio_mgt::drtio::link_is_running() { + let request = drtioaux::Packet::MonitorRequest { channel: channel, probe: probe }; + drtioaux::hw::send(&request).unwrap(); + + let timeout = clock::get_ms() + 20; + while clock::get_ms() < timeout { + if !rtio_mgt::drtio::link_is_running() { + return 0 + } + match drtioaux::hw::recv() { + Ok(None) => (), + Ok(Some(drtioaux::Packet::MonitorReply { value })) => return value, + Ok(Some(_)) => warn!("received unexpected aux packet"), + Err(e) => warn!("aux packet error ({})", e) + } + io.relinquish().unwrap(); + } + warn!("aux packet timeout"); + 0 + } else { + 0 + } +} + +fn read_probe(_io: &Io, channel: u32, probe: u8) -> u32 { + #[cfg(has_rtio_moninj)] + { + if channel & 0xff0000 == 0 { + return read_probe_local(channel as u16, probe) + } + } + #[cfg(has_drtio)] + { + if channel & 0xff0000 != 0 { + return read_probe_drtio(_io, channel as u16, probe) + } + } + error!("read_probe: unrecognized channel number {}", channel); + 0 +} + +#[cfg(has_rtio_moninj)] +fn inject_local(channel: u16, overrd: u8, value: u8) { + unsafe { + csr::rtio_moninj::inj_chan_sel_write(channel as _); + csr::rtio_moninj::inj_override_sel_write(overrd); + csr::rtio_moninj::inj_value_write(value); + } +} + +#[cfg(has_drtio)] +fn inject_drtio(channel: u16, overrd: u8, value: u8) { + if rtio_mgt::drtio::link_is_running() { + let request = drtioaux::Packet::InjectionRequest { + channel: channel, + overrd: overrd, + value: value + }; + drtioaux::hw::send(&request).unwrap(); + } +} + +fn inject(channel: u32, overrd: u8, value: u8) { + #[cfg(has_rtio_moninj)] + { + if channel & 0xff0000 == 0 { + inject_local(channel as u16, overrd, value); + return + } + } + #[cfg(has_drtio)] + { + if channel & 0xff0000 != 0 { + inject_drtio(channel as u16, overrd, value); + return + } + } + error!("inject: unrecognized channel number {}", channel); +} + +#[cfg(has_rtio_moninj)] +fn read_injection_status_local(channel: u16, overrd: u8) -> u8 { + unsafe { + csr::rtio_moninj::inj_chan_sel_write(channel as _); + csr::rtio_moninj::inj_override_sel_write(overrd); + csr::rtio_moninj::inj_value_read() + } +} + +#[cfg(has_drtio)] +fn read_injection_status_drtio(io: &Io, channel: u16, overrd: u8) -> u8 { + if rtio_mgt::drtio::link_is_running() { + let request = drtioaux::Packet::InjectionStatusRequest { + channel: channel, + overrd: overrd + }; + drtioaux::hw::send(&request).unwrap(); + + let timeout = clock::get_ms() + 20; + while clock::get_ms() < timeout { + if !rtio_mgt::drtio::link_is_running() { + return 0 + } + match drtioaux::hw::recv() { + Ok(None) => (), + Ok(Some(drtioaux::Packet::InjectionStatusReply { value })) => return value, + Ok(Some(_)) => warn!("received unexpected aux packet"), + Err(e) => warn!("aux packet error ({})", e) + } + io.relinquish().unwrap(); + } + warn!("aux packet timeout"); + 0 + } else { + 0 + } +} + +fn read_injection_status(_io: &Io, channel: u32, probe: u8) -> u8 { + #[cfg(has_rtio_moninj)] + { + if channel & 0xff0000 == 0 { + return read_injection_status_local(channel as u16, probe) + } + } + #[cfg(has_drtio)] + { + if channel & 0xff0000 != 0 { + return read_injection_status_drtio(_io, channel as u16, probe) + } + } + error!("read_injection_status: unrecognized channel number {}", channel); + 0 +} + +fn connection_worker(io: &Io, mut stream: &mut TcpStream) -> io::Result<()> { + let mut watch_list = BTreeMap::new(); + let mut next_check = 0; + + check_magic(&mut stream)?; + info!("new connection from {}", stream.remote_endpoint()); + loop { - let (size, addr) = socket.recv_from(&mut buf)?; - let request = Request::read_from(&mut io::Cursor::new(&buf[..size]))?; - trace!("{} -> {:?}", addr, request); - - match request { - Request::Monitor => { - #[cfg(has_dds)] - let mut dds_ftws = [0u32; (csr::CONFIG_RTIO_DDS_COUNT as usize * - csr::CONFIG_DDS_CHANNELS_PER_BUS as usize)]; - let mut reply = Reply::default(); - - for i in 0..csr::CONFIG_RTIO_REGULAR_TTL_COUNT as u8 { - unsafe { - csr::rtio_moninj::mon_chan_sel_write(i); - csr::rtio_moninj::mon_probe_sel_write(0); - csr::rtio_moninj::mon_value_update_write(1); - if csr::rtio_moninj::mon_value_read() != 0 { - reply.ttl_levels |= 1 << i; - } - csr::rtio_moninj::mon_probe_sel_write(1); - csr::rtio_moninj::mon_value_update_write(1); - if csr::rtio_moninj::mon_value_read() != 0 { - reply.ttl_oes |= 1 << i; - } - csr::rtio_moninj::inj_chan_sel_write(i); - csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_ENABLE); - if csr::rtio_moninj::inj_value_read() != 0 { - reply.ttl_overrides |= 1 << i; - } + if stream.can_recv() { + let request = HostMessage::read_from(stream)?; + match request { + HostMessage::Monitor { enable, channel, probe } => { + if enable { + let _ = watch_list.entry((channel, probe)).or_insert(None); + } else { + let _ = watch_list.remove(&(channel, probe)); } - } - - #[cfg(has_dds)] - { - reply.dds_rtio_first_channel = csr::CONFIG_RTIO_FIRST_DDS_CHANNEL as u16; - reply.dds_channels_per_bus = csr::CONFIG_DDS_CHANNELS_PER_BUS as u16; - - for j in 0..csr::CONFIG_RTIO_DDS_COUNT { - unsafe { - csr::rtio_moninj::mon_chan_sel_write( - (csr::CONFIG_RTIO_FIRST_DDS_CHANNEL + j) as u8); - for i in 0..csr::CONFIG_DDS_CHANNELS_PER_BUS { - csr::rtio_moninj::mon_probe_sel_write(i as u8); - csr::rtio_moninj::mon_value_update_write(1); - dds_ftws[(csr::CONFIG_DDS_CHANNELS_PER_BUS * j + i) as usize] = - csr::rtio_moninj::mon_value_read() as u32; - } - } - } - reply.dds_ftws = &dds_ftws; - } - - trace!("{} <- {:?}", addr, reply); - buf.clear(); - reply.write_to(&mut buf)?; - socket.send_to(&buf, addr)?; - }, - - Request::TtlSet { channel, mode: TtlMode::Experiment } => { - unsafe { - csr::rtio_moninj::inj_chan_sel_write(channel); - csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_ENABLE); - csr::rtio_moninj::inj_value_write(0); - } - }, - - Request::TtlSet { channel, mode: TtlMode::High } => { - unsafe { - csr::rtio_moninj::inj_chan_sel_write(channel); - csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_O); - csr::rtio_moninj::inj_value_write(1); - csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_OE); - csr::rtio_moninj::inj_value_write(1); - csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_ENABLE); - csr::rtio_moninj::inj_value_write(1); - } - }, - - Request::TtlSet { channel, mode: TtlMode::Low } => { - unsafe { - csr::rtio_moninj::inj_chan_sel_write(channel); - csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_O); - csr::rtio_moninj::inj_value_write(0); - csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_OE); - csr::rtio_moninj::inj_value_write(1); - csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_ENABLE); - csr::rtio_moninj::inj_value_write(1); - } - }, - - Request::TtlSet { channel, mode: TtlMode::Input } => { - unsafe { - csr::rtio_moninj::inj_chan_sel_write(channel); - csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_OE); - csr::rtio_moninj::inj_value_write(0); - csr::rtio_moninj::inj_override_sel_write(MONINJ_TTL_OVERRIDE_ENABLE); - csr::rtio_moninj::inj_value_write(1); + }, + HostMessage::Inject { channel, overrd, value } => inject(channel, overrd, value), + HostMessage::GetInjectionStatus { channel, overrd } => { + let value = read_injection_status(io, channel, overrd); + let reply = DeviceMessage::InjectionStatus { + channel: channel, + overrd: overrd, + value: value + }; + reply.write_to(stream)?; } } + } else if !stream.may_recv() { + return Ok(()) } + + if clock::get_ms() > next_check { + for (&(channel, probe), previous) in watch_list.iter_mut() { + let current = read_probe(io, channel, probe); + if previous.is_none() || (previous.unwrap() != current) { + let message = DeviceMessage::MonitorStatus { + channel: channel, + probe: probe, + value: current + }; + message.write_to(stream)?; + *previous = Some(current); + } + } + next_check = clock::get_ms() + 200; + } + + io.relinquish().unwrap(); } } pub fn thread(io: Io) { - let mut socket = UdpSocket::new(&io, 1, 512); - socket.bind(3250); + let listener = TcpListener::new(&io, 65535); + listener.listen(1383).expect("moninj: cannot listen"); loop { - match worker(&mut socket) { - Ok(()) => unreachable!(), - Err(err) => error!("moninj aborted: {}", err) - } + let stream = listener.accept().expect("moninj: cannot accept").into_handle(); + io.spawn(16384, move |io| { + let mut stream = TcpStream::from_handle(&io, stream); + match connection_worker(&io, &mut stream) { + Ok(()) => {}, + Err(err) => error!("moninj aborted: {}", err) + } + }); } } diff --git a/artiq/firmware/runtime/moninj_proto.rs b/artiq/firmware/runtime/moninj_proto.rs index ef287b696..5b67eca48 100644 --- a/artiq/firmware/runtime/moninj_proto.rs +++ b/artiq/firmware/runtime/moninj_proto.rs @@ -2,63 +2,55 @@ use std::io::{self, Read, Write}; use proto::*; #[derive(Debug)] -pub enum TtlMode { - Experiment, - High, - Low, - Input -} - -impl TtlMode { - pub fn read_from(reader: &mut Read) -> io::Result { - Ok(match read_u8(reader)? { - 0 => TtlMode::Experiment, - 1 => TtlMode::High, - 2 => TtlMode::Low, - 3 => TtlMode::Input, - _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unknown TTL mode")) - }) - } +pub enum HostMessage { + Monitor { enable: bool, channel: u32, probe: u8 }, + Inject { channel: u32, overrd: u8, value: u8 }, + GetInjectionStatus { channel: u32, overrd: u8 } } #[derive(Debug)] -pub enum Request { - Monitor, - TtlSet { channel: u8, mode: TtlMode } +pub enum DeviceMessage { + MonitorStatus { channel: u32, probe: u8, value: u32 }, + InjectionStatus { channel: u32, overrd: u8, value: u8 } } -impl Request { - pub fn read_from(reader: &mut Read) -> io::Result { +impl HostMessage { + pub fn read_from(reader: &mut Read) -> io::Result { Ok(match read_u8(reader)? { - 1 => Request::Monitor, - 2 => Request::TtlSet { - channel: read_u8(reader)?, - mode: TtlMode::read_from(reader)? + 0 => HostMessage::Monitor { + enable: if read_u8(reader)? == 0 { false } else { true }, + channel: read_u32(reader)?, + probe: read_u8(reader)? }, - _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unknown request type")) + 1 => HostMessage::Inject { + channel: read_u32(reader)?, + overrd: read_u8(reader)?, + value: read_u8(reader)? + }, + 2 => HostMessage::GetInjectionStatus { + channel: read_u32(reader)?, + overrd: read_u8(reader)? + }, + _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unknown packet type")) }) } } -#[derive(Debug, Default)] -pub struct Reply<'a> { - pub ttl_levels: u64, - pub ttl_oes: u64, - pub ttl_overrides: u64, - pub dds_rtio_first_channel: u16, - pub dds_channels_per_bus: u16, - pub dds_ftws: &'a [u32] -} - -impl<'a> Reply<'a> { +impl DeviceMessage { pub fn write_to(&self, writer: &mut Write) -> io::Result<()> { - write_u64(writer, self.ttl_levels)?; - write_u64(writer, self.ttl_oes)?; - write_u64(writer, self.ttl_overrides)?; - write_u16(writer, self.dds_rtio_first_channel)?; - write_u16(writer, self.dds_channels_per_bus)?; - for dds_ftw in self.dds_ftws { - write_u32(writer, *dds_ftw)?; + match *self { + DeviceMessage::MonitorStatus { channel, probe, value } => { + write_u8(writer, 0)?; + write_u32(writer, channel)?; + write_u8(writer, probe)?; + write_u32(writer, value)?; + }, + DeviceMessage::InjectionStatus { channel, overrd, value } => { + write_u8(writer, 1)?; + write_u32(writer, channel)?; + write_u8(writer, overrd)?; + write_u8(writer, value)?; + } } Ok(()) } diff --git a/artiq/firmware/runtime/rtio_mgt.rs b/artiq/firmware/runtime/rtio_mgt.rs index 22ba993c8..85740ebd6 100644 --- a/artiq/firmware/runtime/rtio_mgt.rs +++ b/artiq/firmware/runtime/rtio_mgt.rs @@ -37,7 +37,7 @@ pub mod crg { } #[cfg(has_drtio)] -mod drtio { +pub mod drtio { use super::*; use drtioaux; @@ -54,7 +54,7 @@ mod drtio { } } - fn link_is_running() -> bool { + pub fn link_is_running() -> bool { unsafe { LINK_RUNNING }