forked from M-Labs/artiq
1
0
Fork 0

sync_struct/Subscriber: add before_receive_cb, export reader/writer

This commit is contained in:
Sebastien Bourdeauducq 2015-02-07 01:13:15 +08:00
parent 8979d9d5e7
commit 0d0a05a487
1 changed files with 13 additions and 11 deletions

View File

@ -64,17 +64,19 @@ class Subscriber:
self.notify_cb = notify_cb self.notify_cb = notify_cb
@asyncio.coroutine @asyncio.coroutine
def connect(self, host, port): def connect(self, host, port, before_receive_cb=None):
self._reader, self._writer = \ self.reader, self.writer = \
yield from asyncio.open_connection(host, port) yield from asyncio.open_connection(host, port)
try: try:
self._writer.write(_init_string) if before_receive_cb is not None:
self._writer.write((self.notifier_name + "\n").encode()) before_receive_cb()
self.writer.write(_init_string)
self.writer.write((self.notifier_name + "\n").encode())
self.receive_task = asyncio.Task(self._receive_cr()) self.receive_task = asyncio.Task(self._receive_cr())
except: except:
self._writer.close() self.writer.close()
del self._reader del self.reader
del self._writer del self.writer
raise raise
@asyncio.coroutine @asyncio.coroutine
@ -86,15 +88,15 @@ class Subscriber:
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
finally: finally:
self._writer.close() self.writer.close()
del self._reader del self.reader
del self._writer del self.writer
@asyncio.coroutine @asyncio.coroutine
def _receive_cr(self): def _receive_cr(self):
targets = [] targets = []
while True: while True:
line = yield from self._reader.readline() line = yield from self.reader.readline()
if not line: if not line:
return return
mod = pyon.decode(line.decode()) mod = pyon.decode(line.decode())