From 10267f39c963b60d0b085b2c6e05498fabfea946 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 27 May 2016 23:43:20 -0500 Subject: [PATCH] log: use broadcast instead of sync_struct, filter on new messages only (#411) --- artiq/browser/log.py | 21 ++--- artiq/frontend/artiq_browser.py | 12 ++- artiq/frontend/artiq_dashboard.py | 15 +++- artiq/frontend/artiq_master.py | 20 +++-- artiq/gui/log.py | 116 ++++++++------------------- artiq/master/log.py | 31 +++---- doc/manual/default_network_ports.rst | 4 +- 7 files changed, 90 insertions(+), 129 deletions(-) diff --git a/artiq/browser/log.py b/artiq/browser/log.py index f660acf58..cfe54b87a 100644 --- a/artiq/browser/log.py +++ b/artiq/browser/log.py @@ -3,19 +3,20 @@ import logging from artiq.protocols.logging import SourceFilter -class LogBufferHandler(logging.Handler): - def __init__(self, log, *args, **kwargs): +class LogWidgetHandler(logging.Handler): + def __init__(self, *args, **kwargs): logging.Handler.__init__(self, *args, **kwargs) - self.log = log + self.log_widget = None self.setFormatter(logging.Formatter("%(name)s:%(message)s")) def emit(self, record): - if self.log.model is not None: - self.log.model.append((record.levelno, record.source, - record.created, self.format(record))) + if self.log_widget is not None: + message = self.format(record) + self.log_widget.append_message((record.levelno, record.source, + record.created, message)) -def init_log(args, log): +def init_log(args): root_logger = logging.getLogger() root_logger.setLevel(logging.NOTSET) # we use our custom filter only flt = SourceFilter(logging.WARNING + args.quiet*10 - args.verbose*10, @@ -26,9 +27,11 @@ def init_log(args, log): "%(levelname)s:%(source)s:%(name)s:%(message)s")) handlers.append(console_handler) - buffer_handler = LogBufferHandler(log) - handlers.append(buffer_handler) + widget_handler = LogWidgetHandler() + handlers.append(widget_handler) for handler in handlers: handler.addFilter(flt) root_logger.addHandler(handler) + + return widget_handler diff --git a/artiq/frontend/artiq_browser.py b/artiq/frontend/artiq_browser.py index b696e2ad6..835cd84b6 100755 --- a/artiq/frontend/artiq_browser.py +++ b/artiq/frontend/artiq_browser.py @@ -37,7 +37,7 @@ def get_argparser(): class Browser(QtWidgets.QMainWindow): - def __init__(self, datasets_sub, log_sub, browse_root, select): + def __init__(self, datasets_sub, browse_root, select): QtWidgets.QMainWindow.__init__(self) icon = QtGui.QIcon(os.path.join(artiq_dir, "gui", "logo.svg")) @@ -66,7 +66,7 @@ class Browser(QtWidgets.QMainWindow): self.datasets = datasets.DatasetsDock(datasets_sub) - self.log = log.LogDock(None, "log", log_sub) + self.log = log.LogDock(None, "log") self.log.setFeatures(self.log.DockWidgetMovable | self.log.DockWidgetFloatable) @@ -127,17 +127,15 @@ def main(): asyncio.set_event_loop(loop) atexit.register(loop.close) - log_sub = models.LocalModelManager(log.Model) - browser_log.init_log(args, log_sub) - log_sub.init([]) + widget_log_handler = browser_log.init_log(args) datasets_sub = models.LocalModelManager(datasets.Model) datasets_sub.init({}) smgr = state.StateManager(args.db_file) - main_window = Browser(datasets_sub, log_sub, - args.browse_root, args.select) + main_window = Browser(datasets_sub, args.browse_root, args.select) + widget_log_handler.log_widget = main_window.log smgr.register(main_window) if os.name == "nt": diff --git a/artiq/frontend/artiq_dashboard.py b/artiq/frontend/artiq_dashboard.py index 0dfba0035..401b6cc28 100755 --- a/artiq/frontend/artiq_dashboard.py +++ b/artiq/frontend/artiq_dashboard.py @@ -11,6 +11,7 @@ from quamash import QEventLoop from artiq import __artiq_dir__ as artiq_dir from artiq.tools import * from artiq.protocols.pc_rpc import AsyncioClient +from artiq.protocols.broadcast import Receiver from artiq.gui.models import ModelSubscriber from artiq.gui import state, applets, log from artiq.dashboard import (experiments, shortcuts, explorer, @@ -33,6 +34,9 @@ def get_argparser(): parser.add_argument( "--port-control", default=3251, type=int, help="TCP port to connect to for control") + parser.add_argument( + "--port-broadcast", default=1067, type=int, + help="TCP port to connect to for broadcasts") parser.add_argument( "--db-file", default=default_db_file, help="database file for local GUI settings " @@ -106,14 +110,18 @@ def main(): for notifier_name, modelf in (("explist", explorer.Model), ("explist_status", explorer.StatusUpdater), ("datasets", datasets.Model), - ("schedule", schedule.Model), - ("log", log.Model)): + ("schedule", schedule.Model)): subscriber = ModelSubscriber(notifier_name, modelf) loop.run_until_complete(subscriber.connect( args.server, args.port_notify)) atexit_register_coroutine(subscriber.close) sub_clients[notifier_name] = subscriber + log_receiver = Receiver("log", []) + loop.run_until_complete(log_receiver.connect( + args.server, args.port_broadcast)) + atexit_register_coroutine(log_receiver.close) + # initialize main window main_window = MainWindow(args.server) smgr.register(main_window) @@ -156,8 +164,9 @@ def main(): status_bar, rpc_clients["schedule"], sub_clients["schedule"]) smgr.register(d_schedule) - logmgr = log.LogDockManager(main_window, sub_clients["log"]) + logmgr = log.LogDockManager(main_window) smgr.register(logmgr) + log_receiver.notify_cbs.append(logmgr.append_message) # lay out docks right_docks = [ diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index ba5ab8fa6..5d3d4e11e 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -11,6 +11,7 @@ from artiq.tools import (simple_network_args, atexit_register_coroutine, from artiq.protocols.pc_rpc import Server as RPCServer from artiq.protocols.sync_struct import Publisher from artiq.protocols.logging import Server as LoggingServer +from artiq.protocols.broadcast import Broadcaster from artiq.master.log import log_args, init_log from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.scheduler import Scheduler @@ -27,7 +28,8 @@ def get_argparser(): simple_network_args(parser, [ ("notify", "notifications", 3250), ("control", "control", 3251), - ("logging", "remote logging", 1066) + ("logging", "remote logging", 1066), + ("broadcast", "broadcasts", 1067) ]) group = parser.add_argument_group("databases") @@ -51,13 +53,22 @@ def get_argparser(): def main(): args = get_argparser().parse_args() - log_buffer = init_log(args) + log_forwarder = init_log(args) if os.name == "nt": loop = asyncio.ProactorEventLoop() asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() atexit.register(loop.close) + bind = bind_address_from_args(args) + + server_broadcast = Broadcaster() + loop.run_until_complete(server_broadcast.start( + bind, args.port_broadcast)) + atexit_register_coroutine(server_broadcast.stop) + + log_forwarder.callback = (lambda msg: + server_broadcast.broadcast("log", msg)) device_db = DeviceDB(args.device_db) dataset_db = DatasetDB(args.dataset_db) @@ -88,8 +99,6 @@ def main(): }) experiment_db.scan_repository_async() - bind = bind_address_from_args(args) - server_control = RPCServer({ "master_device_db": device_db, "master_dataset_db": dataset_db, @@ -105,8 +114,7 @@ def main(): "devices": device_db.data, "datasets": dataset_db.data, "explist": experiment_db.explist, - "explist_status": experiment_db.status, - "log": log_buffer.data + "explist_status": experiment_db.status }) loop.run_until_complete(server_notify.start( bind, args.port_notify)) diff --git a/artiq/gui/log.py b/artiq/gui/log.py index 5772f703b..98ce4bb32 100644 --- a/artiq/gui/log.py +++ b/artiq/gui/log.py @@ -10,15 +10,15 @@ from artiq.gui.tools import (LayoutWidget, log_level_to_name, QDockWidgetCloseDetect) -class ModelItem: +class _ModelItem: def __init__(self, parent, row): self.parent = parent self.row = row self.children_by_row = [] -class Model(QtCore.QAbstractItemModel): - def __init__(self, init): +class _Model(QtCore.QAbstractItemModel): + def __init__(self): QtCore.QAbstractTableModel.__init__(self) self.headers = ["Source", "Message"] @@ -26,8 +26,6 @@ class Model(QtCore.QAbstractItemModel): self.entries = [] self.pending_entries = [] - for entry in init: - self.append(entry) self.depth = 1000 timer = QtCore.QTimer(self) timer.timeout.connect(self.timer_tick) @@ -57,9 +55,6 @@ class Model(QtCore.QAbstractItemModel): def columnCount(self, parent): return len(self.headers) - def __delitem__(self, k): - pass - def append(self, v): severity, source, timestamp, message = v self.pending_entries.append((severity, source, timestamp, @@ -75,10 +70,10 @@ class Model(QtCore.QAbstractItemModel): self.beginInsertRows(QtCore.QModelIndex(), nrows, nrows+len(records)-1) self.entries.extend(records) for rec in records: - item = ModelItem(self, len(self.children_by_row)) + item = _ModelItem(self, len(self.children_by_row)) self.children_by_row.append(item) for i in range(len(rec[3])-1): - item.children_by_row.append(ModelItem(item, i)) + item.children_by_row.append(_ModelItem(item, i)) self.endInsertRows() if len(self.entries) > self.depth: @@ -153,43 +148,8 @@ class Model(QtCore.QAbstractItemModel): time.strftime("%m/%d %H:%M:%S", time.localtime(v[2]))) -class _LogFilterProxyModel(QtCore.QSortFilterProxyModel): - def __init__(self, min_level, freetext): - QtCore.QSortFilterProxyModel.__init__(self) - self.min_level = min_level - self.freetext = freetext - - def filterAcceptsRow(self, sourceRow, sourceParent): - model = self.sourceModel() - if sourceParent.isValid(): - parent_item = sourceParent.internalPointer() - msgnum = parent_item.row - else: - msgnum = sourceRow - - accepted_level = model.entries[msgnum][0] >= self.min_level - - if self.freetext: - data_source = model.entries[msgnum][1] - data_message = model.entries[msgnum][3] - accepted_freetext = (self.freetext in data_source - or any(self.freetext in m for m in data_message)) - else: - accepted_freetext = True - - return accepted_level and accepted_freetext - - def set_min_level(self, min_level): - self.min_level = min_level - self.invalidateFilter() - - def set_freetext(self, freetext): - self.freetext = freetext - self.invalidateFilter() - - class LogDock(QDockWidgetCloseDetect): - def __init__(self, manager, name, log_sub): + def __init__(self, manager, name): QDockWidgetCloseDetect.__init__(self, "Log") self.setObjectName(name) @@ -201,12 +161,8 @@ class LogDock(QDockWidgetCloseDetect): self.filter_level.addItems(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]) self.filter_level.setToolTip("Display entries at or above this level") grid.addWidget(self.filter_level, 0, 1) - self.filter_level.currentIndexChanged.connect( - self.filter_level_changed) self.filter_freetext = QtWidgets.QLineEdit() self.filter_freetext.setPlaceholderText("freetext filter...") - self.filter_freetext.editingFinished.connect( - self.filter_freetext_changed) grid.addWidget(self.filter_freetext, 0, 2) scrollbottom = QtWidgets.QToolButton() @@ -244,21 +200,31 @@ class LogDock(QDockWidgetCloseDetect): # lambda: self.log.header().resizeSections(QtWidgets.QHeaderView.ResizeToContents)) # self.log.addAction(sizeheader_action) - log_sub.add_setmodel_callback(self.set_model) - cw = QtGui.QFontMetrics(self.font()).averageCharWidth() self.log.header().resizeSection(0, 26*cw) - def filter_level_changed(self): - if not hasattr(self, "table_model_filter"): - return - self.table_model_filter.set_min_level( - getattr(logging, self.filter_level.currentText())) + self.model = _Model() + self.log.setModel(self.model) + self.model.rowsAboutToBeInserted.connect(self.rows_inserted_before) + self.model.rowsInserted.connect(self.rows_inserted_after) + self.model.rowsRemoved.connect(self.rows_removed) - def filter_freetext_changed(self): - if not hasattr(self, "table_model_filter"): - return - self.table_model_filter.set_freetext(self.filter_freetext.text()) + def append_message(self, msg): + min_level = getattr(logging, self.filter_level.currentText()) + freetext = self.filter_freetext.text() + + accepted_level = msg[0] >= min_level + + if freetext: + data_source = msg[1] + data_message = msg[3] + accepted_freetext = (freetext in data_source + or any(freetext in m for m in data_message)) + else: + accepted_freetext = True + + if accepted_level and accepted_freetext: + self.model.append(msg) def scroll_to_bottom(self): self.log.scrollToBottom() @@ -286,19 +252,6 @@ class LogDock(QDockWidgetCloseDetect): scrollbar = self.log.verticalScrollBar() scrollbar.setValue(self.scroll_value) - def set_model(self, model): - self.table_model = model - self.table_model_filter = _LogFilterProxyModel( - getattr(logging, self.filter_level.currentText()), - self.filter_freetext.text()) - self.table_model_filter.setSourceModel(self.table_model) - self.log.setModel(self.table_model_filter) - self.table_model_filter.rowsAboutToBeInserted.connect(self.rows_inserted_before) - self.table_model_filter.rowsInserted.connect(self.rows_inserted_after) - self.table_model_filter.rowsRemoved.connect(self.rows_removed) - - asyncio.get_event_loop().call_soon(self.log.scrollToBottom) - def save_state(self): return { "min_level_idx": self.filter_level.currentIndex(), @@ -320,10 +273,6 @@ class LogDock(QDockWidgetCloseDetect): pass else: self.filter_freetext.setText(freetext) - # Note that editingFinished is not emitted when calling setText, - # (unlike currentIndexChanged) so we need to call the callback - # manually here, unlike for the combobox. - self.filter_freetext_changed() try: header = state["header"] @@ -334,11 +283,14 @@ class LogDock(QDockWidgetCloseDetect): class LogDockManager: - def __init__(self, main_window, log_sub): + def __init__(self, main_window): self.main_window = main_window - self.log_sub = log_sub self.docks = dict() + def append_message(self, msg): + for dock in self.docks.values(): + dock.append_message(msg) + def create_new_dock(self, add_to_area=True): n = 0 name = "log0" @@ -346,7 +298,7 @@ class LogDockManager: n += 1 name = "log" + str(n) - dock = LogDock(self, name, self.log_sub) + dock = LogDock(self, name) self.docks[name] = dock if add_to_area: self.main_window.addDockWidget(QtCore.Qt.RightDockWidgetArea, dock) @@ -376,7 +328,7 @@ class LogDockManager: if self.docks: raise NotImplementedError for name, dock_state in state.items(): - dock = LogDock(self, name, self.log_sub) + dock = LogDock(self, name) self.docks[name] = dock dock.restore_state(dock_state) self.main_window.addDockWidget(QtCore.Qt.RightDockWidgetArea, dock) diff --git a/artiq/master/log.py b/artiq/master/log.py index e3f0bae9f..f956dc7e1 100644 --- a/artiq/master/log.py +++ b/artiq/master/log.py @@ -5,27 +5,17 @@ from artiq.protocols.sync_struct import Notifier from artiq.protocols.logging import SourceFilter -class LogBuffer: - def __init__(self, depth): - self.depth = depth - self.data = Notifier([]) - - def log(self, level, source, time, message): - if len(self.data.read) >= self.depth: - del self.data[0] - self.data.append((level, source, time, message)) - - -class LogBufferHandler(logging.Handler): - def __init__(self, log_buffer, *args, **kwargs): +class LogForwarder(logging.Handler): + def __init__(self, *args, **kwargs): logging.Handler.__init__(self, *args, **kwargs) - self.log_buffer = log_buffer + self.callback = None self.setFormatter(logging.Formatter("%(name)s:%(message)s")) def emit(self, record): - message = self.format(record) - self.log_buffer.log(record.levelno, record.source, record.created, - message) + if self.callback is not None: + message = self.format(record) + self.callback((record.levelno, record.source, record.created, + message)) def log_args(parser): @@ -65,12 +55,11 @@ def init_log(args): "%(asctime)s %(levelname)s:%(source)s:%(name)s:%(message)s")) handlers.append(file_handler) - log_buffer = LogBuffer(1000) - buffer_handler = LogBufferHandler(log_buffer) - handlers.append(buffer_handler) + log_forwarder = LogForwarder() + handlers.append(log_forwarder) for handler in handlers: handler.addFilter(flt) root_logger.addHandler(handler) - return log_buffer + return log_forwarder diff --git a/doc/manual/default_network_ports.rst b/doc/manual/default_network_ports.rst index badbfd609..1b5072934 100644 --- a/doc/manual/default_network_ports.rst +++ b/doc/manual/default_network_ports.rst @@ -10,7 +10,9 @@ Default network ports +--------------------------+--------------+ | Core device (mon/inj) | 3250 (UDP) | +--------------------------+--------------+ -| Master (logging) | 1066 | +| Master (logging input) | 1066 | ++--------------------------+--------------+ +| Master (broadcasts) | 1067 | +--------------------------+--------------+ | InfluxDB bridge | 3248 | +--------------------------+--------------+