forked from M-Labs/artiq
ctlmgr: reinstate lost changes
This commit is contained in:
parent
4a29f0702f
commit
107e2fedf4
|
@ -6,7 +6,7 @@ import socket
|
||||||
|
|
||||||
from artiq.protocols.sync_struct import Subscriber
|
from artiq.protocols.sync_struct import Subscriber
|
||||||
from artiq.protocols.pc_rpc import AsyncioClient
|
from artiq.protocols.pc_rpc import AsyncioClient
|
||||||
from artiq.protocols.logging import parse_log_message, log_with_name
|
from artiq.protocols.logging import LogParser
|
||||||
from artiq.tools import Condition, TaskObject
|
from artiq.tools import Condition, TaskObject
|
||||||
|
|
||||||
|
|
||||||
|
@ -41,14 +41,14 @@ class Controller:
|
||||||
try:
|
try:
|
||||||
targets, _ = remote.get_rpc_id()
|
targets, _ = remote.get_rpc_id()
|
||||||
remote.select_rpc_target(targets[0])
|
remote.select_rpc_target(targets[0])
|
||||||
r = await getattr(remote, method)(*args, **kwargs)
|
r = await getattr(remote, method)()
|
||||||
finally:
|
finally:
|
||||||
remote.close_rpc()
|
remote.close_rpc()
|
||||||
return r
|
return r
|
||||||
|
|
||||||
async def _ping(self):
|
async def _ping(self):
|
||||||
try:
|
try:
|
||||||
ok = await asyncio.wait_for(self.call("ping"),
|
ok = await asyncio.wait_for(self._call_controller("ping"),
|
||||||
self.ping_timeout)
|
self.ping_timeout)
|
||||||
if ok:
|
if ok:
|
||||||
self.retry_timer_cur = self.retry_timer
|
self.retry_timer_cur = self.retry_timer
|
||||||
|
@ -71,21 +71,8 @@ class Controller:
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
|
||||||
async def forward_logs(self, stream):
|
def _get_log_source(self):
|
||||||
source = "controller({})".format(self.name)
|
return "controller({})".format(self.name)
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
entry = (await stream.readline())
|
|
||||||
if not entry:
|
|
||||||
break
|
|
||||||
entry = entry[:-1]
|
|
||||||
level, name, message = parse_log_message(entry.decode())
|
|
||||||
log_with_name(name, level, message, extra={"source": source})
|
|
||||||
except:
|
|
||||||
logger.debug("exception in log forwarding", exc_info=True)
|
|
||||||
break
|
|
||||||
logger.debug("stopped log forwarding of stream %s of %s",
|
|
||||||
stream, self.name)
|
|
||||||
|
|
||||||
async def launcher(self):
|
async def launcher(self):
|
||||||
try:
|
try:
|
||||||
|
@ -96,10 +83,12 @@ class Controller:
|
||||||
self.process = await asyncio.create_subprocess_exec(
|
self.process = await asyncio.create_subprocess_exec(
|
||||||
*shlex.split(self.command),
|
*shlex.split(self.command),
|
||||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
asyncio.ensure_future(self.forward_logs(
|
asyncio.ensure_future(
|
||||||
self.process.stdout))
|
LogParser(self._get_log_source).stream_task(
|
||||||
asyncio.ensure_future(self.forward_logs(
|
self.process.stdout))
|
||||||
self.process.stderr))
|
asyncio.ensure_future(
|
||||||
|
LogParser(self._get_log_source).stream_task(
|
||||||
|
self.process.stderr))
|
||||||
await self._wait_and_ping()
|
await self._wait_and_ping()
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
logger.warning("Controller %s failed to start", self.name)
|
logger.warning("Controller %s failed to start", self.name)
|
||||||
|
|
Loading…
Reference in New Issue