dashboard/moninj: use thread instead of asyncio UDP (#39)

This commit is contained in:
Sebastien Bourdeauducq 2016-04-11 18:09:41 +08:00
parent 1690cb11b3
commit d9e918be49
2 changed files with 36 additions and 34 deletions

View File

@ -1,4 +1,5 @@
import asyncio
import threading
import logging
import socket
import struct
@ -273,34 +274,43 @@ class MonInj(TaskObject):
self.subscriber = Subscriber("devices", self.init_devices)
self.dm = None
self.transport = None
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Never ceasing to disappoint, asyncio has an issue about UDP
# not being supported on Windows (ProactorEventLoop) open since 2014.
self.loop = asyncio.get_event_loop()
self.thread = threading.Thread(target=self.receiver_thread,
daemon=True)
self.thread.start()
async def start(self, server, port):
loop = asyncio.get_event_loop()
await loop.create_datagram_endpoint(lambda: self,
family=socket.AF_INET)
await self.subscriber.connect(server, port)
try:
await self.subscriber.connect(server, port)
try:
TaskObject.start(self)
except:
await self.subscriber.close()
raise
TaskObject.start(self)
except:
self.transport.close()
await self.subscriber.close()
raise
async def stop(self):
await TaskObject.stop(self)
await self.subscriber.close()
if self.transport is not None:
self.transport.close()
self.transport = None
try:
# This is required to make recvfrom terminate in the thread.
# On Linux, this raises "OSError: Transport endpoint is not
# connected", but still has the intended effect.
self.socket.shutdown(socket.SHUT_RDWR)
except OSError:
pass
self.socket.close()
self.thread.join()
def connection_made(self, transport):
self.transport = transport
def receiver_thread(self):
while True:
data, addr = self.socket.recvfrom(2048)
if addr is None:
break
self.loop.call_soon_threadsafe(self.datagram_received, data)
def datagram_received(self, data, addr):
def datagram_received(self, data):
if self.dm is None:
logger.debug("received datagram, but device manager "
"is not present yet")
@ -330,12 +340,6 @@ class MonInj(TaskObject):
except:
logger.warning("failed to process datagram", exc_info=True)
def error_received(self, exc):
logger.warning("datagram endpoint error")
def connection_lost(self, exc):
self.transport = None
def send_to_device(self, data):
if self.dm is None:
logger.debug("cannot sent to device yet, no device manager")
@ -344,10 +348,8 @@ class MonInj(TaskObject):
logger.debug("core device address: %s", ca)
if ca is None:
logger.warning("could not find core device address")
elif self.transport is None:
logger.warning("datagram endpoint not available")
else:
self.transport.sendto(data, (ca, 3250))
self.socket.sendto(data, (ca, 3250))
async def _do(self):
while True:

View File

@ -142,10 +142,9 @@ def main():
atexit_register_coroutine(d_applets.stop)
smgr.register(d_applets)
if os.name != "nt":
d_ttl_dds = moninj.MonInj()
loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify))
atexit_register_coroutine(d_ttl_dds.stop)
d_ttl_dds = moninj.MonInj()
loop.run_until_complete(d_ttl_dds.start(args.server, args.port_notify))
atexit_register_coroutine(d_ttl_dds.stop)
d_schedule = schedule.ScheduleDock(
status_bar, rpc_clients["schedule"], sub_clients["schedule"])
@ -155,10 +154,11 @@ def main():
smgr.register(logmgr)
# lay out docks
right_docks = [d_explorer, d_shortcuts]
if os.name != "nt":
right_docks += [d_ttl_dds.ttl_dock, d_ttl_dds.dds_dock]
right_docks += [d_datasets, d_applets]
right_docks = [
d_explorer, d_shortcuts,
d_ttl_dds.ttl_dock, d_ttl_dds.dds_dock,
d_datasets, d_applets
]
main_window.addDockWidget(QtCore.Qt.RightDockWidgetArea, right_docks[0])
for d1, d2 in zip(right_docks, right_docks[1:]):
main_window.tabifyDockWidget(d1, d2)