master/client: periodic schedule monitoring

This commit is contained in:
Sebastien Bourdeauducq 2014-12-29 18:44:50 +08:00
parent 02f3781e65
commit 6b283d78d3
5 changed files with 157 additions and 42 deletions

View File

@ -9,9 +9,9 @@ class Scheduler:
def __init__(self, *args, **kwargs):
self.worker = Worker(*args, **kwargs)
self.next_rid = 0
self.queued = Notifier([])
self.queue = Notifier([])
self.queue_count = asyncio.Semaphore(0)
self.periodic = dict()
self.periodic = Notifier(dict())
self.periodic_modified = asyncio.Event()
def new_rid(self):
@ -20,8 +20,8 @@ class Scheduler:
return r
def new_prid(self):
prids = set(range(len(self.periodic) + 1))
prids -= set(self.periodic.keys())
prids = set(range(len(self.periodic.backing_struct) + 1))
prids -= set(self.periodic.backing_struct.keys())
return next(iter(prids))
@asyncio.coroutine
@ -38,14 +38,14 @@ class Scheduler:
def run_once(self, run_params, timeout):
rid = self.new_rid()
self.queued.append((rid, run_params, timeout))
self.queue.append((rid, run_params, timeout))
self.queue_count.release()
return rid
def cancel_once(self, rid):
idx = next(idx for idx, (qrid, _, _) in enumerate(self.queued)
idx = next(idx for idx, (qrid, _, _) in enumerate(self.queue)
if qrid == rid)
del self.queued[idx]
del self.queue[idx]
def run_periodic(self, run_params, timeout, period):
prid = self.new_prid()
@ -61,7 +61,7 @@ class Scheduler:
while True:
min_next_run = None
min_prid = None
for prid, params in self.periodic.items():
for prid, params in self.periodic.backing_struct.items():
if min_next_run is None or params[0] < min_next_run:
min_next_run = params[0]
min_prid = prid
@ -74,14 +74,15 @@ class Scheduler:
if min_next_run > 0:
return min_next_run
next_run, run_params, timeout, period = self.periodic[min_prid]
next_run, run_params, timeout, period = \
self.periodic.backing_struct[min_prid]
self.periodic[min_prid] = now + period, run_params, timeout, period
rid = self.new_rid()
self.queued.insert(0, (rid, run_params, timeout))
self.queue.insert(0, (rid, run_params, timeout))
result = yield from self.worker.run(run_params, timeout)
print(prid, rid, result)
del self.queued[0]
del self.queue[0]
@asyncio.coroutine
def _schedule(self):
@ -99,7 +100,7 @@ class Scheduler:
yield from self._run_periodic()
if ev_queue in done:
rid, run_params, timeout = self.queued.backing_struct[0]
rid, run_params, timeout = self.queue.backing_struct[0]
result = yield from self.worker.run(run_params, timeout)
print(rid, result)
del self.queued[0]
del self.queue[0]

View File

@ -8,7 +8,8 @@ _init_string = b"ARTIQ sync_struct\n"
class Subscriber:
def __init__(self, target_builder, notify_cb=None):
def __init__(self, notifier_name, target_builder, notify_cb=None):
self.notifier_name = notifier_name
self.target_builder = target_builder
self.notify_cb = notify_cb
@ -18,6 +19,7 @@ class Subscriber:
yield from asyncio.open_connection(host, port)
try:
self._writer.write(_init_string)
self._writer.write((self.notifier_name + "\n").encode())
self.receive_task = asyncio.Task(self._receive_cr())
except:
self._writer.close()
@ -56,8 +58,11 @@ class Subscriber:
target.insert(obj["i"], obj["x"])
elif action == "pop":
target.pop(obj["i"])
elif action == "setitem":
target.__setitem__(obj["key"], obj["value"])
elif action == "delitem":
target.__delitem__(obj["key"])
if self.notify_cb is not None:
self.notify_cb()
@ -73,32 +78,41 @@ class Notifier:
def append(self, x):
self.backing_struct.append(x)
if self.publisher is not None:
self.publisher.publish({"action": "append", "x": x})
self.publisher.publish(self, {"action": "append", "x": x})
def insert(self, i, x):
self.backing_struct.insert(i, x)
if self.publisher is not None:
self.publisher.publish({"action": "insert", "i": i, "x": x})
self.publisher.publish(self, {"action": "insert", "i": i, "x": x})
def pop(self, i=-1):
r = self.backing_struct.pop(i)
if self.publisher is not None:
self.publisher.publish({"action": "pop", "i": i})
self.publisher.publish(self, {"action": "pop", "i": i})
return r
def __setitem__(self, key, value):
self.backing_struct.__setitem__(key, value)
if self.publisher is not None:
self.publisher.publish(self, {"action": "setitem",
"key": key,
"value": value})
def __delitem__(self, key):
self.backing_struct.__delitem__(key)
if self.publisher is not None:
self.publisher.publish({"action": "delitem", "key": key})
self.publisher.publish(self, {"action": "delitem", "key": key})
class Publisher(AsyncioServer):
def __init__(self, notifier):
def __init__(self, notifiers):
AsyncioServer.__init__(self)
self.notifier = notifier
self._recipients = set()
self.notifiers = notifiers
self._recipients = {k: set() for k in notifiers.keys()}
self._notifier_names = {id(v): k for k, v in notifiers.items()}
self.notifier.publisher = self
for notifier in notifiers.values():
notifier.publisher = self
@asyncio.coroutine
def _handle_connection_cr(self, reader, writer):
@ -107,12 +121,22 @@ class Publisher(AsyncioServer):
if line != _init_string:
return
obj = {"action": "init", "struct": self.notifier.backing_struct}
line = yield from reader.readline()
if not line:
return
notifier_name = line.decode()[:-1]
try:
notifier = self.notifiers[notifier_name]
except KeyError:
return
obj = {"action": "init", "struct": notifier.backing_struct}
line = pyon.encode(obj) + "\n"
writer.write(line.encode())
queue = asyncio.Queue()
self._recipients.add(queue)
self._recipients[notifier_name].add(queue)
try:
while True:
line = yield from queue.get()
@ -120,15 +144,16 @@ class Publisher(AsyncioServer):
# raise exception on connection error
yield from writer.drain()
finally:
self._recipients.remove(queue)
self._recipients[notifier_name].remove(queue)
except ConnectionResetError:
# subscribers disconnecting are a normal occurence
pass
finally:
writer.close()
def publish(self, obj):
def publish(self, notifier, obj):
line = pyon.encode(obj) + "\n"
line = line.encode()
for recipient in self._recipients:
notifier_name = self._notifier_names[id(notifier)]
for recipient in self._recipients[notifier_name]:
recipient.put_nowait(line)

View File

@ -37,7 +37,6 @@ def _get_args():
help="unit to run")
parser_add.add_argument("file", help="file containing the unit to run")
parser_cancel = subparsers.add_parser("cancel",
help="cancel an experiment")
parser_cancel.add_argument("-p", "--periodic", default=False,
@ -46,8 +45,11 @@ def _get_args():
parser_cancel.add_argument("rid", type=int,
help="run identifier (RID/PRID)")
parser_show = subparsers.add_parser("show-queue",
help="show the experiment queue")
parser_show_queue = subparsers.add_parser(
"show-queue", help="show the experiment queue")
parser_show_periodic = subparsers.add_parser(
"show-periodic", help="show the periodic experiment table")
return parser.parse_args()
@ -93,7 +95,7 @@ def _show_periodic(periodic):
if periodic:
table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function",
"Timeout", "Period"])
sp = sorted(periodic.items(), key=lambda x: x[1][0])
sp = sorted(periodic.items(), key=lambda x: (x[1][0], x[0]))
for prid, (next_run, run_params, timeout, period) in sp:
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
prid, run_params["file"]]
@ -107,6 +109,8 @@ def _show_periodic(periodic):
def _run_subscriber(host, port, subscriber):
if port is None:
port = 8887
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(subscriber.connect(host, port))
@ -127,9 +131,18 @@ def main():
def init_queue(x):
queue[:] = x
return queue
subscriber = Subscriber(init_queue, lambda: _show_queue(queue))
port = 8887 if args.port is None else args.port
_run_subscriber(args.server, port, subscriber)
subscriber = Subscriber("queue", init_queue,
lambda: _show_queue(queue))
_run_subscriber(args.server, args.port, subscriber)
elif args.action == "show-periodic":
periodic = dict()
def init_periodic(x):
periodic.clear()
periodic.update(x)
return periodic
subscriber = Subscriber("periodic", init_periodic,
lambda: _show_periodic(periodic))
_run_subscriber(args.server, args.port, subscriber)
else:
port = 8888 if args.port is None else args.port
remote = Client(args.server, port, "schedule_control")

