scheduler: support pipeline flush

This commit is contained in:
Sebastien Bourdeauducq 2015-05-28 17:20:58 +08:00
parent 4da377eef0
commit 737f6d4485
7 changed files with 95 additions and 48 deletions

View File

@ -32,14 +32,17 @@ def get_argparser():
subparsers.required = True
parser_add = subparsers.add_parser("submit", help="submit an experiment")
parser_add.add_argument("-t", "--timed", default=None, type=str,
help="set a due date for the experiment")
parser_add.add_argument("-p", "--pipeline", default="main", type=str,
help="pipeline to run the experiment in "
"(default: %(default)s)")
parser_add.add_argument("-P", "--priority", default=0, type=int,
help="priority (higher value means sooner "
"scheduling, default: %(default)s)")
parser_add.add_argument("-t", "--timed", default=None, type=str,
help="set a due date for the experiment")
parser_add.add_argument("-f", "--flush", default=False, action="store_true",
help="flush the pipeline before preparing "
"the experiment")
parser_add.add_argument("-e", "--experiment", default=None,
help="experiment to run")
parser_add.add_argument("file",
@ -106,7 +109,8 @@ def _action_submit(remote, args):
due_date = None
else:
due_date = time.mktime(parse_date(args.timed).timetuple())
rid = remote.submit(args.pipeline, expid, args.priority, due_date)
rid = remote.submit(args.pipeline, expid,
args.priority, due_date, args.flush)
print("RID: {}".format(rid))

View File

@ -41,19 +41,22 @@ class ExplorerDock(dockarea.Dock):
self.datetime.setDisplayFormat("MMM d yyyy hh:mm:ss")
self.datetime.setCalendarPopup(True)
self.datetime.setDate(QtCore.QDate.currentDate())
self.datetime_en = QtGui.QCheckBox("Set due date:")
self.datetime_en = QtGui.QCheckBox("Due date:")
grid.addWidget(self.datetime_en, 1, 0)
grid.addWidget(self.datetime, 1, 1, colspan=3)
grid.addWidget(self.datetime, 1, 1)
self.priority = QtGui.QSpinBox()
self.priority.setRange(-99, 99)
grid.addLabel("Priority:", 1, 2)
grid.addWidget(self.priority, 1, 3)
self.pipeline = QtGui.QLineEdit()
self.pipeline.insert("main")
grid.addLabel("Pipeline:", 2, 0)
grid.addWidget(self.pipeline, 2, 1)
self.priority = QtGui.QSpinBox()
self.priority.setRange(-99, 99)
grid.addLabel("Priority:", 2, 2)
grid.addWidget(self.priority, 2, 3)
self.flush = QtGui.QCheckBox("Flush")
grid.addWidget(self.flush, 2, 2, colspan=2)
submit = QtGui.QPushButton("Submit")
grid.addWidget(submit, 3, 0, colspan=4)
@ -79,14 +82,14 @@ class ExplorerDock(dockarea.Dock):
@asyncio.coroutine
def submit(self, pipeline_name, file, experiment, arguments,
priority, due_date):
priority, due_date, flush):
expid = {
"file": file,
"experiment": experiment,
"arguments": arguments,
}
rid = yield from self.schedule_ctl.submit(pipeline_name, expid,
priority, due_date)
priority, due_date, flush)
self.status_bar.showMessage("Submitted RID {}".format(rid))
def submit_clicked(self):
@ -101,4 +104,5 @@ class ExplorerDock(dockarea.Dock):
due_date = None
asyncio.async(self.submit(self.pipeline.text(),
expinfo["file"], expinfo["experiment"],
dict(), self.priority.value(), due_date))
dict(), self.priority.value(), due_date,
self.flush.isChecked()))

View File

@ -4,7 +4,7 @@ from enum import Enum
from time import time
from artiq.master.worker import Worker
from artiq.tools import asyncio_wait_or_cancel, asyncio_queue_peek
from artiq.tools import asyncio_wait_or_cancel, asyncio_queue_peek, WaitSet
from artiq.protocols.sync_struct import Notifier
@ -13,13 +13,14 @@ logger = logging.getLogger(__name__)
class RunStatus(Enum):
pending = 0
preparing = 1
prepare_done = 2
running = 3
run_done = 4
analyzing = 5
analyze_done = 6
paused = 7
flushing = 1
preparing = 2
prepare_done = 3
running = 4
run_done = 5
analyzing = 6
analyze_done = 7
paused = 8
def _mk_worker_method(name):
@ -45,7 +46,7 @@ def _mk_worker_method(name):
class Run:
def __init__(self, rid, pipeline_name,
expid, priority, due_date,
expid, priority, due_date, flush,
worker_handlers, notifier):
# called through pool
self.rid = rid
@ -53,6 +54,7 @@ class Run:
self.expid = expid
self.priority = priority
self.due_date = due_date
self.flush = flush
self._status = RunStatus.pending
self._terminated = False
@ -64,6 +66,7 @@ class Run:
"expid": self.expid,
"priority": self.priority,
"due_date": self.due_date,
"flush": self.flush,
"status": self._status.name
}
@ -120,20 +123,20 @@ class RIDCounter:
class RunPool:
def __init__(self, ridc, worker_handlers, notifier):
self.runs = dict()
self.submitted_callback = None
self.submitted_cb = None
self._ridc = ridc
self._worker_handlers = worker_handlers
self._notifier = notifier
def submit(self, expid, priority, due_date, pipeline_name):
def submit(self, expid, priority, due_date, flush, pipeline_name):
# called through scheduler
rid = self._ridc.get()
run = Run(rid, pipeline_name, expid, priority, due_date,
run = Run(rid, pipeline_name, expid, priority, due_date, flush,
self._worker_handlers, self._notifier)
self.runs[rid] = run
if self.submitted_callback is not None:
self.submitted_callback()
if self.submitted_cb is not None:
self.submitted_cb()
return rid
@asyncio.coroutine
@ -161,13 +164,14 @@ class TaskObject:
class PrepareStage(TaskObject):
def __init__(self, deleter, pool, outq):
self.deleter = deleter
def __init__(self, flush_tracker, delete_cb, pool, outq):
self.flush_tracker = flush_tracker
self.delete_cb = delete_cb
self.pool = pool
self.outq = outq
self.pool_submitted = asyncio.Event()
self.pool.submitted_callback = lambda: self.pool_submitted.set()
self.pool.submitted_cb = lambda: self.pool_submitted.set()
@asyncio.coroutine
def _push_runs(self):
@ -186,14 +190,18 @@ class PrepareStage(TaskObject):
# pending_runs is an empty sequence
return None
if run.due_date is None or run.due_date < now:
if run.flush:
run.status = RunStatus.flushing
yield from self.flush_tracker.wait_empty()
run.status = RunStatus.preparing
self.flush_tracker.add(run.rid)
try:
yield from run.prepare()
except:
logger.warning("got worker exception in prepare stage, "
"deleting RID %d",
run.rid, exc_info=True)
self.deleter.delete(run.rid)
self.delete_cb(run.rid)
run.status = RunStatus.prepare_done
yield from self.outq.put(run)
else:
@ -214,8 +222,8 @@ class PrepareStage(TaskObject):
class RunStage(TaskObject):
def __init__(self, deleter, inq, outq):
self.deleter = deleter
def __init__(self, delete_cb, inq, outq):
self.delete_cb = delete_cb
self.inq = inq
self.outq = outq
@ -246,7 +254,7 @@ class RunStage(TaskObject):
logger.warning("got worker exception in run stage, "
"deleting RID %d",
run.rid, exc_info=True)
self.deleter.delete(run.rid)
self.delete_cb(run.rid)
else:
if completed:
run.status = RunStatus.run_done
@ -257,8 +265,8 @@ class RunStage(TaskObject):
class AnalyzeStage(TaskObject):
def __init__(self, deleter, inq):
self.deleter = deleter
def __init__(self, delete_cb, inq):
self.delete_cb = delete_cb
self.inq = inq
@asyncio.coroutine
@ -273,17 +281,23 @@ class AnalyzeStage(TaskObject):
logger.warning("got worker exception in analyze stage, "
"deleting RID %d",
run.rid, exc_info=True)
self.deleter.delete(run.rid)
self.delete_cb(run.rid)
run.status = RunStatus.analyze_done
self.deleter.delete(run.rid)
self.delete_cb(run.rid)
class Pipeline:
def __init__(self, ridc, deleter, worker_handlers, notifier):
flush_tracker = WaitSet()
def delete_cb(rid):
deleter.delete(rid)
flush_tracker.discard(rid)
self.pool = RunPool(ridc, worker_handlers, notifier)
self._prepare = PrepareStage(deleter, self.pool, asyncio.Queue(maxsize=1))
self._run = RunStage(deleter, self._prepare.outq, asyncio.Queue(maxsize=1))
self._analyze = AnalyzeStage(deleter, self._run.outq)
self._prepare = PrepareStage(flush_tracker, delete_cb,
self.pool, asyncio.Queue(maxsize=1))
self._run = RunStage(delete_cb,
self._prepare.outq, asyncio.Queue(maxsize=1))
self._analyze = AnalyzeStage(delete_cb, self._run.outq)
def start(self):
self._prepare.start()
@ -366,7 +380,7 @@ class Scheduler:
if self._pipelines:
logger.warning("some pipelines were not garbage-collected")
def submit(self, pipeline_name, expid, priority, due_date):
def submit(self, pipeline_name, expid, priority, due_date, flush):
if self._terminated:
return
try:
@ -377,7 +391,7 @@ class Scheduler:
self._worker_handlers, self.notifier)
self._pipelines[pipeline_name] = pipeline
pipeline.start()
return pipeline.pool.submit(expid, priority, due_date, pipeline_name)
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)
def delete(self, rid):
self._deleter.delete(rid)

