forked from M-Labs/artiq
1
0
Fork 0

ctlmgr: reinstate lost changes

This commit is contained in:
Robert Jördens 2016-01-29 17:20:07 -07:00
parent 4a29f0702f
commit 107e2fedf4
1 changed files with 11 additions and 22 deletions

View File

@ -6,7 +6,7 @@ import socket
from artiq.protocols.sync_struct import Subscriber
from artiq.protocols.pc_rpc import AsyncioClient
from artiq.protocols.logging import parse_log_message, log_with_name
from artiq.protocols.logging import LogParser
from artiq.tools import Condition, TaskObject
@ -41,14 +41,14 @@ class Controller:
try:
targets, _ = remote.get_rpc_id()
remote.select_rpc_target(targets[0])
r = await getattr(remote, method)(*args, **kwargs)
r = await getattr(remote, method)()
finally:
remote.close_rpc()
return r
async def _ping(self):
try:
ok = await asyncio.wait_for(self.call("ping"),
ok = await asyncio.wait_for(self._call_controller("ping"),
self.ping_timeout)
if ok:
self.retry_timer_cur = self.retry_timer
@ -71,21 +71,8 @@ class Controller:
else:
break
async def forward_logs(self, stream):
source = "controller({})".format(self.name)
while True:
try:
entry = (await stream.readline())
if not entry:
break
entry = entry[:-1]
level, name, message = parse_log_message(entry.decode())
log_with_name(name, level, message, extra={"source": source})
except:
logger.debug("exception in log forwarding", exc_info=True)
break
logger.debug("stopped log forwarding of stream %s of %s",
stream, self.name)
def _get_log_source(self):
return "controller({})".format(self.name)
async def launcher(self):
try:
@ -96,9 +83,11 @@ class Controller:
self.process = await asyncio.create_subprocess_exec(
*shlex.split(self.command),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
asyncio.ensure_future(self.forward_logs(
asyncio.ensure_future(
LogParser(self._get_log_source).stream_task(
self.process.stdout))
asyncio.ensure_future(self.forward_logs(
asyncio.ensure_future(
LogParser(self._get_log_source).stream_task(
self.process.stderr))
await self._wait_and_ping()
except FileNotFoundError: