forked from M-Labs/artiq
1
0
Fork 0

ctlmgr: forward controller logs

This commit is contained in:
Sebastien Bourdeauducq 2015-10-16 20:08:11 +08:00
parent 786dc14057
commit f332c1d3cc
3 changed files with 51 additions and 30 deletions

View File

@ -5,12 +5,15 @@ import atexit
import argparse import argparse
import os import os
import logging import logging
import subprocess
import shlex import shlex
import socket import socket
from artiq.protocols.sync_struct import Subscriber from artiq.protocols.sync_struct import Subscriber
from artiq.protocols.pc_rpc import AsyncioClient, Server from artiq.protocols.pc_rpc import AsyncioClient, Server
from artiq.protocols.logging import LogForwarder from artiq.protocols.logging import (LogForwarder,
parse_log_message, log_with_name,
SourceFilter)
from artiq.tools import TaskObject, Condition from artiq.tools import TaskObject, Condition
@ -75,6 +78,23 @@ class Controller:
else: else:
break break
async def forward_logs(self, stream):
source = "controller({})".format(self.name)
while True:
try:
entry = (await stream.readline())
if not entry:
break
entry = entry[:-1]
level, name, message = parse_log_message(entry.decode())
log_with_name(name, level, message, extra={"source": source})
except:
logger.debug("exception in log forwarding", exc_info=True)
break
logger.debug("stopped log forwarding of stream %s of %s",
stream, self.name)
async def launcher(self): async def launcher(self):
try: try:
while True: while True:
@ -82,7 +102,12 @@ class Controller:
self.name, self.command) self.name, self.command)
try: try:
self.process = await asyncio.create_subprocess_exec( self.process = await asyncio.create_subprocess_exec(
*shlex.split(self.command)) *shlex.split(self.command),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
asyncio.ensure_future(self.forward_logs(
self.process.stdout))
asyncio.ensure_future(self.forward_logs(
self.process.stderr))
await self._wait_and_ping() await self._wait_and_ping()
except FileNotFoundError: except FileNotFoundError:
logger.warning("Controller %s failed to start", self.name) logger.warning("Controller %s failed to start", self.name)
@ -236,9 +261,9 @@ def get_argparser():
group = parser.add_argument_group("verbosity") group = parser.add_argument_group("verbosity")
group.add_argument("-v", "--verbose", default=0, action="count", group.add_argument("-v", "--verbose", default=0, action="count",
help="increase logging level") help="increase logging level of the manager process")
group.add_argument("-q", "--quiet", default=0, action="count", group.add_argument("-q", "--quiet", default=0, action="count",
help="decrease logging level") help="decrease logging level of the manager process")
parser.add_argument( parser.add_argument(
"-s", "--server", default="::1", "-s", "--server", default="::1",
@ -261,19 +286,13 @@ def get_argparser():
return parser return parser
class SourceAdder:
def filter(self, record):
if not hasattr(record, "source"):
record.source = "ctlmgr"
return True
def main(): def main():
args = get_argparser().parse_args() args = get_argparser().parse_args()
root_logger = logging.getLogger() root_logger = logging.getLogger()
root_logger.setLevel(logging.WARNING + args.quiet*10 - args.verbose*10) root_logger.setLevel(logging.NOTSET)
source_adder = SourceAdder() source_adder = SourceFilter(logging.WARNING + args.quiet*10 - args.verbose*10,
"ctlmgr")
console_handler = logging.StreamHandler() console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter( console_handler.setFormatter(logging.Formatter(
"%(levelname)s:%(source)s:%(name)s:%(message)s")) "%(levelname)s:%(source)s:%(name)s:%(message)s"))

View File

@ -2,7 +2,7 @@ import logging
import logging.handlers import logging.handlers
from artiq.protocols.sync_struct import Notifier from artiq.protocols.sync_struct import Notifier
from artiq.protocols.logging import parse_log_message, log_with_name from artiq.protocols.logging import parse_log_message, log_with_name, SourceFilter
class LogBuffer: class LogBuffer:
@ -34,21 +34,6 @@ def log_worker(rid, message):
log_worker.worker_pass_rid = True log_worker.worker_pass_rid = True
class SourceFilter:
def __init__(self, master_level):
self.master_level = master_level
def filter(self, record):
if not hasattr(record, "source"):
record.source = "master"
if record.source == "master":
return record.levelno >= self.master_level
else:
# log messages that are forwarded from a source have already
# been filtered, and may have a level below the master level.
return True
def log_args(parser): def log_args(parser):
group = parser.add_argument_group("logging") group = parser.add_argument_group("logging")
group.add_argument("-v", "--verbose", default=0, action="count", group.add_argument("-v", "--verbose", default=0, action="count",
@ -69,7 +54,8 @@ def log_args(parser):
def init_log(args): def init_log(args):
root_logger = logging.getLogger() root_logger = logging.getLogger()
root_logger.setLevel(logging.NOTSET) # we use our custom filter only root_logger.setLevel(logging.NOTSET) # we use our custom filter only
flt = SourceFilter(logging.WARNING + args.quiet*10 - args.verbose*10) flt = SourceFilter(logging.WARNING + args.quiet*10 - args.verbose*10,
"master")
full_fmt = logging.Formatter( full_fmt = logging.Formatter(
"%(levelname)s:%(source)s:%(name)s:%(message)s") "%(levelname)s:%(source)s:%(name)s:%(message)s")

View File

@ -78,6 +78,22 @@ class Server(AsyncioServer):
writer.close() writer.close()
class SourceFilter:
def __init__(self, local_level, local_source):
self.local_level = local_level
self.local_source = local_source
def filter(self, record):
if not hasattr(record, "source"):
record.source = self.local_source
if record.source == self.local_source:
return record.levelno >= self.local_level
else:
# log messages that are forwarded from a source have already
# been filtered, and may have a level below the local level.
return True
class LogForwarder(logging.Handler, TaskObject): class LogForwarder(logging.Handler, TaskObject):
def __init__(self, host, port, reconnect_timer=5.0, queue_size=1000, def __init__(self, host, port, reconnect_timer=5.0, queue_size=1000,
**kwargs): **kwargs):