mirror of
https://github.com/m-labs/artiq.git
synced 2024-12-25 03:08:27 +08:00
refactor logging and implement in worker
This commit is contained in:
parent
a583a923d8
commit
5aa4de8e89
@ -12,8 +12,7 @@ import platform
|
||||
|
||||
from artiq.protocols.sync_struct import Subscriber
|
||||
from artiq.protocols.pc_rpc import AsyncioClient, Server
|
||||
from artiq.protocols.logging import (LogForwarder,
|
||||
parse_log_message, log_with_name,
|
||||
from artiq.protocols.logging import (LogForwarder, LogParser,
|
||||
SourceFilter)
|
||||
from artiq.tools import *
|
||||
|
||||
@ -79,22 +78,8 @@ class Controller:
|
||||
else:
|
||||
break
|
||||
|
||||
async def forward_logs(self, stream):
|
||||
source = "controller({})".format(self.name)
|
||||
while True:
|
||||
try:
|
||||
entry = (await stream.readline())
|
||||
if not entry:
|
||||
break
|
||||
entry = entry[:-1]
|
||||
level, name, message = parse_log_message(entry.decode())
|
||||
log_with_name(name, level, message, extra={"source": source})
|
||||
except:
|
||||
logger.debug("exception in log forwarding", exc_info=True)
|
||||
break
|
||||
logger.debug("stopped log forwarding of stream %s of %s",
|
||||
stream, self.name)
|
||||
|
||||
def _get_log_source(self):
|
||||
return "controller({})".format(self.name)
|
||||
|
||||
async def launcher(self):
|
||||
try:
|
||||
@ -105,10 +90,12 @@ class Controller:
|
||||
self.process = await asyncio.create_subprocess_exec(
|
||||
*shlex.split(self.command),
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
asyncio.ensure_future(self.forward_logs(
|
||||
self.process.stdout))
|
||||
asyncio.ensure_future(self.forward_logs(
|
||||
self.process.stderr))
|
||||
asyncio.ensure_future(
|
||||
LogParser(self._get_log_source).stream_task(
|
||||
self.process.stdout))
|
||||
asyncio.ensure_future(
|
||||
LogParser(self._get_log_source).stream_task(
|
||||
self.process.stderr))
|
||||
await self._wait_and_ping()
|
||||
except FileNotFoundError:
|
||||
logger.warning("Controller %s failed to start", self.name)
|
||||
|
@ -9,7 +9,7 @@ from artiq.tools import *
|
||||
from artiq.protocols.pc_rpc import Server as RPCServer
|
||||
from artiq.protocols.sync_struct import Publisher
|
||||
from artiq.protocols.logging import Server as LoggingServer
|
||||
from artiq.master.log import log_args, init_log, log_worker
|
||||
from artiq.master.log import log_args, init_log
|
||||
from artiq.master.databases import DeviceDB, DatasetDB
|
||||
from artiq.master.scheduler import Scheduler
|
||||
from artiq.master.worker_db import get_last_rid
|
||||
@ -63,8 +63,7 @@ def main():
|
||||
repo_backend = GitBackend(args.repository)
|
||||
else:
|
||||
repo_backend = FilesystemBackend(args.repository)
|
||||
experiment_db = ExperimentDB(repo_backend, device_db.get_device_db,
|
||||
log_worker)
|
||||
experiment_db = ExperimentDB(repo_backend, device_db.get_device_db)
|
||||
atexit.register(experiment_db.close)
|
||||
experiment_db.scan_repository_async()
|
||||
|
||||
@ -72,8 +71,7 @@ def main():
|
||||
"get_device_db": device_db.get_device_db,
|
||||
"get_device": device_db.get,
|
||||
"get_dataset": dataset_db.get,
|
||||
"update_dataset": dataset_db.update,
|
||||
"log": log_worker
|
||||
"update_dataset": dataset_db.update
|
||||
}
|
||||
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, experiment_db)
|
||||
worker_handlers.update({
|
||||
|
@ -3,7 +3,6 @@ import os
|
||||
import tempfile
|
||||
import shutil
|
||||
import logging
|
||||
from functools import partial
|
||||
|
||||
from artiq.protocols.sync_struct import Notifier
|
||||
from artiq.master.worker import (Worker, WorkerInternalException,
|
||||
@ -15,13 +14,10 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def _get_repository_entries(entry_dict,
|
||||
root, filename, get_device_db, log):
|
||||
worker = Worker({
|
||||
"get_device_db": get_device_db,
|
||||
"log": partial(log, "scan", os.path.basename(filename))
|
||||
})
|
||||
root, filename, get_device_db):
|
||||
worker = Worker({"get_device_db": get_device_db})
|
||||
try:
|
||||
description = await worker.examine(os.path.join(root, filename))
|
||||
description = await worker.examine("scan", os.path.join(root, filename))
|
||||
except:
|
||||
log_worker_exception()
|
||||
raise
|
||||
@ -49,7 +45,7 @@ async def _get_repository_entries(entry_dict,
|
||||
entry_dict[name] = entry
|
||||
|
||||
|
||||
async def _scan_experiments(root, get_device_db, log, subdir=""):
|
||||
async def _scan_experiments(root, get_device_db, subdir=""):
|
||||
entry_dict = dict()
|
||||
for de in os.scandir(os.path.join(root, subdir)):
|
||||
if de.name.startswith("."):
|
||||
@ -58,13 +54,13 @@ async def _scan_experiments(root, get_device_db, log, subdir=""):
|
||||
filename = os.path.join(subdir, de.name)
|
||||
try:
|
||||
await _get_repository_entries(
|
||||
entry_dict, root, filename, get_device_db, log)
|
||||
entry_dict, root, filename, get_device_db)
|
||||
except Exception as exc:
|
||||
logger.warning("Skipping file '%s'", filename,
|
||||
exc_info=not isinstance(exc, WorkerInternalException))
|
||||
if de.is_dir():
|
||||
subentries = await _scan_experiments(
|
||||
root, get_device_db, log,
|
||||
root, get_device_db,
|
||||
os.path.join(subdir, de.name))
|
||||
entries = {de.name + "/" + k: v for k, v in subentries.items()}
|
||||
entry_dict.update(entries)
|
||||
@ -81,10 +77,9 @@ def _sync_explist(target, source):
|
||||
|
||||
|
||||
class ExperimentDB:
|
||||
def __init__(self, repo_backend, get_device_db_fn, log_fn):
|
||||
def __init__(self, repo_backend, get_device_db_fn):
|
||||
self.repo_backend = repo_backend
|
||||
self.get_device_db_fn = get_device_db_fn
|
||||
self.log_fn = log_fn
|
||||
|
||||
self.cur_rev = self.repo_backend.get_head_rev()
|
||||
self.repo_backend.request_rev(self.cur_rev)
|
||||
@ -106,8 +101,7 @@ class ExperimentDB:
|
||||
wd, _ = self.repo_backend.request_rev(new_cur_rev)
|
||||
self.repo_backend.release_rev(self.cur_rev)
|
||||
self.cur_rev = new_cur_rev
|
||||
new_explist = await _scan_experiments(wd, self.get_device_db_fn,
|
||||
self.log_fn)
|
||||
new_explist = await _scan_experiments(wd, self.get_device_db_fn)
|
||||
|
||||
_sync_explist(self.explist, new_explist)
|
||||
finally:
|
||||
@ -122,12 +116,9 @@ class ExperimentDB:
|
||||
revision = self.cur_rev
|
||||
wd, _ = self.repo_backend.request_rev(revision)
|
||||
filename = os.path.join(wd, filename)
|
||||
worker = Worker({
|
||||
"get_device_db": self.get_device_db_fn,
|
||||
"log": partial(self.log_fn, "examine", os.path.basename(filename))
|
||||
})
|
||||
worker = Worker({"get_device_db": self.get_device_db_fn})
|
||||
try:
|
||||
description = await worker.examine(filename)
|
||||
description = await worker.examine("examine", filename)
|
||||
finally:
|
||||
await worker.close()
|
||||
if use_repository:
|
||||
|
@ -2,7 +2,7 @@ import logging
|
||||
import logging.handlers
|
||||
|
||||
from artiq.protocols.sync_struct import Notifier
|
||||
from artiq.protocols.logging import parse_log_message, log_with_name, SourceFilter
|
||||
from artiq.protocols.logging import SourceFilter
|
||||
|
||||
|
||||
class LogBuffer:
|
||||
@ -28,14 +28,6 @@ class LogBufferHandler(logging.Handler):
|
||||
self.log_buffer.log(record.levelno, record.source, record.created,
|
||||
part)
|
||||
|
||||
|
||||
def log_worker(rid, filename, message):
|
||||
level, name, message = parse_log_message(message)
|
||||
log_with_name(name, level, message,
|
||||
extra={"source": "worker({},{})".format(rid, filename)})
|
||||
log_worker.worker_pass_runinfo = True
|
||||
|
||||
|
||||
def log_args(parser):
|
||||
group = parser.add_argument_group("logging")
|
||||
group.add_argument("-v", "--verbose", default=0, action="count",
|
||||
|
@ -8,6 +8,7 @@ import time
|
||||
from functools import partial
|
||||
|
||||
from artiq.protocols import pipe_ipc, pyon
|
||||
from artiq.protocols.logging import LogParser
|
||||
from artiq.tools import asyncio_wait_or_cancel
|
||||
|
||||
|
||||
@ -72,6 +73,9 @@ class Worker:
|
||||
else:
|
||||
return None
|
||||
|
||||
def _get_log_source(self):
|
||||
return "worker({},{})".format(self.rid, self.filename)
|
||||
|
||||
async def _create_process(self, log_level):
|
||||
await self.io_lock.acquire()
|
||||
try:
|
||||
@ -80,7 +84,14 @@ class Worker:
|
||||
self.ipc = pipe_ipc.AsyncioParentComm()
|
||||
await self.ipc.create_subprocess(
|
||||
sys.executable, "-m", "artiq.master.worker_impl",
|
||||
self.ipc.get_address(), str(log_level))
|
||||
self.ipc.get_address(), str(log_level),
|
||||
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
||||
asyncio.ensure_future(
|
||||
LogParser(self._get_log_source).stream_task(
|
||||
self.ipc.process.stdout))
|
||||
asyncio.ensure_future(
|
||||
LogParser(self._get_log_source).stream_task(
|
||||
self.ipc.process.stderr))
|
||||
finally:
|
||||
self.io_lock.release()
|
||||
|
||||
@ -94,7 +105,7 @@ class Worker:
|
||||
await self.io_lock.acquire()
|
||||
try:
|
||||
if self.ipc is None:
|
||||
# Note the %s - self.rid can be None
|
||||
# Note the %s - self.rid can be None or a user string
|
||||
logger.debug("worker was not created (RID %s)", self.rid)
|
||||
return
|
||||
if self.ipc.process.returncode is not None:
|
||||
@ -192,8 +203,6 @@ class Worker:
|
||||
func = self.register_experiment
|
||||
else:
|
||||
func = self.handlers[action]
|
||||
if getattr(func, "worker_pass_runinfo", False):
|
||||
func = partial(func, self.rid, self.filename)
|
||||
try:
|
||||
data = func(*obj["args"], **obj["kwargs"])
|
||||
reply = {"status": "ok", "data": data}
|
||||
@ -265,7 +274,10 @@ class Worker:
|
||||
await self._worker_action({"action": "write_results"},
|
||||
timeout)
|
||||
|
||||
async def examine(self, file, timeout=20.0):
|
||||
async def examine(self, rid, file, timeout=20.0):
|
||||
self.rid = rid
|
||||
self.filename = os.path.basename(file)
|
||||
|
||||
await self._create_process(logging.WARNING)
|
||||
r = dict()
|
||||
def register(class_name, name, arginfo):
|
||||
|
@ -36,6 +36,29 @@ def parse_log_message(msg):
|
||||
return logging.INFO, "print", msg
|
||||
|
||||
|
||||
class LogParser:
|
||||
def __init__(self, source_cb):
|
||||
self.source_cb = source_cb
|
||||
|
||||
def line_input(self, msg):
|
||||
level, name, message = parse_log_message(msg)
|
||||
log_with_name(name, level, message,
|
||||
extra={"source": self.source_cb()})
|
||||
|
||||
async def stream_task(self, stream):
|
||||
while True:
|
||||
try:
|
||||
entry = (await stream.readline())
|
||||
if not entry:
|
||||
break
|
||||
self.line_input(entry[:-1].decode())
|
||||
except:
|
||||
logger.debug("exception in log forwarding", exc_info=True)
|
||||
break
|
||||
logger.debug("stopped log forwarding of stream %s of %s",
|
||||
stream, self.source_cb())
|
||||
|
||||
|
||||
_init_string = b"ARTIQ logging\n"
|
||||
|
||||
|
||||
|
@ -71,7 +71,7 @@ def _run_experiment(class_name):
|
||||
"arguments": dict()
|
||||
}
|
||||
loop = asyncio.get_event_loop()
|
||||
worker = Worker(handlers={"log": lambda message: None})
|
||||
worker = Worker({})
|
||||
loop.run_until_complete(_call_worker(worker, expid))
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user