From 948ed6fb0a8dc5e513c55fb87bbbf82a2f8dc4e5 Mon Sep 17 00:00:00 2001 From: whitequark Date: Wed, 29 Mar 2017 03:35:45 +0000 Subject: [PATCH] Extract core device management interface from session interface (#691). --- artiq/coredevice/comm_kernel.py | 25 ----- artiq/coredevice/comm_mgmt.py | 133 +++++++++++++++++++++++ artiq/firmware/libproto/lib.rs | 1 + artiq/firmware/libproto/mgmt_proto.rs | 68 ++++++++++++ artiq/firmware/libproto/session_proto.rs | 38 ------- artiq/firmware/runtime/lib.rs | 4 +- artiq/firmware/runtime/mgmt.rs | 72 ++++++++++++ artiq/firmware/runtime/session.rs | 29 ----- artiq/frontend/artiq_coreboot.py | 7 +- artiq/frontend/artiq_corelog.py | 14 ++- artiq/frontend/artiq_devtool.py | 2 +- doc/manual/default_network_ports.rst | 2 + 12 files changed, 294 insertions(+), 101 deletions(-) create mode 100644 artiq/coredevice/comm_mgmt.py create mode 100644 artiq/firmware/libproto/mgmt_proto.rs create mode 100644 artiq/firmware/runtime/mgmt.rs diff --git a/artiq/coredevice/comm_kernel.py b/artiq/coredevice/comm_kernel.py index 057647547..eba2a9dbd 100644 --- a/artiq/coredevice/comm_kernel.py +++ b/artiq/coredevice/comm_kernel.py @@ -281,25 +281,6 @@ class CommKernel: self._read_empty(_D2HMsgType.CLOCK_SWITCH_COMPLETED) - def get_log(self): - self._write_empty(_H2DMsgType.LOG_REQUEST) - - self._read_header() - self._read_expect(_D2HMsgType.LOG_REPLY) - return self._read_string() - - def clear_log(self): - self._write_empty(_H2DMsgType.LOG_CLEAR) - - self._read_empty(_D2HMsgType.LOG_REPLY) - - def set_log_level(self, level): - if level not in _LogLevel.__members__: - raise ValueError("invalid log level {}".format(level)) - - self._write_header(_H2DMsgType.LOG_FILTER) - self._write_int8(getattr(_LogLevel, level).value) - def flash_storage_read(self, key): self._write_header(_H2DMsgType.FLASH_READ_REQUEST) self._write_string(key) @@ -330,12 +311,6 @@ class CommKernel: self._read_empty(_D2HMsgType.FLASH_OK_REPLY) - def hotswap(self, image): - self._write_header(_H2DMsgType.HOTSWAP) - self._write_bytes(image) - - self._read_empty(_D2HMsgType.HOTSWAP_IMMINENT) - def load(self, kernel_library): self._write_header(_H2DMsgType.LOAD_KERNEL) self._write_bytes(kernel_library) diff --git a/artiq/coredevice/comm_mgmt.py b/artiq/coredevice/comm_mgmt.py new file mode 100644 index 000000000..36793957b --- /dev/null +++ b/artiq/coredevice/comm_mgmt.py @@ -0,0 +1,133 @@ +from enum import Enum +import logging +import socket +import struct + + +logger = logging.getLogger(__name__) + + +class Request(Enum): + GetLog = 1 + ClearLog = 2 + SetLogFilter = 3 + + Hotswap = 4 + + +class Reply(Enum): + Success = 1 + + LogContent = 2 + + HotswapImminent = 3 + + +class LogLevel(Enum): + OFF = 0 + ERROR = 1 + WARN = 2 + INFO = 3 + DEBUG = 4 + TRACE = 5 + + +def initialize_connection(host, port): + sock = socket.create_connection((host, port), 5.0) + sock.settimeout(None) + logger.debug("connected to host %s on port %d", host, port) + return sock + + +class CommMgmt: + def __init__(self, dmgr, host, port=1380): + self.host = host + self.port = port + + def open(self): + if hasattr(self, "socket"): + return + self.socket = initialize_connection(self.host, self.port) + self.socket.sendall(b"ARTIQ management\n") + + def close(self): + if not hasattr(self, "socket"): + return + self.socket.close() + del self.socket + logger.debug("disconnected") + + # Protocol elements + + def _write(self, data): + self.socket.sendall(data) + + def _write_header(self, ty): + self.open() + + logger.debug("sending message: type=%r", ty) + self._write(struct.pack("B", ty.value)) + + def _write_int8(self, value): + self._write(struct.pack("B", value)) + + def _write_int32(self, value): + self._write(struct.pack(">l", value)) + + def _write_bytes(self, value): + self._write_int32(len(value)) + self._write(value) + + def _read(self, length): + r = bytes() + while len(r) < length: + rn = self.socket.recv(min(8192, length - len(r))) + if not rn: + raise ConnectionResetError("Connection closed") + r += rn + return r + + def _read_header(self): + ty = Reply(*struct.unpack("B", self._read(1))) + logger.debug("receiving message: type=%r", ty) + + return ty + + def _read_expect(self, ty): + if self._read_header() != ty: + raise IOError("Incorrect reply from device: {} (expected {})". + format(self._read_type, ty)) + + def _read_int32(self): + (value, ) = struct.unpack(">l", self._read(4)) + return value + + def _read_bytes(self): + return self._read(self._read_int32()) + + def _read_string(self): + return self._read_bytes().decode("utf-8") + + # External API + + def get_log(self): + self._write_header(Request.GetLog) + self._read_expect(Reply.LogContent) + return self._read_string() + + def clear_log(self): + self._write_header(Request.ClearLog) + self._read_expect(Reply.Success) + + def set_log_level(self, level): + if level not in LogLevel.__members__: + raise ValueError("invalid log level {}".format(level)) + + self._write_header(Request.SetLogFilter) + self._write_int8(getattr(LogLevel, level).value) + self._read_expect(Reply.Success) + + def hotswap(self, firmware): + self._write_header(Request.Hotswap) + self._write_bytes(firmware) + self._read_expect(Reply.HotswapImminent) diff --git a/artiq/firmware/libproto/lib.rs b/artiq/firmware/libproto/lib.rs index 3ecf3c278..d3422b5ee 100644 --- a/artiq/firmware/libproto/lib.rs +++ b/artiq/firmware/libproto/lib.rs @@ -13,6 +13,7 @@ extern crate std_artiq as std; pub mod kernel_proto; // External protocols. +pub mod mgmt_proto; pub mod analyzer_proto; pub mod moninj_proto; pub mod session_proto; diff --git a/artiq/firmware/libproto/mgmt_proto.rs b/artiq/firmware/libproto/mgmt_proto.rs new file mode 100644 index 000000000..75e480840 --- /dev/null +++ b/artiq/firmware/libproto/mgmt_proto.rs @@ -0,0 +1,68 @@ +use std::vec::Vec; +use std::io::{self, Read, Write}; +use {ReadExt, WriteExt}; +#[cfg(feature = "log")] +use log::LogLevelFilter; + +#[derive(Debug)] +pub enum Request { + GetLog, + ClearLog, + #[cfg(feature = "log")] + SetLogFilter(LogLevelFilter), + + Hotswap(Vec), +} + +pub enum Reply<'a> { + Success, + + LogContent(&'a str), + + HotswapImminent, +} + +impl Request { + pub fn read_from(reader: &mut Read) -> io::Result { + Ok(match reader.read_u8()? { + 1 => Request::GetLog, + 2 => Request::ClearLog, + #[cfg(feature = "log")] + 3 => { + let level = match reader.read_u8()? { + 0 => LogLevelFilter::Off, + 1 => LogLevelFilter::Error, + 2 => LogLevelFilter::Warn, + 3 => LogLevelFilter::Info, + 4 => LogLevelFilter::Debug, + 5 => LogLevelFilter::Trace, + _ => return Err(io::Error::new(io::ErrorKind::InvalidData, + "invalid log level")) + }; + Request::SetLogFilter(level) + } + 4 => Request::Hotswap(reader.read_bytes()?), + _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unknown request type")) + }) + } +} + +impl<'a> Reply<'a> { + pub fn write_to(&self, writer: &mut Write) -> io::Result<()> { + match *self { + Reply::Success => { + writer.write_u8(1)?; + }, + + Reply::LogContent(ref log) => { + writer.write_u8(2)?; + writer.write_string(log)?; + }, + + Reply::HotswapImminent => { + writer.write_u8(3)?; + }, + } + Ok(()) + } +} diff --git a/artiq/firmware/libproto/session_proto.rs b/artiq/firmware/libproto/session_proto.rs index e15eed08d..c852f3918 100644 --- a/artiq/firmware/libproto/session_proto.rs +++ b/artiq/firmware/libproto/session_proto.rs @@ -1,8 +1,6 @@ use std::io::{self, Read, Write}; use std::vec::Vec; use std::string::String; -#[cfg(feature = "log")] -use log::LogLevelFilter; use {ReadExt, WriteExt}; fn read_sync(reader: &mut Read) -> io::Result<()> { @@ -20,11 +18,6 @@ fn write_sync(writer: &mut Write) -> io::Result<()> { #[derive(Debug)] pub enum Request { - Log, - LogClear, - #[cfg(feature = "log")] - LogFilter(LogLevelFilter), - SystemInfo, SwitchClock(u8), @@ -46,30 +39,12 @@ pub enum Request { FlashWrite { key: String, value: Vec }, FlashRemove { key: String }, FlashErase, - - Hotswap(Vec), } impl Request { pub fn read_from(reader: &mut Read) -> io::Result { read_sync(reader)?; Ok(match reader.read_u8()? { - 1 => Request::Log, - 2 => Request::LogClear, - #[cfg(feature = "log")] - 13 => { - let level = match reader.read_u8()? { - 0 => LogLevelFilter::Off, - 1 => LogLevelFilter::Error, - 2 => LogLevelFilter::Warn, - 3 => LogLevelFilter::Info, - 4 => LogLevelFilter::Debug, - 5 => LogLevelFilter::Trace, - _ => return Err(io::Error::new(io::ErrorKind::InvalidData, - "invalid log level")) - }; - Request::LogFilter(level) - } 3 => Request::SystemInfo, 4 => Request::SwitchClock(reader.read_u8()?), 5 => Request::LoadKernel(reader.read_bytes()?), @@ -99,7 +74,6 @@ impl Request { 12 => Request::FlashRemove { key: reader.read_string()? }, - 14 => Request::Hotswap(reader.read_bytes()?), _ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unknown request type")) }) } @@ -107,8 +81,6 @@ impl Request { #[derive(Debug)] pub enum Reply<'a> { - Log(&'a str), - SystemInfo { ident: &'a str, finished_cleanly: bool @@ -140,19 +112,12 @@ pub enum Reply<'a> { WatchdogExpired, ClockFailure, - - HotswapImminent, } impl<'a> Reply<'a> { pub fn write_to(&self, writer: &mut Write) -> io::Result<()> { write_sync(writer)?; match *self { - Reply::Log(ref log) => { - writer.write_u8(1)?; - writer.write_string(log)?; - }, - Reply::SystemInfo { ident, finished_cleanly } => { writer.write_u8(2)?; writer.write(b"AROR")?; @@ -221,9 +186,6 @@ impl<'a> Reply<'a> { Reply::ClockFailure => { writer.write_u8(15)?; }, - Reply::HotswapImminent => { - writer.write_u8(16)?; - } } Ok(()) } diff --git a/artiq/firmware/runtime/lib.rs b/artiq/firmware/runtime/lib.rs index f92b3c666..168057590 100644 --- a/artiq/firmware/runtime/lib.rs +++ b/artiq/firmware/runtime/lib.rs @@ -22,7 +22,7 @@ extern crate drtioaux; use std::boxed::Box; use smoltcp::wire::{EthernetAddress, IpAddress}; -use proto::{analyzer_proto, moninj_proto, rpc_proto, session_proto, kernel_proto}; +use proto::{mgmt_proto, analyzer_proto, moninj_proto, rpc_proto, session_proto, kernel_proto}; use amp::{mailbox, rpc_queue}; macro_rules! borrow_mut { @@ -43,6 +43,7 @@ mod sched; mod cache; mod rtio_dma; +mod mgmt; mod kernel; mod session; #[cfg(any(has_rtio_moninj, has_drtio))] @@ -124,6 +125,7 @@ fn startup() { let mut scheduler = sched::Scheduler::new(); let io = scheduler.io(); rtio_mgt::startup(&io); + io.spawn(4096, mgmt::thread); io.spawn(16384, session::thread); #[cfg(any(has_rtio_moninj, has_drtio))] io.spawn(4096, moninj::thread); diff --git a/artiq/firmware/runtime/mgmt.rs b/artiq/firmware/runtime/mgmt.rs new file mode 100644 index 000000000..f195a6846 --- /dev/null +++ b/artiq/firmware/runtime/mgmt.rs @@ -0,0 +1,72 @@ +use std::io::{self, Read}; +use logger_artiq::BufferLogger; +use sched::Io; +use sched::{TcpListener, TcpStream}; +use board; +use mgmt_proto::*; + +fn check_magic(stream: &mut TcpStream) -> io::Result<()> { + const MAGIC: &'static [u8] = b"ARTIQ management\n"; + + let mut magic: [u8; 17] = [0; 17]; + stream.read_exact(&mut magic)?; + if magic != MAGIC { + Err(io::Error::new(io::ErrorKind::InvalidData, "unrecognized magic")) + } else { + Ok(()) + } +} + +fn worker(mut stream: &mut TcpStream) -> io::Result<()> { + check_magic(&mut stream)?; + info!("new connection from {}", stream.remote_endpoint()); + + loop { + match Request::read_from(stream)? { + Request::GetLog => { + BufferLogger::with_instance(|logger| { + logger.extract(|log| { + Reply::LogContent(log).write_to(stream) + }) + })?; + }, + + Request::ClearLog => { + BufferLogger::with_instance(|logger| + logger.clear()); + Reply::Success.write_to(stream)?; + }, + + Request::SetLogFilter(level) => { + info!("changing log level to {}", level); + BufferLogger::with_instance(|logger| + logger.set_max_log_level(level)); + Reply::Success.write_to(stream)?; + }, + + Request::Hotswap(firmware) => { + Reply::HotswapImminent.write_to(stream)?; + stream.close()?; + warn!("hotswapping firmware"); + unsafe { board::boot::hotswap(&firmware) } + }, + }; + } +} + +pub fn thread(io: Io) { + let listener = TcpListener::new(&io, 4096); + listener.listen(1380).expect("mgmt: cannot listen"); + info!("management interface active"); + + loop { + let stream = listener.accept().expect("mgmt: cannot accept").into_handle(); + io.spawn(4096, move |io| { + let mut stream = TcpStream::from_handle(&io, stream); + match worker(&mut stream) { + Ok(()) => {}, + Err(err) => error!("aborted: {}", err) + } + }); + } +} diff --git a/artiq/firmware/runtime/session.rs b/artiq/firmware/runtime/session.rs index 2e33275f6..88a96c16e 100644 --- a/artiq/firmware/runtime/session.rs +++ b/artiq/firmware/runtime/session.rs @@ -116,7 +116,6 @@ fn host_read(stream: &mut TcpStream) -> io::Result { let request = host::Request::read_from(stream)?; match &request { &host::Request::LoadKernel(_) => debug!("comm<-host LoadLibrary(...)"), - &host::Request::Hotswap(_) => debug!("comm<-host Hotswap(...)"), _ => debug!("comm<-host {:?}", request) } Ok(request) @@ -217,26 +216,6 @@ fn process_host_message(io: &Io, Ok(()) } - // artiq_corelog - host::Request::Log => { - // Logging the packet with the log is inadvisable - debug!("comm->host Log(...)"); - BufferLogger::with_instance(|logger| { - logger.extract(|log| { - host::Reply::Log(log).write_to(stream) - }) - }) - } - host::Request::LogClear => { - BufferLogger::with_instance(|logger| logger.clear()); - host_write(stream, host::Reply::Log("")) - } - host::Request::LogFilter(filter) => { - info!("changing log level to {}", filter); - BufferLogger::with_instance(|logger| logger.set_max_log_level(filter)); - Ok(()) - } - // artiq_coreconfig host::Request::FlashRead { ref key } => { config::read(key, |result| { @@ -269,14 +248,6 @@ fn process_host_message(io: &Io, } } - // artiq_coreboot - host::Request::Hotswap(binary) => { - host_write(stream, host::Reply::HotswapImminent)?; - stream.close()?; - warn!("hotswapping firmware"); - unsafe { board::boot::hotswap(&binary) } - } - // artiq_run/artiq_master host::Request::SwitchClock(clk) => { if session.running() { diff --git a/artiq/frontend/artiq_coreboot.py b/artiq/frontend/artiq_coreboot.py index b02c78793..ca5b7888a 100755 --- a/artiq/frontend/artiq_coreboot.py +++ b/artiq/frontend/artiq_coreboot.py @@ -6,6 +6,7 @@ import struct from artiq.tools import verbosity_args, init_logger from artiq.master.databases import DeviceDB from artiq.master.worker_db import DeviceManager +from artiq.coredevice.comm_mgmt import CommMgmt def get_argparser(): @@ -26,9 +27,9 @@ def main(): init_logger(args) device_mgr = DeviceManager(DeviceDB(args.device_db)) try: - comm = device_mgr.get("comm") - comm.check_system_info() - comm.hotswap(args.image.read()) + core_addr = device_mgr.get_desc("comm")["arguments"]["host"] + mgmt = CommMgmt(device_mgr, core_addr) + mgmt.hotswap(args.image.read()) finally: device_mgr.close_devices() diff --git a/artiq/frontend/artiq_corelog.py b/artiq/frontend/artiq_corelog.py index 3fcb44f14..d3720284f 100755 --- a/artiq/frontend/artiq_corelog.py +++ b/artiq/frontend/artiq_corelog.py @@ -5,6 +5,7 @@ import argparse from artiq.tools import verbosity_args, init_logger from artiq.master.databases import DeviceDB from artiq.master.worker_db import DeviceManager +from artiq.coredevice.comm_mgmt import CommMgmt def get_argparser(): @@ -16,6 +17,9 @@ def get_argparser(): subparsers = parser.add_subparsers(dest="action") + p_clear = subparsers.add_parser("clear", + help="clear log buffer") + p_set_level = subparsers.add_parser("set_level", help="set minimum level for messages to be logged") p_set_level.add_argument("level", metavar="LEVEL", type=str, @@ -29,12 +33,14 @@ def main(): init_logger(args) device_mgr = DeviceManager(DeviceDB(args.device_db)) try: - comm = device_mgr.get("comm") - comm.check_system_info() + core_addr = device_mgr.get_desc("comm")["arguments"]["host"] + mgmt = CommMgmt(device_mgr, core_addr) if args.action == "set_level": - comm.set_log_level(args.level) + mgmt.set_log_level(args.level) + elif args.action == "clear": + mgmt.clear_log() else: - print(comm.get_log(), end="") + print(mgmt.get_log(), end="") finally: device_mgr.close_devices() diff --git a/artiq/frontend/artiq_devtool.py b/artiq/frontend/artiq_devtool.py index 644ae8913..02f189a1c 100755 --- a/artiq/frontend/artiq_devtool.py +++ b/artiq/frontend/artiq_devtool.py @@ -179,7 +179,7 @@ def main(): local_stream.close() remote_stream.close() - for port in (1381, 1382): + for port in (1380, 1381, 1382): thread = threading.Thread(target=forwarder, args=(port,), name="port-{}".format(port), daemon=True) thread.start() diff --git a/doc/manual/default_network_ports.rst b/doc/manual/default_network_ports.rst index 898069555..e3e669019 100644 --- a/doc/manual/default_network_ports.rst +++ b/doc/manual/default_network_ports.rst @@ -4,6 +4,8 @@ Default network ports +--------------------------+--------------+ | Component | Default port | +==========================+==============+ +| Core device (management) | 1380 | ++--------------------------+--------------+ | Core device (main) | 1381 | +--------------------------+--------------+ | Core device (analyzer) | 1382 |