forked from M-Labs/artiq
applets: handle dataset mutations
This commit is contained in:
parent
f25b5442e7
commit
de99e7f830
|
@ -4,7 +4,7 @@ import asyncio
|
||||||
|
|
||||||
from quamash import QEventLoop, QtWidgets, QtGui, QtCore
|
from quamash import QEventLoop, QtWidgets, QtGui, QtCore
|
||||||
|
|
||||||
from artiq.protocols.sync_struct import Subscriber
|
from artiq.protocols.sync_struct import Subscriber, process_mod
|
||||||
from artiq.protocols import pyon
|
from artiq.protocols import pyon
|
||||||
from artiq.protocols.pipe_ipc import AsyncioChildComm
|
from artiq.protocols.pipe_ipc import AsyncioChildComm
|
||||||
|
|
||||||
|
@ -37,6 +37,7 @@ class AppletIPCClient(AsyncioChildComm):
|
||||||
self.close_cb()
|
self.close_cb()
|
||||||
|
|
||||||
async def listen(self):
|
async def listen(self):
|
||||||
|
data = None
|
||||||
while True:
|
while True:
|
||||||
obj = await self.read_pyon()
|
obj = await self.read_pyon()
|
||||||
try:
|
try:
|
||||||
|
@ -44,6 +45,13 @@ class AppletIPCClient(AsyncioChildComm):
|
||||||
if action == "terminate":
|
if action == "terminate":
|
||||||
self.close_cb()
|
self.close_cb()
|
||||||
return
|
return
|
||||||
|
elif action == "mod":
|
||||||
|
mod = obj["mod"]
|
||||||
|
if mod["action"] == "init":
|
||||||
|
data = self.init_cb(mod["struct"])
|
||||||
|
else:
|
||||||
|
process_mod(data, mod)
|
||||||
|
self.mod_cb(mod)
|
||||||
else:
|
else:
|
||||||
raise ValueError("unknown action in applet request")
|
raise ValueError("unknown action in applet request")
|
||||||
except:
|
except:
|
||||||
|
@ -51,9 +59,11 @@ class AppletIPCClient(AsyncioChildComm):
|
||||||
exc_info=True)
|
exc_info=True)
|
||||||
self.close_cb()
|
self.close_cb()
|
||||||
|
|
||||||
def subscribe(self, datasets):
|
def subscribe(self, datasets, init_cb, mod_cb):
|
||||||
self.write_pyon({"action": "subscribe",
|
self.write_pyon({"action": "subscribe",
|
||||||
"datasets": datasets})
|
"datasets": datasets})
|
||||||
|
self.init_cb = init_cb
|
||||||
|
self.mod_cb = mod_cb
|
||||||
asyncio.ensure_future(self.listen())
|
asyncio.ensure_future(self.listen())
|
||||||
|
|
||||||
|
|
||||||
|
@ -188,7 +198,7 @@ class SimpleApplet:
|
||||||
self.loop.run_until_complete(self.subscriber.connect(
|
self.loop.run_until_complete(self.subscriber.connect(
|
||||||
self.args.server_notify, self.args.port_notify))
|
self.args.server_notify, self.args.port_notify))
|
||||||
elif self.args.mode == "embedded":
|
elif self.args.mode == "embedded":
|
||||||
self.ipc.subscribe(self.datasets)
|
self.ipc.subscribe(self.datasets, self.sub_init, self.sub_mod)
|
||||||
else:
|
else:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
|
@ -111,7 +111,7 @@ def main():
|
||||||
|
|
||||||
d_datasets = datasets.DatasetsDock(win, dock_area, sub_clients["datasets"])
|
d_datasets = datasets.DatasetsDock(win, dock_area, sub_clients["datasets"])
|
||||||
|
|
||||||
d_applets = applets.AppletsDock(dock_area)
|
d_applets = applets.AppletsDock(dock_area, sub_clients["datasets"])
|
||||||
atexit_register_coroutine(d_applets.stop)
|
atexit_register_coroutine(d_applets.stop)
|
||||||
smgr.register(d_applets)
|
smgr.register(d_applets)
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,11 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class AppletIPCServer(AsyncioParentComm):
|
class AppletIPCServer(AsyncioParentComm):
|
||||||
|
def __init__(self, datasets_sub):
|
||||||
|
AsyncioParentComm.__init__(self)
|
||||||
|
self.datasets_sub = datasets_sub
|
||||||
|
self.datasets = set()
|
||||||
|
|
||||||
def write_pyon(self, obj):
|
def write_pyon(self, obj):
|
||||||
self.write(pyon.encode(obj).encode() + b"\n")
|
self.write(pyon.encode(obj).encode() + b"\n")
|
||||||
|
|
||||||
|
@ -22,7 +27,25 @@ class AppletIPCServer(AsyncioParentComm):
|
||||||
line = await self.readline()
|
line = await self.readline()
|
||||||
return pyon.decode(line.decode())
|
return pyon.decode(line.decode())
|
||||||
|
|
||||||
|
def _synthesize_init(self, data):
|
||||||
|
struct = {k: v for k, v in data.items() if k in self.datasets}
|
||||||
|
return {"action": "init",
|
||||||
|
"struct": struct}
|
||||||
|
|
||||||
|
def _on_mod(self, mod):
|
||||||
|
if mod["action"] == "init":
|
||||||
|
mod = self._synthesize_init(mod["struct"])
|
||||||
|
else:
|
||||||
|
if mod["path"]:
|
||||||
|
if mod["path"][0] not in self.datasets:
|
||||||
|
return
|
||||||
|
elif mod["action"] in {"setitem", "delitem"}:
|
||||||
|
if mod["key"] not in self.datasets:
|
||||||
|
return
|
||||||
|
self.write_pyon({"action": "mod", "mod": mod})
|
||||||
|
|
||||||
async def serve(self, embed_cb):
|
async def serve(self, embed_cb):
|
||||||
|
self.datasets_sub.notify_cbs.append(self._on_mod)
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
obj = await self.read_pyon()
|
obj = await self.read_pyon()
|
||||||
|
@ -32,7 +55,11 @@ class AppletIPCServer(AsyncioParentComm):
|
||||||
embed_cb(obj["win_id"])
|
embed_cb(obj["win_id"])
|
||||||
self.write_pyon({"action": "embed_done"})
|
self.write_pyon({"action": "embed_done"})
|
||||||
elif action == "subscribe":
|
elif action == "subscribe":
|
||||||
print("applet subscribed: ", obj["datasets"])
|
self.datasets = obj["datasets"]
|
||||||
|
if self.datasets_sub.model is not None:
|
||||||
|
mod = self._synthesize_init(
|
||||||
|
self.datasets_sub.model.backing_store)
|
||||||
|
self.write_pyon({"action": "mod", "mod": mod})
|
||||||
else:
|
else:
|
||||||
raise ValueError("unknown action in applet request")
|
raise ValueError("unknown action in applet request")
|
||||||
except:
|
except:
|
||||||
|
@ -44,6 +71,8 @@ class AppletIPCServer(AsyncioParentComm):
|
||||||
except:
|
except:
|
||||||
logger.error("error processing data from applet, "
|
logger.error("error processing data from applet, "
|
||||||
"server stopped", exc_info=True)
|
"server stopped", exc_info=True)
|
||||||
|
finally:
|
||||||
|
self.datasets_sub.notify_cbs.remove(self._on_mod)
|
||||||
|
|
||||||
def start(self, embed_cb):
|
def start(self, embed_cb):
|
||||||
self.server_task = asyncio.ensure_future(self.serve(embed_cb))
|
self.server_task = asyncio.ensure_future(self.serve(embed_cb))
|
||||||
|
@ -54,11 +83,12 @@ class AppletIPCServer(AsyncioParentComm):
|
||||||
|
|
||||||
|
|
||||||
class AppletDock(dockarea.Dock):
|
class AppletDock(dockarea.Dock):
|
||||||
def __init__(self, uid, name, command):
|
def __init__(self, datasets_sub, uid, name, command):
|
||||||
dockarea.Dock.__init__(self, "applet" + str(uid),
|
dockarea.Dock.__init__(self, "applet" + str(uid),
|
||||||
label="Applet: " + name,
|
label="Applet: " + name,
|
||||||
closable=True)
|
closable=True)
|
||||||
self.setMinimumSize(QtCore.QSize(500, 400))
|
self.setMinimumSize(QtCore.QSize(500, 400))
|
||||||
|
self.datasets_sub = datasets_sub
|
||||||
self.applet_name = name
|
self.applet_name = name
|
||||||
self.command = command
|
self.command = command
|
||||||
|
|
||||||
|
@ -67,7 +97,7 @@ class AppletDock(dockarea.Dock):
|
||||||
self.label.setText("Applet: " + name)
|
self.label.setText("Applet: " + name)
|
||||||
|
|
||||||
async def start(self):
|
async def start(self):
|
||||||
self.ipc = AppletIPCServer()
|
self.ipc = AppletIPCServer(self.datasets_sub)
|
||||||
command = self.command.format(python=sys.executable,
|
command = self.command.format(python=sys.executable,
|
||||||
ipc_address=self.ipc.get_address())
|
ipc_address=self.ipc.get_address())
|
||||||
logger.debug("starting command %s for %s", command, self.applet_name)
|
logger.debug("starting command %s for %s", command, self.applet_name)
|
||||||
|
@ -122,8 +152,9 @@ _templates = [
|
||||||
|
|
||||||
|
|
||||||
class AppletsDock(dockarea.Dock):
|
class AppletsDock(dockarea.Dock):
|
||||||
def __init__(self, dock_area):
|
def __init__(self, dock_area, datasets_sub):
|
||||||
self.dock_area = dock_area
|
self.dock_area = dock_area
|
||||||
|
self.datasets_sub = datasets_sub
|
||||||
self.dock_to_checkbox = dict()
|
self.dock_to_checkbox = dict()
|
||||||
self.applet_uids = set()
|
self.applet_uids = set()
|
||||||
self.workaround_pyqtgraph_bug = False
|
self.workaround_pyqtgraph_bug = False
|
||||||
|
@ -170,7 +201,7 @@ class AppletsDock(dockarea.Dock):
|
||||||
self.table.cellChanged.connect(self.cell_changed)
|
self.table.cellChanged.connect(self.cell_changed)
|
||||||
|
|
||||||
def create(self, uid, name, command):
|
def create(self, uid, name, command):
|
||||||
dock = AppletDock(uid, name, command)
|
dock = AppletDock(self.datasets_sub, uid, name, command)
|
||||||
# If a dock is floated and then dock state is restored, pyqtgraph
|
# If a dock is floated and then dock state is restored, pyqtgraph
|
||||||
# leaves a "phantom" window open.
|
# leaves a "phantom" window open.
|
||||||
if self.workaround_pyqtgraph_bug:
|
if self.workaround_pyqtgraph_bug:
|
||||||
|
|
Loading…
Reference in New Issue