From 96a5d73c81f01891e606509bac055aebdd2e2622 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Thu, 9 Jul 2015 13:18:12 +0200 Subject: [PATCH] worker: split build stage from prepare --- artiq/language/experiment.py | 24 ++++++++++++++++++------ artiq/master/scheduler.py | 10 ++++++---- artiq/master/worker.py | 16 ++++++++++------ artiq/master/worker_impl.py | 5 ++++- artiq/test/worker.py | 3 ++- 5 files changed, 40 insertions(+), 18 deletions(-) diff --git a/artiq/language/experiment.py b/artiq/language/experiment.py index c7fce7a55..b3394918c 100644 --- a/artiq/language/experiment.py +++ b/artiq/language/experiment.py @@ -1,6 +1,6 @@ from inspect import isclass -__all__ = ["Experiment", "has_analyze", "is_experiment"] +__all__ = ["Experiment", "is_experiment"] class Experiment: @@ -9,11 +9,28 @@ class Experiment: Deriving from this class enables automatic experiment discovery in Python modules. """ + def prepare(self): + """Entry point for pre-computing data necessary for running the + experiment. + + Doing such computations outside of ``run`` enables more efficient + scheduling of multiple experiments that need to access the shared + hardware during part of their execution. + + This method must not interact with the hardware. + """ + pass + def run(self): """The main entry point of the experiment. This method must be overloaded by the user to implement the main control flow of the experiment. + + This method may interact with the hardware. + + The experiment may call the scheduler's ``pause`` method while in + ``run``. """ raise NotImplementedError @@ -32,11 +49,6 @@ class Experiment: pass -def has_analyze(experiment): - """Checks if an experiment instance overloaded its ``analyze`` method.""" - return experiment.analyze.__func__ is not Experiment.analyze - - def is_experiment(o): """Checks if a Python object is an instantiable experiment.""" return isclass(o) and issubclass(o, Experiment) and o is not Experiment diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 9d989af73..93afb0508 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -99,13 +99,14 @@ class Run: yield from self.worker.close() del self._notifier[self.rid] - _prepare = _mk_worker_method("prepare") + _build = _mk_worker_method("build") @asyncio.coroutine - def prepare(self): - yield from self._prepare(self.rid, self.pipeline_name, self.expid, - self.priority) + def build(self): + yield from self._build(self.rid, self.pipeline_name, self.expid, + self.priority) + prepare = _mk_worker_method("prepare") run = _mk_worker_method("run") resume = _mk_worker_method("resume") analyze = _mk_worker_method("analyze") @@ -188,6 +189,7 @@ class PrepareStage(TaskObject): run.status = RunStatus.preparing self.flush_tracker.add(run.rid) try: + yield from run.build() yield from run.prepare() except: logger.warning("got worker exception in prepare stage, " diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 59955d09a..e3b1827ed 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -28,11 +28,11 @@ class WorkerError(Exception): class Worker: def __init__(self, handlers, send_timeout=0.5, term_timeout=1.0, - prepare_timeout=15.0, results_timeout=15.0): + build_timeout=15.0, results_timeout=15.0): self.handlers = handlers self.send_timeout = send_timeout self.term_timeout = term_timeout - self.prepare_timeout = prepare_timeout + self.build_timeout = build_timeout self.results_timeout = results_timeout self.rid = None @@ -142,7 +142,7 @@ class Worker: [self.process.stdout.readline(), self.closed.wait()], timeout=timeout, return_when=asyncio.FIRST_COMPLETED) if all(f.cancelled() for f in fs): - raise WorkerTimeout("Timeout sending data to worker") + raise WorkerTimeout("Timeout receiving data from worker") if self.closed.is_set(): raise WorkerError("Data transmission to worker cancelled") line = fs[0].result() @@ -209,16 +209,20 @@ class Worker: return completed @asyncio.coroutine - def prepare(self, rid, pipeline_name, expid, priority): + def build(self, rid, pipeline_name, expid, priority): self.rid = rid yield from self._create_process() yield from self._worker_action( - {"action": "prepare", + {"action": "build", "rid": rid, "pipeline_name": pipeline_name, "expid": expid, "priority": priority}, - self.prepare_timeout) + self.build_timeout) + + @asyncio.coroutine + def prepare(self): + yield from self._worker_action({"action": "prepare"}) @asyncio.coroutine def run(self): diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index e82a79b1b..c5307d51a 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -117,7 +117,7 @@ def main(): while True: obj = get_object() action = obj["action"] - if action == "prepare": + if action == "build": start_time = time.localtime() rid = obj["rid"] pipeline_name = obj["pipeline_name"] @@ -131,6 +131,9 @@ def main(): **expid["arguments"]) rdb.build() put_object({"action": "completed"}) + elif action == "prepare": + exp_inst.prepare() + put_object({"action": "completed"}) elif action == "run": exp_inst.run() put_object({"action": "completed"}) diff --git a/artiq/test/worker.py b/artiq/test/worker.py index 7e0e1d3ab..ccd5339aa 100644 --- a/artiq/test/worker.py +++ b/artiq/test/worker.py @@ -32,7 +32,8 @@ class WatchdogTimeoutInBuild(Experiment, AutoDB): @asyncio.coroutine def _call_worker(worker, expid): try: - yield from worker.prepare(0, "main", expid, 0) + yield from worker.build(0, "main", expid, 0) + yield from worker.prepare() yield from worker.run() yield from worker.analyze() finally: