From b74b8d58267ca295d873c9d552843b13c78e0254 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 17 May 2015 16:11:00 +0800 Subject: [PATCH] Scheduling TNG --- artiq/frontend/artiq_client.py | 114 ++--- artiq/frontend/artiq_master.py | 14 +- artiq/gui/explorer.py | 7 +- artiq/gui/scheduler.py | 130 ++---- artiq/gui/tools.py | 21 + artiq/master/scheduler.py | 436 ++++++++++++++---- artiq/master/worker.py | 165 +++++-- artiq/master/worker_impl.py | 31 +- artiq/tools.py | 22 + .../repository/flopping_f_simulation.py | 3 +- 10 files changed, 600 insertions(+), 343 deletions(-) diff --git a/artiq/frontend/artiq_client.py b/artiq/frontend/artiq_client.py index 31d639a59..6259c90d3 100755 --- a/artiq/frontend/artiq_client.py +++ b/artiq/frontend/artiq_client.py @@ -33,27 +33,23 @@ def get_argparser(): parser_add = subparsers.add_parser("submit", help="submit an experiment") parser_add.add_argument( - "-T", "--timed", default=None, type=str, - help="run the experiment in timed mode. " - "argument specifies the time of the first run, " - "use 'now' to run immediately") + "-t", "--timed", default=None, type=str, + help="set a due date for the experiment") + parser_add.add_argument("-p", "--pipeline", default="main", type=str, + help="pipeline to run the experiment in " + "(default: %(default)s)") parser_add.add_argument("-e", "--experiment", default=None, help="experiment to run") - parser_add.add_argument("--rtr-group", default=None, type=str, - help="real-time result group " - "(defaults to filename)") parser_add.add_argument("file", help="file containing the experiment to run") parser_add.add_argument("arguments", nargs="*", help="run arguments") - parser_cancel = subparsers.add_parser("cancel", - help="cancel an experiment") - parser_cancel.add_argument("-T", "--timed", default=False, - action="store_true", - help="cancel a timed experiment") - parser_cancel.add_argument("rid", type=int, - help="run identifier (RID/TRID)") + parser_delete = subparsers.add_parser("delete", + help="delete an experiment " + "from the schedule") + parser_delete.add_argument("rid", type=int, + help="run identifier (RID)") parser_set_device = subparsers.add_parser( "set-device", help="add or modify a device") @@ -79,7 +75,7 @@ def get_argparser(): "show", help="show schedule, devices or parameters") parser_show.add_argument( "what", - help="select object to show: queue/timed/devices/parameters") + help="select object to show: schedule/devices/parameters") return parser @@ -99,30 +95,21 @@ def _action_submit(remote, args): print("Failed to parse run arguments") sys.exit(1) - run_params = { + expid = { "file": args.file, "experiment": args.experiment, "arguments": arguments, - "rtr_group": args.rtr_group if args.rtr_group is not None \ - else args.file } if args.timed is None: - rid = remote.run_queued(run_params) - print("RID: {}".format(rid)) + due_date = None else: - if args.timed == "now": - next_time = None - else: - next_time = time.mktime(parse_date(args.timed).timetuple()) - trid = remote.run_timed(run_params, next_time) - print("TRID: {}".format(trid)) + due_date = time.mktime(parse_date(args.timed).timetuple()) + rid = remote.submit(args.pipeline, expid, due_date) + print("RID: {}".format(rid)) -def _action_cancel(remote, args): - if args.timed: - remote.cancel_timed(args.rid) - else: - remote.cancel_queued(args.rid) +def _action_delete(remote, args): + remote.delete(args.rid) def _action_set_device(remote, args): @@ -141,41 +128,30 @@ def _action_del_parameter(remote, args): remote.delete(args.name) -def _show_queue(queue): +def _show_schedule(schedule): clear_screen() - if queue: - table = PrettyTable(["RID", "File", "Experiment", "Arguments"]) - for rid, run_params in queue: - row = [rid, run_params["file"]] - if run_params["experiment"] is None: + if schedule: + l = sorted(schedule.items(), + key=lambda x: (x[1]["due_date"] or 0, x[0])) + table = PrettyTable(["RID", "Pipeline", " Status ", "Due date", + "File", "Experiment", "Arguments"]) + for rid, v in l: + row = [rid, v["pipeline"], v["status"]] + if v["due_date"] is None: row.append("") else: - row.append(run_params["experiment"]) - row.append(format_arguments(run_params["arguments"])) + row.append(time.strftime("%m/%d %H:%M:%S", + time.localtime(v["due_date"]))) + row.append(v["expid"]["file"]) + if v["expid"]["experiment"] is None: + row.append("") + else: + row.append(v["expid"]["experiment"]) + row.append(format_arguments(v["expid"]["arguments"])) table.add_row(row) print(table) else: - print("Queue is empty") - - -def _show_timed(timed): - clear_screen() - if timed: - table = PrettyTable(["Next run", "TRID", "File", "Experiment", - "Arguments"]) - sp = sorted(timed.items(), key=lambda x: (x[1][0], x[0])) - for trid, (next_run, run_params) in sp: - row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), - trid, run_params["file"]] - if run_params["experiment"] is None: - row.append("") - else: - row.append(run_params["experiment"]) - row.append(format_arguments(run_params["arguments"])) - table.add_row(row) - print(table) - else: - print("No timed schedule") + print("Schedule is empty") def _show_devices(devices): @@ -211,16 +187,6 @@ def _run_subscriber(host, port, subscriber): loop.close() -def _show_list(args, notifier_name, display_fun): - l = [] - def init_l(x): - l[:] = x - return l - subscriber = Subscriber(notifier_name, init_l, - lambda mod: display_fun(l)) - _run_subscriber(args.server, args.port, subscriber) - - def _show_dict(args, notifier_name, display_fun): d = dict() def init_d(x): @@ -236,10 +202,8 @@ def main(): args = get_argparser().parse_args() action = args.action.replace("-", "_") if action == "show": - if args.what == "queue": - _show_list(args, "queue", _show_queue) - elif args.what == "timed": - _show_dict(args, "timed", _show_timed) + if args.what == "schedule": + _show_dict(args, "schedule", _show_schedule) elif args.what == "devices": _show_dict(args, "devices", _show_devices) elif args.what == "parameters": @@ -251,7 +215,7 @@ def main(): port = 3251 if args.port is None else args.port target_name = { "submit": "master_schedule", - "cancel": "master_schedule", + "delete": "master_schedule", "set_device": "master_ddb", "del_device": "master_ddb", "set_parameter": "master_pdb", diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 8c82de7c7..1c9b19e6a 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -48,20 +48,15 @@ def main(): loop = asyncio.get_event_loop() atexit.register(lambda: loop.close()) - def run_cb(rid, run_params): - rtr.current_group = run_params["rtr_group"] - scheduler = Scheduler(run_cb, get_last_rid() + 1) - scheduler.worker_handlers = { + worker_handlers = { "req_device": ddb.request, "req_parameter": pdb.request, "set_parameter": pdb.set, "init_rt_results": rtr.init, "update_rt_results": rtr.update, - "scheduler_run_queued": scheduler.run_queued, - "scheduler_cancel_queued": scheduler.cancel_queued, - "scheduler_run_timed": scheduler.run_timed, - "scheduler_cancel_timed": scheduler.cancel_timed, } + scheduler = Scheduler(get_last_rid() + 1, worker_handlers) + worker_handlers["scheduler_submit"] = scheduler.submit scheduler.start() atexit.register(lambda: loop.run_until_complete(scheduler.stop())) @@ -76,8 +71,7 @@ def main(): atexit.register(lambda: loop.run_until_complete(server_control.stop())) server_notify = Publisher({ - "queue": scheduler.queue, - "timed": scheduler.timed, + "schedule": scheduler.notifier, "devices": ddb.data, "parameters": pdb.data, "parameters_simplehist": simplephist.history, diff --git a/artiq/gui/explorer.py b/artiq/gui/explorer.py index 3fe7c5e7a..1736acf16 100644 --- a/artiq/gui/explorer.py +++ b/artiq/gui/explorer.py @@ -144,10 +144,9 @@ class ExplorerWindow(Window): arguments = {} else: arguments = self.controls.get_arguments() - run_params = { + expid = { "file": data["file"], "experiment": data["experiment"], - "arguments": arguments, - "rtr_group": data["file"] + "arguments": arguments } - asyncio.Task(self.schedule_ctl.run_queued(run_params)) + asyncio.async(self.schedule_ctl.submit("main", expid, None)) diff --git a/artiq/gui/scheduler.py b/artiq/gui/scheduler.py index 3b8e41e76..3bd8bbb11 100644 --- a/artiq/gui/scheduler.py +++ b/artiq/gui/scheduler.py @@ -3,37 +3,29 @@ import asyncio from gi.repository import Gtk -from artiq.gui.tools import Window, ListSyncer, DictSyncer +from artiq.gui.tools import Window, DictSyncer from artiq.protocols.sync_struct import Subscriber from artiq.tools import format_arguments -class _QueueStoreSyncer(ListSyncer): - def convert(self, x): - rid, run_params = x - row = [rid, run_params["file"]] - if run_params["experiment"] is None: - row.append("") - else: - row.append(run_params["experiment"]) - row.append(format_arguments(run_params["arguments"])) - return row - - -class _TimedStoreSyncer(DictSyncer): +class _ScheduleStoreSyncer(DictSyncer): def order_key(self, kv_pair): - # order by next run time, and then by TRID - return (kv_pair[1][0], kv_pair[0]) + # order by due date, and then by RID + return (kv_pair[1]["due_date"] or 0, kv_pair[0]) - def convert(self, trid, x): - next_run, run_params = x - row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), - trid, run_params["file"]] - if run_params["experiment"] is None: + def convert(self, rid, v): + row = [rid, v["pipeline"], v["status"]] + if v["due_date"] is None: row.append("") else: - row.append(run_params["experiment"]) - row.append(format_arguments(run_params["arguments"])) + row.append(time.strftime("%m/%d %H:%M:%S", + time.localtime(v["due_date"]))) + row.append(v["expid"]["file"]) + if v["expid"]["experiment"] is None: + row.append("") + else: + row.append(v["expid"]["experiment"]) + row.append(format_arguments(v["expid"]["arguments"])) return row @@ -43,93 +35,41 @@ class SchedulerWindow(Window): Window.__init__(self, title="Scheduler", - default_size=(720, 570), + default_size=(950, 570), **kwargs) topvbox = Gtk.VBox(spacing=6) self.add(topvbox) - hbox = Gtk.HBox(spacing=6) - enable = Gtk.Switch(active=True) - label = Gtk.Label("Run experiments") - hbox.pack_start(label, False, False, 0) - hbox.pack_start(enable, False, False, 0) - topvbox.pack_start(hbox, False, False, 0) - - notebook = Gtk.Notebook() - topvbox.pack_start(notebook, True, True, 0) - - self.queue_store = Gtk.ListStore(int, str, str, str) - self.queue_tree = Gtk.TreeView(self.queue_store) - for i, title in enumerate(["RID", "File", "Experiment", "Arguments"]): + self.schedule_store = Gtk.ListStore(int, str, str, str, str, str, str) + self.schedule_tree = Gtk.TreeView(self.schedule_store) + for i, title in enumerate(["RID", "Pipeline", "Status", "Due date", + "File", "Experiment", "Arguments"]): renderer = Gtk.CellRendererText() column = Gtk.TreeViewColumn(title, renderer, text=i) - self.queue_tree.append_column(column) + self.schedule_tree.append_column(column) scroll = Gtk.ScrolledWindow() - scroll.add(self.queue_tree) - vbox = Gtk.VBox(spacing=6) - vbox.pack_start(scroll, True, True, 0) - hbox = Gtk.HBox(spacing=6) - button = Gtk.Button("Find") - hbox.pack_start(button, True, True, 0) - button = Gtk.Button("Move up") - hbox.pack_start(button, True, True, 0) - button = Gtk.Button("Move down") - hbox.pack_start(button, True, True, 0) - button = Gtk.Button("Remove") - button.connect("clicked", self.remove_queued) - hbox.pack_start(button, True, True, 0) - vbox.pack_start(hbox, False, False, 0) - vbox.set_border_width(6) - notebook.insert_page(vbox, Gtk.Label("Queue"), -1) + scroll.add(self.schedule_tree) + topvbox.pack_start(scroll, True, True, 0) + button = Gtk.Button("Delete") + button.connect("clicked", self.delete) + topvbox.pack_start(button, False, False, 0) + topvbox.set_border_width(6) - self.timed_store = Gtk.ListStore(str, int, str, str, str) - self.timed_tree = Gtk.TreeView(self.timed_store) - for i, title in enumerate(["Next run", "TRID", "File", "Experiment", - "Arguments"]): - renderer = Gtk.CellRendererText() - column = Gtk.TreeViewColumn(title, renderer, text=i) - self.timed_tree.append_column(column) - scroll = Gtk.ScrolledWindow() - scroll.add(self.timed_tree) - vbox = Gtk.VBox(spacing=6) - vbox.pack_start(scroll, True, True, 0) - button = Gtk.Button("Remove") - button.connect("clicked", self.remove_timed) - vbox.pack_start(button, False, False, 0) - vbox.set_border_width(6) - notebook.insert_page(vbox, Gtk.Label("Timed schedule"), -1) - - def remove_queued(self, widget): - store, selected = self.queue_tree.get_selection().get_selected() + def delete(self, widget): + store, selected = self.schedule_tree.get_selection().get_selected() if selected is not None: rid = store[selected][0] - asyncio.Task(self.schedule_ctl.cancel_queued(rid)) - - def remove_timed(self, widget): - store, selected = self.timed_tree.get_selection().get_selected() - if selected is not None: - trid = store[selected][1] - asyncio.Task(self.schedule_ctl.cancel_timed(trid)) + asyncio.async(self.schedule_ctl.delete(rid)) @asyncio.coroutine def sub_connect(self, host, port): - self.queue_subscriber = Subscriber("queue", self.init_queue_store) - yield from self.queue_subscriber.connect(host, port) - try: - self.timed_subscriber = Subscriber("timed", self.init_timed_store) - yield from self.timed_subscriber.connect(host, port) - except: - yield from self.queue_subscriber.close() - raise + self.schedule_subscriber = Subscriber("schedule", self.init_schedule_store) + yield from self.schedule_subscriber.connect(host, port) @asyncio.coroutine def sub_close(self): - yield from self.timed_subscriber.close() - yield from self.queue_subscriber.close() + yield from self.schedule_subscriber.close() - def init_queue_store(self, init): - return _QueueStoreSyncer(self.queue_store, init) - - def init_timed_store(self, init): - return _TimedStoreSyncer(self.timed_store, init) + def init_schedule_store(self, init): + return _ScheduleStoreSyncer(self.schedule_store, init) diff --git a/artiq/gui/tools.py b/artiq/gui/tools.py index 4adc1be39..aa329d3db 100644 --- a/artiq/gui/tools.py +++ b/artiq/gui/tools.py @@ -78,6 +78,19 @@ class ListSyncer: raise NotImplementedError +class _DictSyncerSubstruct: + def __init__(self, update_cb, ref): + self.update_cb = update_cb + self.ref = ref + + def __getitem__(self, key): + return _DictSyncerSubstruct(self.update_cb, self.ref[key]) + + def __setitem__(self, key, value): + self.ref[key] = value + self.update_cb() + + class DictSyncer: def __init__(self, store, init): self.store = store @@ -86,6 +99,7 @@ class DictSyncer: for k, v in sorted(init.items(), key=self.order_key): self.store.append(self.convert(k, v)) self.order.append((k, self.order_key((k, v)))) + self.local_copy = init def _find_index(self, key): for i, e in enumerate(self.order): @@ -109,11 +123,18 @@ class DictSyncer: break self.store.insert(j, self.convert(key, value)) self.order.insert(j, (key, ord_el)) + self.local_copy[key] = value def __delitem__(self, key): i = self._find_index(key) del self.store[i] del self.order[i] + del self.local_copy[key] + + def __getitem__(self, key): + def update(): + self[key] = self.local_copy[key] + return _DictSyncerSubstruct(update, self.local_copy[key]) def order_key(self, kv_pair): raise NotImplementedError diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index d1c19396a..fbe560606 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -1,32 +1,150 @@ import asyncio +import logging +from enum import Enum from time import time -from artiq.protocols.sync_struct import Notifier from artiq.master.worker import Worker +from artiq.tools import asyncio_wait_or_cancel, asyncio_queue_peek +from artiq.protocols.sync_struct import Notifier -class Scheduler: - def __init__(self, run_cb, first_rid): - self.worker_handlers = dict() - self.run_cb = run_cb - self.next_rid = first_rid - self.queue = Notifier([]) - self.queue_modified = asyncio.Event() - self.timed = Notifier(dict()) - self.timed_modified = asyncio.Event() +logger = logging.getLogger(__name__) - def new_rid(self): - r = self.next_rid - self.next_rid += 1 - return r - def new_trid(self): - trids = set(range(len(self.timed.read) + 1)) - trids -= set(self.timed.read.keys()) - return next(iter(trids)) +class RunStatus(Enum): + pending = 0 + preparing = 1 + prepare_done = 2 + running = 3 + run_done = 4 + analyzing = 5 + analyze_done = 6 + paused = 7 + +def _mk_worker_method(name): + @asyncio.coroutine + def worker_method(self, *args, **kwargs): + if self._terminated: + return True + m = getattr(self._worker, name) + try: + return (yield from m(*args, **kwargs)) + except Exception as e: + if isinstance(e, asyncio.CancelledError): + raise + if self._terminated: + logger.debug("suppressing worker exception of terminated run", + exc_info=True) + # Return completion on termination + return True + else: + raise + return worker_method + + +class Run: + def __init__(self, rid, pipeline_name, + expid, due_date, + worker_handlers, notifier): + # called through pool + self.rid = rid + self.pipeline_name = pipeline_name + self.expid = expid + self.due_date = due_date + + self._status = RunStatus.pending + self._terminated = False + self._worker = Worker(worker_handlers) + + self._notifier = notifier + self._notifier[self.rid] = { + "pipeline": self.pipeline_name, + "expid": self.expid, + "due_date": self.due_date, + "status": self._status.name + } + + @property + def status(self): + return self._status + + @status.setter + def status(self, value): + self._status = value + if not self._terminated: + self._notifier[self.rid]["status"] = self._status.name + + # The run with the largest priority_key is to be scheduled first + def priority_key(self, now): + if self.due_date is None: + overdue = 0 + due_date_k = 0 + else: + overdue = int(now > self.due_date) + due_date_k = -self.due_date + return (overdue, due_date_k, -self.rid) + + @asyncio.coroutine + def close(self): + # called through pool + self._terminated = True + yield from self._worker.close() + del self._notifier[self.rid] + + _prepare = _mk_worker_method("prepare") + + @asyncio.coroutine + def prepare(self): + yield from self._prepare(self.rid, self.pipeline_name, self.expid) + + run = _mk_worker_method("run") + resume = _mk_worker_method("resume") + analyze = _mk_worker_method("analyze") + write_results = _mk_worker_method("write_results") + + +class RIDCounter: + def __init__(self, next_rid): + self._next_rid = next_rid + + def get(self): + rid = self._next_rid + self._next_rid += 1 + return rid + + +class RunPool: + def __init__(self, ridc, worker_handlers, notifier): + self.runs = dict() + self.submitted_callback = None + + self._ridc = ridc + self._worker_handlers = worker_handlers + self._notifier = notifier + + def submit(self, expid, due_date, pipeline_name): + # called through scheduler + rid = self._ridc.get() + run = Run(rid, pipeline_name, expid, due_date, + self._worker_handlers, self._notifier) + self.runs[rid] = run + if self.submitted_callback is not None: + self.submitted_callback() + return rid + + @asyncio.coroutine + def delete(self, rid): + # called through deleter + if rid not in self.runs: + return + yield from self.runs[rid].close() + del self.runs[rid] + + +class TaskObject: def start(self): - self.task = asyncio.Task(self._schedule()) + self.task = asyncio.async(self._do()) @asyncio.coroutine def stop(self): @@ -34,97 +152,215 @@ class Scheduler: yield from asyncio.wait([self.task]) del self.task - def run_queued(self, run_params): - rid = self.new_rid() - self.queue.append((rid, run_params)) - self.queue_modified.set() - return rid + @asyncio.coroutine + def _do(self): + raise NotImplementedError - def cancel_queued(self, rid): - idx = next(idx for idx, (qrid, _) - in enumerate(self.queue.read) - if qrid == rid) - if idx == 0: - # Cannot cancel when already running - raise NotImplementedError - del self.queue[idx] - def run_timed(self, run_params, next_run): - if next_run is None: - next_run = time() - trid = self.new_trid() - self.timed[trid] = next_run, run_params - self.timed_modified.set() - return trid +class PrepareStage(TaskObject): + def __init__(self, pool, outq): + self.pool = pool + self.outq = outq - def cancel_timed(self, trid): - del self.timed[trid] + self.pool_submitted = asyncio.Event() + self.pool.submitted_callback = lambda: self.pool_submitted.set() @asyncio.coroutine - def _run(self, rid, run_params): - self.run_cb(rid, run_params) - worker = Worker(self.worker_handlers) - try: - yield from worker.prepare(rid, run_params) - try: - yield from worker.run() - yield from worker.analyze() - yield from worker.write_results() - finally: - yield from worker.close() - except Exception as e: - print("RID {} failed:".format(rid)) - print("{}: {}".format(e.__class__.__name__, e)) - else: - print("RID {} completed successfully".format(rid)) + def _push_runs(self): + """Pushes all runs that have no due date of have a due date in the + past. - @asyncio.coroutine - def _run_timed(self): + Returns the time before the next schedulable run, or None if the + pool is empty.""" while True: - min_next_run = None - min_trid = None - for trid, params in self.timed.read.items(): - if min_next_run is None or params[0] < min_next_run: - min_next_run = params[0] - min_trid = trid - now = time() - - if min_next_run is None: + pending_runs = filter(lambda r: r.status == RunStatus.pending, + self.pool.runs.values()) + try: + run = max(pending_runs, key=lambda r: r.priority_key(now)) + except ValueError: + # pending_runs is an empty sequence return None - min_next_run -= now - if min_next_run > 0: - return min_next_run - - next_run, run_params = self.timed.read[min_trid] - del self.timed[min_trid] - - rid = self.new_rid() - self.queue.insert(0, (rid, run_params)) - yield from self._run(rid, run_params) - del self.queue[0] + if run.due_date is None or run.due_date < now: + run.status = RunStatus.preparing + yield from run.prepare() + run.status = RunStatus.prepare_done + yield from self.outq.put(run) + else: + return run.due_date - now @asyncio.coroutine - def _schedule(self): + def _do(self): while True: - next_timed = yield from self._run_timed() - if self.queue.read: - rid, run_params = self.queue.read[0] - yield from self._run(rid, run_params) - del self.queue[0] + next_timed_in = yield from self._push_runs() + if next_timed_in is None: + # pool is empty - wait for something to be added to it + yield from self.pool_submitted.wait() else: - self.queue_modified.clear() - self.timed_modified.clear() - t1 = asyncio.Task(self.queue_modified.wait()) - t2 = asyncio.Task(self.timed_modified.wait()) - try: - done, pend = yield from asyncio.wait( - [t1, t2], - timeout=next_timed, - return_when=asyncio.FIRST_COMPLETED) - except: - t1.cancel() - t2.cancel() - raise - for t in pend: - t.cancel() + # wait for next_timed_in seconds, or until the pool is modified + yield from asyncio_wait_or_cancel([self.pool_submitted.wait()], + timeout=next_timed_in) + self.pool_submitted.clear() + + +class RunStage(TaskObject): + def __init__(self, deleter, inq, outq): + self.deleter = deleter + self.inq = inq + self.outq = outq + + @asyncio.coroutine + def _do(self): + stack = [] + + while True: + try: + next_irun = asyncio_queue_peek(self.inq) + except asyncio.QueueEmpty: + next_irun = None + now = time() + if not stack or ( + next_irun is not None and + next_irun.priority_key(now) > stack[-1].priority_key(now)): + stack.append((yield from self.inq.get())) + + run = stack.pop() + try: + if run.status == RunStatus.paused: + run.status = RunStatus.running + completed = yield from run.resume() + else: + run.status = RunStatus.running + completed = yield from run.run() + except: + logger.warning("got worker exception, deleting RID %d", + run.rid, exc_info=True) + self.deleter.delete(run.rid) + else: + if completed: + run.status = RunStatus.run_done + yield from self.outq.put(run) + else: + run.status = RunStatus.paused + stack.append(run) + + +class AnalyzeStage(TaskObject): + def __init__(self, deleter, inq): + self.deleter = deleter + self.inq = inq + + @asyncio.coroutine + def _do(self): + while True: + run = yield from self.inq.get() + run.status = RunStatus.analyzing + yield from run.analyze() + yield from run.write_results() + run.status = RunStatus.analyze_done + self.deleter.delete(run.rid) + + +class Pipeline: + def __init__(self, ridc, deleter, worker_handlers, notifier): + self.pool = RunPool(ridc, worker_handlers, notifier) + self._prepare = PrepareStage(self.pool, asyncio.Queue(maxsize=1)) + self._run = RunStage(deleter, self._prepare.outq, asyncio.Queue(maxsize=1)) + self._analyze = AnalyzeStage(deleter, self._run.outq) + + def start(self): + self._prepare.start() + self._run.start() + self._analyze.start() + + @asyncio.coroutine + def stop(self): + # NB: restart of a stopped pipeline is not supported + yield from self._analyze.stop() + yield from self._run.stop() + yield from self._prepare.stop() + + +class Deleter(TaskObject): + def __init__(self, pipelines): + self._pipelines = pipelines + self._queue = asyncio.JoinableQueue() + + def delete(self, rid): + logger.debug("delete request for RID %d", rid) + self._queue.put_nowait(rid) + + @asyncio.coroutine + def join(self): + yield from self._queue.join() + + @asyncio.coroutine + def _delete(self, rid): + for pipeline in self._pipelines.values(): + if rid in pipeline.pool.runs: + logger.debug("deleting RID %d...", rid) + yield from pipeline.pool.delete(rid) + logger.debug("deletion of RID %d completed", rid) + break + + @asyncio.coroutine + def _gc_pipelines(self): + pipeline_names = list(self._pipelines.keys()) + for name in pipeline_names: + if not self._pipelines[name].pool.runs: + logger.debug("garbage-collecting pipeline '%s'...", name) + yield from self._pipelines[name].stop() + del self._pipelines[name] + logger.debug("garbage-collection of pipeline '%s' completed", + name) + + @asyncio.coroutine + def _do(self): + while True: + rid = yield from self._queue.get() + yield from self._delete(rid) + yield from self._gc_pipelines() + self._queue.task_done() + + +class Scheduler: + def __init__(self, next_rid, worker_handlers): + self.notifier = Notifier(dict()) + + self._pipelines = dict() + self._worker_handlers = worker_handlers + self._terminated = False + + self._ridc = RIDCounter(next_rid) + self._deleter = Deleter(self._pipelines) + + def start(self): + self._deleter.start() + + @asyncio.coroutine + def stop(self): + # NB: restart of a stopped scheduler is not supported + self._terminated = True # prevent further runs from being created + for pipeline in self._pipelines.values(): + for rid in pipeline.pool.runs.keys(): + self._deleter.delete(rid) + yield from self._deleter.join() + yield from self._deleter.stop() + if self._pipelines: + logger.warning("some pipelines were not garbage-collected") + + def submit(self, pipeline_name, expid, due_date): + if self._terminated: + return + try: + pipeline = self._pipelines[pipeline_name] + except KeyError: + logger.debug("creating pipeline '%s'", pipeline_name) + pipeline = Pipeline(self._ridc, self._deleter, + self._worker_handlers, self.notifier) + self._pipelines[pipeline_name] = pipeline + pipeline.start() + return pipeline.pool.submit(expid, due_date, pipeline_name) + + def delete(self, rid): + self._deleter.delete(rid) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 1ddc63307..d23a9932a 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -1,12 +1,16 @@ import sys import asyncio +import logging import subprocess import traceback import time from artiq.protocols import pyon from artiq.language.units import strip_unit -from artiq.tools import asyncio_process_wait_timeout +from artiq.tools import asyncio_process_wait_timeout, asyncio_wait_or_cancel + + +logger = logging.getLogger(__name__) class WorkerTimeout(Exception): @@ -30,8 +34,14 @@ class Worker: self.term_timeout = term_timeout self.prepare_timeout = prepare_timeout self.results_timeout = results_timeout + + self.rid = None + self.process = None self.watchdogs = dict() # wid -> expiration (using time.monotonic) + self.io_lock = asyncio.Lock() + self.closed = asyncio.Event() + def create_watchdog(self, t): n_user_watchdogs = len(self.watchdogs) if -1 in self.watchdogs: @@ -53,50 +63,82 @@ class Worker: @asyncio.coroutine def _create_process(self): - self.process = yield from asyncio.create_subprocess_exec( - sys.executable, "-m", "artiq.master.worker_impl", - stdout=subprocess.PIPE, stdin=subprocess.PIPE) + yield from self.io_lock.acquire() + try: + if self.closed.is_set(): + raise WorkerError("Attempting to create process after close") + self.process = yield from asyncio.create_subprocess_exec( + sys.executable, "-m", "artiq.master.worker_impl", + stdout=subprocess.PIPE, stdin=subprocess.PIPE) + finally: + self.io_lock.release() @asyncio.coroutine def close(self): - if self.process.returncode is not None: - if process.returncode != 0: - raise WorkerError("Worker finished with status code {}" - .format(process.returncode)) - return - obj = {"action": "terminate"} + self.closed.set() + yield from self.io_lock.acquire() try: - yield from self._send(obj, self.send_timeout) - except: - self.process.kill() - return - try: - yield from asyncio_process_wait_timeout(self.process, - self.term_timeout) - except asyncio.TimeoutError: - self.process.kill() + if self.process is None: + # Note the %s - self.rid can be None + logger.debug("worker was not created (RID %s)", self.rid) + return + if self.process.returncode is not None: + logger.debug("worker already terminated (RID %d)", self.rid) + if self.process.returncode != 0: + logger.warning("worker finished with status code %d" + " (RID %d)", self.process.returncode, + self.rid) + return + obj = {"action": "terminate"} + try: + yield from self._send(obj, self.send_timeout, cancellable=False) + except: + logger.warning("failed to send terminate command to worker" + " (RID %d), killing", self.rid, exc_info=True) + self.process.kill() + return + try: + yield from asyncio_process_wait_timeout(self.process, + self.term_timeout) + except asyncio.TimeoutError: + logger.warning("worker did not exit (RID %d), killing", self.rid) + self.process.kill() + else: + logger.debug("worker exited gracefully (RID %d)", self.rid) + finally: + self.io_lock.release() @asyncio.coroutine - def _send(self, obj, timeout): + def _send(self, obj, timeout, cancellable=True): + assert self.io_lock.locked() line = pyon.encode(obj) self.process.stdin.write(line.encode()) self.process.stdin.write("\n".encode()) - try: - fut = self.process.stdin.drain() - if fut is not (): # FIXME: why does Python return this? - yield from asyncio.wait_for(fut, timeout=timeout) - except asyncio.TimeoutError: - raise WorkerTimeout("Timeout sending data from worker") - except: - raise WorkerError("Failed to send data to worker") + ifs = [self.process.stdin.drain()] + if cancellable: + ifs.append(self.closed.wait()) + fs = yield from asyncio_wait_or_cancel( + ifs, timeout=timeout, + return_when=asyncio.FIRST_COMPLETED) + if all(f.cancelled() for f in fs): + raise WorkerTimeout("Timeout sending data to worker") + for f in fs: + if not f.cancelled() and f.done(): + f.result() # raise any exceptions + if cancellable and self.closed.is_set(): + raise WorkerError("Data transmission to worker cancelled") @asyncio.coroutine def _recv(self, timeout): - try: - line = yield from asyncio.wait_for( - self.process.stdout.readline(), timeout=timeout) - except asyncio.TimeoutError: - raise WorkerTimeout("Timeout receiving data from worker") + assert self.io_lock.locked() + fs = yield from asyncio_wait_or_cancel( + [self.process.stdout.readline(), self.closed.wait()], + timeout=timeout, return_when=asyncio.FIRST_COMPLETED) + if all(f.cancelled() for f in fs): + raise WorkerTimeout("Timeout sending data to worker") + if self.closed.is_set(): + raise WorkerError("Data transmission to worker cancelled") + line = fs[0].result() if not line: raise WorkerError("Worker ended while attempting to receive data") try: @@ -109,12 +151,18 @@ class Worker: def _handle_worker_requests(self): while True: try: - obj = yield from self._recv(self.watchdog_time()) + yield from self.io_lock.acquire() + try: + obj = yield from self._recv(self.watchdog_time()) + finally: + self.io_lock.release() except WorkerTimeout: raise WorkerWatchdogTimeout action = obj["action"] if action == "completed": - return + return True + elif action == "pause": + return False del obj["action"] if action == "create_watchdog": func = self.create_watchdog @@ -128,36 +176,59 @@ class Worker: except: reply = {"status": "failed", "message": traceback.format_exc()} - yield from self._send(reply, self.send_timeout) + yield from self.io_lock.acquire() + try: + yield from self._send(reply, self.send_timeout) + finally: + self.io_lock.release() @asyncio.coroutine def _worker_action(self, obj, timeout=None): if timeout is not None: self.watchdogs[-1] = time.monotonic() + timeout try: - yield from self._send(obj, self.send_timeout) + yield from self.io_lock.acquire() try: - yield from self._handle_worker_requests() + yield from self._send(obj, self.send_timeout) + finally: + self.io_lock.release() + try: + completed = yield from self._handle_worker_requests() except WorkerTimeout: raise WorkerWatchdogTimeout finally: if timeout is not None: del self.watchdogs[-1] + return completed @asyncio.coroutine - def prepare(self, rid, run_params): + def prepare(self, rid, pipeline_name, expid): + self.rid = rid yield from self._create_process() - try: - yield from self._worker_action( - {"action": "prepare", "rid": rid, "run_params": run_params}, - self.prepare_timeout) - except: - yield from self.close() - raise + yield from self._worker_action( + {"action": "prepare", + "rid": rid, + "pipeline_name": pipeline_name, + "expid": expid}, + self.prepare_timeout) @asyncio.coroutine def run(self): - yield from self._worker_action({"action": "run"}) + completed = yield from self._worker_action({"action": "run"}) + if not completed: + self.yield_time = time.monotonic() + return completed + + @asyncio.coroutine + def resume(self): + stop_duration = time.monotonic() - self.yield_time + for wid, expiry in self.watchdogs: + self.watchdogs[wid] += stop_duration + completed = yield from self._worker_action({"status": "ok", + "data": None}) + if not completed: + self.yield_time = time.monotonic() + return completed @asyncio.coroutine def analyze(self): diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 361af948e..722fc3504 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -33,6 +33,11 @@ def make_parent_action(action, argnames, exception=ParentActionError): request[argname] = arg put_object(request) reply = get_object() + if "action" in reply: + if reply["action"] == "terminate": + sys.exit() + else: + raise ValueError if reply["status"] == "ok": return reply["data"] else: @@ -71,11 +76,15 @@ set_watchdog_factory(Watchdog) class Scheduler: - run_queued = make_parent_action("scheduler_run_queued", "run_params") - cancel_queued = make_parent_action("scheduler_cancel_queued", "rid") - run_timed = make_parent_action("scheduler_run_timed", - "run_params next_run") - cancel_timed = make_parent_action("scheduler_cancel_timed", "trid") + pause = staticmethod(make_parent_action("pause", "")) + + submit = staticmethod(make_parent_action("scheduler_submit", + "pipeline_name expid due_date")) + cancel = staticmethod(make_parent_action("scheduler_cancel", "rid")) + + def __init__(self, pipeline_name, expid): + self.pipeline_name = pipeline_name + self.expid = expid def get_exp(file, exp): @@ -96,7 +105,7 @@ def main(): start_time = None rid = None - run_params = None + expid = None exp = None exp_inst = None @@ -110,12 +119,12 @@ def main(): if action == "prepare": start_time = time.localtime() rid = obj["rid"] - run_params = obj["run_params"] - exp = get_exp(run_params["file"], run_params["experiment"]) + pipeline_name = obj["pipeline_name"] + expid = obj["expid"] + exp = get_exp(expid["file"], expid["experiment"]) exp_inst = exp(dbh, - scheduler=Scheduler, - run_params=run_params, - **run_params["arguments"]) + scheduler=Scheduler(pipeline_name, expid), + **expid["arguments"]) rdb.build() put_object({"action": "completed"}) elif action == "run": diff --git a/artiq/tools.py b/artiq/tools.py index 197fa6c44..78bf3b819 100644 --- a/artiq/tools.py +++ b/artiq/tools.py @@ -96,3 +96,25 @@ def asyncio_process_wait_timeout(process, timeout): r = yield from asyncio.wait_for( process.stdout.read(1024), timeout=end_time - time.monotonic()) + + +@asyncio.coroutine +def asyncio_wait_or_cancel(fs, **kwargs): + fs = [asyncio.async(f) for f in fs] + try: + d, p = yield from asyncio.wait(fs, **kwargs) + except: + for f in fs: + f.cancel() + raise + for f in p: + f.cancel() + return fs + + +def asyncio_queue_peek(q): + """Like q.get_nowait(), but does not remove the item from the queue.""" + if q._queue: + return q._queue[0] + else: + raise asyncio.QueueEmpty diff --git a/examples/master/repository/flopping_f_simulation.py b/examples/master/repository/flopping_f_simulation.py index 0a509336d..299e4f45c 100644 --- a/examples/master/repository/flopping_f_simulation.py +++ b/examples/master/repository/flopping_f_simulation.py @@ -51,7 +51,8 @@ class FloppingF(Experiment, AutoDB): self.frequency.append(frequency) self.brightness.append(brightness) time.sleep(0.1) - self.scheduler.run_timed(self.run_params, time.time() + 20) + self.scheduler.submit(self.scheduler.pipeline_name, self.scheduler.expid, + time.time() + 20) def analyze(self): popt, pcov = curve_fit(model_numpy,