diff --git a/artiq/devices/ctlmgr.py b/artiq/devices/ctlmgr.py index 0a09c7cb8..93b00db04 100644 --- a/artiq/devices/ctlmgr.py +++ b/artiq/devices/ctlmgr.py @@ -6,7 +6,7 @@ import socket from artiq.protocols.sync_struct import Subscriber from artiq.protocols.pc_rpc import AsyncioClient -from artiq.protocols.logging import parse_log_message, log_with_name +from artiq.protocols.logging import LogParser from artiq.tools import Condition, TaskObject @@ -41,14 +41,14 @@ class Controller: try: targets, _ = remote.get_rpc_id() remote.select_rpc_target(targets[0]) - r = await getattr(remote, method)(*args, **kwargs) + r = await getattr(remote, method)() finally: remote.close_rpc() return r async def _ping(self): try: - ok = await asyncio.wait_for(self.call("ping"), + ok = await asyncio.wait_for(self._call_controller("ping"), self.ping_timeout) if ok: self.retry_timer_cur = self.retry_timer @@ -71,21 +71,8 @@ class Controller: else: break - async def forward_logs(self, stream): - source = "controller({})".format(self.name) - while True: - try: - entry = (await stream.readline()) - if not entry: - break - entry = entry[:-1] - level, name, message = parse_log_message(entry.decode()) - log_with_name(name, level, message, extra={"source": source}) - except: - logger.debug("exception in log forwarding", exc_info=True) - break - logger.debug("stopped log forwarding of stream %s of %s", - stream, self.name) + def _get_log_source(self): + return "controller({})".format(self.name) async def launcher(self): try: @@ -96,10 +83,12 @@ class Controller: self.process = await asyncio.create_subprocess_exec( *shlex.split(self.command), stdout=subprocess.PIPE, stderr=subprocess.PIPE) - asyncio.ensure_future(self.forward_logs( - self.process.stdout)) - asyncio.ensure_future(self.forward_logs( - self.process.stderr)) + asyncio.ensure_future( + LogParser(self._get_log_source).stream_task( + self.process.stdout)) + asyncio.ensure_future( + LogParser(self._get_log_source).stream_task( + self.process.stderr)) await self._wait_and_ping() except FileNotFoundError: logger.warning("Controller %s failed to start", self.name)