Extract core device management interface from session interface (#691).

This commit is contained in:
whitequark 2017-03-29 03:35:45 +00:00
parent 0cda1a3d34
commit 948ed6fb0a
12 changed files with 294 additions and 101 deletions

View File

@ -281,25 +281,6 @@ class CommKernel:
self._read_empty(_D2HMsgType.CLOCK_SWITCH_COMPLETED)
def get_log(self):
self._write_empty(_H2DMsgType.LOG_REQUEST)
self._read_header()
self._read_expect(_D2HMsgType.LOG_REPLY)
return self._read_string()
def clear_log(self):
self._write_empty(_H2DMsgType.LOG_CLEAR)
self._read_empty(_D2HMsgType.LOG_REPLY)
def set_log_level(self, level):
if level not in _LogLevel.__members__:
raise ValueError("invalid log level {}".format(level))
self._write_header(_H2DMsgType.LOG_FILTER)
self._write_int8(getattr(_LogLevel, level).value)
def flash_storage_read(self, key):
self._write_header(_H2DMsgType.FLASH_READ_REQUEST)
self._write_string(key)
@ -330,12 +311,6 @@ class CommKernel:
self._read_empty(_D2HMsgType.FLASH_OK_REPLY)
def hotswap(self, image):
self._write_header(_H2DMsgType.HOTSWAP)
self._write_bytes(image)
self._read_empty(_D2HMsgType.HOTSWAP_IMMINENT)
def load(self, kernel_library):
self._write_header(_H2DMsgType.LOAD_KERNEL)
self._write_bytes(kernel_library)

View File

@ -0,0 +1,133 @@
from enum import Enum
import logging
import socket
import struct
logger = logging.getLogger(__name__)
class Request(Enum):
GetLog = 1
ClearLog = 2
SetLogFilter = 3
Hotswap = 4
class Reply(Enum):
Success = 1
LogContent = 2
HotswapImminent = 3
class LogLevel(Enum):
OFF = 0
ERROR = 1
WARN = 2
INFO = 3
DEBUG = 4
TRACE = 5
def initialize_connection(host, port):
sock = socket.create_connection((host, port), 5.0)
sock.settimeout(None)
logger.debug("connected to host %s on port %d", host, port)
return sock
class CommMgmt:
def __init__(self, dmgr, host, port=1380):
self.host = host
self.port = port
def open(self):
if hasattr(self, "socket"):
return
self.socket = initialize_connection(self.host, self.port)
self.socket.sendall(b"ARTIQ management\n")
def close(self):
if not hasattr(self, "socket"):
return
self.socket.close()
del self.socket
logger.debug("disconnected")
# Protocol elements
def _write(self, data):
self.socket.sendall(data)
def _write_header(self, ty):
self.open()
logger.debug("sending message: type=%r", ty)
self._write(struct.pack("B", ty.value))
def _write_int8(self, value):
self._write(struct.pack("B", value))
def _write_int32(self, value):
self._write(struct.pack(">l", value))
def _write_bytes(self, value):
self._write_int32(len(value))
self._write(value)
def _read(self, length):
r = bytes()
while len(r) < length:
rn = self.socket.recv(min(8192, length - len(r)))
if not rn:
raise ConnectionResetError("Connection closed")
r += rn
return r
def _read_header(self):
ty = Reply(*struct.unpack("B", self._read(1)))
logger.debug("receiving message: type=%r", ty)
return ty
def _read_expect(self, ty):
if self._read_header() != ty:
raise IOError("Incorrect reply from device: {} (expected {})".
format(self._read_type, ty))
def _read_int32(self):
(value, ) = struct.unpack(">l", self._read(4))
return value
def _read_bytes(self):
return self._read(self._read_int32())
def _read_string(self):
return self._read_bytes().decode("utf-8")
# External API
def get_log(self):
self._write_header(Request.GetLog)
self._read_expect(Reply.LogContent)
return self._read_string()
def clear_log(self):
self._write_header(Request.ClearLog)
self._read_expect(Reply.Success)
def set_log_level(self, level):
if level not in LogLevel.__members__:
raise ValueError("invalid log level {}".format(level))
self._write_header(Request.SetLogFilter)
self._write_int8(getattr(LogLevel, level).value)
self._read_expect(Reply.Success)
def hotswap(self, firmware):
self._write_header(Request.Hotswap)
self._write_bytes(firmware)
self._read_expect(Reply.HotswapImminent)

View File

@ -13,6 +13,7 @@ extern crate std_artiq as std;
pub mod kernel_proto;
// External protocols.
pub mod mgmt_proto;
pub mod analyzer_proto;
pub mod moninj_proto;
pub mod session_proto;

View File

@ -0,0 +1,68 @@
use std::vec::Vec;
use std::io::{self, Read, Write};
use {ReadExt, WriteExt};
#[cfg(feature = "log")]
use log::LogLevelFilter;
#[derive(Debug)]
pub enum Request {
GetLog,
ClearLog,
#[cfg(feature = "log")]
SetLogFilter(LogLevelFilter),
Hotswap(Vec<u8>),
}
pub enum Reply<'a> {
Success,
LogContent(&'a str),
HotswapImminent,
}
impl Request {
pub fn read_from(reader: &mut Read) -> io::Result<Request> {
Ok(match reader.read_u8()? {
1 => Request::GetLog,
2 => Request::ClearLog,
#[cfg(feature = "log")]
3 => {
let level = match reader.read_u8()? {
0 => LogLevelFilter::Off,
1 => LogLevelFilter::Error,
2 => LogLevelFilter::Warn,
3 => LogLevelFilter::Info,
4 => LogLevelFilter::Debug,
5 => LogLevelFilter::Trace,
_ => return Err(io::Error::new(io::ErrorKind::InvalidData,
"invalid log level"))
};
Request::SetLogFilter(level)
}
4 => Request::Hotswap(reader.read_bytes()?),
_ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unknown request type"))
})
}
}
impl<'a> Reply<'a> {
pub fn write_to(&self, writer: &mut Write) -> io::Result<()> {
match *self {
Reply::Success => {
writer.write_u8(1)?;
},
Reply::LogContent(ref log) => {
writer.write_u8(2)?;
writer.write_string(log)?;
},
Reply::HotswapImminent => {
writer.write_u8(3)?;
},
}
Ok(())
}
}

View File

@ -1,8 +1,6 @@
use std::io::{self, Read, Write};
use std::vec::Vec;
use std::string::String;
#[cfg(feature = "log")]
use log::LogLevelFilter;
use {ReadExt, WriteExt};
fn read_sync(reader: &mut Read) -> io::Result<()> {
@ -20,11 +18,6 @@ fn write_sync(writer: &mut Write) -> io::Result<()> {
#[derive(Debug)]
pub enum Request {
Log,
LogClear,
#[cfg(feature = "log")]
LogFilter(LogLevelFilter),
SystemInfo,
SwitchClock(u8),
@ -46,30 +39,12 @@ pub enum Request {
FlashWrite { key: String, value: Vec<u8> },
FlashRemove { key: String },
FlashErase,
Hotswap(Vec<u8>),
}
impl Request {
pub fn read_from(reader: &mut Read) -> io::Result<Request> {
read_sync(reader)?;
Ok(match reader.read_u8()? {
1 => Request::Log,
2 => Request::LogClear,
#[cfg(feature = "log")]
13 => {
let level = match reader.read_u8()? {
0 => LogLevelFilter::Off,
1 => LogLevelFilter::Error,
2 => LogLevelFilter::Warn,
3 => LogLevelFilter::Info,
4 => LogLevelFilter::Debug,
5 => LogLevelFilter::Trace,
_ => return Err(io::Error::new(io::ErrorKind::InvalidData,
"invalid log level"))
};
Request::LogFilter(level)
}
3 => Request::SystemInfo,
4 => Request::SwitchClock(reader.read_u8()?),
5 => Request::LoadKernel(reader.read_bytes()?),
@ -99,7 +74,6 @@ impl Request {
12 => Request::FlashRemove {
key: reader.read_string()?
},
14 => Request::Hotswap(reader.read_bytes()?),
_ => return Err(io::Error::new(io::ErrorKind::InvalidData, "unknown request type"))
})
}
@ -107,8 +81,6 @@ impl Request {
#[derive(Debug)]
pub enum Reply<'a> {
Log(&'a str),
SystemInfo {
ident: &'a str,
finished_cleanly: bool
@ -140,19 +112,12 @@ pub enum Reply<'a> {
WatchdogExpired,
ClockFailure,
HotswapImminent,
}
impl<'a> Reply<'a> {
pub fn write_to(&self, writer: &mut Write) -> io::Result<()> {
write_sync(writer)?;
match *self {
Reply::Log(ref log) => {
writer.write_u8(1)?;
writer.write_string(log)?;
},
Reply::SystemInfo { ident, finished_cleanly } => {
writer.write_u8(2)?;
writer.write(b"AROR")?;
@ -221,9 +186,6 @@ impl<'a> Reply<'a> {
Reply::ClockFailure => {
writer.write_u8(15)?;
},
Reply::HotswapImminent => {
writer.write_u8(16)?;
}
}
Ok(())
}

View File

@ -22,7 +22,7 @@ extern crate drtioaux;
use std::boxed::Box;
use smoltcp::wire::{EthernetAddress, IpAddress};
use proto::{analyzer_proto, moninj_proto, rpc_proto, session_proto, kernel_proto};
use proto::{mgmt_proto, analyzer_proto, moninj_proto, rpc_proto, session_proto, kernel_proto};
use amp::{mailbox, rpc_queue};
macro_rules! borrow_mut {
@ -43,6 +43,7 @@ mod sched;
mod cache;
mod rtio_dma;
mod mgmt;
mod kernel;
mod session;
#[cfg(any(has_rtio_moninj, has_drtio))]
@ -124,6 +125,7 @@ fn startup() {
let mut scheduler = sched::Scheduler::new();
let io = scheduler.io();
rtio_mgt::startup(&io);
io.spawn(4096, mgmt::thread);
io.spawn(16384, session::thread);
#[cfg(any(has_rtio_moninj, has_drtio))]
io.spawn(4096, moninj::thread);

View File

@ -0,0 +1,72 @@
use std::io::{self, Read};
use logger_artiq::BufferLogger;
use sched::Io;
use sched::{TcpListener, TcpStream};
use board;
use mgmt_proto::*;
fn check_magic(stream: &mut TcpStream) -> io::Result<()> {
const MAGIC: &'static [u8] = b"ARTIQ management\n";
let mut magic: [u8; 17] = [0; 17];
stream.read_exact(&mut magic)?;
if magic != MAGIC {
Err(io::Error::new(io::ErrorKind::InvalidData, "unrecognized magic"))
} else {
Ok(())
}
}
fn worker(mut stream: &mut TcpStream) -> io::Result<()> {
check_magic(&mut stream)?;
info!("new connection from {}", stream.remote_endpoint());
loop {
match Request::read_from(stream)? {
Request::GetLog => {
BufferLogger::with_instance(|logger| {
logger.extract(|log| {
Reply::LogContent(log).write_to(stream)
})
})?;
},
Request::ClearLog => {
BufferLogger::with_instance(|logger|
logger.clear());
Reply::Success.write_to(stream)?;
},
Request::SetLogFilter(level) => {
info!("changing log level to {}", level);
BufferLogger::with_instance(|logger|
logger.set_max_log_level(level));
Reply::Success.write_to(stream)?;
},
Request::Hotswap(firmware) => {
Reply::HotswapImminent.write_to(stream)?;
stream.close()?;
warn!("hotswapping firmware");
unsafe { board::boot::hotswap(&firmware) }
},
};
}
}
pub fn thread(io: Io) {
let listener = TcpListener::new(&io, 4096);
listener.listen(1380).expect("mgmt: cannot listen");
info!("management interface active");
loop {
let stream = listener.accept().expect("mgmt: cannot accept").into_handle();
io.spawn(4096, move |io| {
let mut stream = TcpStream::from_handle(&io, stream);
match worker(&mut stream) {
Ok(()) => {},
Err(err) => error!("aborted: {}", err)
}
});
}
}

View File

@ -116,7 +116,6 @@ fn host_read(stream: &mut TcpStream) -> io::Result<host::Request> {
let request = host::Request::read_from(stream)?;
match &request {
&host::Request::LoadKernel(_) => debug!("comm<-host LoadLibrary(...)"),
&host::Request::Hotswap(_) => debug!("comm<-host Hotswap(...)"),
_ => debug!("comm<-host {:?}", request)
}
Ok(request)
@ -217,26 +216,6 @@ fn process_host_message(io: &Io,
Ok(())
}
// artiq_corelog
host::Request::Log => {
// Logging the packet with the log is inadvisable
debug!("comm->host Log(...)");
BufferLogger::with_instance(|logger| {
logger.extract(|log| {
host::Reply::Log(log).write_to(stream)
})
})
}
host::Request::LogClear => {
BufferLogger::with_instance(|logger| logger.clear());
host_write(stream, host::Reply::Log(""))
}
host::Request::LogFilter(filter) => {
info!("changing log level to {}", filter);
BufferLogger::with_instance(|logger| logger.set_max_log_level(filter));
Ok(())
}
// artiq_coreconfig
host::Request::FlashRead { ref key } => {
config::read(key, |result| {
@ -269,14 +248,6 @@ fn process_host_message(io: &Io,
}
}
// artiq_coreboot
host::Request::Hotswap(binary) => {
host_write(stream, host::Reply::HotswapImminent)?;
stream.close()?;
warn!("hotswapping firmware");
unsafe { board::boot::hotswap(&binary) }
}
// artiq_run/artiq_master
host::Request::SwitchClock(clk) => {
if session.running() {

View File

@ -6,6 +6,7 @@ import struct
from artiq.tools import verbosity_args, init_logger
from artiq.master.databases import DeviceDB
from artiq.master.worker_db import DeviceManager
from artiq.coredevice.comm_mgmt import CommMgmt
def get_argparser():
@ -26,9 +27,9 @@ def main():
init_logger(args)
device_mgr = DeviceManager(DeviceDB(args.device_db))
try:
comm = device_mgr.get("comm")
comm.check_system_info()
comm.hotswap(args.image.read())
core_addr = device_mgr.get_desc("comm")["arguments"]["host"]
mgmt = CommMgmt(device_mgr, core_addr)
mgmt.hotswap(args.image.read())
finally:
device_mgr.close_devices()

View File

@ -5,6 +5,7 @@ import argparse
from artiq.tools import verbosity_args, init_logger
from artiq.master.databases import DeviceDB
from artiq.master.worker_db import DeviceManager
from artiq.coredevice.comm_mgmt import CommMgmt
def get_argparser():
@ -16,6 +17,9 @@ def get_argparser():
subparsers = parser.add_subparsers(dest="action")
p_clear = subparsers.add_parser("clear",
help="clear log buffer")
p_set_level = subparsers.add_parser("set_level",
help="set minimum level for messages to be logged")
p_set_level.add_argument("level", metavar="LEVEL", type=str,
@ -29,12 +33,14 @@ def main():
init_logger(args)
device_mgr = DeviceManager(DeviceDB(args.device_db))
try:
comm = device_mgr.get("comm")
comm.check_system_info()
core_addr = device_mgr.get_desc("comm")["arguments"]["host"]
mgmt = CommMgmt(device_mgr, core_addr)
if args.action == "set_level":
comm.set_log_level(args.level)
mgmt.set_log_level(args.level)
elif args.action == "clear":
mgmt.clear_log()
else:
print(comm.get_log(), end="")
print(mgmt.get_log(), end="")
finally:
device_mgr.close_devices()

View File

@ -179,7 +179,7 @@ def main():
local_stream.close()
remote_stream.close()
for port in (1381, 1382):
for port in (1380, 1381, 1382):
thread = threading.Thread(target=forwarder, args=(port,),
name="port-{}".format(port), daemon=True)
thread.start()

View File

@ -4,6 +4,8 @@ Default network ports
+--------------------------+--------------+
| Component | Default port |
+==========================+==============+
| Core device (management) | 1380 |
+--------------------------+--------------+
| Core device (main) | 1381 |
+--------------------------+--------------+
| Core device (analyzer) | 1382 |