master: Factor RIDCounter out into own module, explain worker_db module [nfc]

The docstrings are quite minimal still, but should already
help with navigating the different layers when getting
accustomed with the code base.

RIDCounter was moved to its own module, as it isn't really
related to the other classes (used from master only).
This commit is contained in:
David Nadlinger 2018-03-08 21:40:05 +00:00 committed by Sébastien Bourdeauducq
parent 4641ddf002
commit 64b9a377da
3 changed files with 91 additions and 76 deletions

View File

@ -15,7 +15,7 @@ from artiq.protocols.broadcast import Broadcaster
from artiq.master.log import log_args, init_log from artiq.master.log import log_args, init_log
from artiq.master.databases import DeviceDB, DatasetDB from artiq.master.databases import DeviceDB, DatasetDB
from artiq.master.scheduler import Scheduler from artiq.master.scheduler import Scheduler
from artiq.master.worker_db import RIDCounter from artiq.master.rid_counter import RIDCounter
from artiq.master.experiments import (FilesystemBackend, GitBackend, from artiq.master.experiments import (FilesystemBackend, GitBackend,
ExperimentDB) ExperimentDB)

View File

@ -0,0 +1,84 @@
import logging
import os
import tempfile
import re
logger = logging.getLogger(__name__)
class RIDCounter:
"""Monotonically incrementing counter for RIDs (experiment run ids).
A cache is used, but if necessary, the last used rid will be determined
from the given results directory.
"""
def __init__(self, cache_filename="last_rid.pyon", results_dir="results"):
self.cache_filename = cache_filename
self.results_dir = results_dir
self._next_rid = self._last_rid() + 1
logger.debug("Next RID is %d", self._next_rid)
def get(self):
rid = self._next_rid
self._next_rid += 1
self._update_cache(rid)
return rid
def _last_rid(self):
try:
rid = self._last_rid_from_cache()
except FileNotFoundError:
logger.debug("Last RID cache not found, scanning results")
rid = self._last_rid_from_results()
self._update_cache(rid)
return rid
else:
logger.debug("Using last RID from cache")
return rid
def _update_cache(self, rid):
contents = str(rid) + "\n"
directory = os.path.abspath(os.path.dirname(self.cache_filename))
with tempfile.NamedTemporaryFile("w", dir=directory, delete=False
) as f:
f.write(contents)
tmpname = f.name
os.replace(tmpname, self.cache_filename)
def _last_rid_from_cache(self):
with open(self.cache_filename, "r") as f:
return int(f.read())
def _last_rid_from_results(self):
r = -1
try:
day_folders = os.listdir(self.results_dir)
except:
return r
day_folders = filter(
lambda x: re.fullmatch("\\d\\d\\d\\d-\\d\\d-\\d\\d", x),
day_folders)
for df in day_folders:
day_path = os.path.join(self.results_dir, df)
try:
hm_folders = os.listdir(day_path)
except:
continue
hm_folders = filter(lambda x: re.fullmatch("\\d\\d(-\\d\\d)?", x),
hm_folders)
for hmf in hm_folders:
hm_path = os.path.join(day_path, hmf)
try:
h5files = os.listdir(hm_path)
except:
continue
for x in h5files:
m = re.fullmatch(
"(\\d\\d\\d\\d\\d\\d\\d\\d\\d)-.*\\.h5", x)
if m is None:
continue
rid = int(m.group(1))
if rid > r:
r = rid
return r

View File

@ -1,10 +1,13 @@
"""Client-side interfaces to the master databases (devices, datasets).
These artefacts are intended for out-of-process use (i.e. from workers or the
standalone command line tools).
"""
from operator import setitem from operator import setitem
from collections import OrderedDict from collections import OrderedDict
import importlib import importlib
import logging import logging
import os
import tempfile
import re
from artiq.protocols.sync_struct import Notifier from artiq.protocols.sync_struct import Notifier
from artiq.protocols.pc_rpc import AutoTarget, Client, BestEffortClient from artiq.protocols.pc_rpc import AutoTarget, Client, BestEffortClient
@ -13,78 +16,6 @@ from artiq.protocols.pc_rpc import AutoTarget, Client, BestEffortClient
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class RIDCounter:
def __init__(self, cache_filename="last_rid.pyon", results_dir="results"):
self.cache_filename = cache_filename
self.results_dir = results_dir
self._next_rid = self._last_rid() + 1
logger.debug("Next RID is %d", self._next_rid)
def get(self):
rid = self._next_rid
self._next_rid += 1
self._update_cache(rid)
return rid
def _last_rid(self):
try:
rid = self._last_rid_from_cache()
except FileNotFoundError:
logger.debug("Last RID cache not found, scanning results")
rid = self._last_rid_from_results()
self._update_cache(rid)
return rid
else:
logger.debug("Using last RID from cache")
return rid
def _update_cache(self, rid):
contents = str(rid) + "\n"
directory = os.path.abspath(os.path.dirname(self.cache_filename))
with tempfile.NamedTemporaryFile("w", dir=directory, delete=False
) as f:
f.write(contents)
tmpname = f.name
os.replace(tmpname, self.cache_filename)
def _last_rid_from_cache(self):
with open(self.cache_filename, "r") as f:
return int(f.read())
def _last_rid_from_results(self):
r = -1
try:
day_folders = os.listdir(self.results_dir)
except:
return r
day_folders = filter(
lambda x: re.fullmatch("\\d\\d\\d\\d-\\d\\d-\\d\\d", x),
day_folders)
for df in day_folders:
day_path = os.path.join(self.results_dir, df)
try:
hm_folders = os.listdir(day_path)
except:
continue
hm_folders = filter(lambda x: re.fullmatch("\\d\\d(-\\d\\d)?", x),
hm_folders)
for hmf in hm_folders:
hm_path = os.path.join(day_path, hmf)
try:
h5files = os.listdir(hm_path)
except:
continue
for x in h5files:
m = re.fullmatch(
"(\\d\\d\\d\\d\\d\\d\\d\\d\\d)-.*\\.h5", x)
if m is None:
continue
rid = int(m.group(1))
if rid > r:
r = rid
return r
class DummyDevice: class DummyDevice:
pass pass