forked from M-Labs/artiq
scheduler: use current (last scanned) repo revision instead of HEAD
This commit is contained in:
parent
f99c53d179
commit
5e14afde3e
|
@ -83,7 +83,7 @@ def main():
|
||||||
"update_dataset": dataset_db.update,
|
"update_dataset": dataset_db.update,
|
||||||
"log": log_worker
|
"log": log_worker
|
||||||
}
|
}
|
||||||
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend)
|
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, experiment_db)
|
||||||
worker_handlers.update({
|
worker_handlers.update({
|
||||||
"scheduler_submit": scheduler.submit,
|
"scheduler_submit": scheduler.submit,
|
||||||
"scheduler_delete": scheduler.delete,
|
"scheduler_delete": scheduler.delete,
|
||||||
|
|
|
@ -301,7 +301,7 @@ class _ExperimentDock(dockarea.Dock):
|
||||||
log_level.currentIndexChanged.connect(update_log_level)
|
log_level.currentIndexChanged.connect(update_log_level)
|
||||||
|
|
||||||
repo_rev = QtGui.QLineEdit()
|
repo_rev = QtGui.QLineEdit()
|
||||||
repo_rev.setPlaceholderText("HEAD")
|
repo_rev.setPlaceholderText("current")
|
||||||
repo_rev_label = QtGui.QLabel("Revision:")
|
repo_rev_label = QtGui.QLabel("Revision:")
|
||||||
repo_rev_label.setToolTip("Experiment repository revision "
|
repo_rev_label.setToolTip("Experiment repository revision "
|
||||||
"(commit ID) to use")
|
"(commit ID) to use")
|
||||||
|
|
|
@ -127,14 +127,14 @@ class RIDCounter:
|
||||||
|
|
||||||
|
|
||||||
class RunPool:
|
class RunPool:
|
||||||
def __init__(self, ridc, worker_handlers, notifier, repo_backend):
|
def __init__(self, ridc, worker_handlers, notifier, experiment_db):
|
||||||
self.runs = dict()
|
self.runs = dict()
|
||||||
self.state_changed = Condition()
|
self.state_changed = Condition()
|
||||||
|
|
||||||
self.ridc = ridc
|
self.ridc = ridc
|
||||||
self.worker_handlers = worker_handlers
|
self.worker_handlers = worker_handlers
|
||||||
self.notifier = notifier
|
self.notifier = notifier
|
||||||
self.repo_backend = repo_backend
|
self.experiment_db = experiment_db
|
||||||
|
|
||||||
def submit(self, expid, priority, due_date, flush, pipeline_name):
|
def submit(self, expid, priority, due_date, flush, pipeline_name):
|
||||||
# mutates expid to insert head repository revision if None.
|
# mutates expid to insert head repository revision if None.
|
||||||
|
@ -142,8 +142,9 @@ class RunPool:
|
||||||
rid = self.ridc.get()
|
rid = self.ridc.get()
|
||||||
if "repo_rev" in expid:
|
if "repo_rev" in expid:
|
||||||
if expid["repo_rev"] is None:
|
if expid["repo_rev"] is None:
|
||||||
expid["repo_rev"] = self.repo_backend.get_head_rev()
|
expid["repo_rev"] = self.experiment_db.cur_rev
|
||||||
wd, repo_msg = self.repo_backend.request_rev(expid["repo_rev"])
|
wd, repo_msg = self.experiment_db.repo_backend.request_rev(
|
||||||
|
expid["repo_rev"])
|
||||||
else:
|
else:
|
||||||
wd, repo_msg = None, None
|
wd, repo_msg = None, None
|
||||||
run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush,
|
run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush,
|
||||||
|
@ -159,7 +160,7 @@ class RunPool:
|
||||||
run = self.runs[rid]
|
run = self.runs[rid]
|
||||||
await run.close()
|
await run.close()
|
||||||
if "repo_rev" in run.expid:
|
if "repo_rev" in run.expid:
|
||||||
self.repo_backend.release_rev(run.expid["repo_rev"])
|
self.experiment_db.repo_backend.release_rev(run.expid["repo_rev"])
|
||||||
del self.runs[rid]
|
del self.runs[rid]
|
||||||
|
|
||||||
|
|
||||||
|
@ -325,8 +326,8 @@ class AnalyzeStage(TaskObject):
|
||||||
|
|
||||||
|
|
||||||
class Pipeline:
|
class Pipeline:
|
||||||
def __init__(self, ridc, deleter, worker_handlers, notifier, repo_backend):
|
def __init__(self, ridc, deleter, worker_handlers, notifier, experiment_db):
|
||||||
self.pool = RunPool(ridc, worker_handlers, notifier, repo_backend)
|
self.pool = RunPool(ridc, worker_handlers, notifier, experiment_db)
|
||||||
self._prepare = PrepareStage(self.pool, deleter.delete)
|
self._prepare = PrepareStage(self.pool, deleter.delete)
|
||||||
self._run = RunStage(self.pool, deleter.delete)
|
self._run = RunStage(self.pool, deleter.delete)
|
||||||
self._analyze = AnalyzeStage(self.pool, deleter.delete)
|
self._analyze = AnalyzeStage(self.pool, deleter.delete)
|
||||||
|
@ -386,12 +387,12 @@ class Deleter(TaskObject):
|
||||||
|
|
||||||
|
|
||||||
class Scheduler:
|
class Scheduler:
|
||||||
def __init__(self, next_rid, worker_handlers, repo_backend):
|
def __init__(self, next_rid, worker_handlers, experiment_db):
|
||||||
self.notifier = Notifier(dict())
|
self.notifier = Notifier(dict())
|
||||||
|
|
||||||
self._pipelines = dict()
|
self._pipelines = dict()
|
||||||
self._worker_handlers = worker_handlers
|
self._worker_handlers = worker_handlers
|
||||||
self._repo_backend = repo_backend
|
self._experiment_db = experiment_db
|
||||||
self._terminated = False
|
self._terminated = False
|
||||||
|
|
||||||
self._ridc = RIDCounter(next_rid)
|
self._ridc = RIDCounter(next_rid)
|
||||||
|
@ -422,7 +423,7 @@ class Scheduler:
|
||||||
logger.debug("creating pipeline '%s'", pipeline_name)
|
logger.debug("creating pipeline '%s'", pipeline_name)
|
||||||
pipeline = Pipeline(self._ridc, self._deleter,
|
pipeline = Pipeline(self._ridc, self._deleter,
|
||||||
self._worker_handlers, self.notifier,
|
self._worker_handlers, self.notifier,
|
||||||
self._repo_backend)
|
self._experiment_db)
|
||||||
self._pipelines[pipeline_name] = pipeline
|
self._pipelines[pipeline_name] = pipeline
|
||||||
pipeline.start()
|
pipeline.start()
|
||||||
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)
|
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)
|
||||||
|
|
|
@ -107,7 +107,7 @@ You may now run the master with the Git support enabled: ::
|
||||||
|
|
||||||
Push commits containing experiments to the bare repository using e.g. Git over SSH, and the new experiments should automatically appear in the GUI.
|
Push commits containing experiments to the bare repository using e.g. Git over SSH, and the new experiments should automatically appear in the GUI.
|
||||||
|
|
||||||
.. note:: If you plan to run the ARTIQ system entirely on a single machine, you may also consider using a non-bare repository and the ``post-commit`` hook to trigger repository scans every time you commit changes (locally). The ARTIQ master never uses the repository's working directory, but only what is committed. More precisely, it fetches by default the last (atomically) completed commit at the time of experiment submission and checks it out in a temporary folder (which solves the problem of concurrent repository access).
|
.. note:: If you plan to run the ARTIQ system entirely on a single machine, you may also consider using a non-bare repository and the ``post-commit`` hook to trigger repository scans every time you commit changes (locally). The ARTIQ master never uses the repository's working directory, but only what is committed. More precisely, when scanning the repository, it fetches the last (atomically) completed commit at that time of repository scan and checks it out in a temporary folder. This commit ID is used by default when subsequently submitting experiments. There is one temporary folder by commit ID currently referenced in the system, so concurrently running experiments from different repository revisions is fully supported by the master.
|
||||||
|
|
||||||
The GUI always runs experiments from the repository. The command-line client, by default, runs experiment from the raw filesystem (which is useful for iterating rapidly without creating many disorganized commits). If you want to use the repository instead, simply pass the ``-R`` option.
|
The GUI always runs experiments from the repository. The command-line client, by default, runs experiment from the raw filesystem (which is useful for iterating rapidly without creating many disorganized commits). If you want to use the repository instead, simply pass the ``-R`` option.
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue