mirror of https://github.com/m-labs/artiq.git
Merge branch 'worker_pipeipc'
This commit is contained in:
commit
b753306f12
|
@ -12,8 +12,7 @@ import platform
|
||||||
|
|
||||||
from artiq.protocols.sync_struct import Subscriber
|
from artiq.protocols.sync_struct import Subscriber
|
||||||
from artiq.protocols.pc_rpc import AsyncioClient, Server
|
from artiq.protocols.pc_rpc import AsyncioClient, Server
|
||||||
from artiq.protocols.logging import (LogForwarder,
|
from artiq.protocols.logging import (LogForwarder, LogParser,
|
||||||
parse_log_message, log_with_name,
|
|
||||||
SourceFilter)
|
SourceFilter)
|
||||||
from artiq.tools import *
|
from artiq.tools import *
|
||||||
|
|
||||||
|
@ -79,22 +78,8 @@ class Controller:
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
|
||||||
async def forward_logs(self, stream):
|
def _get_log_source(self):
|
||||||
source = "controller({})".format(self.name)
|
return "controller({})".format(self.name)
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
entry = (await stream.readline())
|
|
||||||
if not entry:
|
|
||||||
break
|
|
||||||
entry = entry[:-1]
|
|
||||||
level, name, message = parse_log_message(entry.decode())
|
|
||||||
log_with_name(name, level, message, extra={"source": source})
|
|
||||||
except:
|
|
||||||
logger.debug("exception in log forwarding", exc_info=True)
|
|
||||||
break
|
|
||||||
logger.debug("stopped log forwarding of stream %s of %s",
|
|
||||||
stream, self.name)
|
|
||||||
|
|
||||||
|
|
||||||
async def launcher(self):
|
async def launcher(self):
|
||||||
try:
|
try:
|
||||||
|
@ -105,9 +90,11 @@ class Controller:
|
||||||
self.process = await asyncio.create_subprocess_exec(
|
self.process = await asyncio.create_subprocess_exec(
|
||||||
*shlex.split(self.command),
|
*shlex.split(self.command),
|
||||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
asyncio.ensure_future(self.forward_logs(
|
asyncio.ensure_future(
|
||||||
|
LogParser(self._get_log_source).stream_task(
|
||||||
self.process.stdout))
|
self.process.stdout))
|
||||||
asyncio.ensure_future(self.forward_logs(
|
asyncio.ensure_future(
|
||||||
|
LogParser(self._get_log_source).stream_task(
|
||||||
self.process.stderr))
|
self.process.stderr))
|
||||||
await self._wait_and_ping()
|
await self._wait_and_ping()
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
|
|
|
@ -9,7 +9,7 @@ from artiq.tools import *
|
||||||
from artiq.protocols.pc_rpc import Server as RPCServer
|
from artiq.protocols.pc_rpc import Server as RPCServer
|
||||||
from artiq.protocols.sync_struct import Publisher
|
from artiq.protocols.sync_struct import Publisher
|
||||||
from artiq.protocols.logging import Server as LoggingServer
|
from artiq.protocols.logging import Server as LoggingServer
|
||||||
from artiq.master.log import log_args, init_log, log_worker
|
from artiq.master.log import log_args, init_log
|
||||||
from artiq.master.databases import DeviceDB, DatasetDB
|
from artiq.master.databases import DeviceDB, DatasetDB
|
||||||
from artiq.master.scheduler import Scheduler
|
from artiq.master.scheduler import Scheduler
|
||||||
from artiq.master.worker_db import get_last_rid
|
from artiq.master.worker_db import get_last_rid
|
||||||
|
@ -63,8 +63,7 @@ def main():
|
||||||
repo_backend = GitBackend(args.repository)
|
repo_backend = GitBackend(args.repository)
|
||||||
else:
|
else:
|
||||||
repo_backend = FilesystemBackend(args.repository)
|
repo_backend = FilesystemBackend(args.repository)
|
||||||
experiment_db = ExperimentDB(repo_backend, device_db.get_device_db,
|
experiment_db = ExperimentDB(repo_backend, device_db.get_device_db)
|
||||||
log_worker)
|
|
||||||
atexit.register(experiment_db.close)
|
atexit.register(experiment_db.close)
|
||||||
experiment_db.scan_repository_async()
|
experiment_db.scan_repository_async()
|
||||||
|
|
||||||
|
@ -72,8 +71,7 @@ def main():
|
||||||
"get_device_db": device_db.get_device_db,
|
"get_device_db": device_db.get_device_db,
|
||||||
"get_device": device_db.get,
|
"get_device": device_db.get,
|
||||||
"get_dataset": dataset_db.get,
|
"get_dataset": dataset_db.get,
|
||||||
"update_dataset": dataset_db.update,
|
"update_dataset": dataset_db.update
|
||||||
"log": log_worker
|
|
||||||
}
|
}
|
||||||
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, experiment_db)
|
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, experiment_db)
|
||||||
worker_handlers.update({
|
worker_handlers.update({
|
||||||
|
|
|
@ -3,7 +3,6 @@ import os
|
||||||
import tempfile
|
import tempfile
|
||||||
import shutil
|
import shutil
|
||||||
import logging
|
import logging
|
||||||
from functools import partial
|
|
||||||
|
|
||||||
from artiq.protocols.sync_struct import Notifier
|
from artiq.protocols.sync_struct import Notifier
|
||||||
from artiq.master.worker import (Worker, WorkerInternalException,
|
from artiq.master.worker import (Worker, WorkerInternalException,
|
||||||
|
@ -15,13 +14,10 @@ logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
async def _get_repository_entries(entry_dict,
|
async def _get_repository_entries(entry_dict,
|
||||||
root, filename, get_device_db, log):
|
root, filename, get_device_db):
|
||||||
worker = Worker({
|
worker = Worker({"get_device_db": get_device_db})
|
||||||
"get_device_db": get_device_db,
|
|
||||||
"log": partial(log, "scan", os.path.basename(filename))
|
|
||||||
})
|
|
||||||
try:
|
try:
|
||||||
description = await worker.examine(os.path.join(root, filename))
|
description = await worker.examine("scan", os.path.join(root, filename))
|
||||||
except:
|
except:
|
||||||
log_worker_exception()
|
log_worker_exception()
|
||||||
raise
|
raise
|
||||||
|
@ -49,7 +45,7 @@ async def _get_repository_entries(entry_dict,
|
||||||
entry_dict[name] = entry
|
entry_dict[name] = entry
|
||||||
|
|
||||||
|
|
||||||
async def _scan_experiments(root, get_device_db, log, subdir=""):
|
async def _scan_experiments(root, get_device_db, subdir=""):
|
||||||
entry_dict = dict()
|
entry_dict = dict()
|
||||||
for de in os.scandir(os.path.join(root, subdir)):
|
for de in os.scandir(os.path.join(root, subdir)):
|
||||||
if de.name.startswith("."):
|
if de.name.startswith("."):
|
||||||
|
@ -58,13 +54,13 @@ async def _scan_experiments(root, get_device_db, log, subdir=""):
|
||||||
filename = os.path.join(subdir, de.name)
|
filename = os.path.join(subdir, de.name)
|
||||||
try:
|
try:
|
||||||
await _get_repository_entries(
|
await _get_repository_entries(
|
||||||
entry_dict, root, filename, get_device_db, log)
|
entry_dict, root, filename, get_device_db)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.warning("Skipping file '%s'", filename,
|
logger.warning("Skipping file '%s'", filename,
|
||||||
exc_info=not isinstance(exc, WorkerInternalException))
|
exc_info=not isinstance(exc, WorkerInternalException))
|
||||||
if de.is_dir():
|
if de.is_dir():
|
||||||
subentries = await _scan_experiments(
|
subentries = await _scan_experiments(
|
||||||
root, get_device_db, log,
|
root, get_device_db,
|
||||||
os.path.join(subdir, de.name))
|
os.path.join(subdir, de.name))
|
||||||
entries = {de.name + "/" + k: v for k, v in subentries.items()}
|
entries = {de.name + "/" + k: v for k, v in subentries.items()}
|
||||||
entry_dict.update(entries)
|
entry_dict.update(entries)
|
||||||
|
@ -81,10 +77,9 @@ def _sync_explist(target, source):
|
||||||
|
|
||||||
|
|
||||||
class ExperimentDB:
|
class ExperimentDB:
|
||||||
def __init__(self, repo_backend, get_device_db_fn, log_fn):
|
def __init__(self, repo_backend, get_device_db_fn):
|
||||||
self.repo_backend = repo_backend
|
self.repo_backend = repo_backend
|
||||||
self.get_device_db_fn = get_device_db_fn
|
self.get_device_db_fn = get_device_db_fn
|
||||||
self.log_fn = log_fn
|
|
||||||
|
|
||||||
self.cur_rev = self.repo_backend.get_head_rev()
|
self.cur_rev = self.repo_backend.get_head_rev()
|
||||||
self.repo_backend.request_rev(self.cur_rev)
|
self.repo_backend.request_rev(self.cur_rev)
|
||||||
|
@ -106,8 +101,7 @@ class ExperimentDB:
|
||||||
wd, _ = self.repo_backend.request_rev(new_cur_rev)
|
wd, _ = self.repo_backend.request_rev(new_cur_rev)
|
||||||
self.repo_backend.release_rev(self.cur_rev)
|
self.repo_backend.release_rev(self.cur_rev)
|
||||||
self.cur_rev = new_cur_rev
|
self.cur_rev = new_cur_rev
|
||||||
new_explist = await _scan_experiments(wd, self.get_device_db_fn,
|
new_explist = await _scan_experiments(wd, self.get_device_db_fn)
|
||||||
self.log_fn)
|
|
||||||
|
|
||||||
_sync_explist(self.explist, new_explist)
|
_sync_explist(self.explist, new_explist)
|
||||||
finally:
|
finally:
|
||||||
|
@ -122,12 +116,9 @@ class ExperimentDB:
|
||||||
revision = self.cur_rev
|
revision = self.cur_rev
|
||||||
wd, _ = self.repo_backend.request_rev(revision)
|
wd, _ = self.repo_backend.request_rev(revision)
|
||||||
filename = os.path.join(wd, filename)
|
filename = os.path.join(wd, filename)
|
||||||
worker = Worker({
|
worker = Worker({"get_device_db": self.get_device_db_fn})
|
||||||
"get_device_db": self.get_device_db_fn,
|
|
||||||
"log": partial(self.log_fn, "examine", os.path.basename(filename))
|
|
||||||
})
|
|
||||||
try:
|
try:
|
||||||
description = await worker.examine(filename)
|
description = await worker.examine("examine", filename)
|
||||||
finally:
|
finally:
|
||||||
await worker.close()
|
await worker.close()
|
||||||
if use_repository:
|
if use_repository:
|
||||||
|
|
|
@ -2,7 +2,7 @@ import logging
|
||||||
import logging.handlers
|
import logging.handlers
|
||||||
|
|
||||||
from artiq.protocols.sync_struct import Notifier
|
from artiq.protocols.sync_struct import Notifier
|
||||||
from artiq.protocols.logging import parse_log_message, log_with_name, SourceFilter
|
from artiq.protocols.logging import SourceFilter
|
||||||
|
|
||||||
|
|
||||||
class LogBuffer:
|
class LogBuffer:
|
||||||
|
@ -28,14 +28,6 @@ class LogBufferHandler(logging.Handler):
|
||||||
self.log_buffer.log(record.levelno, record.source, record.created,
|
self.log_buffer.log(record.levelno, record.source, record.created,
|
||||||
part)
|
part)
|
||||||
|
|
||||||
|
|
||||||
def log_worker(rid, filename, message):
|
|
||||||
level, name, message = parse_log_message(message)
|
|
||||||
log_with_name(name, level, message,
|
|
||||||
extra={"source": "worker({},{})".format(rid, filename)})
|
|
||||||
log_worker.worker_pass_runinfo = True
|
|
||||||
|
|
||||||
|
|
||||||
def log_args(parser):
|
def log_args(parser):
|
||||||
group = parser.add_argument_group("logging")
|
group = parser.add_argument_group("logging")
|
||||||
group.add_argument("-v", "--verbose", default=0, action="count",
|
group.add_argument("-v", "--verbose", default=0, action="count",
|
||||||
|
|
|
@ -7,7 +7,8 @@ import traceback
|
||||||
import time
|
import time
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
|
||||||
from artiq.protocols import pyon
|
from artiq.protocols import pipe_ipc, pyon
|
||||||
|
from artiq.protocols.logging import LogParser
|
||||||
from artiq.tools import asyncio_wait_or_cancel
|
from artiq.tools import asyncio_wait_or_cancel
|
||||||
|
|
||||||
|
|
||||||
|
@ -47,7 +48,7 @@ class Worker:
|
||||||
|
|
||||||
self.rid = None
|
self.rid = None
|
||||||
self.filename = None
|
self.filename = None
|
||||||
self.process = None
|
self.ipc = None
|
||||||
self.watchdogs = dict() # wid -> expiration (using time.monotonic)
|
self.watchdogs = dict() # wid -> expiration (using time.monotonic)
|
||||||
|
|
||||||
self.io_lock = asyncio.Lock()
|
self.io_lock = asyncio.Lock()
|
||||||
|
@ -72,15 +73,25 @@ class Worker:
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def _get_log_source(self):
|
||||||
|
return "worker({},{})".format(self.rid, self.filename)
|
||||||
|
|
||||||
async def _create_process(self, log_level):
|
async def _create_process(self, log_level):
|
||||||
await self.io_lock.acquire()
|
await self.io_lock.acquire()
|
||||||
try:
|
try:
|
||||||
if self.closed.is_set():
|
if self.closed.is_set():
|
||||||
raise WorkerError("Attempting to create process after close")
|
raise WorkerError("Attempting to create process after close")
|
||||||
self.process = await asyncio.create_subprocess_exec(
|
self.ipc = pipe_ipc.AsyncioParentComm()
|
||||||
|
await self.ipc.create_subprocess(
|
||||||
sys.executable, "-m", "artiq.master.worker_impl",
|
sys.executable, "-m", "artiq.master.worker_impl",
|
||||||
str(log_level),
|
self.ipc.get_address(), str(log_level),
|
||||||
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||||
|
asyncio.ensure_future(
|
||||||
|
LogParser(self._get_log_source).stream_task(
|
||||||
|
self.ipc.process.stdout))
|
||||||
|
asyncio.ensure_future(
|
||||||
|
LogParser(self._get_log_source).stream_task(
|
||||||
|
self.ipc.process.stderr))
|
||||||
finally:
|
finally:
|
||||||
self.io_lock.release()
|
self.io_lock.release()
|
||||||
|
|
||||||
|
@ -93,15 +104,15 @@ class Worker:
|
||||||
self.closed.set()
|
self.closed.set()
|
||||||
await self.io_lock.acquire()
|
await self.io_lock.acquire()
|
||||||
try:
|
try:
|
||||||
if self.process is None:
|
if self.ipc is None:
|
||||||
# Note the %s - self.rid can be None
|
# Note the %s - self.rid can be None or a user string
|
||||||
logger.debug("worker was not created (RID %s)", self.rid)
|
logger.debug("worker was not created (RID %s)", self.rid)
|
||||||
return
|
return
|
||||||
if self.process.returncode is not None:
|
if self.ipc.process.returncode is not None:
|
||||||
logger.debug("worker already terminated (RID %s)", self.rid)
|
logger.debug("worker already terminated (RID %s)", self.rid)
|
||||||
if self.process.returncode != 0:
|
if self.ipc.process.returncode != 0:
|
||||||
logger.warning("worker finished with status code %d"
|
logger.warning("worker finished with status code %d"
|
||||||
" (RID %s)", self.process.returncode,
|
" (RID %s)", self.ipc.process.returncode,
|
||||||
self.rid)
|
self.rid)
|
||||||
return
|
return
|
||||||
obj = {"action": "terminate"}
|
obj = {"action": "terminate"}
|
||||||
|
@ -111,21 +122,21 @@ class Worker:
|
||||||
logger.debug("failed to send terminate command to worker"
|
logger.debug("failed to send terminate command to worker"
|
||||||
" (RID %s), killing", self.rid, exc_info=True)
|
" (RID %s), killing", self.rid, exc_info=True)
|
||||||
try:
|
try:
|
||||||
self.process.kill()
|
self.ipc.process.kill()
|
||||||
except ProcessLookupError:
|
except ProcessLookupError:
|
||||||
pass
|
pass
|
||||||
await self.process.wait()
|
await self.ipc.process.wait()
|
||||||
return
|
return
|
||||||
try:
|
try:
|
||||||
await asyncio.wait_for(self.process.wait(), term_timeout)
|
await asyncio.wait_for(self.ipc.process.wait(), term_timeout)
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
logger.debug("worker did not exit by itself (RID %s), killing",
|
logger.debug("worker did not exit by itself (RID %s), killing",
|
||||||
self.rid)
|
self.rid)
|
||||||
try:
|
try:
|
||||||
self.process.kill()
|
self.ipc.process.kill()
|
||||||
except ProcessLookupError:
|
except ProcessLookupError:
|
||||||
pass
|
pass
|
||||||
await self.process.wait()
|
await self.ipc.process.wait()
|
||||||
else:
|
else:
|
||||||
logger.debug("worker exited by itself (RID %s)", self.rid)
|
logger.debug("worker exited by itself (RID %s)", self.rid)
|
||||||
finally:
|
finally:
|
||||||
|
@ -134,9 +145,8 @@ class Worker:
|
||||||
async def _send(self, obj, cancellable=True):
|
async def _send(self, obj, cancellable=True):
|
||||||
assert self.io_lock.locked()
|
assert self.io_lock.locked()
|
||||||
line = pyon.encode(obj)
|
line = pyon.encode(obj)
|
||||||
self.process.stdin.write(line.encode())
|
self.ipc.write((line + "\n").encode())
|
||||||
self.process.stdin.write("\n".encode())
|
ifs = [self.ipc.drain()]
|
||||||
ifs = [self.process.stdin.drain()]
|
|
||||||
if cancellable:
|
if cancellable:
|
||||||
ifs.append(self.closed.wait())
|
ifs.append(self.closed.wait())
|
||||||
fs = await asyncio_wait_or_cancel(
|
fs = await asyncio_wait_or_cancel(
|
||||||
|
@ -153,7 +163,7 @@ class Worker:
|
||||||
async def _recv(self, timeout):
|
async def _recv(self, timeout):
|
||||||
assert self.io_lock.locked()
|
assert self.io_lock.locked()
|
||||||
fs = await asyncio_wait_or_cancel(
|
fs = await asyncio_wait_or_cancel(
|
||||||
[self.process.stdout.readline(), self.closed.wait()],
|
[self.ipc.readline(), self.closed.wait()],
|
||||||
timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
|
timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
|
||||||
if all(f.cancelled() for f in fs):
|
if all(f.cancelled() for f in fs):
|
||||||
raise WorkerTimeout("Timeout receiving data from worker")
|
raise WorkerTimeout("Timeout receiving data from worker")
|
||||||
|
@ -193,8 +203,6 @@ class Worker:
|
||||||
func = self.register_experiment
|
func = self.register_experiment
|
||||||
else:
|
else:
|
||||||
func = self.handlers[action]
|
func = self.handlers[action]
|
||||||
if getattr(func, "worker_pass_runinfo", False):
|
|
||||||
func = partial(func, self.rid, self.filename)
|
|
||||||
try:
|
try:
|
||||||
data = func(*obj["args"], **obj["kwargs"])
|
data = func(*obj["args"], **obj["kwargs"])
|
||||||
reply = {"status": "ok", "data": data}
|
reply = {"status": "ok", "data": data}
|
||||||
|
@ -266,7 +274,10 @@ class Worker:
|
||||||
await self._worker_action({"action": "write_results"},
|
await self._worker_action({"action": "write_results"},
|
||||||
timeout)
|
timeout)
|
||||||
|
|
||||||
async def examine(self, file, timeout=20.0):
|
async def examine(self, rid, file, timeout=20.0):
|
||||||
|
self.rid = rid
|
||||||
|
self.filename = os.path.basename(file)
|
||||||
|
|
||||||
await self._create_process(logging.WARNING)
|
await self._create_process(logging.WARNING)
|
||||||
r = dict()
|
r = dict()
|
||||||
def register(class_name, name, arginfo):
|
def register(class_name, name, arginfo):
|
||||||
|
|
|
@ -6,8 +6,8 @@ import traceback
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
|
|
||||||
import artiq
|
import artiq
|
||||||
from artiq.protocols import pyon
|
from artiq.protocols import pipe_ipc, pyon
|
||||||
from artiq.tools import file_import
|
from artiq.tools import multiline_log_config, file_import
|
||||||
from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output
|
from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output
|
||||||
from artiq.language.environment import is_experiment
|
from artiq.language.environment import is_experiment
|
||||||
from artiq.language.core import set_watchdog_factory, TerminationRequested
|
from artiq.language.core import set_watchdog_factory, TerminationRequested
|
||||||
|
@ -15,16 +15,16 @@ from artiq.coredevice.core import CompileError, host_only, _render_diagnostic
|
||||||
from artiq import __version__ as artiq_version
|
from artiq import __version__ as artiq_version
|
||||||
|
|
||||||
|
|
||||||
|
ipc = None
|
||||||
|
|
||||||
def get_object():
|
def get_object():
|
||||||
line = sys.__stdin__.readline()
|
line = ipc.readline().decode()
|
||||||
return pyon.decode(line)
|
return pyon.decode(line)
|
||||||
|
|
||||||
|
|
||||||
def put_object(obj):
|
def put_object(obj):
|
||||||
ds = pyon.encode(obj)
|
ds = pyon.encode(obj)
|
||||||
sys.__stdout__.write(ds)
|
ipc.write((ds + "\n").encode())
|
||||||
sys.__stdout__.write("\n")
|
|
||||||
sys.__stdout__.flush()
|
|
||||||
|
|
||||||
|
|
||||||
class ParentActionError(Exception):
|
class ParentActionError(Exception):
|
||||||
|
@ -53,23 +53,6 @@ def make_parent_action(action, exception=None):
|
||||||
return parent_action
|
return parent_action
|
||||||
|
|
||||||
|
|
||||||
class LogForwarder:
|
|
||||||
def __init__(self):
|
|
||||||
self.buffer = ""
|
|
||||||
|
|
||||||
to_parent = staticmethod(make_parent_action("log"))
|
|
||||||
|
|
||||||
def write(self, data):
|
|
||||||
self.buffer += data
|
|
||||||
while "\n" in self.buffer:
|
|
||||||
i = self.buffer.index("\n")
|
|
||||||
self.to_parent(self.buffer[:i])
|
|
||||||
self.buffer = self.buffer[i+1:]
|
|
||||||
|
|
||||||
def flush(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class ParentDeviceDB:
|
class ParentDeviceDB:
|
||||||
get_device_db = make_parent_action("get_device_db")
|
get_device_db = make_parent_action("get_device_db")
|
||||||
get = make_parent_action("get_device", KeyError)
|
get = make_parent_action("get_device", KeyError)
|
||||||
|
@ -202,9 +185,10 @@ def setup_diagnostics(experiment_file, repository_path):
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
sys.stdout = LogForwarder()
|
global ipc
|
||||||
sys.stderr = LogForwarder()
|
|
||||||
logging.basicConfig(level=int(sys.argv[1]))
|
multiline_log_config(level=int(sys.argv[2]))
|
||||||
|
ipc = pipe_ipc.ChildComm(sys.argv[1])
|
||||||
|
|
||||||
start_time = None
|
start_time = None
|
||||||
rid = None
|
rid = None
|
||||||
|
@ -268,7 +252,11 @@ def main():
|
||||||
# When we get CompileError, a more suitable diagnostic has already
|
# When we get CompileError, a more suitable diagnostic has already
|
||||||
# been printed.
|
# been printed.
|
||||||
if not isinstance(exc, CompileError):
|
if not isinstance(exc, CompileError):
|
||||||
lines = ["Terminating with exception\n"]
|
short_exc_info = type(exc).__name__
|
||||||
|
exc_str = str(exc)
|
||||||
|
if exc_str:
|
||||||
|
short_exc_info += ": " + exc_str
|
||||||
|
lines = ["Terminating with exception ("+short_exc_info+")\n"]
|
||||||
lines += traceback.format_exception_only(type(exc), exc)
|
lines += traceback.format_exception_only(type(exc), exc)
|
||||||
if hasattr(exc, "parent_traceback"):
|
if hasattr(exc, "parent_traceback"):
|
||||||
lines += exc.parent_traceback
|
lines += exc.parent_traceback
|
||||||
|
@ -277,6 +265,7 @@ def main():
|
||||||
put_object({"action": "exception"})
|
put_object({"action": "exception"})
|
||||||
finally:
|
finally:
|
||||||
device_mgr.close_devices()
|
device_mgr.close_devices()
|
||||||
|
ipc.close()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
import re
|
||||||
|
|
||||||
from artiq.protocols.asyncio_server import AsyncioServer
|
from artiq.protocols.asyncio_server import AsyncioServer
|
||||||
from artiq.tools import TaskObject
|
from artiq.tools import TaskObject, MultilineFormatter
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -25,15 +26,64 @@ _name_to_level = {
|
||||||
|
|
||||||
|
|
||||||
def parse_log_message(msg):
|
def parse_log_message(msg):
|
||||||
for name, level in _name_to_level.items():
|
lr = "|".join(_name_to_level.keys())
|
||||||
if msg.startswith(name + ":"):
|
m = re.fullmatch('('+lr+')(<\d+>)?:([^:]*):(.*)', msg)
|
||||||
remainder = msg[len(name) + 1:]
|
if m is None:
|
||||||
|
return 0, logging.INFO, "print", msg
|
||||||
|
level = _name_to_level[m.group(1)]
|
||||||
|
if m.group(2):
|
||||||
|
multiline = int(m.group(2)[1:-1]) - 1
|
||||||
|
else:
|
||||||
|
multiline = 0
|
||||||
|
name = m.group(3)
|
||||||
|
message = m.group(4)
|
||||||
|
return multiline, level, name, message
|
||||||
|
|
||||||
|
|
||||||
|
class LogParser:
|
||||||
|
def __init__(self, source_cb):
|
||||||
|
self.source_cb = source_cb
|
||||||
|
self.multiline_count = 0
|
||||||
|
self.multiline_level = None
|
||||||
|
self.multiline_name = None
|
||||||
|
self.multiline_message = None
|
||||||
|
|
||||||
|
def line_input(self, msg):
|
||||||
|
if self.multiline_count:
|
||||||
|
self.multiline_message += "\n" + msg
|
||||||
|
self.multiline_count -= 1
|
||||||
|
if not self.multiline_count:
|
||||||
|
log_with_name(
|
||||||
|
self.multiline_name,
|
||||||
|
self.multiline_level,
|
||||||
|
self.multiline_message,
|
||||||
|
extra={"source": self.source_cb()})
|
||||||
|
self.multiline_level = None
|
||||||
|
self.multiline_name = None
|
||||||
|
self.multiline_message = None
|
||||||
|
else:
|
||||||
|
multiline, level, name, message = parse_log_message(msg)
|
||||||
|
if multiline:
|
||||||
|
self.multiline_count = multiline
|
||||||
|
self.multiline_level = level
|
||||||
|
self.multiline_name = name
|
||||||
|
self.multiline_message = message
|
||||||
|
else:
|
||||||
|
log_with_name(name, level, message,
|
||||||
|
extra={"source": self.source_cb()})
|
||||||
|
|
||||||
|
async def stream_task(self, stream):
|
||||||
|
while True:
|
||||||
try:
|
try:
|
||||||
idx = remainder.index(":")
|
entry = (await stream.readline())
|
||||||
|
if not entry:
|
||||||
|
break
|
||||||
|
self.line_input(entry[:-1].decode())
|
||||||
except:
|
except:
|
||||||
continue
|
logger.debug("exception in log forwarding", exc_info=True)
|
||||||
return level, remainder[:idx], remainder[idx+1:]
|
break
|
||||||
return logging.INFO, "print", msg
|
logger.debug("stopped log forwarding of stream %s of %s",
|
||||||
|
stream, self.source_cb())
|
||||||
|
|
||||||
|
|
||||||
_init_string = b"ARTIQ logging\n"
|
_init_string = b"ARTIQ logging\n"
|
||||||
|
@ -42,8 +92,10 @@ _init_string = b"ARTIQ logging\n"
|
||||||
class Server(AsyncioServer):
|
class Server(AsyncioServer):
|
||||||
"""Remote logging TCP server.
|
"""Remote logging TCP server.
|
||||||
|
|
||||||
Takes one log entry per line, in the format:
|
Log entries are in the format:
|
||||||
source:levelno:name:message
|
source:levelno<total_lines>:name:message
|
||||||
|
continuation...
|
||||||
|
...continuation
|
||||||
"""
|
"""
|
||||||
async def _handle_connection_cr(self, reader, writer):
|
async def _handle_connection_cr(self, reader, writer):
|
||||||
try:
|
try:
|
||||||
|
@ -51,6 +103,9 @@ class Server(AsyncioServer):
|
||||||
if line != _init_string:
|
if line != _init_string:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
source = None
|
||||||
|
parser = LogParser(lambda: source)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
line = await reader.readline()
|
line = await reader.readline()
|
||||||
if not line:
|
if not line:
|
||||||
|
@ -60,20 +115,16 @@ class Server(AsyncioServer):
|
||||||
except:
|
except:
|
||||||
return
|
return
|
||||||
line = line[:-1]
|
line = line[:-1]
|
||||||
linesplit = line.split(":", 3)
|
if parser.multiline_count:
|
||||||
if len(linesplit) != 4:
|
parser.line_input(line)
|
||||||
|
else:
|
||||||
|
linesplit = line.split(":", maxsplit=1)
|
||||||
|
if len(linesplit) != 2:
|
||||||
logger.warning("received improperly formatted message, "
|
logger.warning("received improperly formatted message, "
|
||||||
"dropping connection")
|
"dropping connection")
|
||||||
return
|
return
|
||||||
source, level, name, message = linesplit
|
source, remainder = linesplit
|
||||||
try:
|
parser.line_input(remainder)
|
||||||
level = int(level)
|
|
||||||
except:
|
|
||||||
logger.warning("received improperly formatted level, "
|
|
||||||
"dropping connection")
|
|
||||||
return
|
|
||||||
log_with_name(name, level, message,
|
|
||||||
extra={"source": source})
|
|
||||||
finally:
|
finally:
|
||||||
writer.close()
|
writer.close()
|
||||||
|
|
||||||
|
@ -100,19 +151,12 @@ class LogForwarder(logging.Handler, TaskObject):
|
||||||
logging.Handler.__init__(self, **kwargs)
|
logging.Handler.__init__(self, **kwargs)
|
||||||
self.host = host
|
self.host = host
|
||||||
self.port = port
|
self.port = port
|
||||||
self.setFormatter(logging.Formatter(
|
self.setFormatter(MultilineFormatter())
|
||||||
"%(name)s:%(message)s"))
|
|
||||||
self._queue = asyncio.Queue(queue_size)
|
self._queue = asyncio.Queue(queue_size)
|
||||||
self.reconnect_timer = reconnect_timer
|
self.reconnect_timer = reconnect_timer
|
||||||
|
|
||||||
def emit(self, record):
|
def emit(self, record):
|
||||||
message = self.format(record)
|
self._queue.put_nowait(record.source + ":" + self.format(record))
|
||||||
for part in message.split("\n"):
|
|
||||||
part = "{}:{}:{}".format(record.source, record.levelno, part)
|
|
||||||
try:
|
|
||||||
self._queue.put_nowait(part)
|
|
||||||
except asyncio.QueueFull:
|
|
||||||
break
|
|
||||||
|
|
||||||
async def _do(self):
|
async def _do(self):
|
||||||
while True:
|
while True:
|
||||||
|
|
|
@ -71,7 +71,7 @@ def _run_experiment(class_name):
|
||||||
"arguments": dict()
|
"arguments": dict()
|
||||||
}
|
}
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
worker = Worker(handlers={"log": lambda message: None})
|
worker = Worker({})
|
||||||
loop.run_until_complete(_call_worker(worker, expid))
|
loop.run_until_complete(_call_worker(worker, expid))
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -133,8 +133,30 @@ def simple_network_args(parser, default_port):
|
||||||
group.add_argument("--port-" + name, default=default, type=int,
|
group.add_argument("--port-" + name, default=default, type=int,
|
||||||
help=h)
|
help=h)
|
||||||
|
|
||||||
|
class MultilineFormatter(logging.Formatter):
|
||||||
|
def __init__(self):
|
||||||
|
logging.Formatter.__init__(
|
||||||
|
self, "%(levelname)s:%(name)s:%(message)s")
|
||||||
|
|
||||||
|
def format(self, record):
|
||||||
|
r = logging.Formatter.format(self, record)
|
||||||
|
linebreaks = r.count("\n")
|
||||||
|
if linebreaks:
|
||||||
|
i = r.index(":")
|
||||||
|
r = r[:i] + "<" + str(linebreaks + 1) + ">" + r[i:]
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
def multiline_log_config(level):
|
||||||
|
root_logger = logging.getLogger()
|
||||||
|
root_logger.setLevel(level)
|
||||||
|
handler = logging.StreamHandler()
|
||||||
|
handler.setFormatter(MultilineFormatter())
|
||||||
|
root_logger.addHandler(handler)
|
||||||
|
|
||||||
|
|
||||||
def init_logger(args):
|
def init_logger(args):
|
||||||
logging.basicConfig(level=logging.WARNING + args.quiet*10 - args.verbose*10)
|
multiline_log_config(level=logging.WARNING + args.quiet*10 - args.verbose*10)
|
||||||
|
|
||||||
|
|
||||||
def bind_address_from_args(args):
|
def bind_address_from_args(args):
|
||||||
|
|
Loading…
Reference in New Issue