diff --git a/artiq/applets/simple.py b/artiq/applets/simple.py index 8fc5ceafc..24a76e464 100644 --- a/artiq/applets/simple.py +++ b/artiq/applets/simple.py @@ -1,3 +1,4 @@ +import logging import argparse import asyncio @@ -8,7 +9,13 @@ from artiq.protocols import pyon from artiq.protocols.pipe_ipc import AsyncioChildComm +logger = logging.getLogger(__name__) + + class AppletIPCClient(AsyncioChildComm): + def set_close_cb(self, close_cb): + self.close_cb = close_cb + def write_pyon(self, obj): self.write(pyon.encode(obj).encode() + b"\n") @@ -17,12 +24,37 @@ class AppletIPCClient(AsyncioChildComm): return pyon.decode(line.decode()) async def embed(self, win_id): + # This function is only called when not subscribed to anything, + # so the only normal replies are embed_done and terminate. self.write_pyon({"action": "embed", "win_id": win_id}) reply = await self.read_pyon() - if reply["action"] != "embed_done": - raise ValueError("Got erroneous reply to embed request", - reply) + if reply["action"] == "terminate": + self.close_cb() + elif reply["action"] != "embed_done": + logger.error("unexpected action reply to embed request: %s", + action) + self.close_cb() + + async def listen(self): + while True: + obj = await self.read_pyon() + try: + action = obj["action"] + if action == "terminate": + self.close_cb() + return + else: + raise ValueError("unknown action in applet request") + except: + logger.error("error processing applet request", + exc_info=True) + self.close_cb() + + def subscribe(self, datasets): + self.write_pyon({"action": "subscribe", + "datasets": datasets}) + asyncio.ensure_future(self.listen()) class SimpleApplet: @@ -108,6 +140,7 @@ class SimpleApplet: # Doing embedding the other way around (using QWindow.setParent in the # applet) breaks resizing. if self.args.mode == "embedded": + self.ipc.set_close_cb(self.main_widget.close) win_id = int(self.main_widget.winId()) self.loop.run_until_complete(self.ipc.embed(win_id)) self.main_widget.show() @@ -155,8 +188,7 @@ class SimpleApplet: self.loop.run_until_complete(self.subscriber.connect( self.args.server_notify, self.args.port_notify)) elif self.args.mode == "embedded": - # TODO - pass + self.ipc.subscribe(self.datasets) else: raise NotImplementedError diff --git a/artiq/gui/applets.py b/artiq/gui/applets.py index 755491f17..909069796 100644 --- a/artiq/gui/applets.py +++ b/artiq/gui/applets.py @@ -7,18 +7,14 @@ from functools import partial from quamash import QtCore, QtGui, QtWidgets from pyqtgraph import dockarea -from artiq.protocols import pyon from artiq.protocols.pipe_ipc import AsyncioParentComm +from artiq.protocols import pyon logger = logging.getLogger(__name__) class AppletIPCServer(AsyncioParentComm): - def __init__(self, capture_cb): - AsyncioParentComm.__init__(self) - self.capture_cb = capture_cb - def write_pyon(self, obj): self.write(pyon.encode(obj).encode() + b"\n") @@ -26,25 +22,40 @@ class AppletIPCServer(AsyncioParentComm): line = await self.readline() return pyon.decode(line.decode()) - async def serve(self): - while True: - obj = await self.read_pyon() - try: - action = obj["action"] - if action == "embed": - self.capture_cb(obj["win_id"]) - self.write_pyon({"action": "embed_done"}) - else: - raise ValueError("unknown action in applet request") - except: - logger.warning("error processing applet request", - exc_info=True) - self.write_pyon({"action": "error"}) + async def serve(self, embed_cb): + try: + while True: + obj = await self.read_pyon() + try: + action = obj["action"] + if action == "embed": + embed_cb(obj["win_id"]) + self.write_pyon({"action": "embed_done"}) + elif action == "subscribe": + print("applet subscribed: ", obj["datasets"]) + else: + raise ValueError("unknown action in applet request") + except: + logger.warning("error processing applet request", + exc_info=True) + self.write_pyon({"action": "error"}) + except asyncio.CancelledError: + pass + except: + logger.error("error processing data from applet, " + "server stopped", exc_info=True) + + def start(self, embed_cb): + self.server_task = asyncio.ensure_future(self.serve(embed_cb)) + + async def stop(self): + self.server_task.cancel() + await asyncio.wait([self.server_task]) class AppletDock(dockarea.Dock): def __init__(self, name, command): - dockarea.Dock.__init__(self, "applet" + str(id(self)), # XXX + dockarea.Dock.__init__(self, "applet" + str(id(self)), # TODO label="Applet: " + name, closable=True) self.setMinimumSize(QtCore.QSize(500, 400)) @@ -56,7 +67,7 @@ class AppletDock(dockarea.Dock): self.label.setText("Applet: " + name) async def start(self): - self.ipc = AppletIPCServer(self.capture) + self.ipc = AppletIPCServer() command = self.command.format(python=sys.executable, ipc_address=self.ipc.get_address()) logger.debug("starting command %s for %s", command, self.applet_name) @@ -65,18 +76,18 @@ class AppletDock(dockarea.Dock): except: logger.warning("Applet %s failed to start", self.applet_name, exc_info=True) - asyncio.ensure_future(self.ipc.serve()) + self.ipc.start(self.embed) - def capture(self, win_id): + def embed(self, win_id): logger.debug("capturing window 0x%x for %s", win_id, self.applet_name) - captured_window = QtGui.QWindow.fromWinId(win_id) - captured_widget = QtWidgets.QWidget.createWindowContainer( - captured_window) - self.addWidget(captured_widget) + embed_window = QtGui.QWindow.fromWinId(win_id) + embed_widget = QtWidgets.QWidget.createWindowContainer(embed_window) + self.addWidget(embed_widget) async def terminate(self): - if hasattr(self, "process"): - # TODO: send IPC termination request + if hasattr(self, "ipc"): + await self.ipc.stop() + self.ipc.write_pyon({"action": "terminate"}) try: await asyncio.wait_for(self.ipc.process.wait(), 2.0) except: