From 0d0a05a487c35ad5a62a611f50ebdd41aafb8596 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sat, 7 Feb 2015 01:13:15 +0800 Subject: [PATCH] sync_struct/Subscriber: add before_receive_cb, export reader/writer --- artiq/protocols/sync_struct.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/artiq/protocols/sync_struct.py b/artiq/protocols/sync_struct.py index b85397d69..95b390faa 100644 --- a/artiq/protocols/sync_struct.py +++ b/artiq/protocols/sync_struct.py @@ -64,17 +64,19 @@ class Subscriber: self.notify_cb = notify_cb @asyncio.coroutine - def connect(self, host, port): - self._reader, self._writer = \ + def connect(self, host, port, before_receive_cb=None): + self.reader, self.writer = \ yield from asyncio.open_connection(host, port) try: - self._writer.write(_init_string) - self._writer.write((self.notifier_name + "\n").encode()) + if before_receive_cb is not None: + before_receive_cb() + self.writer.write(_init_string) + self.writer.write((self.notifier_name + "\n").encode()) self.receive_task = asyncio.Task(self._receive_cr()) except: - self._writer.close() - del self._reader - del self._writer + self.writer.close() + del self.reader + del self.writer raise @asyncio.coroutine @@ -86,15 +88,15 @@ class Subscriber: except asyncio.CancelledError: pass finally: - self._writer.close() - del self._reader - del self._writer + self.writer.close() + del self.reader + del self.writer @asyncio.coroutine def _receive_cr(self): targets = [] while True: - line = yield from self._reader.readline() + line = yield from self.reader.readline() if not line: return mod = pyon.decode(line.decode())