forked from M-Labs/artiq
1
0
Fork 0

gui: display status of TTL channels

This commit is contained in:
Sebastien Bourdeauducq 2015-06-05 19:11:41 +08:00
parent 9f9079589e
commit 14cf244c0a
1 changed files with 90 additions and 12 deletions

View File

@ -2,8 +2,9 @@ import asyncio
import logging import logging
import socket import socket
import struct import struct
from operator import itemgetter
from quamash import QtGui from quamash import QtGui, QtCore
from pyqtgraph import dockarea from pyqtgraph import dockarea
from artiq.tools import TaskObject from artiq.tools import TaskObject
@ -12,21 +13,78 @@ from artiq.tools import TaskObject
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class _TTLWidget(QtGui.QFrame):
def __init__(self, force_out, name):
self.force_out = force_out
QtGui.QFrame.__init__(self)
self.setFrameShape(QtGui.QFrame.Panel)
self.setFrameShadow(QtGui.QFrame.Raised)
grid = QtGui.QGridLayout()
self.setLayout(grid)
label = QtGui.QLabel(name)
label.setAlignment(QtCore.Qt.AlignCenter)
grid.addWidget(label, 1, 1)
self._direction = QtGui.QLabel()
self._value = QtGui.QLabel()
self._direction.setAlignment(QtCore.Qt.AlignCenter)
self._value.setAlignment(QtCore.Qt.AlignCenter)
self.set_value(0, False, False)
grid.addWidget(self._direction, 2, 1)
grid.addWidget(self._value, 3, 1, 6, 1)
def set_value(self, value, oe, override):
value = "1" if value else "0"
if override:
value = "<b>" + value + "</b>"
color = " color=\"red\""
else:
color = ""
self._value.setText("<font size=\"9\"{}>{}</font>".format(
color, value))
oe = oe or self.force_out
direction = "OUT" if oe else "IN"
self._direction.setText("<font size=\"1\">" + direction + "</font>")
class _DeviceManager: class _DeviceManager:
def __init__(self, init): def __init__(self, init):
self.comm = None self.ddb = dict()
if "comm" in init: self.ttl_cb = lambda: None
self.comm = init["comm"] self.ttl_widgets = dict()
for k, v in init.items():
self[k] = v
def __setitem__(self, k, v): def __setitem__(self, k, v):
if k == "comm": self.ddb[k] = v
self.comm = v if k in self.ttl_widgets:
del self[k]
if not isinstance(v, dict):
return
try:
if v["type"] == "local" and v["module"] == "artiq.coredevice.ttl":
channel = v["arguments"]["channel"]
force_out = v["class"] == "TTLOut"
self.ttl_widgets[channel] = _TTLWidget(force_out, k)
self.ttl_cb()
except KeyError:
pass
def __delitem__(self, k):
del self.ddb[k]
if k in self.ttl_widgets:
del self.ttl_widgets[k]
self.ttl_cb()
def get_core_addr(self): def get_core_addr(self):
if self.comm is None:
return None
try: try:
return self.comm["arguments"]["host"] comm = self.ddb["comm"]
while isinstance(comm, str):
comm = self.ddb[comm]
return comm["arguments"]["host"]
except KeyError: except KeyError:
return None return None
@ -37,10 +95,16 @@ class MonInjTTLDock(dockarea.Dock, TaskObject):
self.dm = _DeviceManager(dict()) self.dm = _DeviceManager(dict())
self.transport = None self.transport = None
self.grid = QtGui.QGridLayout()
gridw = QtGui.QWidget()
gridw.setLayout(self.grid)
self.addWidget(gridw)
@asyncio.coroutine @asyncio.coroutine
def start(self): def start(self):
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
yield from loop.create_datagram_endpoint(lambda: self, family=socket.AF_INET) yield from loop.create_datagram_endpoint(lambda: self,
family=socket.AF_INET)
TaskObject.start(self) TaskObject.start(self)
@asyncio.coroutine @asyncio.coroutine
@ -54,8 +118,11 @@ class MonInjTTLDock(dockarea.Dock, TaskObject):
self.transport = transport self.transport = transport
def datagram_received(self, data, addr): def datagram_received(self, data, addr):
levels, oe = struct.unpack(">QQ", data) ttl_levels, ttl_oes = struct.unpack(">QQ", data)
print("Received:", hex(levels), hex(oe)) for channel, w in self.dm.ttl_widgets.items():
w.set_value(ttl_levels & (1 << channel),
ttl_oes & (1 << channel),
False)
def error_received(self, exc): def error_received(self, exc):
logger.warning("datagram endpoint error") logger.warning("datagram endpoint error")
@ -76,8 +143,19 @@ class MonInjTTLDock(dockarea.Dock, TaskObject):
# MONINJ_REQ_MONITOR # MONINJ_REQ_MONITOR
self.transport.sendto(b"\x01", (ca, 3250)) self.transport.sendto(b"\x01", (ca, 3250))
def layout_ttl_widgets(self):
w = self.grid.itemAt(0)
while w is not None:
self.grid.removeItem(w)
w = self.grid.itemAt(0)
for i, (_, w) in enumerate(sorted(self.dm.ttl_widgets.items(),
key=itemgetter(0))):
self.grid.addWidget(w, i // 4, i % 4)
def init_devices(self, d): def init_devices(self, d):
self.dm = _DeviceManager(d) self.dm = _DeviceManager(d)
self.dm.ttl_cb = self.layout_ttl_widgets
self.layout_ttl_widgets()
return self.dm return self.dm