From 737f6d4485eda029e3ac023c27cc153f17a78cc3 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Thu, 28 May 2015 17:20:58 +0800 Subject: [PATCH] scheduler: support pipeline flush --- artiq/frontend/artiq_client.py | 10 ++- artiq/gui/explorer.py | 22 +++--- artiq/master/scheduler.py | 74 +++++++++++-------- artiq/master/worker_impl.py | 2 +- artiq/test/scheduler.py | 9 ++- artiq/tools.py | 24 ++++++ .../repository/flopping_f_simulation.py | 2 +- 7 files changed, 95 insertions(+), 48 deletions(-) diff --git a/artiq/frontend/artiq_client.py b/artiq/frontend/artiq_client.py index 443b72b40..7499412b0 100755 --- a/artiq/frontend/artiq_client.py +++ b/artiq/frontend/artiq_client.py @@ -32,14 +32,17 @@ def get_argparser(): subparsers.required = True parser_add = subparsers.add_parser("submit", help="submit an experiment") - parser_add.add_argument("-t", "--timed", default=None, type=str, - help="set a due date for the experiment") parser_add.add_argument("-p", "--pipeline", default="main", type=str, help="pipeline to run the experiment in " "(default: %(default)s)") parser_add.add_argument("-P", "--priority", default=0, type=int, help="priority (higher value means sooner " "scheduling, default: %(default)s)") + parser_add.add_argument("-t", "--timed", default=None, type=str, + help="set a due date for the experiment") + parser_add.add_argument("-f", "--flush", default=False, action="store_true", + help="flush the pipeline before preparing " + "the experiment") parser_add.add_argument("-e", "--experiment", default=None, help="experiment to run") parser_add.add_argument("file", @@ -106,7 +109,8 @@ def _action_submit(remote, args): due_date = None else: due_date = time.mktime(parse_date(args.timed).timetuple()) - rid = remote.submit(args.pipeline, expid, args.priority, due_date) + rid = remote.submit(args.pipeline, expid, + args.priority, due_date, args.flush) print("RID: {}".format(rid)) diff --git a/artiq/gui/explorer.py b/artiq/gui/explorer.py index 9d5603193..3f1e590fd 100644 --- a/artiq/gui/explorer.py +++ b/artiq/gui/explorer.py @@ -41,19 +41,22 @@ class ExplorerDock(dockarea.Dock): self.datetime.setDisplayFormat("MMM d yyyy hh:mm:ss") self.datetime.setCalendarPopup(True) self.datetime.setDate(QtCore.QDate.currentDate()) - self.datetime_en = QtGui.QCheckBox("Set due date:") + self.datetime_en = QtGui.QCheckBox("Due date:") grid.addWidget(self.datetime_en, 1, 0) - grid.addWidget(self.datetime, 1, 1, colspan=3) + grid.addWidget(self.datetime, 1, 1) + + self.priority = QtGui.QSpinBox() + self.priority.setRange(-99, 99) + grid.addLabel("Priority:", 1, 2) + grid.addWidget(self.priority, 1, 3) self.pipeline = QtGui.QLineEdit() self.pipeline.insert("main") grid.addLabel("Pipeline:", 2, 0) grid.addWidget(self.pipeline, 2, 1) - self.priority = QtGui.QSpinBox() - self.priority.setRange(-99, 99) - grid.addLabel("Priority:", 2, 2) - grid.addWidget(self.priority, 2, 3) + self.flush = QtGui.QCheckBox("Flush") + grid.addWidget(self.flush, 2, 2, colspan=2) submit = QtGui.QPushButton("Submit") grid.addWidget(submit, 3, 0, colspan=4) @@ -79,14 +82,14 @@ class ExplorerDock(dockarea.Dock): @asyncio.coroutine def submit(self, pipeline_name, file, experiment, arguments, - priority, due_date): + priority, due_date, flush): expid = { "file": file, "experiment": experiment, "arguments": arguments, } rid = yield from self.schedule_ctl.submit(pipeline_name, expid, - priority, due_date) + priority, due_date, flush) self.status_bar.showMessage("Submitted RID {}".format(rid)) def submit_clicked(self): @@ -101,4 +104,5 @@ class ExplorerDock(dockarea.Dock): due_date = None asyncio.async(self.submit(self.pipeline.text(), expinfo["file"], expinfo["experiment"], - dict(), self.priority.value(), due_date)) + dict(), self.priority.value(), due_date, + self.flush.isChecked())) diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 44b14765d..dc6dda68e 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -4,7 +4,7 @@ from enum import Enum from time import time from artiq.master.worker import Worker -from artiq.tools import asyncio_wait_or_cancel, asyncio_queue_peek +from artiq.tools import asyncio_wait_or_cancel, asyncio_queue_peek, WaitSet from artiq.protocols.sync_struct import Notifier @@ -13,13 +13,14 @@ logger = logging.getLogger(__name__) class RunStatus(Enum): pending = 0 - preparing = 1 - prepare_done = 2 - running = 3 - run_done = 4 - analyzing = 5 - analyze_done = 6 - paused = 7 + flushing = 1 + preparing = 2 + prepare_done = 3 + running = 4 + run_done = 5 + analyzing = 6 + analyze_done = 7 + paused = 8 def _mk_worker_method(name): @@ -45,7 +46,7 @@ def _mk_worker_method(name): class Run: def __init__(self, rid, pipeline_name, - expid, priority, due_date, + expid, priority, due_date, flush, worker_handlers, notifier): # called through pool self.rid = rid @@ -53,6 +54,7 @@ class Run: self.expid = expid self.priority = priority self.due_date = due_date + self.flush = flush self._status = RunStatus.pending self._terminated = False @@ -64,6 +66,7 @@ class Run: "expid": self.expid, "priority": self.priority, "due_date": self.due_date, + "flush": self.flush, "status": self._status.name } @@ -120,20 +123,20 @@ class RIDCounter: class RunPool: def __init__(self, ridc, worker_handlers, notifier): self.runs = dict() - self.submitted_callback = None + self.submitted_cb = None self._ridc = ridc self._worker_handlers = worker_handlers self._notifier = notifier - def submit(self, expid, priority, due_date, pipeline_name): + def submit(self, expid, priority, due_date, flush, pipeline_name): # called through scheduler rid = self._ridc.get() - run = Run(rid, pipeline_name, expid, priority, due_date, + run = Run(rid, pipeline_name, expid, priority, due_date, flush, self._worker_handlers, self._notifier) self.runs[rid] = run - if self.submitted_callback is not None: - self.submitted_callback() + if self.submitted_cb is not None: + self.submitted_cb() return rid @asyncio.coroutine @@ -161,13 +164,14 @@ class TaskObject: class PrepareStage(TaskObject): - def __init__(self, deleter, pool, outq): - self.deleter = deleter + def __init__(self, flush_tracker, delete_cb, pool, outq): + self.flush_tracker = flush_tracker + self.delete_cb = delete_cb self.pool = pool self.outq = outq self.pool_submitted = asyncio.Event() - self.pool.submitted_callback = lambda: self.pool_submitted.set() + self.pool.submitted_cb = lambda: self.pool_submitted.set() @asyncio.coroutine def _push_runs(self): @@ -186,14 +190,18 @@ class PrepareStage(TaskObject): # pending_runs is an empty sequence return None if run.due_date is None or run.due_date < now: + if run.flush: + run.status = RunStatus.flushing + yield from self.flush_tracker.wait_empty() run.status = RunStatus.preparing + self.flush_tracker.add(run.rid) try: yield from run.prepare() except: logger.warning("got worker exception in prepare stage, " "deleting RID %d", run.rid, exc_info=True) - self.deleter.delete(run.rid) + self.delete_cb(run.rid) run.status = RunStatus.prepare_done yield from self.outq.put(run) else: @@ -214,8 +222,8 @@ class PrepareStage(TaskObject): class RunStage(TaskObject): - def __init__(self, deleter, inq, outq): - self.deleter = deleter + def __init__(self, delete_cb, inq, outq): + self.delete_cb = delete_cb self.inq = inq self.outq = outq @@ -246,7 +254,7 @@ class RunStage(TaskObject): logger.warning("got worker exception in run stage, " "deleting RID %d", run.rid, exc_info=True) - self.deleter.delete(run.rid) + self.delete_cb(run.rid) else: if completed: run.status = RunStatus.run_done @@ -257,8 +265,8 @@ class RunStage(TaskObject): class AnalyzeStage(TaskObject): - def __init__(self, deleter, inq): - self.deleter = deleter + def __init__(self, delete_cb, inq): + self.delete_cb = delete_cb self.inq = inq @asyncio.coroutine @@ -273,17 +281,23 @@ class AnalyzeStage(TaskObject): logger.warning("got worker exception in analyze stage, " "deleting RID %d", run.rid, exc_info=True) - self.deleter.delete(run.rid) + self.delete_cb(run.rid) run.status = RunStatus.analyze_done - self.deleter.delete(run.rid) + self.delete_cb(run.rid) class Pipeline: def __init__(self, ridc, deleter, worker_handlers, notifier): + flush_tracker = WaitSet() + def delete_cb(rid): + deleter.delete(rid) + flush_tracker.discard(rid) self.pool = RunPool(ridc, worker_handlers, notifier) - self._prepare = PrepareStage(deleter, self.pool, asyncio.Queue(maxsize=1)) - self._run = RunStage(deleter, self._prepare.outq, asyncio.Queue(maxsize=1)) - self._analyze = AnalyzeStage(deleter, self._run.outq) + self._prepare = PrepareStage(flush_tracker, delete_cb, + self.pool, asyncio.Queue(maxsize=1)) + self._run = RunStage(delete_cb, + self._prepare.outq, asyncio.Queue(maxsize=1)) + self._analyze = AnalyzeStage(delete_cb, self._run.outq) def start(self): self._prepare.start() @@ -366,7 +380,7 @@ class Scheduler: if self._pipelines: logger.warning("some pipelines were not garbage-collected") - def submit(self, pipeline_name, expid, priority, due_date): + def submit(self, pipeline_name, expid, priority, due_date, flush): if self._terminated: return try: @@ -377,7 +391,7 @@ class Scheduler: self._worker_handlers, self.notifier) self._pipelines[pipeline_name] = pipeline pipeline.start() - return pipeline.pool.submit(expid, priority, due_date, pipeline_name) + return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name) def delete(self, rid): self._deleter.delete(rid) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 18e0e95ec..e82a79b1b 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -79,7 +79,7 @@ class Scheduler: pause = staticmethod(make_parent_action("pause", "")) submit = staticmethod(make_parent_action("scheduler_submit", - "pipeline_name expid priority due_date")) + "pipeline_name expid priority due_date flush")) cancel = staticmethod(make_parent_action("scheduler_cancel", "rid")) def __init__(self, pipeline_name, expid, priority): diff --git a/artiq/test/scheduler.py b/artiq/test/scheduler.py index 8588a82a2..dd993d516 100644 --- a/artiq/test/scheduler.py +++ b/artiq/test/scheduler.py @@ -31,7 +31,8 @@ def _get_basic_steps(rid, expid): return [ {"action": "setitem", "key": rid, "value": {"pipeline": "main", "status": "pending", "priority": 0, - "expid": expid, "due_date": None}, "path": []}, + "expid": expid, "due_date": None, "flush": False}, + "path": []}, {"action": "setitem", "key": "status", "value": "preparing", "path": [rid]}, {"action": "setitem", "key": "status", "value": "prepare_done", @@ -71,7 +72,7 @@ class SchedulerCase(unittest.TestCase): loop = asyncio.get_event_loop() scheduler.start() - scheduler.submit("main", expid, 0, None) + scheduler.submit("main", expid, 0, None, False) loop.run_until_complete(done.wait()) loop.run_until_complete(scheduler.stop()) @@ -100,8 +101,8 @@ class SchedulerCase(unittest.TestCase): loop = asyncio.get_event_loop() scheduler.start() - scheduler.submit("main", expid_bg, -99, None) + scheduler.submit("main", expid_bg, -99, None, False) loop.run_until_complete(background_running.wait()) - scheduler.submit("main", expid, 0, None) + scheduler.submit("main", expid, 0, None, False) loop.run_until_complete(done.wait()) loop.run_until_complete(scheduler.stop()) diff --git a/artiq/tools.py b/artiq/tools.py index 2b5400a93..a6ee390f8 100644 --- a/artiq/tools.py +++ b/artiq/tools.py @@ -119,3 +119,27 @@ def asyncio_queue_peek(q): return q._queue[0] else: raise asyncio.QueueEmpty + + +class WaitSet: + def __init__(self): + self._s = set() + self._ev = asyncio.Event() + + def _update_ev(self): + if self._s: + self._ev.clear() + else: + self._ev.set() + + def add(self, e): + self._s.add(e) + self._update_ev() + + def discard(self, e): + self._s.discard(e) + self._update_ev() + + @asyncio.coroutine + def wait_empty(self): + yield from self._ev.wait() diff --git a/examples/master/repository/flopping_f_simulation.py b/examples/master/repository/flopping_f_simulation.py index 9aa2bc9d8..23cf42239 100644 --- a/examples/master/repository/flopping_f_simulation.py +++ b/examples/master/repository/flopping_f_simulation.py @@ -51,7 +51,7 @@ class FloppingF(Experiment, AutoDB): self.brightness.append(brightness) time.sleep(0.1) self.scheduler.submit(self.scheduler.pipeline_name, self.scheduler.expid, - self.scheduler.priority, time.time() + 20) + self.scheduler.priority, time.time() + 20, False) def analyze(self): popt, pcov = curve_fit(model_numpy,