replace periodic schedule with timed schedule

This commit is contained in:
Sebastien Bourdeauducq 2015-01-23 00:23:00 +08:00
parent 569600b607
commit 47aa634ab5
4 changed files with 84 additions and 84 deletions

View File

@ -5,6 +5,7 @@ import time
import asyncio import asyncio
import sys import sys
from operator import itemgetter from operator import itemgetter
from dateutil.parser import parse as parse_date
from prettytable import PrettyTable from prettytable import PrettyTable
@ -32,8 +33,10 @@ def _get_args():
parser_add = subparsers.add_parser("submit", help="submit an experiment") parser_add = subparsers.add_parser("submit", help="submit an experiment")
parser_add.add_argument( parser_add.add_argument(
"-p", "--periodic", default=None, type=float, "-T", "--timed", default=None, type=str,
help="run the experiment periodically every given number of seconds") help="run the experiment in timed mode. "
"argument specifies the time of the first run, "
"use 'now' to run immediately")
parser_add.add_argument( parser_add.add_argument(
"-t", "--timeout", default=None, type=float, "-t", "--timeout", default=None, type=float,
help="specify a timeout for the experiment to complete") help="specify a timeout for the experiment to complete")
@ -45,11 +48,11 @@ def _get_args():
parser_cancel = subparsers.add_parser("cancel", parser_cancel = subparsers.add_parser("cancel",
help="cancel an experiment") help="cancel an experiment")
parser_cancel.add_argument("-p", "--periodic", default=False, parser_cancel.add_argument("-T", "--timed", default=False,
action="store_true", action="store_true",
help="cancel a periodic experiment") help="cancel a timed experiment")
parser_cancel.add_argument("rid", type=int, parser_cancel.add_argument("rid", type=int,
help="run identifier (RID/PRID)") help="run identifier (RID/TRID)")
parser_set_device = subparsers.add_parser( parser_set_device = subparsers.add_parser(
"set-device", help="add or modify a device") "set-device", help="add or modify a device")
@ -75,7 +78,7 @@ def _get_args():
"show", help="show schedule, devices or parameters") "show", help="show schedule, devices or parameters")
parser_show.add_argument( parser_show.add_argument(
"what", "what",
help="select object to show: queue/periodic/devices/parameters") help="select object to show: queue/timed/devices/parameters")
return parser.parse_args() return parser.parse_args()
@ -100,20 +103,23 @@ def _action_submit(remote, args):
"unit": args.unit, "unit": args.unit,
"arguments": arguments "arguments": arguments
} }
if args.periodic is None: if args.timed is None:
rid = remote.run_once(run_params, args.timeout) rid = remote.run_queued(run_params, args.timeout)
print("RID: {}".format(rid)) print("RID: {}".format(rid))
else: else:
prid = remote.run_periodic(run_params, args.timeout, if args.timed == "now":
args.periodic) next_time = None
print("PRID: {}".format(prid)) else:
next_time = time.mktime(parse_date(args.timed).timetuple())
trid = remote.run_timed(run_params, args.timeout, next_time)
print("TRID: {}".format(trid))
def _action_cancel(remote, args): def _action_cancel(remote, args):
if args.periodic: if args.timed:
remote.cancel_periodic(args.rid) remote.cancel_timed(args.rid)
else: else:
remote.cancel_once(args.rid) remote.cancel_queued(args.rid)
def _action_set_device(remote, args): def _action_set_device(remote, args):
@ -147,23 +153,22 @@ def _show_queue(queue):
print("Queue is empty") print("Queue is empty")
def _show_periodic(periodic): def _show_timed(timed):
clear_screen() clear_screen()
if periodic: if timed:
table = PrettyTable(["Next run", "PRID", "File", "Unit", table = PrettyTable(["Next run", "TRID", "File", "Unit",
"Timeout", "Period", "Arguments"]) "Timeout", "Arguments"])
sp = sorted(periodic.items(), key=lambda x: (x[1][0], x[0])) sp = sorted(timed.items(), key=lambda x: (x[1][0], x[0]))
for prid, (next_run, run_params, timeout, period) in sp: for trid, (next_run, run_params, timeout) in sp:
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
prid, run_params["file"]] trid, run_params["file"]]
for x in run_params["unit"], timeout: for x in run_params["unit"], timeout:
row.append("-" if x is None else x) row.append("-" if x is None else x)
row.append(period)
row.append(format_run_arguments(run_params["arguments"])) row.append(format_run_arguments(run_params["arguments"]))
table.add_row(row) table.add_row(row)
print(table) print(table)
else: else:
print("No periodic schedule") print("No timed schedule")
def _show_devices(devices): def _show_devices(devices):
@ -226,8 +231,8 @@ def main():
if action == "show": if action == "show":
if args.what == "queue": if args.what == "queue":
_show_list(args, "queue", _show_queue) _show_list(args, "queue", _show_queue)
elif args.what == "periodic": elif args.what == "timed":
_show_dict(args, "periodic", _show_periodic) _show_dict(args, "timed", _show_timed)
elif args.what == "devices": elif args.what == "devices":
_show_dict(args, "devices", _show_devices) _show_dict(args, "devices", _show_devices)
elif args.what == "parameters": elif args.what == "parameters":

