mirror of https://github.com/m-labs/artiq.git
master/client: queue pubsub
This commit is contained in:
parent
f033810e04
commit
1fdad21f08
|
@ -17,7 +17,7 @@ import asyncio
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
from artiq.management import pyon
|
from artiq.management import pyon
|
||||||
from artiq.management.network import AsyncioServer
|
from artiq.management.tools import AsyncioServer
|
||||||
|
|
||||||
|
|
||||||
class RemoteError(Exception):
|
class RemoteError(Exception):
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
from time import time
|
from time import time
|
||||||
|
|
||||||
|
from artiq.management.sync_struct import Notifier
|
||||||
from artiq.management.worker import Worker
|
from artiq.management.worker import Worker
|
||||||
|
|
||||||
|
|
||||||
|
@ -8,8 +9,7 @@ class Scheduler:
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.worker = Worker(*args, **kwargs)
|
self.worker = Worker(*args, **kwargs)
|
||||||
self.next_rid = 0
|
self.next_rid = 0
|
||||||
self.currently_executing = None
|
self.queued = Notifier([])
|
||||||
self.queued = []
|
|
||||||
self.queue_count = asyncio.Semaphore(0)
|
self.queue_count = asyncio.Semaphore(0)
|
||||||
self.periodic = dict()
|
self.periodic = dict()
|
||||||
self.periodic_modified = asyncio.Event()
|
self.periodic_modified = asyncio.Event()
|
||||||
|
@ -47,14 +47,6 @@ class Scheduler:
|
||||||
if qrid == rid)
|
if qrid == rid)
|
||||||
del self.queued[idx]
|
del self.queued[idx]
|
||||||
|
|
||||||
def get_schedule(self):
|
|
||||||
if self.currently_executing is None:
|
|
||||||
ce = None
|
|
||||||
else:
|
|
||||||
rid, run_params, timeout, t = self.currently_executing
|
|
||||||
ce = rid, run_params, timeout, time() - t
|
|
||||||
return ce, self.queued, self.periodic
|
|
||||||
|
|
||||||
def run_periodic(self, run_params, timeout, period):
|
def run_periodic(self, run_params, timeout, period):
|
||||||
prid = self.new_prid()
|
prid = self.new_prid()
|
||||||
self.periodic[prid] = 0, run_params, timeout, period
|
self.periodic[prid] = 0, run_params, timeout, period
|
||||||
|
@ -64,13 +56,6 @@ class Scheduler:
|
||||||
def cancel_periodic(self, prid):
|
def cancel_periodic(self, prid):
|
||||||
del self.periodic[prid]
|
del self.periodic[prid]
|
||||||
|
|
||||||
@asyncio.coroutine
|
|
||||||
def _run(self, rid, run_params, timeout):
|
|
||||||
self.currently_executing = rid, run_params, timeout, time()
|
|
||||||
result = yield from self.worker.run(run_params, timeout)
|
|
||||||
self.currently_executing = None
|
|
||||||
return result
|
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _run_periodic(self):
|
def _run_periodic(self):
|
||||||
while True:
|
while True:
|
||||||
|
@ -93,8 +78,10 @@ class Scheduler:
|
||||||
self.periodic[min_prid] = now + period, run_params, timeout, period
|
self.periodic[min_prid] = now + period, run_params, timeout, period
|
||||||
|
|
||||||
rid = self.new_rid()
|
rid = self.new_rid()
|
||||||
result = yield from self._run(rid, run_params, timeout)
|
self.queued.insert(0, (rid, run_params, timeout))
|
||||||
|
result = yield from self.worker.run(run_params, timeout)
|
||||||
print(prid, rid, result)
|
print(prid, rid, result)
|
||||||
|
del self.queued[0]
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _schedule(self):
|
def _schedule(self):
|
||||||
|
@ -111,6 +98,7 @@ class Scheduler:
|
||||||
|
|
||||||
yield from self._run_periodic()
|
yield from self._run_periodic()
|
||||||
if ev_queue in done:
|
if ev_queue in done:
|
||||||
rid, run_params, timeout = self.queued.pop(0)
|
rid, run_params, timeout = self.queued.backing_struct[0]
|
||||||
result = yield from self._run(rid, run_params, timeout)
|
result = yield from self.worker.run(run_params, timeout)
|
||||||
print(rid, result)
|
print(rid, result)
|
||||||
|
del self.queued[0]
|
||||||
|
|
|
@ -1,16 +1,15 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
from artiq.management import pyon
|
from artiq.management import pyon
|
||||||
from artiq.management.network import AsyncioServer
|
from artiq.management.tools import AsyncioServer
|
||||||
|
|
||||||
|
|
||||||
_init_string = b"ARTIQ sync_struct\n"
|
_init_string = b"ARTIQ sync_struct\n"
|
||||||
|
|
||||||
|
|
||||||
class Subscriber:
|
class Subscriber:
|
||||||
def __init__(self, target_builder, error_cb, notify_cb=None):
|
def __init__(self, target_builder, notify_cb=None):
|
||||||
self.target_builder = target_builder
|
self.target_builder = target_builder
|
||||||
self.error_cb = error_cb
|
|
||||||
self.notify_cb = notify_cb
|
self.notify_cb = notify_cb
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
|
@ -19,7 +18,7 @@ class Subscriber:
|
||||||
yield from asyncio.open_connection(host, port)
|
yield from asyncio.open_connection(host, port)
|
||||||
try:
|
try:
|
||||||
self._writer.write(_init_string)
|
self._writer.write(_init_string)
|
||||||
self._receive_task = asyncio.Task(self._receive_cr())
|
self.receive_task = asyncio.Task(self._receive_cr())
|
||||||
except:
|
except:
|
||||||
self._writer.close()
|
self._writer.close()
|
||||||
del self._reader
|
del self._reader
|
||||||
|
@ -29,9 +28,9 @@ class Subscriber:
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def close(self):
|
def close(self):
|
||||||
try:
|
try:
|
||||||
self._receive_task.cancel()
|
self.receive_task.cancel()
|
||||||
try:
|
try:
|
||||||
yield from asyncio.wait_for(self._receive_task, None)
|
yield from asyncio.wait_for(self.receive_task, None)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
pass
|
pass
|
||||||
finally:
|
finally:
|
||||||
|
@ -41,34 +40,66 @@ class Subscriber:
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _receive_cr(self):
|
def _receive_cr(self):
|
||||||
try:
|
target = None
|
||||||
target = None
|
while True:
|
||||||
while True:
|
line = yield from self._reader.readline()
|
||||||
line = yield from self._reader.readline()
|
if not line:
|
||||||
obj = pyon.decode(line.decode())
|
return
|
||||||
action = obj["action"]
|
obj = pyon.decode(line.decode())
|
||||||
|
action = obj["action"]
|
||||||
|
|
||||||
if action == "init":
|
if action == "init":
|
||||||
target = self.target_builder(obj["struct"])
|
target = self.target_builder(obj["struct"])
|
||||||
elif action == "append":
|
elif action == "append":
|
||||||
target.append(obj["x"])
|
target.append(obj["x"])
|
||||||
elif action == "pop":
|
elif action == "insert":
|
||||||
target.pop(obj["i"])
|
target.insert(obj["i"], obj["x"])
|
||||||
elif action == "delitem":
|
elif action == "pop":
|
||||||
target.__delitem__(obj["key"])
|
target.pop(obj["i"])
|
||||||
if self.notify_cb is not None:
|
elif action == "delitem":
|
||||||
self.notify_cb()
|
target.__delitem__(obj["key"])
|
||||||
except:
|
if self.notify_cb is not None:
|
||||||
self.error_cb()
|
self.notify_cb()
|
||||||
raise
|
|
||||||
|
|
||||||
|
class Notifier:
|
||||||
|
def __init__(self, backing_struct):
|
||||||
|
self.backing_struct = backing_struct
|
||||||
|
self.publisher = None
|
||||||
|
|
||||||
|
# Backing struct modification methods.
|
||||||
|
# All modifications must go through them!
|
||||||
|
|
||||||
|
def append(self, x):
|
||||||
|
self.backing_struct.append(x)
|
||||||
|
if self.publisher is not None:
|
||||||
|
self.publisher.publish({"action": "append", "x": x})
|
||||||
|
|
||||||
|
def insert(self, i, x):
|
||||||
|
self.backing_struct.insert(i, x)
|
||||||
|
if self.publisher is not None:
|
||||||
|
self.publisher.publish({"action": "insert", "i": i, "x": x})
|
||||||
|
|
||||||
|
def pop(self, i=-1):
|
||||||
|
r = self.backing_struct.pop(i)
|
||||||
|
if self.publisher is not None:
|
||||||
|
self.publisher.publish({"action": "pop", "i": i})
|
||||||
|
return r
|
||||||
|
|
||||||
|
def __delitem__(self, key):
|
||||||
|
self.backing_struct.__delitem__(key)
|
||||||
|
if self.publisher is not None:
|
||||||
|
self.publisher.publish({"action": "delitem", "key": key})
|
||||||
|
|
||||||
|
|
||||||
class Publisher(AsyncioServer):
|
class Publisher(AsyncioServer):
|
||||||
def __init__(self, backing_struct):
|
def __init__(self, notifier):
|
||||||
AsyncioServer.__init__(self)
|
AsyncioServer.__init__(self)
|
||||||
self.backing_struct = backing_struct
|
self.notifier = notifier
|
||||||
self._recipients = set()
|
self._recipients = set()
|
||||||
|
|
||||||
|
self.notifier.publisher = self
|
||||||
|
|
||||||
@asyncio.coroutine
|
@asyncio.coroutine
|
||||||
def _handle_connection_cr(self, reader, writer):
|
def _handle_connection_cr(self, reader, writer):
|
||||||
try:
|
try:
|
||||||
|
@ -76,7 +107,7 @@ class Publisher(AsyncioServer):
|
||||||
if line != _init_string:
|
if line != _init_string:
|
||||||
return
|
return
|
||||||
|
|
||||||
obj = {"action": "init", "struct": self.backing_struct}
|
obj = {"action": "init", "struct": self.notifier.backing_struct}
|
||||||
line = pyon.encode(obj) + "\n"
|
line = pyon.encode(obj) + "\n"
|
||||||
writer.write(line.encode())
|
writer.write(line.encode())
|
||||||
|
|
||||||
|
@ -96,24 +127,8 @@ class Publisher(AsyncioServer):
|
||||||
finally:
|
finally:
|
||||||
writer.close()
|
writer.close()
|
||||||
|
|
||||||
def _publish(self, obj):
|
def publish(self, obj):
|
||||||
line = pyon.encode(obj) + "\n"
|
line = pyon.encode(obj) + "\n"
|
||||||
line = line.encode()
|
line = line.encode()
|
||||||
for recipient in self._recipients:
|
for recipient in self._recipients:
|
||||||
recipient.put_nowait(line)
|
recipient.put_nowait(line)
|
||||||
|
|
||||||
# Backing struct modification methods.
|
|
||||||
# All modifications must go through them!
|
|
||||||
|
|
||||||
def append(self, x):
|
|
||||||
self.backing_struct.append(x)
|
|
||||||
self._publish({"action": "append", "x": x})
|
|
||||||
|
|
||||||
def pop(self, i=-1):
|
|
||||||
r = self.backing_struct.pop(i)
|
|
||||||
self._publish({"action": "pop", "i": i})
|
|
||||||
return r
|
|
||||||
|
|
||||||
def __delitem__(self, key):
|
|
||||||
self.backing_struct.__delitem__(key)
|
|
||||||
self._publish({"action": "delitem", "key": key})
|
|
||||||
|
|
|
@ -1,7 +1,12 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import sys
|
||||||
from copy import copy
|
from copy import copy
|
||||||
|
|
||||||
|
|
||||||
|
def clear_screen():
|
||||||
|
sys.stdout.write("\x1b[2J\x1b[H")
|
||||||
|
|
||||||
|
|
||||||
class AsyncioServer:
|
class AsyncioServer:
|
||||||
"""Generic TCP server based on asyncio.
|
"""Generic TCP server based on asyncio.
|
||||||
|
|
|
@ -2,10 +2,14 @@
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import time
|
import time
|
||||||
|
import sys
|
||||||
|
import asyncio
|
||||||
|
|
||||||
from prettytable import PrettyTable
|
from prettytable import PrettyTable
|
||||||
|
|
||||||
from artiq.management.pc_rpc import Client
|
from artiq.management.pc_rpc import Client
|
||||||
|
from artiq.management.sync_struct import Subscriber
|
||||||
|
from artiq.management.tools import clear_screen
|
||||||
|
|
||||||
|
|
||||||
def _get_args():
|
def _get_args():
|
||||||
|
@ -14,7 +18,7 @@ def _get_args():
|
||||||
"-s", "--server", default="::1",
|
"-s", "--server", default="::1",
|
||||||
help="hostname or IP of the master to connect to")
|
help="hostname or IP of the master to connect to")
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--port", default=8888, type=int,
|
"--port", default=None, type=int,
|
||||||
help="TCP port to use to connect to the master")
|
help="TCP port to use to connect to the master")
|
||||||
|
|
||||||
subparsers = parser.add_subparsers(dest="action")
|
subparsers = parser.add_subparsers(dest="action")
|
||||||
|
@ -42,8 +46,8 @@ def _get_args():
|
||||||
parser_cancel.add_argument("rid", type=int,
|
parser_cancel.add_argument("rid", type=int,
|
||||||
help="run identifier (RID/PRID)")
|
help="run identifier (RID/PRID)")
|
||||||
|
|
||||||
parser_show = subparsers.add_parser("show",
|
parser_show = subparsers.add_parser("show-queue",
|
||||||
help="show the experiment schedule")
|
help="show the experiment queue")
|
||||||
|
|
||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
|
||||||
|
@ -70,30 +74,25 @@ def _action_cancel(remote, args):
|
||||||
remote.cancel_once(args.rid)
|
remote.cancel_once(args.rid)
|
||||||
|
|
||||||
|
|
||||||
def _action_show(remote, args):
|
def _show_queue(queue):
|
||||||
ce, queue, periodic = remote.get_schedule()
|
clear_screen()
|
||||||
if ce is not None or queue:
|
if queue:
|
||||||
table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"])
|
table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"])
|
||||||
if ce is not None:
|
|
||||||
rid, run_params, timeout, t = ce
|
|
||||||
print("Currently executing RID {} for {:.1f}s".format(rid, t))
|
|
||||||
row = [rid, run_params["file"]]
|
|
||||||
for x in run_params["unit"], run_params["function"], timeout:
|
|
||||||
row.append("-" if x is None else x)
|
|
||||||
table.add_row(row)
|
|
||||||
for rid, run_params, timeout in queue:
|
for rid, run_params, timeout in queue:
|
||||||
row = [rid, run_params["file"]]
|
row = [rid, run_params["file"]]
|
||||||
for x in run_params["unit"], run_params["function"], timeout:
|
for x in run_params["unit"], run_params["function"], timeout:
|
||||||
row.append("-" if x is None else x)
|
row.append("-" if x is None else x)
|
||||||
table.add_row(row)
|
table.add_row(row)
|
||||||
print("Run queue:")
|
|
||||||
print(table)
|
print(table)
|
||||||
else:
|
else:
|
||||||
print("Queue is empty")
|
print("Queue is empty")
|
||||||
|
|
||||||
|
|
||||||
|
def _show_periodic(periodic):
|
||||||
|
clear_screen()
|
||||||
if periodic:
|
if periodic:
|
||||||
table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function",
|
table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function",
|
||||||
"Timeout", "Period"])
|
"Timeout", "Period"])
|
||||||
print("Periodic schedule:")
|
|
||||||
sp = sorted(periodic.items(), key=lambda x: x[1][0])
|
sp = sorted(periodic.items(), key=lambda x: x[1][0])
|
||||||
for prid, (next_run, run_params, timeout, period) in sp:
|
for prid, (next_run, run_params, timeout, period) in sp:
|
||||||
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
|
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
|
||||||
|
@ -107,13 +106,37 @@ def _action_show(remote, args):
|
||||||
print("No periodic schedule")
|
print("No periodic schedule")
|
||||||
|
|
||||||
|
|
||||||
|
def _run_subscriber(host, port, subscriber):
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(subscriber.connect(host, port))
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(asyncio.wait_for(subscriber.receive_task,
|
||||||
|
None))
|
||||||
|
print("Connection to master lost")
|
||||||
|
finally:
|
||||||
|
loop.run_until_complete(subscriber.close())
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
args = _get_args()
|
args = _get_args()
|
||||||
remote = Client(args.server, args.port, "master")
|
if args.action == "show-queue":
|
||||||
try:
|
queue = []
|
||||||
globals()["_action_" + args.action](remote, args)
|
def init_queue(x):
|
||||||
finally:
|
queue[:] = x
|
||||||
remote.close_rpc()
|
return queue
|
||||||
|
subscriber = Subscriber(init_queue, lambda: _show_queue(queue))
|
||||||
|
port = 8887 if args.port is None else args.port
|
||||||
|
_run_subscriber(args.server, port, subscriber)
|
||||||
|
else:
|
||||||
|
port = 8888 if args.port is None else args.port
|
||||||
|
remote = Client(args.server, port, "schedule_control")
|
||||||
|
try:
|
||||||
|
globals()["_action_" + args.action](remote, args)
|
||||||
|
finally:
|
||||||
|
remote.close_rpc()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|
|
@ -4,6 +4,7 @@ import asyncio
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
from artiq.management.pc_rpc import Server
|
from artiq.management.pc_rpc import Server
|
||||||
|
from artiq.management.sync_struct import Publisher
|
||||||
from artiq.management.scheduler import Scheduler
|
from artiq.management.scheduler import Scheduler
|
||||||
|
|
||||||
|
|
||||||
|
@ -13,8 +14,11 @@ def _get_args():
|
||||||
"--bind", default="::1",
|
"--bind", default="::1",
|
||||||
help="hostname or IP address to bind to")
|
help="hostname or IP address to bind to")
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--port", default=8888, type=int,
|
"--port-schedule-control", default=8888, type=int,
|
||||||
help="TCP port to listen to")
|
help="TCP port to listen to for schedule control")
|
||||||
|
parser.add_argument(
|
||||||
|
"--port-schedule-notify", default=8887, type=int,
|
||||||
|
help="TCP port to listen to for schedule notifications")
|
||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
@ -25,12 +29,19 @@ def main():
|
||||||
scheduler = Scheduler("ddb.pyon", "pdb.pyon")
|
scheduler = Scheduler("ddb.pyon", "pdb.pyon")
|
||||||
loop.run_until_complete(scheduler.start())
|
loop.run_until_complete(scheduler.start())
|
||||||
try:
|
try:
|
||||||
server = Server(scheduler, "master")
|
schedule_control = Server(scheduler, "schedule_control")
|
||||||
loop.run_until_complete(server.start(args.bind, args.port))
|
loop.run_until_complete(schedule_control.start(
|
||||||
|
args.bind, args.port_schedule_control))
|
||||||
try:
|
try:
|
||||||
loop.run_forever()
|
schedule_notify = Publisher(scheduler.queued)
|
||||||
|
loop.run_until_complete(schedule_notify.start(
|
||||||
|
args.bind, args.port_schedule_notify))
|
||||||
|
try:
|
||||||
|
loop.run_forever()
|
||||||
|
finally:
|
||||||
|
loop.run_until_complete(schedule_notify.stop())
|
||||||
finally:
|
finally:
|
||||||
loop.run_until_complete(server.stop())
|
loop.run_until_complete(schedule_control.stop())
|
||||||
finally:
|
finally:
|
||||||
loop.run_until_complete(scheduler.stop())
|
loop.run_until_complete(scheduler.stop())
|
||||||
finally:
|
finally:
|
||||||
|
|
Loading…
Reference in New Issue