View File

@ -2,6 +2,7 @@
import argparse
import asyncio
import time
import gbulb
from gi.repository import Gtk
@ -19,8 +20,8 @@ class QueueStoreSyncer:
def _convert(self, x):
rid, run_params, timeout = x
row = [rid, run_params["file"]]
for x in run_params["unit"], run_params["function"], timeout:
row.append("-" if x is None else str(x))
for e in run_params["unit"], run_params["function"], timeout:
row.append("-" if e is None else str(e))
return row
def append(self, x):
@ -33,9 +34,57 @@ class QueueStoreSyncer:
del self.queue_store[key]
class PeriodicStoreSyncer:
def __init__(self, periodic_store, init):
self.periodic_store = periodic_store
self.periodic_store.clear()
self.order = []
for prid, x in sorted(init.items(), key=lambda e: (e[1][0], e[0])):
self.periodic_store.append(self._convert(prid, x))
self.order.append((x[0], prid))
def _convert(self, prid, x):
next_run, run_params, timeout, period = x
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
prid, run_params["file"]]
for e in run_params["unit"], run_params["function"], timeout:
row.append("-" if e is None else str(e))
row.append(str(period))
return row
def _find_index(self, prid):
for i, e in enumerate(self.periodic_store):
if e[1] == prid:
return i
raise KeyError
def __setitem__(self, prid, x):
try:
i = self._find_index(prid)
except KeyError:
pass
else:
del self.periodic_store[i]
del self.order[i]
for i, o in enumerate(self.order):
if o > (x[0], prid):
break
self.periodic_store.insert(i, self._convert(prid, x))
self.order.insert(i, (x[0], prid))
def __delitem__(self, key):
i = self._find_index(key)
del self.periodic_store[i]
del self.order[i]
class SchedulerWindow(Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self, title="Scheduler")
self.set_border_width(10)
vpane = Gtk.VPaned()
self.add(vpane)
self.queue_store = Gtk.ListStore(int, str, str, str, str)
tree = Gtk.TreeView(self.queue_store)
@ -44,20 +93,44 @@ class SchedulerWindow(Gtk.Window):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=i)
tree.append_column(column)
self.add(tree)
scroll = Gtk.ScrolledWindow()
scroll.add(tree)
vpane.add1(scroll)
self.periodic_store = Gtk.ListStore(str, int, str, str, str, str, str)
tree = Gtk.TreeView(self.periodic_store)
for i, title in enumerate(["Next run", "PRID", "File", "Unit",
"Function", "Timeout", "Period"]):
renderer = Gtk.CellRendererText()
column = Gtk.TreeViewColumn(title, renderer, text=i)
tree.append_column(column)
scroll = Gtk.ScrolledWindow()
scroll.add(tree)
vpane.add2(scroll)
@asyncio.coroutine
def sub_connect(self, host, port):
self.subscriber = Subscriber(self.init_queue_store)
yield from self.subscriber.connect(host, port)
self.queue_subscriber = Subscriber("queue", self.init_queue_store)
yield from self.queue_subscriber.connect(host, port)
try:
self.periodic_subscriber = Subscriber(
"periodic", self.init_periodic_store)
yield from self.periodic_subscriber.connect(host, port)
except:
yield from self.queue_subscriber.close()
raise
@asyncio.coroutine
def sub_close(self):
yield from self.subscriber.close()
yield from self.periodic_subscriber.close()
yield from self.queue_subscriber.close()
def init_queue_store(self, init):
return QueueStoreSyncer(self.queue_store, init)
def init_periodic_store(self, init):
return PeriodicStoreSyncer(self.periodic_store, init)
def _get_args():
parser = argparse.ArgumentParser(description="ARTIQ GUI client")

View File

@ -33,9 +33,12 @@ def main():
loop.run_until_complete(schedule_control.start(
args.bind, args.port_schedule_control))
try:
schedule_notify = Publisher(scheduler.queued)
schedule_notify = Publisher({
"queue": scheduler.queue,
"periodic": scheduler.periodic
})
loop.run_until_complete(schedule_notify.start(
args.bind, args.port_schedule_notify))
args.bind, args.port_schedule_notify))
try:
loop.run_forever()
finally: