forked from M-Labs/artiq
protocols: better workaround for asyncio issue 263
This commit is contained in:
parent
71d2e3a69f
commit
1bc4061620
|
@ -2,7 +2,7 @@ import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from artiq.protocols.asyncio_server import AsyncioServer
|
from artiq.protocols.asyncio_server import AsyncioServer
|
||||||
from artiq.tools import TaskObject
|
from artiq.tools import TaskObject, workaround_asyncio263
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -119,19 +119,12 @@ class LogForwarder(logging.Handler, TaskObject):
|
||||||
try:
|
try:
|
||||||
reader, writer = await asyncio.open_connection(self.host,
|
reader, writer = await asyncio.open_connection(self.host,
|
||||||
self.port)
|
self.port)
|
||||||
detect_close = asyncio.ensure_future(reader.read(1))
|
|
||||||
writer.write(_init_string)
|
writer.write(_init_string)
|
||||||
while True:
|
while True:
|
||||||
message = await self._queue.get() + "\n"
|
message = await self._queue.get() + "\n"
|
||||||
writer.write(message.encode())
|
writer.write(message.encode())
|
||||||
|
await workaround_asyncio263()
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
# HACK: detect connection termination through the completion
|
|
||||||
# of a read operation. For some reason, write/drain operations
|
|
||||||
# on a closed socket do not raise exceptions, but print
|
|
||||||
# "asyncio:socket.send() raised exception."
|
|
||||||
if detect_close.done():
|
|
||||||
await asyncio.sleep(self.reconnect_timer)
|
|
||||||
break
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
return
|
return
|
||||||
except:
|
except:
|
||||||
|
|
|
@ -16,6 +16,7 @@ from functools import partial
|
||||||
|
|
||||||
from artiq.protocols import pyon
|
from artiq.protocols import pyon
|
||||||
from artiq.protocols.asyncio_server import AsyncioServer
|
from artiq.protocols.asyncio_server import AsyncioServer
|
||||||
|
from artiq.tools import workaround_asyncio263
|
||||||
|
|
||||||
|
|
||||||
_init_string = b"ARTIQ sync_struct\n"
|
_init_string = b"ARTIQ sync_struct\n"
|
||||||
|
@ -233,6 +234,7 @@ class Publisher(AsyncioServer):
|
||||||
line = await queue.get()
|
line = await queue.get()
|
||||||
writer.write(line)
|
writer.write(line)
|
||||||
# raise exception on connection error
|
# raise exception on connection error
|
||||||
|
await workaround_asyncio263()
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
finally:
|
finally:
|
||||||
self._recipients[notifier_name].remove(queue)
|
self._recipients[notifier_name].remove(queue)
|
||||||
|
|
|
@ -175,3 +175,9 @@ class Condition:
|
||||||
for fut in self._waiters:
|
for fut in self._waiters:
|
||||||
if not fut.done():
|
if not fut.done():
|
||||||
fut.set_result(False)
|
fut.set_result(False)
|
||||||
|
|
||||||
|
|
||||||
|
# See: https://github.com/python/asyncio/issues/263
|
||||||
|
@asyncio.coroutine
|
||||||
|
def workaround_asyncio263():
|
||||||
|
yield
|
||||||
|
|
Loading…
Reference in New Issue