artiq/artiq/coredevice/comm_mgmt.py

197 lines
5.2 KiB
Python

from enum import Enum
import logging
import struct
from sipyco.keepalive import create_connection
logger = logging.getLogger(__name__)
class Request(Enum):
GetLog = 1
ClearLog = 2
PullLog = 7
SetLogFilter = 3
SetUartLogFilter = 6
ConfigRead = 12
ConfigWrite = 13
ConfigRemove = 14
ConfigErase = 15
Reboot = 5
DebugAllocator = 8
class Reply(Enum):
Success = 1
Error = 6
Unavailable = 4
LogContent = 2
ConfigData = 7
RebootImminent = 3
class LogLevel(Enum):
OFF = 0
ERROR = 1
WARN = 2
INFO = 3
DEBUG = 4
TRACE = 5
class CommMgmt:
def __init__(self, host, port=1380):
self.host = host
self.port = port
def open(self):
if hasattr(self, "socket"):
return
self.socket = create_connection(self.host, self.port)
self.socket.sendall(b"ARTIQ management\n")
endian = self._read(1)
if endian == b"e":
self.endian = "<"
elif endian == b"E":
self.endian = ">"
else:
raise IOError("Incorrect reply from device: expected e/E.")
def close(self):
if not hasattr(self, "socket"):
return
self.socket.close()
del self.socket
logger.debug("disconnected")
# Protocol elements
def _write(self, data):
self.socket.sendall(data)
def _write_header(self, ty):
self.open()
logger.debug("sending message: type=%r", ty)
self._write(struct.pack("B", ty.value))
def _write_int8(self, value):
self._write(struct.pack("B", value))
def _write_int32(self, value):
self._write(struct.pack(self.endian + "l", value))
def _write_bytes(self, value):
self._write_int32(len(value))
self._write(value)
def _write_string(self, value):
self._write_bytes(value.encode("utf-8"))
def _read(self, length):
r = bytes()
while len(r) < length:
rn = self.socket.recv(min(8192, length - len(r)))
if not rn:
raise ConnectionResetError("Connection closed")
r += rn
return r
def _read_header(self):
ty = Reply(*struct.unpack("B", self._read(1)))
logger.debug("receiving message: type=%r", ty)
return ty
def _read_expect(self, ty):
header = self._read_header()
if header != ty:
raise IOError("Incorrect reply from device: {} (expected {})".
format(header, ty))
def _read_int32(self):
(value, ) = struct.unpack(self.endian + "l", self._read(4))
return value
def _read_bytes(self):
return self._read(self._read_int32())
def _read_string(self):
return self._read_bytes().decode("utf-8")
# External API
def get_log(self):
self._write_header(Request.GetLog)
self._read_expect(Reply.LogContent)
return self._read_string()
def clear_log(self):
self._write_header(Request.ClearLog)
self._read_expect(Reply.Success)
def pull_log(self):
self._write_header(Request.PullLog)
self._read_expect(Reply.LogContent)
return self._read_string()
def set_log_level(self, level):
if level not in LogLevel.__members__:
raise ValueError("invalid log level {}".format(level))
self._write_header(Request.SetLogFilter)
self._write_int8(getattr(LogLevel, level).value)
self._read_expect(Reply.Success)
def set_uart_log_level(self, level):
if level not in LogLevel.__members__:
raise ValueError("invalid log level {}".format(level))
self._write_header(Request.SetUartLogFilter)
self._write_int8(getattr(LogLevel, level).value)
self._read_expect(Reply.Success)
def config_read(self, key):
self._write_header(Request.ConfigRead)
self._write_string(key)
ty = self._read_header()
if ty == Reply.Error:
raise IOError("Device failed to read config. The key may not exist.")
elif ty != Reply.ConfigData:
raise IOError("Incorrect reply from device: {} (expected {})".
format(ty, Reply.ConfigData))
return self._read_string()
def config_write(self, key, value):
self._write_header(Request.ConfigWrite)
self._write_string(key)
self._write_bytes(value)
ty = self._read_header()
if ty == Reply.Error:
raise IOError("Device failed to write config. More information may be available in the log.")
elif ty != Reply.Success:
raise IOError("Incorrect reply from device: {} (expected {})".
format(ty, Reply.Success))
def config_remove(self, key):
self._write_header(Request.ConfigRemove)
self._write_string(key)
self._read_expect(Reply.Success)
def config_erase(self):
self._write_header(Request.ConfigErase)
self._read_expect(Reply.Success)
def reboot(self):
self._write_header(Request.Reboot)
self._read_expect(Reply.RebootImminent)
def debug_allocator(self):
self._write_header(Request.DebugAllocator)