From cbda753f440f3b4e0e7d33eb1641ed63f2bcf1d3 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 16 Oct 2015 00:53:35 +0800 Subject: [PATCH] master: TCP server for remote logging --- artiq/frontend/artiq_master.py | 21 ++++++--- artiq/master/log.py | 42 +++-------------- artiq/protocols/logging.py | 67 ++++++++++++++++++++++++++++ doc/manual/default_network_ports.rst | 2 + 4 files changed, 91 insertions(+), 41 deletions(-) create mode 100644 artiq/protocols/logging.py diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index dc2fb0ac5..186d14e68 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -5,9 +5,10 @@ import argparse import atexit import os -from artiq.protocols.pc_rpc import Server +from artiq.protocols.pc_rpc import Server as RPCServer from artiq.protocols.sync_struct import Publisher -from artiq.master.log import log_args, init_log +from artiq.protocols.logging import Server as LoggingServer +from artiq.master.log import log_args, init_log, log_worker from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.scheduler import Scheduler from artiq.master.worker_db import get_last_rid @@ -27,6 +28,9 @@ def get_argparser(): group.add_argument( "--port-control", default=3251, type=int, help="TCP port to listen to for control (default: %(default)d)") + group.add_argument( + "--port-logging", default=1066, type=int, + help="TCP port to listen to for remote logging (default: %(default)d)") group = parser.add_argument_group("databases") group.add_argument("--device-db", default="device_db.pyon", @@ -49,7 +53,7 @@ def get_argparser(): def main(): args = get_argparser().parse_args() - log_buffer, log_forwarder = init_log(args) + log_buffer = init_log(args) if os.name == "nt": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) @@ -67,7 +71,7 @@ def main(): else: repo_backend = FilesystemBackend(args.repository) repository = Repository(repo_backend, device_db.get_device_db, - log_forwarder.log_worker) + log_worker) atexit.register(repository.close) repository.scan_async() @@ -76,14 +80,14 @@ def main(): "get_device": device_db.get, "get_dataset": dataset_db.get, "update_dataset": dataset_db.update, - "log": log_forwarder.log_worker + "log": log_worker } scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend) worker_handlers["scheduler_submit"] = scheduler.submit scheduler.start() atexit.register(lambda: loop.run_until_complete(scheduler.stop())) - server_control = Server({ + server_control = RPCServer({ "master_device_db": device_db, "master_dataset_db": dataset_db, "master_schedule": scheduler, @@ -104,6 +108,11 @@ def main(): args.bind, args.port_notify)) atexit.register(lambda: loop.run_until_complete(server_notify.stop())) + server_logging = LoggingServer() + loop.run_until_complete(server_logging.start( + args.bind, args.port_logging)) + atexit.register(lambda: loop.run_until_complete(server_logging.stop())) + loop.run_forever() if __name__ == "__main__": diff --git a/artiq/master/log.py b/artiq/master/log.py index 3e6bec2b0..ca8e1fd71 100644 --- a/artiq/master/log.py +++ b/artiq/master/log.py @@ -2,6 +2,7 @@ import logging import logging.handlers from artiq.protocols.sync_struct import Notifier +from artiq.protocols.logging import parse_log_message, log_with_name class LogBuffer: @@ -25,38 +26,11 @@ class LogBufferHandler(logging.Handler): self.log_buffer.log(record.levelno, record.source, record.created, message) -name_to_level = { - "CRITICAL": logging.CRITICAL, - "ERROR": logging.ERROR, - "WARN": logging.WARNING, - "WARNING": logging.WARNING, - "INFO": logging.INFO, - "DEBUG": logging.DEBUG, -} - - -def parse_log_message(msg): - for name, level in name_to_level.items(): - if msg.startswith(name + ":"): - remainder = msg[len(name) + 1:] - try: - idx = remainder.index(":") - except: - continue - return level, remainder[:idx], remainder[idx+1:] - return logging.INFO, "print", msg - - -fwd_logger = logging.getLogger("fwd") - - -class LogForwarder: - def log_worker(self, rid, message): - level, name, message = parse_log_message(message) - fwd_logger.name = name - fwd_logger.log(level, message, - extra={"source": "worker({})".format(rid)}) - log_worker.worker_pass_rid = True +def log_worker(rid, message): + level, name, message = parse_log_message(message) + log_with_name(name, level, message, + extra={"source": "worker({})".format(rid)}) +log_worker.worker_pass_rid = True class SourceFilter: @@ -120,6 +94,4 @@ def init_log(args): handler.addFilter(flt) root_logger.addHandler(handler) - log_forwarder = LogForwarder() - - return log_buffer, log_forwarder + return log_buffer diff --git a/artiq/protocols/logging.py b/artiq/protocols/logging.py new file mode 100644 index 000000000..42cca1572 --- /dev/null +++ b/artiq/protocols/logging.py @@ -0,0 +1,67 @@ +import asyncio +import logging + +from artiq.protocols.asyncio_server import AsyncioServer + + +_fwd_logger = logging.getLogger("fwd") + + +def log_with_name(name, *args, **kwargs): + _fwd_logger.name = name + _fwd_logger.log(*args, **kwargs) + + +_name_to_level = { + "CRITICAL": logging.CRITICAL, + "ERROR": logging.ERROR, + "WARN": logging.WARNING, + "WARNING": logging.WARNING, + "INFO": logging.INFO, + "DEBUG": logging.DEBUG, +} + + +def parse_log_message(msg): + for name, level in _name_to_level.items(): + if msg.startswith(name + ":"): + remainder = msg[len(name) + 1:] + try: + idx = remainder.index(":") + except: + continue + return level, remainder[:idx], remainder[idx+1:] + return logging.INFO, "print", msg + + +_init_string = b"ARTIQ logging\n" + + +class Server(AsyncioServer): + async def _handle_connection_cr(self, reader, writer): + try: + line = await reader.readline() + if line != _init_string: + return + + while True: + line = await reader.readline() + if not line: + break + try: + line = line.decode() + except: + return + line = line[:-1] + linesplit = line.split(":", 4) + if len(linesplit) != 4: + return + source, levelname, name, message = linesplit + try: + level = _name_to_level[levelname] + except KeyError: + return + log_with_name(name, level, message, + extra={"source": source}) + finally: + writer.close() diff --git a/doc/manual/default_network_ports.rst b/doc/manual/default_network_ports.rst index 35e576d2b..ab50291af 100644 --- a/doc/manual/default_network_ports.rst +++ b/doc/manual/default_network_ports.rst @@ -8,6 +8,8 @@ Default network ports +--------------------------+--------------+ | Core device (mon/inj) | 3250 (UDP) | +--------------------------+--------------+ +| Master (logging) | 1066 | ++--------------------------+--------------+ | InfluxDB bridge | 3248 | +--------------------------+--------------+ | Controller manager | 3249 |