master: optimize repository scan, closes #546

This commit is contained in:
Sebastien Bourdeauducq 2016-09-09 19:19:01 +08:00
parent 4ef5eb2644
commit 387688354c
2 changed files with 69 additions and 53 deletions

View File

@ -2,6 +2,7 @@ import asyncio
import os import os
import tempfile import tempfile
import shutil import shutil
import time
import logging import logging
from artiq.protocols.sync_struct import Notifier from artiq.protocols.sync_struct import Notifier
@ -13,60 +14,71 @@ from artiq.tools import get_windows_drives, exc_to_warning
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
async def _get_repository_entries(entry_dict, class _RepoScanner:
root, filename, worker_handlers): def __init__(self, worker_handlers):
worker = Worker(worker_handlers) self.worker_handlers = worker_handlers
try: self.worker = None
description = await worker.examine("scan", os.path.join(root, filename))
except:
log_worker_exception()
raise
finally:
await worker.close()
for class_name, class_desc in description.items():
name = class_desc["name"]
arginfo = class_desc["arginfo"]
if "/" in name:
logger.warning("Character '/' is not allowed in experiment "
"name (%s)", name)
name = name.replace("/", "_")
if name in entry_dict:
basename = name
i = 1
while name in entry_dict:
name = basename + str(i)
i += 1
logger.warning("Duplicate experiment name: '%s'\n"
"Renaming class '%s' in '%s' to '%s'",
basename, class_name, filename, name)
entry = {
"file": filename,
"class_name": class_name,
"arginfo": arginfo
}
entry_dict[name] = entry
async def process_file(self, entry_dict, root, filename):
logger.debug("processing file %s %s", root, filename)
try:
description = await self.worker.examine(
"scan", os.path.join(root, filename))
except:
log_worker_exception()
raise
for class_name, class_desc in description.items():
name = class_desc["name"]
arginfo = class_desc["arginfo"]
if "/" in name:
logger.warning("Character '/' is not allowed in experiment "
"name (%s)", name)
name = name.replace("/", "_")
if name in entry_dict:
basename = name
i = 1
while name in entry_dict:
name = basename + str(i)
i += 1
logger.warning("Duplicate experiment name: '%s'\n"
"Renaming class '%s' in '%s' to '%s'",
basename, class_name, filename, name)
entry = {
"file": filename,
"class_name": class_name,
"arginfo": arginfo
}
entry_dict[name] = entry
async def _scan_experiments(root, worker_handlers, subdir=""): async def _scan(self, root, subdir=""):
entry_dict = dict() entry_dict = dict()
for de in os.scandir(os.path.join(root, subdir)): for de in os.scandir(os.path.join(root, subdir)):
if de.name.startswith("."): if de.name.startswith("."):
continue continue
if de.is_file() and de.name.endswith(".py"): if de.is_file() and de.name.endswith(".py"):
filename = os.path.join(subdir, de.name) filename = os.path.join(subdir, de.name)
try: try:
await _get_repository_entries( await self.process_file(entry_dict, root, filename)
entry_dict, root, filename, worker_handlers) except Exception as exc:
except Exception as exc: logger.warning("Skipping file '%s'", filename,
logger.warning("Skipping file '%s'", filename, exc_info=not isinstance(exc, WorkerInternalException))
exc_info=not isinstance(exc, WorkerInternalException)) # restart worker
if de.is_dir(): await self.worker.close()
subentries = await _scan_experiments( self.worker = Worker(self.worker_handlers)
root, worker_handlers, if de.is_dir():
os.path.join(subdir, de.name)) subentries = await self._scan(
entries = {de.name + "/" + k: v for k, v in subentries.items()} root, os.path.join(subdir, de.name))
entry_dict.update(entries) entries = {de.name + "/" + k: v for k, v in subentries.items()}
return entry_dict entry_dict.update(entries)
return entry_dict
async def scan(self, root):
self.worker = Worker(self.worker_handlers)
try:
r = await self._scan(root)
finally:
await self.worker.close()
return r
def _sync_explist(target, source): def _sync_explist(target, source):
@ -109,7 +121,9 @@ class ExperimentDB:
self.repo_backend.release_rev(self.cur_rev) self.repo_backend.release_rev(self.cur_rev)
self.cur_rev = new_cur_rev self.cur_rev = new_cur_rev
self.status["cur_rev"] = new_cur_rev self.status["cur_rev"] = new_cur_rev
new_explist = await _scan_experiments(wd, self.worker_handlers) t1 = time.monotonic()
new_explist = await _RepoScanner(self.worker_handlers).scan(wd)
logger.info("repository scan took %d seconds", time.monotonic()-t1)
_sync_explist(self.explist, new_explist) _sync_explist(self.explist, new_explist)
finally: finally:

View File

@ -76,6 +76,8 @@ class Worker:
return "worker({},{})".format(self.rid, self.filename) return "worker({},{})".format(self.rid, self.filename)
async def _create_process(self, log_level): async def _create_process(self, log_level):
if self.ipc is not None:
return # process already exists, recycle
await self.io_lock.acquire() await self.io_lock.acquire()
try: try:
if self.closed.is_set(): if self.closed.is_set():