From 59194176a9427d707314a10487cbbe34cb56770b Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 5 Oct 2014 16:25:31 +0800 Subject: [PATCH] add rudimentary master/controller/client --- artiq/management/scheduler.py | 26 ++++++++++++ artiq/management/worker.py | 74 +++++++++++++++++++++++++++++++++ artiq/management/worker_impl.py | 45 ++++++++++++++++++++ frontend/artiq | 45 ++++++++++++++++++++ frontend/artiqd | 54 ++++++++++++++++++++++++ 5 files changed, 244 insertions(+) create mode 100644 artiq/management/scheduler.py create mode 100644 artiq/management/worker.py create mode 100644 artiq/management/worker_impl.py create mode 100755 frontend/artiq create mode 100755 frontend/artiqd diff --git a/artiq/management/scheduler.py b/artiq/management/scheduler.py new file mode 100644 index 000000000..800b44f82 --- /dev/null +++ b/artiq/management/scheduler.py @@ -0,0 +1,26 @@ +import asyncio + +from artiq.management.worker import Worker + + +class Scheduler: + def __init__(self, loop): + self.loop = loop + self.queue = asyncio.Queue() + + def __enter__(self): + self.worker = Worker(self.loop) + return self + + def __exit__(self, type, value, traceback): + self.loop.run_until_complete(self.worker.end_process()) + del self.worker + + def add_run_once(self, item, timeout): + yield from self.queue.put((item, timeout)) + + def task(self): + while True: + item, timeout = yield from self.queue.get() + result = yield from self.worker.run(item, timeout) + print(result) diff --git a/artiq/management/worker.py b/artiq/management/worker.py new file mode 100644 index 000000000..f914c45df --- /dev/null +++ b/artiq/management/worker.py @@ -0,0 +1,74 @@ +import sys +import asyncio +import subprocess +import signal +import json + + +class WorkerFailed(Exception): + pass + + +class Worker: + def __init__(self, loop, send_timeout=0.5, start_reply_timeout=1.0, + term_timeout=1.0): + self.send_timeout = send_timeout + self.start_reply_timeout = start_reply_timeout + self.term_timeout = term_timeout + loop.run_until_complete(self.create_process()) + + @asyncio.coroutine + def create_process(self): + self.process = yield from asyncio.create_subprocess_exec( + sys.executable, "-m", "artiq.management.worker_impl", + stdout=subprocess.PIPE, stdin=subprocess.PIPE) + + @asyncio.coroutine + def _send(self, obj, timeout): + line = json.dumps(obj) + self.process.stdin.write(line.encode()) + self.process.stdin.write("\n".encode()) + try: + fut = self.process.stdin.drain() + if fut is not (): # FIXME: why does Python return this? + yield from asyncio.wait_for(fut, timeout=timeout) + except asyncio.TimeoutError: + raise WorkerFailed("Timeout sending data from worker") + except: + raise WorkerFailed("Failed to send data to worker") + + @asyncio.coroutine + def _recv(self, timeout): + try: + line = yield from asyncio.wait_for( + self.process.stdout.readline(), timeout=timeout) + except asyncio.TimeoutError: + raise WorkerFailed("Timeout receiving data from worker") + if not line: + raise WorkerFailed( + "Worker ended unexpectedly while trying to receive data") + try: + obj = json.loads(line.decode()) + except: + raise WorkerFailed("Worker sent invalid JSON data") + return obj + + @asyncio.coroutine + def run(self, run_params, result_timeout): + yield from self._send(run_params, self.send_timeout) + obj = yield from self._recv(self.start_reply_timeout) + if obj != "ack": + raise WorkerFailed("Incorrect acknowledgement") + result = yield from self._recv(result_timeout) + return result + + @asyncio.coroutine + def end_process(self): + if self.process.returncode is not None: + return + self.process.send_signal(signal.SIGTERM) + try: + yield from asyncio.wait_for( + self.process.wait(), timeout=self.term_timeout) + except asyncio.TimeoutError: + self.process.send_signal(signal.SIGKILL) diff --git a/artiq/management/worker_impl.py b/artiq/management/worker_impl.py new file mode 100644 index 000000000..e9a710ab4 --- /dev/null +++ b/artiq/management/worker_impl.py @@ -0,0 +1,45 @@ +import json +import sys +import importlib + + +def import_in_folder(path, name): + try: + del sys.modules[name] # force path search + except KeyError: + pass + loader = importlib.find_loader(name, [path]) + if loader is None: + raise ImportError("Could not find loader") + return loader.load_module() + + +def run(path, name): + module = import_in_folder(path, name) + module.main() + + +def put_object(obj): + ds = json.dumps(obj) + sys.__stdout__.write(ds) + sys.__stdout__.write("\n") + sys.__stdout__.flush() + + +def main(): + sys.stdout = sys.stderr + + while True: + line = sys.__stdin__.readline() + obj = json.loads(line) + put_object("ack") + + try: + run(**obj) + except Exception as e: + put_object({"status": "failed", "message": str(e)}) + else: + put_object({"status": "ok"}) + +if __name__ == "__main__": + main() diff --git a/frontend/artiq b/frontend/artiq new file mode 100755 index 000000000..dd726ee06 --- /dev/null +++ b/frontend/artiq @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 + +import argparse +import socket +import json + + +def _get_args(): + parser = argparse.ArgumentParser(description="ARTIQ client") + parser.add_argument( + "-o", "--run-once", default=[], nargs=3, + action="append", + help="run experiment once. arguments: ") + parser.add_argument( + "-q", "--quit-master", default=False, + action="store_true", + help="causes the master to quit") + return parser.parse_args() + + +def _send_obj(sock, obj): + line = json.dumps(obj) + "\n" + sock.sendall(line.encode()) + + +def main(): + args = _get_args() + + with socket.create_connection(("::1", 8888)) as sock: + for path, name, timeout in args.run_once: + obj = { + "action": "run_once", + "run_params": { + "path": path, + "name": name + }, + "timeout": timeout + } + _send_obj(sock, obj) + if args.quit_master: + obj = {"action": "quit"} + _send_obj(sock, obj) + +if __name__ == "__main__": + main() diff --git a/frontend/artiqd b/frontend/artiqd new file mode 100755 index 000000000..62b889b32 --- /dev/null +++ b/frontend/artiqd @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 + +import asyncio +import json + +from artiq.management.scheduler import Scheduler + + +class Server: + def __init__(self, loop, scheduler): + self.scheduler = scheduler + self.terminate_notify = asyncio.Semaphore(0) + loop.run_until_complete( + asyncio.start_server(self.handle_connection, "::1", 8888)) + + @asyncio.coroutine + def handle_connection(self, reader, writer): + while True: + line = yield from reader.readline() + if not line: + writer.close() + return + obj = json.loads(line.decode()) + action = obj["action"] + if action == "run_once": + yield from self.scheduler.add_run_once(obj["run_params"], + float(obj["timeout"])) + elif action == "quit": + self.terminate_notify.release() + else: + print("warning: unknown action " + action) + + @asyncio.coroutine + def task(self, scheduler_task): + yield from self.terminate_notify.acquire() + scheduler_task.cancel() + + +def main(): + loop = asyncio.get_event_loop() + try: + with Scheduler(loop) as scheduler: + server = Server(loop, scheduler) + scheduler_task = asyncio.Task(scheduler.task()) + server_task = asyncio.Task(server.task(scheduler_task)) + loop.run_until_complete(asyncio.wait([ + scheduler_task, + server_task + ])) + finally: + loop.close() + +if __name__ == "__main__": + main()