From a583a923d881b8128bd9a2cc66c7ad5ebe6076fe Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 26 Jan 2016 14:59:36 +0100 Subject: [PATCH 1/7] worker: use pipe_ipc (no log) --- artiq/master/worker.py | 35 +++++++++++++++++------------------ artiq/master/worker_impl.py | 35 ++++++++++------------------------- 2 files changed, 27 insertions(+), 43 deletions(-) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 16b782a54..194e24ed7 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -7,7 +7,7 @@ import traceback import time from functools import partial -from artiq.protocols import pyon +from artiq.protocols import pipe_ipc, pyon from artiq.tools import asyncio_wait_or_cancel @@ -47,7 +47,7 @@ class Worker: self.rid = None self.filename = None - self.process = None + self.ipc = None self.watchdogs = dict() # wid -> expiration (using time.monotonic) self.io_lock = asyncio.Lock() @@ -77,10 +77,10 @@ class Worker: try: if self.closed.is_set(): raise WorkerError("Attempting to create process after close") - self.process = await asyncio.create_subprocess_exec( + self.ipc = pipe_ipc.AsyncioParentComm() + await self.ipc.create_subprocess( sys.executable, "-m", "artiq.master.worker_impl", - str(log_level), - stdout=subprocess.PIPE, stdin=subprocess.PIPE) + self.ipc.get_address(), str(log_level)) finally: self.io_lock.release() @@ -93,15 +93,15 @@ class Worker: self.closed.set() await self.io_lock.acquire() try: - if self.process is None: + if self.ipc is None: # Note the %s - self.rid can be None logger.debug("worker was not created (RID %s)", self.rid) return - if self.process.returncode is not None: + if self.ipc.process.returncode is not None: logger.debug("worker already terminated (RID %s)", self.rid) - if self.process.returncode != 0: + if self.ipc.process.returncode != 0: logger.warning("worker finished with status code %d" - " (RID %s)", self.process.returncode, + " (RID %s)", self.ipc.process.returncode, self.rid) return obj = {"action": "terminate"} @@ -111,21 +111,21 @@ class Worker: logger.debug("failed to send terminate command to worker" " (RID %s), killing", self.rid, exc_info=True) try: - self.process.kill() + self.ipc.process.kill() except ProcessLookupError: pass - await self.process.wait() + await self.ipc.process.wait() return try: - await asyncio.wait_for(self.process.wait(), term_timeout) + await asyncio.wait_for(self.ipc.process.wait(), term_timeout) except asyncio.TimeoutError: logger.debug("worker did not exit by itself (RID %s), killing", self.rid) try: - self.process.kill() + self.ipc.process.kill() except ProcessLookupError: pass - await self.process.wait() + await self.ipc.process.wait() else: logger.debug("worker exited by itself (RID %s)", self.rid) finally: @@ -134,9 +134,8 @@ class Worker: async def _send(self, obj, cancellable=True): assert self.io_lock.locked() line = pyon.encode(obj) - self.process.stdin.write(line.encode()) - self.process.stdin.write("\n".encode()) - ifs = [self.process.stdin.drain()] + self.ipc.write((line + "\n").encode()) + ifs = [self.ipc.drain()] if cancellable: ifs.append(self.closed.wait()) fs = await asyncio_wait_or_cancel( @@ -153,7 +152,7 @@ class Worker: async def _recv(self, timeout): assert self.io_lock.locked() fs = await asyncio_wait_or_cancel( - [self.process.stdout.readline(), self.closed.wait()], + [self.ipc.readline(), self.closed.wait()], timeout=timeout, return_when=asyncio.FIRST_COMPLETED) if all(f.cancelled() for f in fs): raise WorkerTimeout("Timeout receiving data from worker") diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index d8bf5cb82..3a9664ae6 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -6,7 +6,7 @@ import traceback from collections import OrderedDict import artiq -from artiq.protocols import pyon +from artiq.protocols import pipe_ipc, pyon from artiq.tools import file_import from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output from artiq.language.environment import is_experiment @@ -15,16 +15,16 @@ from artiq.coredevice.core import CompileError, host_only, _render_diagnostic from artiq import __version__ as artiq_version +ipc = None + def get_object(): - line = sys.__stdin__.readline() + line = ipc.readline().decode() return pyon.decode(line) def put_object(obj): ds = pyon.encode(obj) - sys.__stdout__.write(ds) - sys.__stdout__.write("\n") - sys.__stdout__.flush() + ipc.write((ds + "\n").encode()) class ParentActionError(Exception): @@ -53,23 +53,6 @@ def make_parent_action(action, exception=None): return parent_action -class LogForwarder: - def __init__(self): - self.buffer = "" - - to_parent = staticmethod(make_parent_action("log")) - - def write(self, data): - self.buffer += data - while "\n" in self.buffer: - i = self.buffer.index("\n") - self.to_parent(self.buffer[:i]) - self.buffer = self.buffer[i+1:] - - def flush(self): - pass - - class ParentDeviceDB: get_device_db = make_parent_action("get_device_db") get = make_parent_action("get_device", KeyError) @@ -202,9 +185,10 @@ def setup_diagnostics(experiment_file, repository_path): def main(): - sys.stdout = LogForwarder() - sys.stderr = LogForwarder() - logging.basicConfig(level=int(sys.argv[1])) + global ipc + + logging.basicConfig(level=int(sys.argv[2])) + ipc = pipe_ipc.ChildComm(sys.argv[1]) start_time = None rid = None @@ -277,6 +261,7 @@ def main(): put_object({"action": "exception"}) finally: device_mgr.close_devices() + ipc.close() if __name__ == "__main__": From 5aa4de8e89637367c74cc25fac03d67d6a9a6734 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 26 Jan 2016 20:31:42 +0100 Subject: [PATCH 2/7] refactor logging and implement in worker --- artiq/frontend/artiq_ctlmgr.py | 31 +++++++++---------------------- artiq/frontend/artiq_master.py | 8 +++----- artiq/master/experiments.py | 29 ++++++++++------------------- artiq/master/log.py | 10 +--------- artiq/master/worker.py | 22 +++++++++++++++++----- artiq/protocols/logging.py | 23 +++++++++++++++++++++++ artiq/test/worker.py | 2 +- 7 files changed, 64 insertions(+), 61 deletions(-) diff --git a/artiq/frontend/artiq_ctlmgr.py b/artiq/frontend/artiq_ctlmgr.py index f73bbde87..d6e7a299c 100755 --- a/artiq/frontend/artiq_ctlmgr.py +++ b/artiq/frontend/artiq_ctlmgr.py @@ -12,8 +12,7 @@ import platform from artiq.protocols.sync_struct import Subscriber from artiq.protocols.pc_rpc import AsyncioClient, Server -from artiq.protocols.logging import (LogForwarder, - parse_log_message, log_with_name, +from artiq.protocols.logging import (LogForwarder, LogParser, SourceFilter) from artiq.tools import * @@ -79,22 +78,8 @@ class Controller: else: break - async def forward_logs(self, stream): - source = "controller({})".format(self.name) - while True: - try: - entry = (await stream.readline()) - if not entry: - break - entry = entry[:-1] - level, name, message = parse_log_message(entry.decode()) - log_with_name(name, level, message, extra={"source": source}) - except: - logger.debug("exception in log forwarding", exc_info=True) - break - logger.debug("stopped log forwarding of stream %s of %s", - stream, self.name) - + def _get_log_source(self): + return "controller({})".format(self.name) async def launcher(self): try: @@ -105,10 +90,12 @@ class Controller: self.process = await asyncio.create_subprocess_exec( *shlex.split(self.command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - asyncio.ensure_future(self.forward_logs( - self.process.stdout)) - asyncio.ensure_future(self.forward_logs( - self.process.stderr)) + asyncio.ensure_future( + LogParser(self._get_log_source).stream_task( + self.process.stdout)) + asyncio.ensure_future( + LogParser(self._get_log_source).stream_task( + self.process.stderr)) await self._wait_and_ping() except FileNotFoundError: logger.warning("Controller %s failed to start", self.name) diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 54b4a5670..4fdc7e9d5 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -9,7 +9,7 @@ from artiq.tools import * from artiq.protocols.pc_rpc import Server as RPCServer from artiq.protocols.sync_struct import Publisher from artiq.protocols.logging import Server as LoggingServer -from artiq.master.log import log_args, init_log, log_worker +from artiq.master.log import log_args, init_log from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.scheduler import Scheduler from artiq.master.worker_db import get_last_rid @@ -63,8 +63,7 @@ def main(): repo_backend = GitBackend(args.repository) else: repo_backend = FilesystemBackend(args.repository) - experiment_db = ExperimentDB(repo_backend, device_db.get_device_db, - log_worker) + experiment_db = ExperimentDB(repo_backend, device_db.get_device_db) atexit.register(experiment_db.close) experiment_db.scan_repository_async() @@ -72,8 +71,7 @@ def main(): "get_device_db": device_db.get_device_db, "get_device": device_db.get, "get_dataset": dataset_db.get, - "update_dataset": dataset_db.update, - "log": log_worker + "update_dataset": dataset_db.update } scheduler = Scheduler(get_last_rid() + 1, worker_handlers, experiment_db) worker_handlers.update({ diff --git a/artiq/master/experiments.py b/artiq/master/experiments.py index 0edce0976..f9f113210 100644 --- a/artiq/master/experiments.py +++ b/artiq/master/experiments.py @@ -3,7 +3,6 @@ import os import tempfile import shutil import logging -from functools import partial from artiq.protocols.sync_struct import Notifier from artiq.master.worker import (Worker, WorkerInternalException, @@ -15,13 +14,10 @@ logger = logging.getLogger(__name__) async def _get_repository_entries(entry_dict, - root, filename, get_device_db, log): - worker = Worker({ - "get_device_db": get_device_db, - "log": partial(log, "scan", os.path.basename(filename)) - }) + root, filename, get_device_db): + worker = Worker({"get_device_db": get_device_db}) try: - description = await worker.examine(os.path.join(root, filename)) + description = await worker.examine("scan", os.path.join(root, filename)) except: log_worker_exception() raise @@ -49,7 +45,7 @@ async def _get_repository_entries(entry_dict, entry_dict[name] = entry -async def _scan_experiments(root, get_device_db, log, subdir=""): +async def _scan_experiments(root, get_device_db, subdir=""): entry_dict = dict() for de in os.scandir(os.path.join(root, subdir)): if de.name.startswith("."): @@ -58,13 +54,13 @@ async def _scan_experiments(root, get_device_db, log, subdir=""): filename = os.path.join(subdir, de.name) try: await _get_repository_entries( - entry_dict, root, filename, get_device_db, log) + entry_dict, root, filename, get_device_db) except Exception as exc: logger.warning("Skipping file '%s'", filename, exc_info=not isinstance(exc, WorkerInternalException)) if de.is_dir(): subentries = await _scan_experiments( - root, get_device_db, log, + root, get_device_db, os.path.join(subdir, de.name)) entries = {de.name + "/" + k: v for k, v in subentries.items()} entry_dict.update(entries) @@ -81,10 +77,9 @@ def _sync_explist(target, source): class ExperimentDB: - def __init__(self, repo_backend, get_device_db_fn, log_fn): + def __init__(self, repo_backend, get_device_db_fn): self.repo_backend = repo_backend self.get_device_db_fn = get_device_db_fn - self.log_fn = log_fn self.cur_rev = self.repo_backend.get_head_rev() self.repo_backend.request_rev(self.cur_rev) @@ -106,8 +101,7 @@ class ExperimentDB: wd, _ = self.repo_backend.request_rev(new_cur_rev) self.repo_backend.release_rev(self.cur_rev) self.cur_rev = new_cur_rev - new_explist = await _scan_experiments(wd, self.get_device_db_fn, - self.log_fn) + new_explist = await _scan_experiments(wd, self.get_device_db_fn) _sync_explist(self.explist, new_explist) finally: @@ -122,12 +116,9 @@ class ExperimentDB: revision = self.cur_rev wd, _ = self.repo_backend.request_rev(revision) filename = os.path.join(wd, filename) - worker = Worker({ - "get_device_db": self.get_device_db_fn, - "log": partial(self.log_fn, "examine", os.path.basename(filename)) - }) + worker = Worker({"get_device_db": self.get_device_db_fn}) try: - description = await worker.examine(filename) + description = await worker.examine("examine", filename) finally: await worker.close() if use_repository: diff --git a/artiq/master/log.py b/artiq/master/log.py index 78def9d21..d542eac08 100644 --- a/artiq/master/log.py +++ b/artiq/master/log.py @@ -2,7 +2,7 @@ import logging import logging.handlers from artiq.protocols.sync_struct import Notifier -from artiq.protocols.logging import parse_log_message, log_with_name, SourceFilter +from artiq.protocols.logging import SourceFilter class LogBuffer: @@ -28,14 +28,6 @@ class LogBufferHandler(logging.Handler): self.log_buffer.log(record.levelno, record.source, record.created, part) - -def log_worker(rid, filename, message): - level, name, message = parse_log_message(message) - log_with_name(name, level, message, - extra={"source": "worker({},{})".format(rid, filename)}) -log_worker.worker_pass_runinfo = True - - def log_args(parser): group = parser.add_argument_group("logging") group.add_argument("-v", "--verbose", default=0, action="count", diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 194e24ed7..c0c5c95a1 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -8,6 +8,7 @@ import time from functools import partial from artiq.protocols import pipe_ipc, pyon +from artiq.protocols.logging import LogParser from artiq.tools import asyncio_wait_or_cancel @@ -72,6 +73,9 @@ class Worker: else: return None + def _get_log_source(self): + return "worker({},{})".format(self.rid, self.filename) + async def _create_process(self, log_level): await self.io_lock.acquire() try: @@ -80,7 +84,14 @@ class Worker: self.ipc = pipe_ipc.AsyncioParentComm() await self.ipc.create_subprocess( sys.executable, "-m", "artiq.master.worker_impl", - self.ipc.get_address(), str(log_level)) + self.ipc.get_address(), str(log_level), + stdout=subprocess.PIPE, stderr=subprocess.PIPE) + asyncio.ensure_future( + LogParser(self._get_log_source).stream_task( + self.ipc.process.stdout)) + asyncio.ensure_future( + LogParser(self._get_log_source).stream_task( + self.ipc.process.stderr)) finally: self.io_lock.release() @@ -94,7 +105,7 @@ class Worker: await self.io_lock.acquire() try: if self.ipc is None: - # Note the %s - self.rid can be None + # Note the %s - self.rid can be None or a user string logger.debug("worker was not created (RID %s)", self.rid) return if self.ipc.process.returncode is not None: @@ -192,8 +203,6 @@ class Worker: func = self.register_experiment else: func = self.handlers[action] - if getattr(func, "worker_pass_runinfo", False): - func = partial(func, self.rid, self.filename) try: data = func(*obj["args"], **obj["kwargs"]) reply = {"status": "ok", "data": data} @@ -265,7 +274,10 @@ class Worker: await self._worker_action({"action": "write_results"}, timeout) - async def examine(self, file, timeout=20.0): + async def examine(self, rid, file, timeout=20.0): + self.rid = rid + self.filename = os.path.basename(file) + await self._create_process(logging.WARNING) r = dict() def register(class_name, name, arginfo): diff --git a/artiq/protocols/logging.py b/artiq/protocols/logging.py index e8fa71e66..2fceed81a 100644 --- a/artiq/protocols/logging.py +++ b/artiq/protocols/logging.py @@ -36,6 +36,29 @@ def parse_log_message(msg): return logging.INFO, "print", msg +class LogParser: + def __init__(self, source_cb): + self.source_cb = source_cb + + def line_input(self, msg): + level, name, message = parse_log_message(msg) + log_with_name(name, level, message, + extra={"source": self.source_cb()}) + + async def stream_task(self, stream): + while True: + try: + entry = (await stream.readline()) + if not entry: + break + self.line_input(entry[:-1].decode()) + except: + logger.debug("exception in log forwarding", exc_info=True) + break + logger.debug("stopped log forwarding of stream %s of %s", + stream, self.source_cb()) + + _init_string = b"ARTIQ logging\n" diff --git a/artiq/test/worker.py b/artiq/test/worker.py index 94c7d8897..efc2fa6e2 100644 --- a/artiq/test/worker.py +++ b/artiq/test/worker.py @@ -71,7 +71,7 @@ def _run_experiment(class_name): "arguments": dict() } loop = asyncio.get_event_loop() - worker = Worker(handlers={"log": lambda message: None}) + worker = Worker({}) loop.run_until_complete(_call_worker(worker, expid)) From 19c5e89b4dcd5961b15250e96962192ecfbe4af6 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 26 Jan 2016 21:06:44 +0100 Subject: [PATCH 3/7] protocols/logging: support parsing multiline log messages --- artiq/protocols/logging.py | 51 +++++++++++++++++++++++++++++--------- 1 file changed, 39 insertions(+), 12 deletions(-) diff --git a/artiq/protocols/logging.py b/artiq/protocols/logging.py index 2fceed81a..1a8d89d7c 100644 --- a/artiq/protocols/logging.py +++ b/artiq/protocols/logging.py @@ -1,5 +1,6 @@ import asyncio import logging +import re from artiq.protocols.asyncio_server import AsyncioServer from artiq.tools import TaskObject @@ -25,25 +26,51 @@ _name_to_level = { def parse_log_message(msg): - for name, level in _name_to_level.items(): - if msg.startswith(name + ":"): - remainder = msg[len(name) + 1:] - try: - idx = remainder.index(":") - except: - continue - return level, remainder[:idx], remainder[idx+1:] - return logging.INFO, "print", msg + lr = "|".join(_name_to_level.keys()) + m = re.fullmatch('('+lr+')(<\d+>)?:([^:]*):(.*)', msg) + if m is None: + return 0, logging.INFO, "print", msg + level = _name_to_level[m.group(1)] + if m.group(2): + multiline = int(m.group(2)[1:-1]) - 1 + else: + multiline = 0 + name = m.group(3) + message = m.group(4) + return multiline, level, name, message class LogParser: def __init__(self, source_cb): self.source_cb = source_cb + self.multiline_count = 0 + self.multiline_level = None + self.multiline_name = None + self.multiline_message = None def line_input(self, msg): - level, name, message = parse_log_message(msg) - log_with_name(name, level, message, - extra={"source": self.source_cb()}) + if self.multiline_count: + self.multiline_message += "\n" + msg + self.multiline_count -= 1 + if not self.multiline_count: + log_with_name( + self.multiline_name, + self.multiline_level, + self.multiline_message, + extra={"source": self.source_cb()}) + self.multiline_level = None + self.multiline_name = None + self.multiline_message = None + else: + multiline, level, name, message = parse_log_message(msg) + if multiline: + self.multiline_count = multiline + self.multiline_level = level + self.multiline_name = name + self.multiline_message = message + else: + log_with_name(name, level, message, + extra={"source": self.source_cb()}) async def stream_task(self, stream): while True: From ded1e31567561b367cd2a314da621c351af8e584 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 26 Jan 2016 21:30:09 +0100 Subject: [PATCH 4/7] protocols/logging: add MultilineFormatter --- artiq/protocols/logging.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/artiq/protocols/logging.py b/artiq/protocols/logging.py index 1a8d89d7c..75a92094a 100644 --- a/artiq/protocols/logging.py +++ b/artiq/protocols/logging.py @@ -86,6 +86,28 @@ class LogParser: stream, self.source_cb()) +class MultilineFormatter(logging.Formatter): + def __init__(self): + logging.Formatter.__init__( + self, "%(levelname)s:%(name)s:%(message)s") + + def format(self, record): + r = logging.Formatter.format(self, record) + linebreaks = r.count("\n") + if linebreaks: + i = r.index(":") + r = r[:i] + "<" + str(linebreaks + 1) + ">" + r[i:] + return r + + +def multiline_log_config(level): + root_logger = logging.getLogger() + root_logger.setLevel(level) + handler = logging.StreamHandler() + handler.setFormatter(MultilineFormatter()) + root_logger.addHandler(handler) + + _init_string = b"ARTIQ logging\n" From 1fed38a8dc4e815642fb051478fb5491bb34bd35 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 26 Jan 2016 21:30:28 +0100 Subject: [PATCH 5/7] worker: use MultilineFormatter --- artiq/master/worker_impl.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 3a9664ae6..53381cd25 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -7,6 +7,7 @@ from collections import OrderedDict import artiq from artiq.protocols import pipe_ipc, pyon +from artiq.protocols.logging import multiline_log_config from artiq.tools import file_import from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output from artiq.language.environment import is_experiment @@ -187,7 +188,7 @@ def setup_diagnostics(experiment_file, repository_path): def main(): global ipc - logging.basicConfig(level=int(sys.argv[2])) + multiline_log_config(level=int(sys.argv[2])) ipc = pipe_ipc.ChildComm(sys.argv[1]) start_time = None From 3cf67afeb1916385c6556a06f1fb84193836ba99 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 26 Jan 2016 21:59:37 +0100 Subject: [PATCH 6/7] generalize multiline logging to remote logs and controllers --- artiq/master/worker_impl.py | 3 +- artiq/protocols/logging.py | 68 +++++++++++-------------------------- artiq/tools.py | 24 ++++++++++++- 3 files changed, 44 insertions(+), 51 deletions(-) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 53381cd25..30d2a8168 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -7,8 +7,7 @@ from collections import OrderedDict import artiq from artiq.protocols import pipe_ipc, pyon -from artiq.protocols.logging import multiline_log_config -from artiq.tools import file_import +from artiq.tools import multiline_log_config, file_import from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output from artiq.language.environment import is_experiment from artiq.language.core import set_watchdog_factory, TerminationRequested diff --git a/artiq/protocols/logging.py b/artiq/protocols/logging.py index 75a92094a..b6ddb9bb6 100644 --- a/artiq/protocols/logging.py +++ b/artiq/protocols/logging.py @@ -3,7 +3,7 @@ import logging import re from artiq.protocols.asyncio_server import AsyncioServer -from artiq.tools import TaskObject +from artiq.tools import TaskObject, MultilineFormatter logger = logging.getLogger(__name__) @@ -86,36 +86,16 @@ class LogParser: stream, self.source_cb()) -class MultilineFormatter(logging.Formatter): - def __init__(self): - logging.Formatter.__init__( - self, "%(levelname)s:%(name)s:%(message)s") - - def format(self, record): - r = logging.Formatter.format(self, record) - linebreaks = r.count("\n") - if linebreaks: - i = r.index(":") - r = r[:i] + "<" + str(linebreaks + 1) + ">" + r[i:] - return r - - -def multiline_log_config(level): - root_logger = logging.getLogger() - root_logger.setLevel(level) - handler = logging.StreamHandler() - handler.setFormatter(MultilineFormatter()) - root_logger.addHandler(handler) - - _init_string = b"ARTIQ logging\n" class Server(AsyncioServer): """Remote logging TCP server. - Takes one log entry per line, in the format: - source:levelno:name:message + Log entries are in the format: + source:levelno:name:message + continuation... + ...continuation """ async def _handle_connection_cr(self, reader, writer): try: @@ -123,6 +103,9 @@ class Server(AsyncioServer): if line != _init_string: return + source = None + parser = LogParser(lambda: source) + while True: line = await reader.readline() if not line: @@ -132,20 +115,16 @@ class Server(AsyncioServer): except: return line = line[:-1] - linesplit = line.split(":", 3) - if len(linesplit) != 4: - logger.warning("received improperly formatted message, " - "dropping connection") - return - source, level, name, message = linesplit - try: - level = int(level) - except: - logger.warning("received improperly formatted level, " - "dropping connection") - return - log_with_name(name, level, message, - extra={"source": source}) + if parser.multiline_count: + parser.line_input(line) + else: + linesplit = line.split(":", maxsplit=1) + if len(linesplit) != 2: + logger.warning("received improperly formatted message, " + "dropping connection") + return + source, remainder = linesplit + parser.line_input(remainder) finally: writer.close() @@ -172,19 +151,12 @@ class LogForwarder(logging.Handler, TaskObject): logging.Handler.__init__(self, **kwargs) self.host = host self.port = port - self.setFormatter(logging.Formatter( - "%(name)s:%(message)s")) + self.setFormatter(MultilineFormatter()) self._queue = asyncio.Queue(queue_size) self.reconnect_timer = reconnect_timer def emit(self, record): - message = self.format(record) - for part in message.split("\n"): - part = "{}:{}:{}".format(record.source, record.levelno, part) - try: - self._queue.put_nowait(part) - except asyncio.QueueFull: - break + self._queue.put_nowait(record.source + ":" + self.format(record)) async def _do(self): while True: diff --git a/artiq/tools.py b/artiq/tools.py index addd5d55c..d8abc603a 100644 --- a/artiq/tools.py +++ b/artiq/tools.py @@ -133,8 +133,30 @@ def simple_network_args(parser, default_port): group.add_argument("--port-" + name, default=default, type=int, help=h) +class MultilineFormatter(logging.Formatter): + def __init__(self): + logging.Formatter.__init__( + self, "%(levelname)s:%(name)s:%(message)s") + + def format(self, record): + r = logging.Formatter.format(self, record) + linebreaks = r.count("\n") + if linebreaks: + i = r.index(":") + r = r[:i] + "<" + str(linebreaks + 1) + ">" + r[i:] + return r + + +def multiline_log_config(level): + root_logger = logging.getLogger() + root_logger.setLevel(level) + handler = logging.StreamHandler() + handler.setFormatter(MultilineFormatter()) + root_logger.addHandler(handler) + + def init_logger(args): - logging.basicConfig(level=logging.WARNING + args.quiet*10 - args.verbose*10) + multiline_log_config(level=logging.WARNING + args.quiet*10 - args.verbose*10) def bind_address_from_args(args): From be5162d60fbb1d4896a302d7b002584ef3c143b2 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 26 Jan 2016 22:07:54 +0100 Subject: [PATCH 7/7] worker: restore short exception info in first line of log --- artiq/master/worker_impl.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 30d2a8168..164bf7cc4 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -252,7 +252,11 @@ def main(): # When we get CompileError, a more suitable diagnostic has already # been printed. if not isinstance(exc, CompileError): - lines = ["Terminating with exception\n"] + short_exc_info = type(exc).__name__ + exc_str = str(exc) + if exc_str: + short_exc_info += ": " + exc_str + lines = ["Terminating with exception ("+short_exc_info+")\n"] lines += traceback.format_exception_only(type(exc), exc) if hasattr(exc, "parent_traceback"): lines += exc.parent_traceback