diff --git a/artiq/management/scheduler.py b/artiq/management/scheduler.py index 800b44f82..9372b563b 100644 --- a/artiq/management/scheduler.py +++ b/artiq/management/scheduler.py @@ -4,23 +4,28 @@ from artiq.management.worker import Worker class Scheduler: - def __init__(self, loop): - self.loop = loop + def __init__(self): + self.worker = Worker() self.queue = asyncio.Queue() - def __enter__(self): - self.worker = Worker(self.loop) - return self + @asyncio.coroutine + def start(self): + self.task = asyncio.Task(self._schedule()) + yield from self.worker.create_process() - def __exit__(self, type, value, traceback): - self.loop.run_until_complete(self.worker.end_process()) - del self.worker + @asyncio.coroutine + def stop(self): + self.task.cancel() + yield from asyncio.wait([self.task]) + del self.task + yield from self.worker.end_process() - def add_run_once(self, item, timeout): - yield from self.queue.put((item, timeout)) + def run_once(self, run_params, timeout): + self.queue.put_nowait((run_params, timeout)) - def task(self): + @asyncio.coroutine + def _schedule(self): while True: - item, timeout = yield from self.queue.get() - result = yield from self.worker.run(item, timeout) + run_params, timeout = yield from self.queue.get() + result = yield from self.worker.run(run_params, timeout) print(result) diff --git a/artiq/management/worker.py b/artiq/management/worker.py index f914c45df..b23503e0a 100644 --- a/artiq/management/worker.py +++ b/artiq/management/worker.py @@ -10,12 +10,11 @@ class WorkerFailed(Exception): class Worker: - def __init__(self, loop, send_timeout=0.5, start_reply_timeout=1.0, + def __init__(self, send_timeout=0.5, start_reply_timeout=1.0, term_timeout=1.0): self.send_timeout = send_timeout self.start_reply_timeout = start_reply_timeout self.term_timeout = term_timeout - loop.run_until_complete(self.create_process()) @asyncio.coroutine def create_process(self): @@ -30,7 +29,7 @@ class Worker: self.process.stdin.write("\n".encode()) try: fut = self.process.stdin.drain() - if fut is not (): # FIXME: why does Python return this? + if fut is not (): # FIXME: why does Python return this? yield from asyncio.wait_for(fut, timeout=timeout) except asyncio.TimeoutError: raise WorkerFailed("Timeout sending data from worker") diff --git a/frontend/artiq b/frontend/artiq index dd726ee06..4f1caf32c 100755 --- a/frontend/artiq +++ b/frontend/artiq @@ -1,8 +1,8 @@ #!/usr/bin/env python3 import argparse -import socket -import json + +from artiq.management.pc_rpc import Client def _get_args(): @@ -18,28 +18,20 @@ def _get_args(): return parser.parse_args() -def _send_obj(sock, obj): - line = json.dumps(obj) + "\n" - sock.sendall(line.encode()) - - def main(): args = _get_args() - - with socket.create_connection(("::1", 8888)) as sock: + remote = Client("::1", 8888) + try: for path, name, timeout in args.run_once: - obj = { - "action": "run_once", - "run_params": { + remote.run_once( + { "path": path, "name": name - }, - "timeout": timeout - } - _send_obj(sock, obj) + }, int(timeout)) if args.quit_master: - obj = {"action": "quit"} - _send_obj(sock, obj) + remote.quit() + finally: + remote.close() if __name__ == "__main__": main() diff --git a/frontend/artiqd b/frontend/artiqd index 62b889b32..c257fb09a 100755 --- a/frontend/artiqd +++ b/frontend/artiqd @@ -1,52 +1,42 @@ #!/usr/bin/env python3 import asyncio -import json +from artiq.management.pc_rpc import Server from artiq.management.scheduler import Scheduler -class Server: - def __init__(self, loop, scheduler): +class Master: + def __init__(self, scheduler): self.scheduler = scheduler self.terminate_notify = asyncio.Semaphore(0) - loop.run_until_complete( - asyncio.start_server(self.handle_connection, "::1", 8888)) @asyncio.coroutine - def handle_connection(self, reader, writer): - while True: - line = yield from reader.readline() - if not line: - writer.close() - return - obj = json.loads(line.decode()) - action = obj["action"] - if action == "run_once": - yield from self.scheduler.add_run_once(obj["run_params"], - float(obj["timeout"])) - elif action == "quit": - self.terminate_notify.release() - else: - print("warning: unknown action " + action) - - @asyncio.coroutine - def task(self, scheduler_task): + def wait_quit(self): yield from self.terminate_notify.acquire() - scheduler_task.cancel() + + def quit(self): + self.terminate_notify.release() + + def run_once(self, run_params, timeout): + self.scheduler.run_once(run_params, timeout) def main(): loop = asyncio.get_event_loop() try: - with Scheduler(loop) as scheduler: - server = Server(loop, scheduler) - scheduler_task = asyncio.Task(scheduler.task()) - server_task = asyncio.Task(server.task(scheduler_task)) - loop.run_until_complete(asyncio.wait([ - scheduler_task, - server_task - ])) + scheduler = Scheduler() + loop.run_until_complete(scheduler.start()) + try: + master = Master(scheduler) + server = Server(master) + loop.run_until_complete(server.start("::1", 8888)) + try: + loop.run_until_complete(master.wait_quit()) + finally: + loop.run_until_complete(server.stop()) + finally: + loop.run_until_complete(scheduler.stop()) finally: loop.close()