Scheduling TNG

This commit is contained in:
Sebastien Bourdeauducq 2015-05-17 16:11:00 +08:00
parent e557d7e2df
commit b74b8d5826
10 changed files with 600 additions and 343 deletions

View File

@ -33,27 +33,23 @@ def get_argparser():
parser_add = subparsers.add_parser("submit", help="submit an experiment") parser_add = subparsers.add_parser("submit", help="submit an experiment")
parser_add.add_argument( parser_add.add_argument(
"-T", "--timed", default=None, type=str, "-t", "--timed", default=None, type=str,
help="run the experiment in timed mode. " help="set a due date for the experiment")
"argument specifies the time of the first run, " parser_add.add_argument("-p", "--pipeline", default="main", type=str,
"use 'now' to run immediately") help="pipeline to run the experiment in "
"(default: %(default)s)")
parser_add.add_argument("-e", "--experiment", default=None, parser_add.add_argument("-e", "--experiment", default=None,
help="experiment to run") help="experiment to run")
parser_add.add_argument("--rtr-group", default=None, type=str,
help="real-time result group "
"(defaults to filename)")
parser_add.add_argument("file", parser_add.add_argument("file",
help="file containing the experiment to run") help="file containing the experiment to run")
parser_add.add_argument("arguments", nargs="*", parser_add.add_argument("arguments", nargs="*",
help="run arguments") help="run arguments")
parser_cancel = subparsers.add_parser("cancel", parser_delete = subparsers.add_parser("delete",
help="cancel an experiment") help="delete an experiment "
parser_cancel.add_argument("-T", "--timed", default=False, "from the schedule")
action="store_true", parser_delete.add_argument("rid", type=int,
help="cancel a timed experiment") help="run identifier (RID)")
parser_cancel.add_argument("rid", type=int,
help="run identifier (RID/TRID)")
parser_set_device = subparsers.add_parser( parser_set_device = subparsers.add_parser(
"set-device", help="add or modify a device") "set-device", help="add or modify a device")
@ -79,7 +75,7 @@ def get_argparser():
"show", help="show schedule, devices or parameters") "show", help="show schedule, devices or parameters")
parser_show.add_argument( parser_show.add_argument(
"what", "what",
help="select object to show: queue/timed/devices/parameters") help="select object to show: schedule/devices/parameters")
return parser return parser
@ -99,30 +95,21 @@ def _action_submit(remote, args):
print("Failed to parse run arguments") print("Failed to parse run arguments")
sys.exit(1) sys.exit(1)
run_params = { expid = {
"file": args.file, "file": args.file,
"experiment": args.experiment, "experiment": args.experiment,
"arguments": arguments, "arguments": arguments,
"rtr_group": args.rtr_group if args.rtr_group is not None \
else args.file
} }
if args.timed is None: if args.timed is None:
rid = remote.run_queued(run_params) due_date = None
else:
due_date = time.mktime(parse_date(args.timed).timetuple())
rid = remote.submit(args.pipeline, expid, due_date)
print("RID: {}".format(rid)) print("RID: {}".format(rid))
else:
if args.timed == "now":
next_time = None
else:
next_time = time.mktime(parse_date(args.timed).timetuple())
trid = remote.run_timed(run_params, next_time)
print("TRID: {}".format(trid))
def _action_cancel(remote, args): def _action_delete(remote, args):
if args.timed: remote.delete(args.rid)
remote.cancel_timed(args.rid)
else:
remote.cancel_queued(args.rid)
def _action_set_device(remote, args): def _action_set_device(remote, args):
@ -141,41 +128,30 @@ def _action_del_parameter(remote, args):
remote.delete(args.name) remote.delete(args.name)
def _show_queue(queue): def _show_schedule(schedule):
clear_screen() clear_screen()
if queue: if schedule:
table = PrettyTable(["RID", "File", "Experiment", "Arguments"]) l = sorted(schedule.items(),
for rid, run_params in queue: key=lambda x: (x[1]["due_date"] or 0, x[0]))
row = [rid, run_params["file"]] table = PrettyTable(["RID", "Pipeline", " Status ", "Due date",
if run_params["experiment"] is None: "File", "Experiment", "Arguments"])
for rid, v in l:
row = [rid, v["pipeline"], v["status"]]
if v["due_date"] is None:
row.append("") row.append("")
else: else:
row.append(run_params["experiment"]) row.append(time.strftime("%m/%d %H:%M:%S",
row.append(format_arguments(run_params["arguments"])) time.localtime(v["due_date"])))
row.append(v["expid"]["file"])
if v["expid"]["experiment"] is None:
row.append("")
else:
row.append(v["expid"]["experiment"])
row.append(format_arguments(v["expid"]["arguments"]))
table.add_row(row) table.add_row(row)
print(table) print(table)
else: else:
print("Queue is empty") print("Schedule is empty")
def _show_timed(timed):
clear_screen()
if timed:
table = PrettyTable(["Next run", "TRID", "File", "Experiment",
"Arguments"])
sp = sorted(timed.items(), key=lambda x: (x[1][0], x[0]))
for trid, (next_run, run_params) in sp:
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
trid, run_params["file"]]
if run_params["experiment"] is None:
row.append("")
else:
row.append(run_params["experiment"])
row.append(format_arguments(run_params["arguments"]))
table.add_row(row)
print(table)
else:
print("No timed schedule")
def _show_devices(devices): def _show_devices(devices):
@ -211,16 +187,6 @@ def _run_subscriber(host, port, subscriber):
loop.close() loop.close()
def _show_list(args, notifier_name, display_fun):
l = []
def init_l(x):
l[:] = x
return l
subscriber = Subscriber(notifier_name, init_l,
lambda mod: display_fun(l))
_run_subscriber(args.server, args.port, subscriber)
def _show_dict(args, notifier_name, display_fun): def _show_dict(args, notifier_name, display_fun):
d = dict() d = dict()
def init_d(x): def init_d(x):
@ -236,10 +202,8 @@ def main():
args = get_argparser().parse_args() args = get_argparser().parse_args()
action = args.action.replace("-", "_") action = args.action.replace("-", "_")
if action == "show": if action == "show":
if args.what == "queue": if args.what == "schedule":
_show_list(args, "queue", _show_queue) _show_dict(args, "schedule", _show_schedule)
elif args.what == "timed":
_show_dict(args, "timed", _show_timed)
elif args.what == "devices": elif args.what == "devices":
_show_dict(args, "devices", _show_devices) _show_dict(args, "devices", _show_devices)
elif args.what == "parameters": elif args.what == "parameters":
@ -251,7 +215,7 @@ def main():
port = 3251 if args.port is None else args.port port = 3251 if args.port is None else args.port
target_name = { target_name = {
"submit": "master_schedule", "submit": "master_schedule",
"cancel": "master_schedule", "delete": "master_schedule",
"set_device": "master_ddb", "set_device": "master_ddb",
"del_device": "master_ddb", "del_device": "master_ddb",
"set_parameter": "master_pdb", "set_parameter": "master_pdb",

View File

@ -48,20 +48,15 @@ def main():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
atexit.register(lambda: loop.close()) atexit.register(lambda: loop.close())
def run_cb(rid, run_params): worker_handlers = {
rtr.current_group = run_params["rtr_group"]
scheduler = Scheduler(run_cb, get_last_rid() + 1)
scheduler.worker_handlers = {
"req_device": ddb.request, "req_device": ddb.request,
"req_parameter": pdb.request, "req_parameter": pdb.request,
"set_parameter": pdb.set, "set_parameter": pdb.set,
"init_rt_results": rtr.init, "init_rt_results": rtr.init,
"update_rt_results": rtr.update, "update_rt_results": rtr.update,
"scheduler_run_queued": scheduler.run_queued,
"scheduler_cancel_queued": scheduler.cancel_queued,
"scheduler_run_timed": scheduler.run_timed,
"scheduler_cancel_timed": scheduler.cancel_timed,
} }
scheduler = Scheduler(get_last_rid() + 1, worker_handlers)
worker_handlers["scheduler_submit"] = scheduler.submit
scheduler.start() scheduler.start()
atexit.register(lambda: loop.run_until_complete(scheduler.stop())) atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
@ -76,8 +71,7 @@ def main():
atexit.register(lambda: loop.run_until_complete(server_control.stop())) atexit.register(lambda: loop.run_until_complete(server_control.stop()))
server_notify = Publisher({ server_notify = Publisher({
"queue": scheduler.queue, "schedule": scheduler.notifier,
"timed": scheduler.timed,
"devices": ddb.data, "devices": ddb.data,
"parameters": pdb.data, "parameters": pdb.data,
"parameters_simplehist": simplephist.history, "parameters_simplehist": simplephist.history,

View File

@ -144,10 +144,9 @@ class ExplorerWindow(Window):
arguments = {} arguments = {}
else: else:
arguments = self.controls.get_arguments() arguments = self.controls.get_arguments()
run_params = { expid = {
"file": data["file"], "file": data["file"],
"experiment": data["experiment"], "experiment": data["experiment"],
"arguments": arguments, "arguments": arguments
"rtr_group": data["file"]
} }
asyncio.Task(self.schedule_ctl.run_queued(run_params)) asyncio.async(self.schedule_ctl.submit("main", expid, None))

View File

@ -3,37 +3,29 @@ import asyncio
from gi.repository import Gtk from gi.repository import Gtk
from artiq.gui.tools import Window, ListSyncer, DictSyncer from artiq.gui.tools import Window, DictSyncer
from artiq.protocols.sync_struct import Subscriber from artiq.protocols.sync_struct import Subscriber
from artiq.tools import format_arguments from artiq.tools import format_arguments
class _QueueStoreSyncer(ListSyncer): class _ScheduleStoreSyncer(DictSyncer):
def convert(self, x):
rid, run_params = x
row = [rid, run_params["file"]]
if run_params["experiment"] is None:
row.append("")
else:
row.append(run_params["experiment"])
row.append(format_arguments(run_params["arguments"]))
return row
class _TimedStoreSyncer(DictSyncer):
def order_key(self, kv_pair): def order_key(self, kv_pair):
# order by next run time, and then by TRID # order by due date, and then by RID
return (kv_pair[1][0], kv_pair[0]) return (kv_pair[1]["due_date"] or 0, kv_pair[0])
def convert(self, trid, x): def convert(self, rid, v):
next_run, run_params = x row = [rid, v["pipeline"], v["status"]]
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), if v["due_date"] is None:
trid, run_params["file"]]
if run_params["experiment"] is None:
row.append("") row.append("")
else: else:
row.append(run_params["experiment"]) row.append(time.strftime("%m/%d %H:%M:%S",
row.append(format_arguments(run_params["arguments"])) time.localtime(v["due_date"])))
row.append(v["expid"]["file"])
if v["expid"]["experiment"] is None:
row.append("")
else:
row.append(v["expid"]["experiment"])
row.append(format_arguments(v["expid"]["arguments"]))
return row return row
@ -43,93 +35,41 @@ class SchedulerWindow(Window):
Window.__init__(self, Window.__init__(self,
title="Scheduler", title="Scheduler",
default_size=(720, 570), default_size=(950, 570),
**kwargs) **kwargs)
topvbox = Gtk.VBox(spacing=6) topvbox = Gtk.VBox(spacing=6)
self.add(topvbox) self.add(topvbox)
hbox = Gtk.HBox(spacing=6) self.schedule_store = Gtk.ListStore(int, str, str, str, str, str, str)
enable = Gtk.Switch(active=True) self.schedule_tree = Gtk.TreeView(self.schedule_store)
label = Gtk.Label("Run experiments") for i, title in enumerate(["RID", "Pipeline", "Status", "Due date",
hbox.pack_start(label, False, False, 0) "File", "Experiment", "Arguments"]):
hbox.pack_start(enable, False, False, 0)
topvbox.pack_start(hbox, False, False, 0)
notebook = Gtk.Notebook()
topvbox.pack_start(notebook, True, True, 0)
self.queue_store = Gtk.ListStore(int, str, str, str)
self.queue_tree = Gtk.TreeView(self.queue_store)
for i, title in enumerate(["RID", "File", "Experiment", "Arguments"]):
renderer = Gtk.CellRendererText() renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=i) column = Gtk.TreeViewColumn(title, renderer, text=i)
self.queue_tree.append_column(column) self.schedule_tree.append_column(column)
scroll = Gtk.ScrolledWindow() scroll = Gtk.ScrolledWindow()
scroll.add(self.queue_tree) scroll.add(self.schedule_tree)
vbox = Gtk.VBox(spacing=6) topvbox.pack_start(scroll, True, True, 0)
vbox.pack_start(scroll, True, True, 0) button = Gtk.Button("Delete")
hbox = Gtk.HBox(spacing=6) button.connect("clicked", self.delete)
button = Gtk.Button("Find") topvbox.pack_start(button, False, False, 0)
hbox.pack_start(button, True, True, 0) topvbox.set_border_width(6)
button = Gtk.Button("Move up")
hbox.pack_start(button, True, True, 0)
button = Gtk.Button("Move down")
hbox.pack_start(button, True, True, 0)
button = Gtk.Button("Remove")
button.connect("clicked", self.remove_queued)
hbox.pack_start(button, True, True, 0)
vbox.pack_start(hbox, False, False, 0)
vbox.set_border_width(6)
notebook.insert_page(vbox, Gtk.Label("Queue"), -1)
self.timed_store = Gtk.ListStore(str, int, str, str, str) def delete(self, widget):
self.timed_tree = Gtk.TreeView(self.timed_store) store, selected = self.schedule_tree.get_selection().get_selected()
for i, title in enumerate(["Next run", "TRID", "File", "Experiment",
"Arguments"]):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=i)
self.timed_tree.append_column(column)
scroll = Gtk.ScrolledWindow()
scroll.add(self.timed_tree)
vbox = Gtk.VBox(spacing=6)
vbox.pack_start(scroll, True, True, 0)
button = Gtk.Button("Remove")
button.connect("clicked", self.remove_timed)
vbox.pack_start(button, False, False, 0)
vbox.set_border_width(6)
notebook.insert_page(vbox, Gtk.Label("Timed schedule"), -1)
def remove_queued(self, widget):
store, selected = self.queue_tree.get_selection().get_selected()
if selected is not None: if selected is not None:
rid = store[selected][0] rid = store[selected][0]
asyncio.Task(self.schedule_ctl.cancel_queued(rid)) asyncio.async(self.schedule_ctl.delete(rid))
def remove_timed(self, widget):
store, selected = self.timed_tree.get_selection().get_selected()
if selected is not None:
trid = store[selected][1]
asyncio.Task(self.schedule_ctl.cancel_timed(trid))
@asyncio.coroutine @asyncio.coroutine
def sub_connect(self, host, port): def sub_connect(self, host, port):
self.queue_subscriber = Subscriber("queue", self.init_queue_store) self.schedule_subscriber = Subscriber("schedule", self.init_schedule_store)
yield from self.queue_subscriber.connect(host, port) yield from self.schedule_subscriber.connect(host, port)
try:
self.timed_subscriber = Subscriber("timed", self.init_timed_store)
yield from self.timed_subscriber.connect(host, port)
except:
yield from self.queue_subscriber.close()
raise
@asyncio.coroutine @asyncio.coroutine
def sub_close(self): def sub_close(self):
yield from self.timed_subscriber.close() yield from self.schedule_subscriber.close()
yield from self.queue_subscriber.close()
def init_queue_store(self, init): def init_schedule_store(self, init):
return _QueueStoreSyncer(self.queue_store, init) return _ScheduleStoreSyncer(self.schedule_store, init)
def init_timed_store(self, init):
return _TimedStoreSyncer(self.timed_store, init)

View File

@ -78,6 +78,19 @@ class ListSyncer:
raise NotImplementedError raise NotImplementedError
class _DictSyncerSubstruct:
def __init__(self, update_cb, ref):
self.update_cb = update_cb
self.ref = ref
def __getitem__(self, key):
return _DictSyncerSubstruct(self.update_cb, self.ref[key])
def __setitem__(self, key, value):
self.ref[key] = value
self.update_cb()
class DictSyncer: class DictSyncer:
def __init__(self, store, init): def __init__(self, store, init):
self.store = store self.store = store
@ -86,6 +99,7 @@ class DictSyncer:
for k, v in sorted(init.items(), key=self.order_key): for k, v in sorted(init.items(), key=self.order_key):
self.store.append(self.convert(k, v)) self.store.append(self.convert(k, v))
self.order.append((k, self.order_key((k, v)))) self.order.append((k, self.order_key((k, v))))
self.local_copy = init
def _find_index(self, key): def _find_index(self, key):
for i, e in enumerate(self.order): for i, e in enumerate(self.order):
@ -109,11 +123,18 @@ class DictSyncer:
break break
self.store.insert(j, self.convert(key, value)) self.store.insert(j, self.convert(key, value))
self.order.insert(j, (key, ord_el)) self.order.insert(j, (key, ord_el))
self.local_copy[key] = value
def __delitem__(self, key): def __delitem__(self, key):
i = self._find_index(key) i = self._find_index(key)
del self.store[i] del self.store[i]
del self.order[i] del self.order[i]
del self.local_copy[key]
def __getitem__(self, key):
def update():
self[key] = self.local_copy[key]
return _DictSyncerSubstruct(update, self.local_copy[key])
def order_key(self, kv_pair): def order_key(self, kv_pair):
raise NotImplementedError raise NotImplementedError

View File

@ -1,32 +1,150 @@
import asyncio import asyncio
import logging
from enum import Enum
from time import time from time import time
from artiq.protocols.sync_struct import Notifier
from artiq.master.worker import Worker from artiq.master.worker import Worker
from artiq.tools import asyncio_wait_or_cancel, asyncio_queue_peek
from artiq.protocols.sync_struct import Notifier
class Scheduler: logger = logging.getLogger(__name__)
def __init__(self, run_cb, first_rid):
self.worker_handlers = dict()
self.run_cb = run_cb
self.next_rid = first_rid
self.queue = Notifier([])
self.queue_modified = asyncio.Event()
self.timed = Notifier(dict())
self.timed_modified = asyncio.Event()
def new_rid(self):
r = self.next_rid
self.next_rid += 1
return r
def new_trid(self): class RunStatus(Enum):
trids = set(range(len(self.timed.read) + 1)) pending = 0
trids -= set(self.timed.read.keys()) preparing = 1
return next(iter(trids)) prepare_done = 2
running = 3
run_done = 4
analyzing = 5
analyze_done = 6
paused = 7
def _mk_worker_method(name):
@asyncio.coroutine
def worker_method(self, *args, **kwargs):
if self._terminated:
return True
m = getattr(self._worker, name)
try:
return (yield from m(*args, **kwargs))
except Exception as e:
if isinstance(e, asyncio.CancelledError):
raise
if self._terminated:
logger.debug("suppressing worker exception of terminated run",
exc_info=True)
# Return completion on termination
return True
else:
raise
return worker_method
class Run:
def __init__(self, rid, pipeline_name,
expid, due_date,
worker_handlers, notifier):
# called through pool
self.rid = rid
self.pipeline_name = pipeline_name
self.expid = expid
self.due_date = due_date
self._status = RunStatus.pending
self._terminated = False
self._worker = Worker(worker_handlers)
self._notifier = notifier
self._notifier[self.rid] = {
"pipeline": self.pipeline_name,
"expid": self.expid,
"due_date": self.due_date,
"status": self._status.name
}
@property
def status(self):
return self._status
@status.setter
def status(self, value):
self._status = value
if not self._terminated:
self._notifier[self.rid]["status"] = self._status.name
# The run with the largest priority_key is to be scheduled first
def priority_key(self, now):
if self.due_date is None:
overdue = 0
due_date_k = 0
else:
overdue = int(now > self.due_date)
due_date_k = -self.due_date
return (overdue, due_date_k, -self.rid)
@asyncio.coroutine
def close(self):
# called through pool
self._terminated = True
yield from self._worker.close()
del self._notifier[self.rid]
_prepare = _mk_worker_method("prepare")
@asyncio.coroutine
def prepare(self):
yield from self._prepare(self.rid, self.pipeline_name, self.expid)
run = _mk_worker_method("run")
resume = _mk_worker_method("resume")
analyze = _mk_worker_method("analyze")
write_results = _mk_worker_method("write_results")
class RIDCounter:
def __init__(self, next_rid):
self._next_rid = next_rid
def get(self):
rid = self._next_rid
self._next_rid += 1
return rid
class RunPool:
def __init__(self, ridc, worker_handlers, notifier):
self.runs = dict()
self.submitted_callback = None
self._ridc = ridc
self._worker_handlers = worker_handlers
self._notifier = notifier
def submit(self, expid, due_date, pipeline_name):
# called through scheduler
rid = self._ridc.get()
run = Run(rid, pipeline_name, expid, due_date,
self._worker_handlers, self._notifier)
self.runs[rid] = run
if self.submitted_callback is not None:
self.submitted_callback()
return rid
@asyncio.coroutine
def delete(self, rid):
# called through deleter
if rid not in self.runs:
return
yield from self.runs[rid].close()
del self.runs[rid]
class TaskObject:
def start(self): def start(self):
self.task = asyncio.Task(self._schedule()) self.task = asyncio.async(self._do())
@asyncio.coroutine @asyncio.coroutine
def stop(self): def stop(self):
@ -34,97 +152,215 @@ class Scheduler:
yield from asyncio.wait([self.task]) yield from asyncio.wait([self.task])
del self.task del self.task
def run_queued(self, run_params): @asyncio.coroutine
rid = self.new_rid() def _do(self):
self.queue.append((rid, run_params))
self.queue_modified.set()
return rid
def cancel_queued(self, rid):
idx = next(idx for idx, (qrid, _)
in enumerate(self.queue.read)
if qrid == rid)
if idx == 0:
# Cannot cancel when already running
raise NotImplementedError raise NotImplementedError
del self.queue[idx]
def run_timed(self, run_params, next_run):
if next_run is None:
next_run = time()
trid = self.new_trid()
self.timed[trid] = next_run, run_params
self.timed_modified.set()
return trid
def cancel_timed(self, trid): class PrepareStage(TaskObject):
del self.timed[trid] def __init__(self, pool, outq):
self.pool = pool
self.outq = outq
self.pool_submitted = asyncio.Event()
self.pool.submitted_callback = lambda: self.pool_submitted.set()
@asyncio.coroutine @asyncio.coroutine
def _run(self, rid, run_params): def _push_runs(self):
self.run_cb(rid, run_params) """Pushes all runs that have no due date of have a due date in the
worker = Worker(self.worker_handlers) past.
try:
yield from worker.prepare(rid, run_params)
try:
yield from worker.run()
yield from worker.analyze()
yield from worker.write_results()
finally:
yield from worker.close()
except Exception as e:
print("RID {} failed:".format(rid))
print("{}: {}".format(e.__class__.__name__, e))
else:
print("RID {} completed successfully".format(rid))
@asyncio.coroutine Returns the time before the next schedulable run, or None if the
def _run_timed(self): pool is empty."""
while True: while True:
min_next_run = None
min_trid = None
for trid, params in self.timed.read.items():
if min_next_run is None or params[0] < min_next_run:
min_next_run = params[0]
min_trid = trid
now = time() now = time()
pending_runs = filter(lambda r: r.status == RunStatus.pending,
if min_next_run is None: self.pool.runs.values())
try:
run = max(pending_runs, key=lambda r: r.priority_key(now))
except ValueError:
# pending_runs is an empty sequence
return None return None
min_next_run -= now if run.due_date is None or run.due_date < now:
if min_next_run > 0: run.status = RunStatus.preparing
return min_next_run yield from run.prepare()
run.status = RunStatus.prepare_done
next_run, run_params = self.timed.read[min_trid] yield from self.outq.put(run)
del self.timed[min_trid] else:
return run.due_date - now
rid = self.new_rid()
self.queue.insert(0, (rid, run_params))
yield from self._run(rid, run_params)
del self.queue[0]
@asyncio.coroutine @asyncio.coroutine
def _schedule(self): def _do(self):
while True: while True:
next_timed = yield from self._run_timed() next_timed_in = yield from self._push_runs()
if self.queue.read: if next_timed_in is None:
rid, run_params = self.queue.read[0] # pool is empty - wait for something to be added to it
yield from self._run(rid, run_params) yield from self.pool_submitted.wait()
del self.queue[0]
else: else:
self.queue_modified.clear() # wait for next_timed_in seconds, or until the pool is modified
self.timed_modified.clear() yield from asyncio_wait_or_cancel([self.pool_submitted.wait()],
t1 = asyncio.Task(self.queue_modified.wait()) timeout=next_timed_in)
t2 = asyncio.Task(self.timed_modified.wait()) self.pool_submitted.clear()
class RunStage(TaskObject):
def __init__(self, deleter, inq, outq):
self.deleter = deleter
self.inq = inq
self.outq = outq
@asyncio.coroutine
def _do(self):
stack = []
while True:
try: try:
done, pend = yield from asyncio.wait( next_irun = asyncio_queue_peek(self.inq)
[t1, t2], except asyncio.QueueEmpty:
timeout=next_timed, next_irun = None
return_when=asyncio.FIRST_COMPLETED) now = time()
if not stack or (
next_irun is not None and
next_irun.priority_key(now) > stack[-1].priority_key(now)):
stack.append((yield from self.inq.get()))
run = stack.pop()
try:
if run.status == RunStatus.paused:
run.status = RunStatus.running
completed = yield from run.resume()
else:
run.status = RunStatus.running
completed = yield from run.run()
except: except:
t1.cancel() logger.warning("got worker exception, deleting RID %d",
t2.cancel() run.rid, exc_info=True)
raise self.deleter.delete(run.rid)
for t in pend: else:
t.cancel() if completed:
run.status = RunStatus.run_done
yield from self.outq.put(run)
else:
run.status = RunStatus.paused
stack.append(run)
class AnalyzeStage(TaskObject):
def __init__(self, deleter, inq):
self.deleter = deleter
self.inq = inq
@asyncio.coroutine
def _do(self):
while True:
run = yield from self.inq.get()
run.status = RunStatus.analyzing
yield from run.analyze()
yield from run.write_results()
run.status = RunStatus.analyze_done
self.deleter.delete(run.rid)
class Pipeline:
def __init__(self, ridc, deleter, worker_handlers, notifier):
self.pool = RunPool(ridc, worker_handlers, notifier)
self._prepare = PrepareStage(self.pool, asyncio.Queue(maxsize=1))
self._run = RunStage(deleter, self._prepare.outq, asyncio.Queue(maxsize=1))
self._analyze = AnalyzeStage(deleter, self._run.outq)
def start(self):
self._prepare.start()
self._run.start()
self._analyze.start()
@asyncio.coroutine
def stop(self):
# NB: restart of a stopped pipeline is not supported
yield from self._analyze.stop()
yield from self._run.stop()
yield from self._prepare.stop()
class Deleter(TaskObject):
def __init__(self, pipelines):
self._pipelines = pipelines
self._queue = asyncio.JoinableQueue()
def delete(self, rid):
logger.debug("delete request for RID %d", rid)
self._queue.put_nowait(rid)
@asyncio.coroutine
def join(self):
yield from self._queue.join()
@asyncio.coroutine
def _delete(self, rid):
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
logger.debug("deleting RID %d...", rid)
yield from pipeline.pool.delete(rid)
logger.debug("deletion of RID %d completed", rid)
break
@asyncio.coroutine
def _gc_pipelines(self):
pipeline_names = list(self._pipelines.keys())
for name in pipeline_names:
if not self._pipelines[name].pool.runs:
logger.debug("garbage-collecting pipeline '%s'...", name)
yield from self._pipelines[name].stop()
del self._pipelines[name]
logger.debug("garbage-collection of pipeline '%s' completed",
name)
@asyncio.coroutine
def _do(self):
while True:
rid = yield from self._queue.get()
yield from self._delete(rid)
yield from self._gc_pipelines()
self._queue.task_done()
class Scheduler:
def __init__(self, next_rid, worker_handlers):
self.notifier = Notifier(dict())
self._pipelines = dict()
self._worker_handlers = worker_handlers
self._terminated = False
self._ridc = RIDCounter(next_rid)
self._deleter = Deleter(self._pipelines)
def start(self):
self._deleter.start()
@asyncio.coroutine
def stop(self):
# NB: restart of a stopped scheduler is not supported
self._terminated = True # prevent further runs from being created
for pipeline in self._pipelines.values():
for rid in pipeline.pool.runs.keys():
self._deleter.delete(rid)
yield from self._deleter.join()
yield from self._deleter.stop()
if self._pipelines:
logger.warning("some pipelines were not garbage-collected")
def submit(self, pipeline_name, expid, due_date):
if self._terminated:
return
try:
pipeline = self._pipelines[pipeline_name]
except KeyError:
logger.debug("creating pipeline '%s'", pipeline_name)
pipeline = Pipeline(self._ridc, self._deleter,
self._worker_handlers, self.notifier)
self._pipelines[pipeline_name] = pipeline
pipeline.start()
return pipeline.pool.submit(expid, due_date, pipeline_name)
def delete(self, rid):
self._deleter.delete(rid)

View File

@ -1,12 +1,16 @@
import sys import sys
import asyncio import asyncio
import logging
import subprocess import subprocess
import traceback import traceback
import time import time
from artiq.protocols import pyon from artiq.protocols import pyon
from artiq.language.units import strip_unit from artiq.language.units import strip_unit
from artiq.tools import asyncio_process_wait_timeout from artiq.tools import asyncio_process_wait_timeout, asyncio_wait_or_cancel
logger = logging.getLogger(__name__)
class WorkerTimeout(Exception): class WorkerTimeout(Exception):
@ -30,8 +34,14 @@ class Worker:
self.term_timeout = term_timeout self.term_timeout = term_timeout
self.prepare_timeout = prepare_timeout self.prepare_timeout = prepare_timeout
self.results_timeout = results_timeout self.results_timeout = results_timeout
self.rid = None
self.process = None
self.watchdogs = dict() # wid -> expiration (using time.monotonic) self.watchdogs = dict() # wid -> expiration (using time.monotonic)
self.io_lock = asyncio.Lock()
self.closed = asyncio.Event()
def create_watchdog(self, t): def create_watchdog(self, t):
n_user_watchdogs = len(self.watchdogs) n_user_watchdogs = len(self.watchdogs)
if -1 in self.watchdogs: if -1 in self.watchdogs:
@ -53,50 +63,82 @@ class Worker:
@asyncio.coroutine @asyncio.coroutine
def _create_process(self): def _create_process(self):
yield from self.io_lock.acquire()
try:
if self.closed.is_set():
raise WorkerError("Attempting to create process after close")
self.process = yield from asyncio.create_subprocess_exec( self.process = yield from asyncio.create_subprocess_exec(
sys.executable, "-m", "artiq.master.worker_impl", sys.executable, "-m", "artiq.master.worker_impl",
stdout=subprocess.PIPE, stdin=subprocess.PIPE) stdout=subprocess.PIPE, stdin=subprocess.PIPE)
finally:
self.io_lock.release()
@asyncio.coroutine @asyncio.coroutine
def close(self): def close(self):
self.closed.set()
yield from self.io_lock.acquire()
try:
if self.process is None:
# Note the %s - self.rid can be None
logger.debug("worker was not created (RID %s)", self.rid)
return
if self.process.returncode is not None: if self.process.returncode is not None:
if process.returncode != 0: logger.debug("worker already terminated (RID %d)", self.rid)
raise WorkerError("Worker finished with status code {}" if self.process.returncode != 0:
.format(process.returncode)) logger.warning("worker finished with status code %d"
" (RID %d)", self.process.returncode,
self.rid)
return return
obj = {"action": "terminate"} obj = {"action": "terminate"}
try: try:
yield from self._send(obj, self.send_timeout) yield from self._send(obj, self.send_timeout, cancellable=False)
except: except:
logger.warning("failed to send terminate command to worker"
" (RID %d), killing", self.rid, exc_info=True)
self.process.kill() self.process.kill()
return return
try: try:
yield from asyncio_process_wait_timeout(self.process, yield from asyncio_process_wait_timeout(self.process,
self.term_timeout) self.term_timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning("worker did not exit (RID %d), killing", self.rid)
self.process.kill() self.process.kill()
else:
logger.debug("worker exited gracefully (RID %d)", self.rid)
finally:
self.io_lock.release()
@asyncio.coroutine @asyncio.coroutine
def _send(self, obj, timeout): def _send(self, obj, timeout, cancellable=True):
assert self.io_lock.locked()
line = pyon.encode(obj) line = pyon.encode(obj)
self.process.stdin.write(line.encode()) self.process.stdin.write(line.encode())
self.process.stdin.write("\n".encode()) self.process.stdin.write("\n".encode())
try: ifs = [self.process.stdin.drain()]
fut = self.process.stdin.drain() if cancellable:
if fut is not (): # FIXME: why does Python return this? ifs.append(self.closed.wait())
yield from asyncio.wait_for(fut, timeout=timeout) fs = yield from asyncio_wait_or_cancel(
except asyncio.TimeoutError: ifs, timeout=timeout,
raise WorkerTimeout("Timeout sending data from worker") return_when=asyncio.FIRST_COMPLETED)
except: if all(f.cancelled() for f in fs):
raise WorkerError("Failed to send data to worker") raise WorkerTimeout("Timeout sending data to worker")
for f in fs:
if not f.cancelled() and f.done():
f.result() # raise any exceptions
if cancellable and self.closed.is_set():
raise WorkerError("Data transmission to worker cancelled")
@asyncio.coroutine @asyncio.coroutine
def _recv(self, timeout): def _recv(self, timeout):
try: assert self.io_lock.locked()
line = yield from asyncio.wait_for( fs = yield from asyncio_wait_or_cancel(
self.process.stdout.readline(), timeout=timeout) [self.process.stdout.readline(), self.closed.wait()],
except asyncio.TimeoutError: timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
raise WorkerTimeout("Timeout receiving data from worker") if all(f.cancelled() for f in fs):
raise WorkerTimeout("Timeout sending data to worker")
if self.closed.is_set():
raise WorkerError("Data transmission to worker cancelled")
line = fs[0].result()
if not line: if not line:
raise WorkerError("Worker ended while attempting to receive data") raise WorkerError("Worker ended while attempting to receive data")
try: try:
@ -108,13 +150,19 @@ class Worker:
@asyncio.coroutine @asyncio.coroutine
def _handle_worker_requests(self): def _handle_worker_requests(self):
while True: while True:
try:
yield from self.io_lock.acquire()
try: try:
obj = yield from self._recv(self.watchdog_time()) obj = yield from self._recv(self.watchdog_time())
finally:
self.io_lock.release()
except WorkerTimeout: except WorkerTimeout:
raise WorkerWatchdogTimeout raise WorkerWatchdogTimeout
action = obj["action"] action = obj["action"]
if action == "completed": if action == "completed":
return return True
elif action == "pause":
return False
del obj["action"] del obj["action"]
if action == "create_watchdog": if action == "create_watchdog":
func = self.create_watchdog func = self.create_watchdog
@ -128,36 +176,59 @@ class Worker:
except: except:
reply = {"status": "failed", reply = {"status": "failed",
"message": traceback.format_exc()} "message": traceback.format_exc()}
yield from self.io_lock.acquire()
try:
yield from self._send(reply, self.send_timeout) yield from self._send(reply, self.send_timeout)
finally:
self.io_lock.release()
@asyncio.coroutine @asyncio.coroutine
def _worker_action(self, obj, timeout=None): def _worker_action(self, obj, timeout=None):
if timeout is not None: if timeout is not None:
self.watchdogs[-1] = time.monotonic() + timeout self.watchdogs[-1] = time.monotonic() + timeout
try: try:
yield from self._send(obj, self.send_timeout) yield from self.io_lock.acquire()
try: try:
yield from self._handle_worker_requests() yield from self._send(obj, self.send_timeout)
finally:
self.io_lock.release()
try:
completed = yield from self._handle_worker_requests()
except WorkerTimeout: except WorkerTimeout:
raise WorkerWatchdogTimeout raise WorkerWatchdogTimeout
finally: finally:
if timeout is not None: if timeout is not None:
del self.watchdogs[-1] del self.watchdogs[-1]
return completed
@asyncio.coroutine @asyncio.coroutine
def prepare(self, rid, run_params): def prepare(self, rid, pipeline_name, expid):
self.rid = rid
yield from self._create_process() yield from self._create_process()
try:
yield from self._worker_action( yield from self._worker_action(
{"action": "prepare", "rid": rid, "run_params": run_params}, {"action": "prepare",
"rid": rid,
"pipeline_name": pipeline_name,
"expid": expid},
self.prepare_timeout) self.prepare_timeout)
except:
yield from self.close()
raise
@asyncio.coroutine @asyncio.coroutine
def run(self): def run(self):
yield from self._worker_action({"action": "run"}) completed = yield from self._worker_action({"action": "run"})
if not completed:
self.yield_time = time.monotonic()
return completed
@asyncio.coroutine
def resume(self):
stop_duration = time.monotonic() - self.yield_time
for wid, expiry in self.watchdogs:
self.watchdogs[wid] += stop_duration
completed = yield from self._worker_action({"status": "ok",
"data": None})
if not completed:
self.yield_time = time.monotonic()
return completed
@asyncio.coroutine @asyncio.coroutine
def analyze(self): def analyze(self):

View File

@ -33,6 +33,11 @@ def make_parent_action(action, argnames, exception=ParentActionError):
request[argname] = arg request[argname] = arg
put_object(request) put_object(request)
reply = get_object() reply = get_object()
if "action" in reply:
if reply["action"] == "terminate":
sys.exit()
else:
raise ValueError
if reply["status"] == "ok": if reply["status"] == "ok":
return reply["data"] return reply["data"]
else: else:
@ -71,11 +76,15 @@ set_watchdog_factory(Watchdog)
class Scheduler: class Scheduler:
run_queued = make_parent_action("scheduler_run_queued", "run_params") pause = staticmethod(make_parent_action("pause", ""))
cancel_queued = make_parent_action("scheduler_cancel_queued", "rid")
run_timed = make_parent_action("scheduler_run_timed", submit = staticmethod(make_parent_action("scheduler_submit",
"run_params next_run") "pipeline_name expid due_date"))
cancel_timed = make_parent_action("scheduler_cancel_timed", "trid") cancel = staticmethod(make_parent_action("scheduler_cancel", "rid"))
def __init__(self, pipeline_name, expid):
self.pipeline_name = pipeline_name
self.expid = expid
def get_exp(file, exp): def get_exp(file, exp):
@ -96,7 +105,7 @@ def main():
start_time = None start_time = None
rid = None rid = None
run_params = None expid = None
exp = None exp = None
exp_inst = None exp_inst = None
@ -110,12 +119,12 @@ def main():
if action == "prepare": if action == "prepare":
start_time = time.localtime() start_time = time.localtime()
rid = obj["rid"] rid = obj["rid"]
run_params = obj["run_params"] pipeline_name = obj["pipeline_name"]
exp = get_exp(run_params["file"], run_params["experiment"]) expid = obj["expid"]
exp = get_exp(expid["file"], expid["experiment"])
exp_inst = exp(dbh, exp_inst = exp(dbh,
scheduler=Scheduler, scheduler=Scheduler(pipeline_name, expid),
run_params=run_params, **expid["arguments"])
**run_params["arguments"])
rdb.build() rdb.build()
put_object({"action": "completed"}) put_object({"action": "completed"})
elif action == "run": elif action == "run":

View File

@ -96,3 +96,25 @@ def asyncio_process_wait_timeout(process, timeout):
r = yield from asyncio.wait_for( r = yield from asyncio.wait_for(
process.stdout.read(1024), process.stdout.read(1024),
timeout=end_time - time.monotonic()) timeout=end_time - time.monotonic())
@asyncio.coroutine
def asyncio_wait_or_cancel(fs, **kwargs):
fs = [asyncio.async(f) for f in fs]
try:
d, p = yield from asyncio.wait(fs, **kwargs)
except:
for f in fs:
f.cancel()
raise
for f in p:
f.cancel()
return fs
def asyncio_queue_peek(q):
"""Like q.get_nowait(), but does not remove the item from the queue."""
if q._queue:
return q._queue[0]
else:
raise asyncio.QueueEmpty

View File

@ -51,7 +51,8 @@ class FloppingF(Experiment, AutoDB):
self.frequency.append(frequency) self.frequency.append(frequency)
self.brightness.append(brightness) self.brightness.append(brightness)
time.sleep(0.1) time.sleep(0.1)
self.scheduler.run_timed(self.run_params, time.time() + 20) self.scheduler.submit(self.scheduler.pipeline_name, self.scheduler.expid,
time.time() + 20)
def analyze(self): def analyze(self):
popt, pcov = curve_fit(model_numpy, popt, pcov = curve_fit(model_numpy,