From d9e918be49a4ac242745c1c97b23eb68e46daa44 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Mon, 11 Apr 2016 18:09:41 +0800 Subject: [PATCH] dashboard/moninj: use thread instead of asyncio UDP (#39) --- artiq/dashboard/moninj.py | 54 ++++++++++++++++--------------- artiq/frontend/artiq_dashboard.py | 16 ++++----- 2 files changed, 36 insertions(+), 34 deletions(-) diff --git a/artiq/dashboard/moninj.py b/artiq/dashboard/moninj.py index 5fd5ad902..00a1a85ee 100644 --- a/artiq/dashboard/moninj.py +++ b/artiq/dashboard/moninj.py @@ -1,4 +1,5 @@ import asyncio +import threading import logging import socket import struct @@ -273,34 +274,43 @@ class MonInj(TaskObject): self.subscriber = Subscriber("devices", self.init_devices) self.dm = None - self.transport = None + self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # Never ceasing to disappoint, asyncio has an issue about UDP + # not being supported on Windows (ProactorEventLoop) open since 2014. + self.loop = asyncio.get_event_loop() + self.thread = threading.Thread(target=self.receiver_thread, + daemon=True) + self.thread.start() async def start(self, server, port): - loop = asyncio.get_event_loop() - await loop.create_datagram_endpoint(lambda: self, - family=socket.AF_INET) + await self.subscriber.connect(server, port) try: - await self.subscriber.connect(server, port) - try: - TaskObject.start(self) - except: - await self.subscriber.close() - raise + TaskObject.start(self) except: - self.transport.close() + await self.subscriber.close() raise async def stop(self): await TaskObject.stop(self) await self.subscriber.close() - if self.transport is not None: - self.transport.close() - self.transport = None + try: + # This is required to make recvfrom terminate in the thread. + # On Linux, this raises "OSError: Transport endpoint is not + # connected", but still has the intended effect. + self.socket.shutdown(socket.SHUT_RDWR) + except OSError: + pass + self.socket.close() + self.thread.join() - def connection_made(self, transport): - self.transport = transport + def receiver_thread(self): + while True: + data, addr = self.socket.recvfrom(2048) + if addr is None: + break + self.loop.call_soon_threadsafe(self.datagram_received, data) - def datagram_received(self, data, addr): + def datagram_received(self, data): if self.dm is None: logger.debug("received datagram, but device manager " "is not present yet") @@ -330,12 +340,6 @@ class MonInj(TaskObject): except: logger.warning("failed to process datagram", exc_info=True) - def error_received(self, exc): - logger.warning("datagram endpoint error") - - def connection_lost(self, exc): - self.transport = None - def send_to_device(self, data): if self.dm is None: logger.debug("cannot sent to device yet, no device manager") @@ -344,10 +348,8 @@ class MonInj(TaskObject): logger.debug("core device address: %s", ca) if ca is None: logger.warning("could not find core device address") - elif self.transport is None: - logger.warning("datagram endpoint not available") else: - self.transport.sendto(data, (ca, 3250)) + self.socket.sendto(data, (ca, 3250)) async def _do(self): while True: diff --git a/artiq/frontend/artiq_dashboard.py b/artiq/frontend/artiq_dashboard.py index 78769cdd6..49c0838c9 100755 --- a/artiq/frontend/artiq_dashboard.py +++ b/artiq/frontend/artiq_dashboard.py @@ -142,10 +142,9 @@ def main(): atexit_register_coroutine(d_applets.stop) smgr.register(d_applets) - if os.name != "nt": - d_ttl_dds = moninj.MonInj() - loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify)) - atexit_register_coroutine(d_ttl_dds.stop) + d_ttl_dds = moninj.MonInj() + loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify)) + atexit_register_coroutine(d_ttl_dds.stop) d_schedule = schedule.ScheduleDock( status_bar, rpc_clients["schedule"], sub_clients["schedule"]) @@ -155,10 +154,11 @@ def main(): smgr.register(logmgr) # lay out docks - right_docks = [d_explorer, d_shortcuts] - if os.name != "nt": - right_docks += [d_ttl_dds.ttl_dock, d_ttl_dds.dds_dock] - right_docks += [d_datasets, d_applets] + right_docks = [ + d_explorer, d_shortcuts, + d_ttl_dds.ttl_dock, d_ttl_dds.dds_dock, + d_datasets, d_applets + ] main_window.addDockWidget(QtCore.Qt.RightDockWidgetArea, right_docks[0]) for d1, d2 in zip(right_docks, right_docks[1:]): main_window.tabifyDockWidget(d1, d2)