View File

@ -58,7 +58,7 @@ def main():
server_notify = Publisher({ server_notify = Publisher({
"queue": scheduler.queue, "queue": scheduler.queue,
"periodic": scheduler.periodic, "timed": scheduler.timed,
"devices": ddb.data, "devices": ddb.data,
"parameters": pdb.data, "parameters": pdb.data,
"parameters_simplehist": simplephist.history, "parameters_simplehist": simplephist.history,

View File

@ -18,18 +18,17 @@ class _QueueStoreSyncer(ListSyncer):
return row return row
class _PeriodicStoreSyncer(DictSyncer): class _TimedStoreSyncer(DictSyncer):
def order_key(self, kv_pair): def order_key(self, kv_pair):
# order by next run time, and then by PRID # order by next run time, and then by TRID
return (kv_pair[1][0], kv_pair[0]) return (kv_pair[1][0], kv_pair[0])
def convert(self, prid, x): def convert(self, trid, x):
next_run, run_params, timeout, period = x next_run, run_params, timeout = x
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
prid, run_params["file"]] trid, run_params["file"]]
for e in run_params["unit"], timeout: for e in run_params["unit"], timeout:
row.append("-" if e is None else str(e)) row.append("-" if e is None else str(e))
row.append(str(period))
row.append(format_run_arguments(run_params["arguments"])) row.append(format_run_arguments(run_params["arguments"]))
return row return row
@ -72,64 +71,59 @@ class SchedulerWindow(Window):
button = Gtk.Button("Move down") button = Gtk.Button("Move down")
hbox.pack_start(button, True, True, 0) hbox.pack_start(button, True, True, 0)
button = Gtk.Button("Remove") button = Gtk.Button("Remove")
button.connect("clicked", self.remove_queue) button.connect("clicked", self.remove_queued)
hbox.pack_start(button, True, True, 0) hbox.pack_start(button, True, True, 0)
vbox.pack_start(hbox, False, False, 0) vbox.pack_start(hbox, False, False, 0)
vbox.set_border_width(6) vbox.set_border_width(6)
notebook.insert_page(vbox, Gtk.Label("Queue"), -1) notebook.insert_page(vbox, Gtk.Label("Queue"), -1)
self.periodic_store = Gtk.ListStore(str, int, str, str, str, str, str) self.timed_store = Gtk.ListStore(str, int, str, str, str, str)
self.periodic_tree = Gtk.TreeView(self.periodic_store) self.timed_tree = Gtk.TreeView(self.timed_store)
for i, title in enumerate(["Next run", "PRID", "File", "Unit", for i, title in enumerate(["Next run", "TRID", "File", "Unit",
"Timeout", "Period", "Arguments"]): "Timeout", "Arguments"]):
renderer = Gtk.CellRendererText() renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=i) column = Gtk.TreeViewColumn(title, renderer, text=i)
self.periodic_tree.append_column(column) self.timed_tree.append_column(column)
scroll = Gtk.ScrolledWindow() scroll = Gtk.ScrolledWindow()
scroll.add(self.periodic_tree) scroll.add(self.timed_tree)
vbox = Gtk.VBox(spacing=6) vbox = Gtk.VBox(spacing=6)
vbox.pack_start(scroll, True, True, 0) vbox.pack_start(scroll, True, True, 0)
hbox = Gtk.HBox(spacing=6)
button = Gtk.Button("Change period")
hbox.pack_start(button, True, True, 0)
button = Gtk.Button("Remove") button = Gtk.Button("Remove")
button.connect("clicked", self.remove_periodic) button.connect("clicked", self.remove_timed)
hbox.pack_start(button, True, True, 0) vbox.pack_start(button, False, False, 0)
vbox.pack_start(hbox, False, False, 0)
vbox.set_border_width(6) vbox.set_border_width(6)
notebook.insert_page(vbox, Gtk.Label("Periodic schedule"), -1) notebook.insert_page(vbox, Gtk.Label("Timed schedule"), -1)
def remove_queue(self, widget): def remove_queued(self, widget):
store, selected = self.queue_tree.get_selection().get_selected() store, selected = self.queue_tree.get_selection().get_selected()
if selected is not None: if selected is not None:
rid = store[selected][0] rid = store[selected][0]
asyncio.Task(self.schedule_ctl.cancel_once(rid)) asyncio.Task(self.schedule_ctl.cancel_queued(rid))
def remove_periodic(self, widget): def remove_timed(self, widget):
store, selected = self.periodic_tree.get_selection().get_selected() store, selected = self.timed_tree.get_selection().get_selected()
if selected is not None: if selected is not None:
prid = store[selected][1] trid = store[selected][1]
asyncio.Task(self.schedule_ctl.cancel_periodic(prid)) asyncio.Task(self.schedule_ctl.cancel_timed(trid))
@asyncio.coroutine @asyncio.coroutine
def sub_connect(self, host, port): def sub_connect(self, host, port):
self.queue_subscriber = Subscriber("queue", self.init_queue_store) self.queue_subscriber = Subscriber("queue", self.init_queue_store)
yield from self.queue_subscriber.connect(host, port) yield from self.queue_subscriber.connect(host, port)
try: try:
self.periodic_subscriber = Subscriber( self.timed_subscriber = Subscriber("timed", self.init_timed_store)
"periodic", self.init_periodic_store) yield from self.timed_subscriber.connect(host, port)
yield from self.periodic_subscriber.connect(host, port)
except: except:
yield from self.queue_subscriber.close() yield from self.queue_subscriber.close()
raise raise
@asyncio.coroutine @asyncio.coroutine
def sub_close(self): def sub_close(self):
yield from self.periodic_subscriber.close() yield from self.timed_subscriber.close()
yield from self.queue_subscriber.close() yield from self.queue_subscriber.close()
def init_queue_store(self, init): def init_queue_store(self, init):
return _QueueStoreSyncer(self.queue_store, init) return _QueueStoreSyncer(self.queue_store, init)
def init_periodic_store(self, init): def init_timed_store(self, init):
return _PeriodicStoreSyncer(self.periodic_store, init) return _TimedStoreSyncer(self.timed_store, init)

View File

@ -11,18 +11,18 @@ class Scheduler:
self.next_rid = 0 self.next_rid = 0
self.queue = Notifier([]) self.queue = Notifier([])
self.queue_modified = asyncio.Event() self.queue_modified = asyncio.Event()
self.periodic = Notifier(dict()) self.timed = Notifier(dict())
self.periodic_modified = asyncio.Event() self.timed_modified = asyncio.Event()
def new_rid(self): def new_rid(self):
r = self.next_rid r = self.next_rid
self.next_rid += 1 self.next_rid += 1
return r return r
def new_prid(self): def new_trid(self):
prids = set(range(len(self.periodic.read) + 1)) trids = set(range(len(self.timed.read) + 1))
prids -= set(self.periodic.read.keys()) trids -= set(self.timed.read.keys())
return next(iter(prids)) return next(iter(trids))
@asyncio.coroutine @asyncio.coroutine
def start(self): def start(self):
@ -36,13 +36,13 @@ class Scheduler:
del self.task del self.task
yield from self.worker.end_process() yield from self.worker.end_process()
def run_once(self, run_params, timeout): def run_queued(self, run_params, timeout):
rid = self.new_rid() rid = self.new_rid()
self.queue.append((rid, run_params, timeout)) self.queue.append((rid, run_params, timeout))
self.queue_modified.set() self.queue_modified.set()
return rid return rid
def cancel_once(self, rid): def cancel_queued(self, rid):
idx = next(idx for idx, (qrid, _, _) idx = next(idx for idx, (qrid, _, _)
in enumerate(self.queue.read) in enumerate(self.queue.read)
if qrid == rid) if qrid == rid)
@ -51,14 +51,16 @@ class Scheduler:
raise NotImplementedError raise NotImplementedError
del self.queue[idx] del self.queue[idx]
def run_periodic(self, run_params, timeout, period): def run_timed(self, run_params, timeout, next_run):
prid = self.new_prid() if next_run is None:
self.periodic[prid] = 0, run_params, timeout, period next_run = time()
self.periodic_modified.set() trid = self.new_trid()
return prid self.timed[trid] = next_run, run_params, timeout
self.timed_modified.set()
return trid
def cancel_periodic(self, prid): def cancel_timed(self, trid):
del self.periodic[prid] del self.timed[trid]
@asyncio.coroutine @asyncio.coroutine
def _run(self, rid, run_params, timeout): def _run(self, rid, run_params, timeout):
@ -71,14 +73,14 @@ class Scheduler:
print("RID {} completed successfully".format(rid)) print("RID {} completed successfully".format(rid))
@asyncio.coroutine @asyncio.coroutine
def _run_periodic(self): def _run_timed(self):
while True: while True:
min_next_run = None min_next_run = None
min_prid = None min_trid = None
for prid, params in self.periodic.read.items(): for trid, params in self.timed.read.items():
if min_next_run is None or params[0] < min_next_run: if min_next_run is None or params[0] < min_next_run:
min_next_run = params[0] min_next_run = params[0]
min_prid = prid min_trid = trid
now = time() now = time()
@ -88,9 +90,8 @@ class Scheduler:
if min_next_run > 0: if min_next_run > 0:
return min_next_run return min_next_run
next_run, run_params, timeout, period = \ next_run, run_params, timeout = self.timed.read[min_trid]
self.periodic.read[min_prid] del self.timed[min_trid]
self.periodic[min_prid] = now + period, run_params, timeout, period
rid = self.new_rid() rid = self.new_rid()
self.queue.insert(0, (rid, run_params, timeout)) self.queue.insert(0, (rid, run_params, timeout))
@ -100,20 +101,20 @@ class Scheduler:
@asyncio.coroutine @asyncio.coroutine
def _schedule(self): def _schedule(self):
while True: while True:
next_periodic = yield from self._run_periodic() next_timed = yield from self._run_timed()
if self.queue.read: if self.queue.read:
rid, run_params, timeout = self.queue.read[0] rid, run_params, timeout = self.queue.read[0]
yield from self._run(rid, run_params, timeout) yield from self._run(rid, run_params, timeout)
del self.queue[0] del self.queue[0]
else: else:
self.queue_modified.clear() self.queue_modified.clear()
self.periodic_modified.clear() self.timed_modified.clear()
t1 = asyncio.Task(self.queue_modified.wait()) t1 = asyncio.Task(self.queue_modified.wait())
t2 = asyncio.Task(self.periodic_modified.wait()) t2 = asyncio.Task(self.timed_modified.wait())
try: try:
done, pend = yield from asyncio.wait( done, pend = yield from asyncio.wait(
[t1, t2], [t1, t2],
timeout=next_periodic, timeout=next_timed,
return_when=asyncio.FIRST_COMPLETED) return_when=asyncio.FIRST_COMPLETED)
except: except:
t1.cancel() t1.cancel()