From 387688354c9877461a61b1599a7dcb0aea4e1f63 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 9 Sep 2016 19:19:01 +0800 Subject: [PATCH] master: optimize repository scan, closes #546 --- artiq/master/experiments.py | 120 ++++++++++++++++++++---------------- artiq/master/worker.py | 2 + 2 files changed, 69 insertions(+), 53 deletions(-) diff --git a/artiq/master/experiments.py b/artiq/master/experiments.py index 3102155f3..fc2787476 100644 --- a/artiq/master/experiments.py +++ b/artiq/master/experiments.py @@ -2,6 +2,7 @@ import asyncio import os import tempfile import shutil +import time import logging from artiq.protocols.sync_struct import Notifier @@ -13,60 +14,71 @@ from artiq.tools import get_windows_drives, exc_to_warning logger = logging.getLogger(__name__) -async def _get_repository_entries(entry_dict, - root, filename, worker_handlers): - worker = Worker(worker_handlers) - try: - description = await worker.examine("scan", os.path.join(root, filename)) - except: - log_worker_exception() - raise - finally: - await worker.close() - for class_name, class_desc in description.items(): - name = class_desc["name"] - arginfo = class_desc["arginfo"] - if "/" in name: - logger.warning("Character '/' is not allowed in experiment " - "name (%s)", name) - name = name.replace("/", "_") - if name in entry_dict: - basename = name - i = 1 - while name in entry_dict: - name = basename + str(i) - i += 1 - logger.warning("Duplicate experiment name: '%s'\n" - "Renaming class '%s' in '%s' to '%s'", - basename, class_name, filename, name) - entry = { - "file": filename, - "class_name": class_name, - "arginfo": arginfo - } - entry_dict[name] = entry +class _RepoScanner: + def __init__(self, worker_handlers): + self.worker_handlers = worker_handlers + self.worker = None + async def process_file(self, entry_dict, root, filename): + logger.debug("processing file %s %s", root, filename) + try: + description = await self.worker.examine( + "scan", os.path.join(root, filename)) + except: + log_worker_exception() + raise + for class_name, class_desc in description.items(): + name = class_desc["name"] + arginfo = class_desc["arginfo"] + if "/" in name: + logger.warning("Character '/' is not allowed in experiment " + "name (%s)", name) + name = name.replace("/", "_") + if name in entry_dict: + basename = name + i = 1 + while name in entry_dict: + name = basename + str(i) + i += 1 + logger.warning("Duplicate experiment name: '%s'\n" + "Renaming class '%s' in '%s' to '%s'", + basename, class_name, filename, name) + entry = { + "file": filename, + "class_name": class_name, + "arginfo": arginfo + } + entry_dict[name] = entry -async def _scan_experiments(root, worker_handlers, subdir=""): - entry_dict = dict() - for de in os.scandir(os.path.join(root, subdir)): - if de.name.startswith("."): - continue - if de.is_file() and de.name.endswith(".py"): - filename = os.path.join(subdir, de.name) - try: - await _get_repository_entries( - entry_dict, root, filename, worker_handlers) - except Exception as exc: - logger.warning("Skipping file '%s'", filename, - exc_info=not isinstance(exc, WorkerInternalException)) - if de.is_dir(): - subentries = await _scan_experiments( - root, worker_handlers, - os.path.join(subdir, de.name)) - entries = {de.name + "/" + k: v for k, v in subentries.items()} - entry_dict.update(entries) - return entry_dict + async def _scan(self, root, subdir=""): + entry_dict = dict() + for de in os.scandir(os.path.join(root, subdir)): + if de.name.startswith("."): + continue + if de.is_file() and de.name.endswith(".py"): + filename = os.path.join(subdir, de.name) + try: + await self.process_file(entry_dict, root, filename) + except Exception as exc: + logger.warning("Skipping file '%s'", filename, + exc_info=not isinstance(exc, WorkerInternalException)) + # restart worker + await self.worker.close() + self.worker = Worker(self.worker_handlers) + if de.is_dir(): + subentries = await self._scan( + root, os.path.join(subdir, de.name)) + entries = {de.name + "/" + k: v for k, v in subentries.items()} + entry_dict.update(entries) + return entry_dict + + async def scan(self, root): + self.worker = Worker(self.worker_handlers) + try: + r = await self._scan(root) + finally: + await self.worker.close() + return r def _sync_explist(target, source): @@ -109,7 +121,9 @@ class ExperimentDB: self.repo_backend.release_rev(self.cur_rev) self.cur_rev = new_cur_rev self.status["cur_rev"] = new_cur_rev - new_explist = await _scan_experiments(wd, self.worker_handlers) + t1 = time.monotonic() + new_explist = await _RepoScanner(self.worker_handlers).scan(wd) + logger.info("repository scan took %d seconds", time.monotonic()-t1) _sync_explist(self.explist, new_explist) finally: diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 706c9e398..69b6d9a28 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -76,6 +76,8 @@ class Worker: return "worker({},{})".format(self.rid, self.filename) async def _create_process(self, log_level): + if self.ipc is not None: + return # process already exists, recycle await self.io_lock.acquire() try: if self.closed.is_set():