diff --git a/.gitignore b/.gitignore index 78b404c96..c85718ce8 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ __pycache__/ /artiq/test/results /artiq/test/h5types.h5 /examples/master/results +/examples/master/last_rid.pyon /examples/master/dataset_db.pyon /examples/sim/results /examples/sim/dataset_db.pyon @@ -26,8 +27,9 @@ __pycache__/ # recommended location for testbed /run # alternatively, when testing ad-hoc experiments at the root: -/device_db.pyon -/dataset_db.pyon /results +/last_rid.pyon +/dataset_db.pyon +/device_db.pyon /h5types.h5 /test*.py diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 4fdc7e9d5..b0e9f3c23 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -12,7 +12,7 @@ from artiq.protocols.logging import Server as LoggingServer from artiq.master.log import log_args, init_log from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.scheduler import Scheduler -from artiq.master.worker_db import get_last_rid +from artiq.master.worker_db import RIDCounter from artiq.master.experiments import FilesystemBackend, GitBackend, ExperimentDB @@ -73,7 +73,7 @@ def main(): "get_dataset": dataset_db.get, "update_dataset": dataset_db.update } - scheduler = Scheduler(get_last_rid() + 1, worker_handlers, experiment_db) + scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db) worker_handlers.update({ "scheduler_submit": scheduler.submit, "scheduler_delete": scheduler.delete, diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 2e375a344..8b7c52553 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -116,16 +116,6 @@ class Run: write_results = _mk_worker_method("write_results") -class RIDCounter: - def __init__(self, next_rid): - self._next_rid = next_rid - - def get(self): - rid = self._next_rid - self._next_rid += 1 - return rid - - class RunPool: def __init__(self, ridc, worker_handlers, notifier, experiment_db): self.runs = dict() @@ -387,7 +377,7 @@ class Deleter(TaskObject): class Scheduler: - def __init__(self, next_rid, worker_handlers, experiment_db): + def __init__(self, ridc, worker_handlers, experiment_db): self.notifier = Notifier(dict()) self._pipelines = dict() @@ -395,7 +385,7 @@ class Scheduler: self._experiment_db = experiment_db self._terminated = False - self._ridc = RIDCounter(next_rid) + self._ridc = ridc self._deleter = Deleter(self._pipelines) def start(self): diff --git a/artiq/master/worker_db.py b/artiq/master/worker_db.py index 71e94ca51..487257077 100644 --- a/artiq/master/worker_db.py +++ b/artiq/master/worker_db.py @@ -2,6 +2,7 @@ from collections import OrderedDict import importlib import logging import os +import tempfile import time import re @@ -15,6 +16,75 @@ from artiq.protocols.pc_rpc import AutoTarget, Client, BestEffortClient logger = logging.getLogger(__name__) +class RIDCounter: + def __init__(self, cache_filename="last_rid.pyon", results_dir="results"): + self.cache_filename = cache_filename + self.results_dir = results_dir + self._next_rid = self._last_rid() + 1 + logger.debug("Next RID is %d", self._next_rid) + + def get(self): + rid = self._next_rid + self._next_rid += 1 + self._update_cache(rid) + return rid + + def _last_rid(self): + try: + rid = self._last_rid_from_cache() + except FileNotFoundError: + logger.debug("Last RID cache not found, scanning results") + rid = self._last_rid_from_results() + self._update_cache(rid) + return rid + else: + logger.debug("Using last RID from cache") + return rid + + def _update_cache(self, rid): + contents = str(rid) + "\n" + directory = os.path.abspath(os.path.dirname(self.cache_filename)) + with tempfile.NamedTemporaryFile("w", dir=directory, delete=False) as f: + f.write(contents) + tmpname = f.name + os.replace(tmpname, self.cache_filename) + + def _last_rid_from_cache(self): + with open(self.cache_filename, "r") as f: + return int(f.read()) + + def _last_rid_from_results(self): + r = -1 + try: + day_folders = os.listdir(self.results_dir) + except: + return r + day_folders = filter(lambda x: re.fullmatch('\d\d\d\d-\d\d-\d\d', x), + day_folders) + for df in day_folders: + day_path = os.path.join(self.results_dir, df) + try: + minute_folders = os.listdir(day_path) + except: + continue + minute_folders = filter(lambda x: re.fullmatch('\d\d-\d\d', x), + minute_folders) + for mf in minute_folders: + minute_path = os.path.join(day_path, mf) + try: + h5files = os.listdir(minute_path) + except: + continue + for x in h5files: + m = re.fullmatch('(\d\d\d\d\d\d\d\d\d)-.*\.h5', x) + if m is None: + continue + rid = int(m.group(1)) + if rid > r: + r = rid + return r + + def _create_device(desc, device_mgr): ty = desc["type"] if ty == "local": @@ -87,38 +157,6 @@ def get_hdf5_output(start_time, rid, name): return h5py.File(os.path.join(dirname, filename), "w") -def get_last_rid(): - r = -1 - try: - day_folders = os.listdir("results") - except: - return r - day_folders = filter(lambda x: re.fullmatch('\d\d\d\d-\d\d-\d\d', x), - day_folders) - for df in day_folders: - day_path = os.path.join("results", df) - try: - minute_folders = os.listdir(day_path) - except: - continue - minute_folders = filter(lambda x: re.fullmatch('\d\d-\d\d', x), - minute_folders) - for mf in minute_folders: - minute_path = os.path.join(day_path, mf) - try: - h5files = os.listdir(minute_path) - except: - continue - for x in h5files: - m = re.fullmatch('(\d\d\d\d\d\d\d\d\d)-.*\.h5', x) - if m is None: - continue - rid = int(m.group(1)) - if rid > r: - r = rid - return r - - _type_to_hdf5 = { int: h5py.h5t.STD_I64BE, float: h5py.h5t.IEEE_F64BE,