From f332c1d3cc8482c7ddbf56f5fb776a19e7917196 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 16 Oct 2015 20:08:11 +0800 Subject: [PATCH] ctlmgr: forward controller logs --- artiq/frontend/artiq_ctlmgr.py | 45 ++++++++++++++++++++++++---------- artiq/master/log.py | 20 +++------------ artiq/protocols/logging.py | 16 ++++++++++++ 3 files changed, 51 insertions(+), 30 deletions(-) diff --git a/artiq/frontend/artiq_ctlmgr.py b/artiq/frontend/artiq_ctlmgr.py index 0a3149b78..b4e5dacc2 100755 --- a/artiq/frontend/artiq_ctlmgr.py +++ b/artiq/frontend/artiq_ctlmgr.py @@ -5,12 +5,15 @@ import atexit import argparse import os import logging +import subprocess import shlex import socket from artiq.protocols.sync_struct import Subscriber from artiq.protocols.pc_rpc import AsyncioClient, Server -from artiq.protocols.logging import LogForwarder +from artiq.protocols.logging import (LogForwarder, + parse_log_message, log_with_name, + SourceFilter) from artiq.tools import TaskObject, Condition @@ -75,6 +78,23 @@ class Controller: else: break + async def forward_logs(self, stream): + source = "controller({})".format(self.name) + while True: + try: + entry = (await stream.readline()) + if not entry: + break + entry = entry[:-1] + level, name, message = parse_log_message(entry.decode()) + log_with_name(name, level, message, extra={"source": source}) + except: + logger.debug("exception in log forwarding", exc_info=True) + break + logger.debug("stopped log forwarding of stream %s of %s", + stream, self.name) + + async def launcher(self): try: while True: @@ -82,7 +102,12 @@ class Controller: self.name, self.command) try: self.process = await asyncio.create_subprocess_exec( - *shlex.split(self.command)) + *shlex.split(self.command), + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + asyncio.ensure_future(self.forward_logs( + self.process.stdout)) + asyncio.ensure_future(self.forward_logs( + self.process.stderr)) await self._wait_and_ping() except FileNotFoundError: logger.warning("Controller %s failed to start", self.name) @@ -236,9 +261,9 @@ def get_argparser(): group = parser.add_argument_group("verbosity") group.add_argument("-v", "--verbose", default=0, action="count", - help="increase logging level") + help="increase logging level of the manager process") group.add_argument("-q", "--quiet", default=0, action="count", - help="decrease logging level") + help="decrease logging level of the manager process") parser.add_argument( "-s", "--server", default="::1", @@ -261,19 +286,13 @@ def get_argparser(): return parser -class SourceAdder: - def filter(self, record): - if not hasattr(record, "source"): - record.source = "ctlmgr" - return True - - def main(): args = get_argparser().parse_args() root_logger = logging.getLogger() - root_logger.setLevel(logging.WARNING + args.quiet*10 - args.verbose*10) - source_adder = SourceAdder() + root_logger.setLevel(logging.NOTSET) + source_adder = SourceFilter(logging.WARNING + args.quiet*10 - args.verbose*10, + "ctlmgr") console_handler = logging.StreamHandler() console_handler.setFormatter(logging.Formatter( "%(levelname)s:%(source)s:%(name)s:%(message)s")) diff --git a/artiq/master/log.py b/artiq/master/log.py index f588e839e..f8e6939ef 100644 --- a/artiq/master/log.py +++ b/artiq/master/log.py @@ -2,7 +2,7 @@ import logging import logging.handlers from artiq.protocols.sync_struct import Notifier -from artiq.protocols.logging import parse_log_message, log_with_name +from artiq.protocols.logging import parse_log_message, log_with_name, SourceFilter class LogBuffer: @@ -34,21 +34,6 @@ def log_worker(rid, message): log_worker.worker_pass_rid = True -class SourceFilter: - def __init__(self, master_level): - self.master_level = master_level - - def filter(self, record): - if not hasattr(record, "source"): - record.source = "master" - if record.source == "master": - return record.levelno >= self.master_level - else: - # log messages that are forwarded from a source have already - # been filtered, and may have a level below the master level. - return True - - def log_args(parser): group = parser.add_argument_group("logging") group.add_argument("-v", "--verbose", default=0, action="count", @@ -69,7 +54,8 @@ def log_args(parser): def init_log(args): root_logger = logging.getLogger() root_logger.setLevel(logging.NOTSET) # we use our custom filter only - flt = SourceFilter(logging.WARNING + args.quiet*10 - args.verbose*10) + flt = SourceFilter(logging.WARNING + args.quiet*10 - args.verbose*10, + "master") full_fmt = logging.Formatter( "%(levelname)s:%(source)s:%(name)s:%(message)s") diff --git a/artiq/protocols/logging.py b/artiq/protocols/logging.py index ae2d5a0e2..61b9c07fb 100644 --- a/artiq/protocols/logging.py +++ b/artiq/protocols/logging.py @@ -78,6 +78,22 @@ class Server(AsyncioServer): writer.close() +class SourceFilter: + def __init__(self, local_level, local_source): + self.local_level = local_level + self.local_source = local_source + + def filter(self, record): + if not hasattr(record, "source"): + record.source = self.local_source + if record.source == self.local_source: + return record.levelno >= self.local_level + else: + # log messages that are forwarded from a source have already + # been filtered, and may have a level below the local level. + return True + + class LogForwarder(logging.Handler, TaskObject): def __init__(self, host, port, reconnect_timer=5.0, queue_size=1000, **kwargs):