View File

@ -79,7 +79,7 @@ class Scheduler:
pause = staticmethod(make_parent_action("pause", ""))
submit = staticmethod(make_parent_action("scheduler_submit",
"pipeline_name expid priority due_date"))
"pipeline_name expid priority due_date flush"))
cancel = staticmethod(make_parent_action("scheduler_cancel", "rid"))
def __init__(self, pipeline_name, expid, priority):

View File

@ -31,7 +31,8 @@ def _get_basic_steps(rid, expid):
return [
{"action": "setitem", "key": rid, "value":
{"pipeline": "main", "status": "pending", "priority": 0,
"expid": expid, "due_date": None}, "path": []},
"expid": expid, "due_date": None, "flush": False},
"path": []},
{"action": "setitem", "key": "status", "value": "preparing",
"path": [rid]},
{"action": "setitem", "key": "status", "value": "prepare_done",
@ -71,7 +72,7 @@ class SchedulerCase(unittest.TestCase):
loop = asyncio.get_event_loop()
scheduler.start()
scheduler.submit("main", expid, 0, None)
scheduler.submit("main", expid, 0, None, False)
loop.run_until_complete(done.wait())
loop.run_until_complete(scheduler.stop())
@ -100,8 +101,8 @@ class SchedulerCase(unittest.TestCase):
loop = asyncio.get_event_loop()
scheduler.start()
scheduler.submit("main", expid_bg, -99, None)
scheduler.submit("main", expid_bg, -99, None, False)
loop.run_until_complete(background_running.wait())
scheduler.submit("main", expid, 0, None)
scheduler.submit("main", expid, 0, None, False)
loop.run_until_complete(done.wait())
loop.run_until_complete(scheduler.stop())

View File

@ -119,3 +119,27 @@ def asyncio_queue_peek(q):
return q._queue[0]
else:
raise asyncio.QueueEmpty
class WaitSet:
def __init__(self):
self._s = set()
self._ev = asyncio.Event()
def _update_ev(self):
if self._s:
self._ev.clear()
else:
self._ev.set()
def add(self, e):
self._s.add(e)
self._update_ev()
def discard(self, e):
self._s.discard(e)
self._update_ev()
@asyncio.coroutine
def wait_empty(self):
yield from self._ev.wait()

View File

@ -51,7 +51,7 @@ class FloppingF(Experiment, AutoDB):
self.brightness.append(brightness)
time.sleep(0.1)
self.scheduler.submit(self.scheduler.pipeline_name, self.scheduler.expid,
self.scheduler.priority, time.time() + 20)
self.scheduler.priority, time.time() + 20, False)
def analyze(self):
popt, pcov = curve_fit(model_numpy,