forked from M-Labs/artiq
419 lines
14 KiB
Python
419 lines
14 KiB
Python
import asyncio
|
|
import logging
|
|
import time
|
|
import re
|
|
from functools import partial
|
|
|
|
from PyQt5 import QtCore, QtGui, QtWidgets
|
|
|
|
from artiq.protocols.logging import SourceFilter
|
|
from artiq.gui.tools import (LayoutWidget, log_level_to_name,
|
|
QDockWidgetCloseDetect)
|
|
|
|
|
|
class _ModelItem:
|
|
def __init__(self, parent, row):
|
|
self.parent = parent
|
|
self.row = row
|
|
self.children_by_row = []
|
|
|
|
|
|
class _Model(QtCore.QAbstractItemModel):
|
|
def __init__(self):
|
|
QtCore.QAbstractTableModel.__init__(self)
|
|
|
|
self.headers = ["Source", "Message"]
|
|
self.children_by_row = []
|
|
|
|
self.entries = []
|
|
self.pending_entries = []
|
|
self.depth = 1000
|
|
timer = QtCore.QTimer(self)
|
|
timer.timeout.connect(self.timer_tick)
|
|
timer.start(100)
|
|
|
|
self.fixed_font = QtGui.QFontDatabase.systemFont(QtGui.QFontDatabase.FixedFont)
|
|
|
|
self.white = QtGui.QBrush(QtGui.QColor(255, 255, 255))
|
|
self.black = QtGui.QBrush(QtGui.QColor(0, 0, 0))
|
|
self.debug_fg = QtGui.QBrush(QtGui.QColor(55, 55, 55))
|
|
self.warning_bg = QtGui.QBrush(QtGui.QColor(255, 255, 180))
|
|
self.error_bg = QtGui.QBrush(QtGui.QColor(255, 150, 150))
|
|
|
|
def headerData(self, col, orientation, role):
|
|
if (orientation == QtCore.Qt.Horizontal
|
|
and role == QtCore.Qt.DisplayRole):
|
|
return self.headers[col]
|
|
return None
|
|
|
|
def rowCount(self, parent):
|
|
if parent.isValid():
|
|
item = parent.internalPointer()
|
|
return len(item.children_by_row)
|
|
else:
|
|
return len(self.entries)
|
|
|
|
def columnCount(self, parent):
|
|
return len(self.headers)
|
|
|
|
def append(self, v):
|
|
severity, source, timestamp, message = v
|
|
self.pending_entries.append((severity, source, timestamp,
|
|
message.splitlines()))
|
|
|
|
def clear(self):
|
|
self.beginRemoveRows(QtCore.QModelIndex(), 0, len(self.entries)-1)
|
|
self.entries.clear()
|
|
self.children_by_row.clear()
|
|
self.endRemoveRows()
|
|
|
|
def timer_tick(self):
|
|
if not self.pending_entries:
|
|
return
|
|
nrows = len(self.entries)
|
|
records = self.pending_entries
|
|
self.pending_entries = []
|
|
|
|
self.beginInsertRows(QtCore.QModelIndex(), nrows, nrows+len(records)-1)
|
|
self.entries.extend(records)
|
|
for rec in records:
|
|
item = _ModelItem(self, len(self.children_by_row))
|
|
self.children_by_row.append(item)
|
|
for i in range(len(rec[3])-1):
|
|
item.children_by_row.append(_ModelItem(item, i))
|
|
self.endInsertRows()
|
|
|
|
if len(self.entries) > self.depth:
|
|
start = len(self.entries) - self.depth
|
|
self.beginRemoveRows(QtCore.QModelIndex(), 0, start-1)
|
|
self.entries = self.entries[start:]
|
|
self.children_by_row = self.children_by_row[start:]
|
|
for child in self.children_by_row:
|
|
child.row -= start
|
|
self.endRemoveRows()
|
|
|
|
def index(self, row, column, parent):
|
|
if parent.isValid():
|
|
parent_item = parent.internalPointer()
|
|
return self.createIndex(row, column,
|
|
parent_item.children_by_row[row])
|
|
else:
|
|
return self.createIndex(row, column, self.children_by_row[row])
|
|
|
|
def parent(self, index):
|
|
if index.isValid():
|
|
parent = index.internalPointer().parent
|
|
if parent is self:
|
|
return QtCore.QModelIndex()
|
|
else:
|
|
return self.createIndex(parent.row, 0, parent)
|
|
else:
|
|
return QtCore.QModelIndex()
|
|
|
|
def full_entry(self, index):
|
|
if not index.isValid():
|
|
return
|
|
item = index.internalPointer()
|
|
if item.parent is self:
|
|
msgnum = item.row
|
|
else:
|
|
msgnum = item.parent.row
|
|
return self.entries[msgnum][3]
|
|
|
|
def data(self, index, role):
|
|
if not index.isValid():
|
|
return
|
|
|
|
item = index.internalPointer()
|
|
if item.parent is self:
|
|
msgnum = item.row
|
|
else:
|
|
msgnum = item.parent.row
|
|
|
|
if role == QtCore.Qt.FontRole and index.column() == 1:
|
|
return self.fixed_font
|
|
elif role == QtCore.Qt.BackgroundRole:
|
|
level = self.entries[msgnum][0]
|
|
if level >= logging.ERROR:
|
|
return self.error_bg
|
|
elif level >= logging.WARNING:
|
|
return self.warning_bg
|
|
else:
|
|
return self.white
|
|
elif role == QtCore.Qt.ForegroundRole:
|
|
level = self.entries[msgnum][0]
|
|
if level <= logging.DEBUG:
|
|
return self.debug_fg
|
|
else:
|
|
return self.black
|
|
elif role == QtCore.Qt.DisplayRole:
|
|
v = self.entries[msgnum]
|
|
column = index.column()
|
|
if item.parent is self:
|
|
if column == 0:
|
|
return v[1]
|
|
else:
|
|
return v[3][0]
|
|
else:
|
|
if column == 0:
|
|
return ""
|
|
else:
|
|
return v[3][item.row+1]
|
|
elif role == QtCore.Qt.ToolTipRole:
|
|
v = self.entries[msgnum]
|
|
if item.parent is self:
|
|
lineno = 0
|
|
else:
|
|
lineno = item.row + 1
|
|
return (log_level_to_name(v[0]) + ", " +
|
|
time.strftime("%m/%d %H:%M:%S", time.localtime(v[2])) +
|
|
"\n" + v[3][lineno])
|
|
|
|
|
|
class LogDock(QDockWidgetCloseDetect):
|
|
def __init__(self, manager, name):
|
|
QDockWidgetCloseDetect.__init__(self, "Log")
|
|
self.setObjectName(name)
|
|
|
|
grid = LayoutWidget()
|
|
self.setWidget(grid)
|
|
|
|
grid.addWidget(QtWidgets.QLabel("Minimum level: "), 0, 0)
|
|
self.filter_level = QtWidgets.QComboBox()
|
|
self.filter_level.addItems(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"])
|
|
self.filter_level.setToolTip("Receive entries at or above this level")
|
|
grid.addWidget(self.filter_level, 0, 1)
|
|
self.filter_freetext = QtWidgets.QLineEdit()
|
|
self.filter_freetext.setPlaceholderText("freetext filter...")
|
|
self.filter_freetext.setToolTip("Receive entries containing this text")
|
|
grid.addWidget(self.filter_freetext, 0, 2)
|
|
|
|
scrollbottom = QtWidgets.QToolButton()
|
|
scrollbottom.setToolTip("Scroll to bottom")
|
|
scrollbottom.setIcon(QtWidgets.QApplication.style().standardIcon(
|
|
QtWidgets.QStyle.SP_ArrowDown))
|
|
grid.addWidget(scrollbottom, 0, 3)
|
|
scrollbottom.clicked.connect(self.scroll_to_bottom)
|
|
|
|
clear = QtWidgets.QToolButton()
|
|
clear.setIcon(QtWidgets.QApplication.style().standardIcon(
|
|
QtWidgets.QStyle.SP_DialogResetButton))
|
|
grid.addWidget(clear, 0, 4)
|
|
clear.clicked.connect(lambda: self.model.clear())
|
|
|
|
if manager:
|
|
newdock = QtWidgets.QToolButton()
|
|
newdock.setToolTip("Create new log dock")
|
|
newdock.setIcon(QtWidgets.QApplication.style().standardIcon(
|
|
QtWidgets.QStyle.SP_FileDialogNewFolder))
|
|
# note the lambda, the default parameter is overriden otherwise
|
|
newdock.clicked.connect(lambda: manager.create_new_dock())
|
|
grid.addWidget(newdock, 0, 5)
|
|
grid.layout.setColumnStretch(2, 1)
|
|
|
|
self.log = QtWidgets.QTreeView()
|
|
self.log.setHorizontalScrollMode(
|
|
QtWidgets.QAbstractItemView.ScrollPerPixel)
|
|
self.log.setVerticalScrollMode(
|
|
QtWidgets.QAbstractItemView.ScrollPerPixel)
|
|
self.log.setHorizontalScrollBarPolicy(QtCore.Qt.ScrollBarAsNeeded)
|
|
grid.addWidget(self.log, 1, 0, colspan=6 if manager else 5)
|
|
self.scroll_at_bottom = False
|
|
self.scroll_value = 0
|
|
|
|
self.log.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu)
|
|
copy_action = QtWidgets.QAction("Copy entry to clipboard", self.log)
|
|
copy_action.triggered.connect(self.copy_to_clipboard)
|
|
self.log.addAction(copy_action)
|
|
clear_action = QtWidgets.QAction("Clear", self.log)
|
|
clear_action.triggered.connect(lambda: self.model.clear())
|
|
self.log.addAction(clear_action)
|
|
|
|
# If Qt worked correctly, this would be nice to have. Alas, resizeSections
|
|
# is broken when the horizontal scrollbar is enabled.
|
|
# sizeheader_action = QtWidgets.QAction("Resize header", self.log)
|
|
# sizeheader_action.triggered.connect(
|
|
# lambda: self.log.header().resizeSections(QtWidgets.QHeaderView.ResizeToContents))
|
|
# self.log.addAction(sizeheader_action)
|
|
|
|
cw = QtGui.QFontMetrics(self.font()).averageCharWidth()
|
|
self.log.header().resizeSection(0, 26*cw)
|
|
|
|
self.model = _Model()
|
|
self.log.setModel(self.model)
|
|
self.model.rowsAboutToBeInserted.connect(self.rows_inserted_before)
|
|
self.model.rowsInserted.connect(self.rows_inserted_after)
|
|
self.model.rowsRemoved.connect(self.rows_removed)
|
|
|
|
def append_message(self, msg):
|
|
min_level = getattr(logging, self.filter_level.currentText())
|
|
freetext = self.filter_freetext.text()
|
|
|
|
accepted_level = msg[0] >= min_level
|
|
|
|
if freetext:
|
|
data_source = msg[1]
|
|
data_message = msg[3]
|
|
accepted_freetext = (freetext in data_source
|
|
or any(freetext in m for m in data_message))
|
|
else:
|
|
accepted_freetext = True
|
|
|
|
if accepted_level and accepted_freetext:
|
|
self.model.append(msg)
|
|
|
|
def scroll_to_bottom(self):
|
|
self.log.scrollToBottom()
|
|
|
|
def rows_inserted_before(self):
|
|
scrollbar = self.log.verticalScrollBar()
|
|
self.scroll_value = scrollbar.value()
|
|
self.scroll_at_bottom = self.scroll_value == scrollbar.maximum()
|
|
|
|
def rows_inserted_after(self):
|
|
if self.scroll_at_bottom:
|
|
self.log.scrollToBottom()
|
|
|
|
# HACK:
|
|
# Qt intermittently likes to scroll back to the top when rows are removed.
|
|
# Work around this by restoring the scrollbar to the previously memorized
|
|
# position, after the removal.
|
|
# Note that this works because _LogModel always does the insertion right
|
|
# before the removal.
|
|
# TODO: check if this is still required after moving to QTreeView
|
|
def rows_removed(self):
|
|
if self.scroll_at_bottom:
|
|
self.log.scrollToBottom()
|
|
else:
|
|
scrollbar = self.log.verticalScrollBar()
|
|
scrollbar.setValue(self.scroll_value)
|
|
|
|
def copy_to_clipboard(self):
|
|
idx = self.log.selectedIndexes()
|
|
if idx:
|
|
entry = "\n".join(self.model.full_entry(idx[0]))
|
|
QtWidgets.QApplication.clipboard().setText(entry)
|
|
|
|
def save_state(self):
|
|
return {
|
|
"min_level_idx": self.filter_level.currentIndex(),
|
|
"freetext_filter": self.filter_freetext.text(),
|
|
"header": bytes(self.log.header().saveState())
|
|
}
|
|
|
|
def restore_state(self, state):
|
|
try:
|
|
idx = state["min_level_idx"]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self.filter_level.setCurrentIndex(idx)
|
|
|
|
try:
|
|
freetext = state["freetext_filter"]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self.filter_freetext.setText(freetext)
|
|
|
|
try:
|
|
header = state["header"]
|
|
except KeyError:
|
|
pass
|
|
else:
|
|
self.log.header().restoreState(QtCore.QByteArray(header))
|
|
|
|
|
|
class LogDockManager:
|
|
def __init__(self, main_window):
|
|
self.main_window = main_window
|
|
self.docks = dict()
|
|
|
|
def append_message(self, msg):
|
|
for dock in self.docks.values():
|
|
dock.append_message(msg)
|
|
|
|
def create_new_dock(self, add_to_area=True):
|
|
n = 0
|
|
name = "log0"
|
|
while name in self.docks:
|
|
n += 1
|
|
name = "log" + str(n)
|
|
|
|
dock = LogDock(self, name)
|
|
self.docks[name] = dock
|
|
if add_to_area:
|
|
self.main_window.addDockWidget(QtCore.Qt.RightDockWidgetArea, dock)
|
|
dock.setFloating(True)
|
|
dock.sigClosed.connect(partial(self.on_dock_closed, name))
|
|
self.update_closable()
|
|
return dock
|
|
|
|
def on_dock_closed(self, name):
|
|
dock = self.docks[name]
|
|
dock.deleteLater()
|
|
del self.docks[name]
|
|
self.update_closable()
|
|
|
|
def update_closable(self):
|
|
flags = (QtWidgets.QDockWidget.DockWidgetMovable |
|
|
QtWidgets.QDockWidget.DockWidgetFloatable)
|
|
if len(self.docks) > 1:
|
|
flags |= QtWidgets.QDockWidget.DockWidgetClosable
|
|
for dock in self.docks.values():
|
|
dock.setFeatures(flags)
|
|
|
|
def save_state(self):
|
|
return {name: dock.save_state() for name, dock in self.docks.items()}
|
|
|
|
def restore_state(self, state):
|
|
if self.docks:
|
|
raise NotImplementedError
|
|
for name, dock_state in state.items():
|
|
dock = LogDock(self, name)
|
|
self.docks[name] = dock
|
|
dock.restore_state(dock_state)
|
|
self.main_window.addDockWidget(QtCore.Qt.RightDockWidgetArea, dock)
|
|
dock.sigClosed.connect(partial(self.on_dock_closed, name))
|
|
self.update_closable()
|
|
|
|
def first_log_dock(self):
|
|
if self.docks:
|
|
return None
|
|
dock = self.create_new_dock(False)
|
|
return dock
|
|
|
|
|
|
class LogWidgetHandler(logging.Handler):
|
|
def __init__(self, *args, **kwargs):
|
|
logging.Handler.__init__(self, *args, **kwargs)
|
|
self.callback = None
|
|
self.setFormatter(logging.Formatter("%(name)s:%(message)s"))
|
|
|
|
def emit(self, record):
|
|
if self.callback is not None:
|
|
message = self.format(record)
|
|
self.callback((record.levelno, record.source,
|
|
record.created, message))
|
|
|
|
|
|
def init_log(args, local_source):
|
|
root_logger = logging.getLogger()
|
|
root_logger.setLevel(logging.NOTSET) # we use our custom filter only
|
|
flt = SourceFilter(logging.INFO + args.quiet*10 - args.verbose*10,
|
|
local_source)
|
|
handlers = []
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setFormatter(logging.Formatter(
|
|
"%(levelname)s:%(source)s:%(name)s:%(message)s"))
|
|
handlers.append(console_handler)
|
|
|
|
widget_handler = LogWidgetHandler()
|
|
handlers.append(widget_handler)
|
|
|
|
for handler in handlers:
|
|
handler.addFilter(flt)
|
|
root_logger.addHandler(handler)
|
|
|
|
return widget_handler
|