forked from M-Labs/artiq
1
0
Fork 0

ctlmgr: reinstate lost changes

This commit is contained in:
Robert Jördens 2016-01-29 17:20:07 -07:00
parent 4a29f0702f
commit 107e2fedf4
1 changed files with 11 additions and 22 deletions

View File

@ -6,7 +6,7 @@ import socket
from artiq.protocols.sync_struct import Subscriber from artiq.protocols.sync_struct import Subscriber
from artiq.protocols.pc_rpc import AsyncioClient from artiq.protocols.pc_rpc import AsyncioClient
from artiq.protocols.logging import parse_log_message, log_with_name from artiq.protocols.logging import LogParser
from artiq.tools import Condition, TaskObject from artiq.tools import Condition, TaskObject
@ -41,14 +41,14 @@ class Controller:
try: try:
targets, _ = remote.get_rpc_id() targets, _ = remote.get_rpc_id()
remote.select_rpc_target(targets[0]) remote.select_rpc_target(targets[0])
r = await getattr(remote, method)(*args, **kwargs) r = await getattr(remote, method)()
finally: finally:
remote.close_rpc() remote.close_rpc()
return r return r
async def _ping(self): async def _ping(self):
try: try:
ok = await asyncio.wait_for(self.call("ping"), ok = await asyncio.wait_for(self._call_controller("ping"),
self.ping_timeout) self.ping_timeout)
if ok: if ok:
self.retry_timer_cur = self.retry_timer self.retry_timer_cur = self.retry_timer
@ -71,21 +71,8 @@ class Controller:
else: else:
break break
async def forward_logs(self, stream): def _get_log_source(self):
source = "controller({})".format(self.name) return "controller({})".format(self.name)
while True:
try:
entry = (await stream.readline())
if not entry:
break
entry = entry[:-1]
level, name, message = parse_log_message(entry.decode())
log_with_name(name, level, message, extra={"source": source})
except:
logger.debug("exception in log forwarding", exc_info=True)
break
logger.debug("stopped log forwarding of stream %s of %s",
stream, self.name)
async def launcher(self): async def launcher(self):
try: try:
@ -96,10 +83,12 @@ class Controller:
self.process = await asyncio.create_subprocess_exec( self.process = await asyncio.create_subprocess_exec(
*shlex.split(self.command), *shlex.split(self.command),
stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdout=subprocess.PIPE, stderr=subprocess.PIPE)
asyncio.ensure_future(self.forward_logs( asyncio.ensure_future(
self.process.stdout)) LogParser(self._get_log_source).stream_task(
asyncio.ensure_future(self.forward_logs( self.process.stdout))
self.process.stderr)) asyncio.ensure_future(
LogParser(self._get_log_source).stream_task(
self.process.stderr))
await self._wait_and_ping() await self._wait_and_ping()
except FileNotFoundError: except FileNotFoundError:
logger.warning("Controller %s failed to start", self.name) logger.warning("Controller %s failed to start", self.name)