forked from M-Labs/artiq
add rudimentary master/controller/client
This commit is contained in:
parent
2f58cf6aff
commit
59194176a9
26
artiq/management/scheduler.py
Normal file
26
artiq/management/scheduler.py
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from artiq.management.worker import Worker
|
||||||
|
|
||||||
|
|
||||||
|
class Scheduler:
|
||||||
|
def __init__(self, loop):
|
||||||
|
self.loop = loop
|
||||||
|
self.queue = asyncio.Queue()
|
||||||
|
|
||||||
|
def __enter__(self):
|
||||||
|
self.worker = Worker(self.loop)
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, type, value, traceback):
|
||||||
|
self.loop.run_until_complete(self.worker.end_process())
|
||||||
|
del self.worker
|
||||||
|
|
||||||
|
def add_run_once(self, item, timeout):
|
||||||
|
yield from self.queue.put((item, timeout))
|
||||||
|
|
||||||
|
def task(self):
|
||||||
|
while True:
|
||||||
|
item, timeout = yield from self.queue.get()
|
||||||
|
result = yield from self.worker.run(item, timeout)
|
||||||
|
print(result)
|
74
artiq/management/worker.py
Normal file
74
artiq/management/worker.py
Normal file
@ -0,0 +1,74 @@
|
|||||||
|
import sys
|
||||||
|
import asyncio
|
||||||
|
import subprocess
|
||||||
|
import signal
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerFailed(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Worker:
|
||||||
|
def __init__(self, loop, send_timeout=0.5, start_reply_timeout=1.0,
|
||||||
|
term_timeout=1.0):
|
||||||
|
self.send_timeout = send_timeout
|
||||||
|
self.start_reply_timeout = start_reply_timeout
|
||||||
|
self.term_timeout = term_timeout
|
||||||
|
loop.run_until_complete(self.create_process())
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def create_process(self):
|
||||||
|
self.process = yield from asyncio.create_subprocess_exec(
|
||||||
|
sys.executable, "-m", "artiq.management.worker_impl",
|
||||||
|
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def _send(self, obj, timeout):
|
||||||
|
line = json.dumps(obj)
|
||||||
|
self.process.stdin.write(line.encode())
|
||||||
|
self.process.stdin.write("\n".encode())
|
||||||
|
try:
|
||||||
|
fut = self.process.stdin.drain()
|
||||||
|
if fut is not (): # FIXME: why does Python return this?
|
||||||
|
yield from asyncio.wait_for(fut, timeout=timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
raise WorkerFailed("Timeout sending data from worker")
|
||||||
|
except:
|
||||||
|
raise WorkerFailed("Failed to send data to worker")
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def _recv(self, timeout):
|
||||||
|
try:
|
||||||
|
line = yield from asyncio.wait_for(
|
||||||
|
self.process.stdout.readline(), timeout=timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
raise WorkerFailed("Timeout receiving data from worker")
|
||||||
|
if not line:
|
||||||
|
raise WorkerFailed(
|
||||||
|
"Worker ended unexpectedly while trying to receive data")
|
||||||
|
try:
|
||||||
|
obj = json.loads(line.decode())
|
||||||
|
except:
|
||||||
|
raise WorkerFailed("Worker sent invalid JSON data")
|
||||||
|
return obj
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def run(self, run_params, result_timeout):
|
||||||
|
yield from self._send(run_params, self.send_timeout)
|
||||||
|
obj = yield from self._recv(self.start_reply_timeout)
|
||||||
|
if obj != "ack":
|
||||||
|
raise WorkerFailed("Incorrect acknowledgement")
|
||||||
|
result = yield from self._recv(result_timeout)
|
||||||
|
return result
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def end_process(self):
|
||||||
|
if self.process.returncode is not None:
|
||||||
|
return
|
||||||
|
self.process.send_signal(signal.SIGTERM)
|
||||||
|
try:
|
||||||
|
yield from asyncio.wait_for(
|
||||||
|
self.process.wait(), timeout=self.term_timeout)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self.process.send_signal(signal.SIGKILL)
|
45
artiq/management/worker_impl.py
Normal file
45
artiq/management/worker_impl.py
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import importlib
|
||||||
|
|
||||||
|
|
||||||
|
def import_in_folder(path, name):
|
||||||
|
try:
|
||||||
|
del sys.modules[name] # force path search
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
loader = importlib.find_loader(name, [path])
|
||||||
|
if loader is None:
|
||||||
|
raise ImportError("Could not find loader")
|
||||||
|
return loader.load_module()
|
||||||
|
|
||||||
|
|
||||||
|
def run(path, name):
|
||||||
|
module = import_in_folder(path, name)
|
||||||
|
module.main()
|
||||||
|
|
||||||
|
|
||||||
|
def put_object(obj):
|
||||||
|
ds = json.dumps(obj)
|
||||||
|
sys.__stdout__.write(ds)
|
||||||
|
sys.__stdout__.write("\n")
|
||||||
|
sys.__stdout__.flush()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
sys.stdout = sys.stderr
|
||||||
|
|
||||||
|
while True:
|
||||||
|
line = sys.__stdin__.readline()
|
||||||
|
obj = json.loads(line)
|
||||||
|
put_object("ack")
|
||||||
|
|
||||||
|
try:
|
||||||
|
run(**obj)
|
||||||
|
except Exception as e:
|
||||||
|
put_object({"status": "failed", "message": str(e)})
|
||||||
|
else:
|
||||||
|
put_object({"status": "ok"})
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
45
frontend/artiq
Executable file
45
frontend/artiq
Executable file
@ -0,0 +1,45 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import socket
|
||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
def _get_args():
|
||||||
|
parser = argparse.ArgumentParser(description="ARTIQ client")
|
||||||
|
parser.add_argument(
|
||||||
|
"-o", "--run-once", default=[], nargs=3,
|
||||||
|
action="append",
|
||||||
|
help="run experiment once. arguments: <path> <name> <timeout>")
|
||||||
|
parser.add_argument(
|
||||||
|
"-q", "--quit-master", default=False,
|
||||||
|
action="store_true",
|
||||||
|
help="causes the master to quit")
|
||||||
|
return parser.parse_args()
|
||||||
|
|
||||||
|
|
||||||
|
def _send_obj(sock, obj):
|
||||||
|
line = json.dumps(obj) + "\n"
|
||||||
|
sock.sendall(line.encode())
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
args = _get_args()
|
||||||
|
|
||||||
|
with socket.create_connection(("::1", 8888)) as sock:
|
||||||
|
for path, name, timeout in args.run_once:
|
||||||
|
obj = {
|
||||||
|
"action": "run_once",
|
||||||
|
"run_params": {
|
||||||
|
"path": path,
|
||||||
|
"name": name
|
||||||
|
},
|
||||||
|
"timeout": timeout
|
||||||
|
}
|
||||||
|
_send_obj(sock, obj)
|
||||||
|
if args.quit_master:
|
||||||
|
obj = {"action": "quit"}
|
||||||
|
_send_obj(sock, obj)
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
54
frontend/artiqd
Executable file
54
frontend/artiqd
Executable file
@ -0,0 +1,54 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
|
||||||
|
from artiq.management.scheduler import Scheduler
|
||||||
|
|
||||||
|
|
||||||
|
class Server:
|
||||||
|
def __init__(self, loop, scheduler):
|
||||||
|
self.scheduler = scheduler
|
||||||
|
self.terminate_notify = asyncio.Semaphore(0)
|
||||||
|
loop.run_until_complete(
|
||||||
|
asyncio.start_server(self.handle_connection, "::1", 8888))
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def handle_connection(self, reader, writer):
|
||||||
|
while True:
|
||||||
|
line = yield from reader.readline()
|
||||||
|
if not line:
|
||||||
|
writer.close()
|
||||||
|
return
|
||||||
|
obj = json.loads(line.decode())
|
||||||
|
action = obj["action"]
|
||||||
|
if action == "run_once":
|
||||||
|
yield from self.scheduler.add_run_once(obj["run_params"],
|
||||||
|
float(obj["timeout"]))
|
||||||
|
elif action == "quit":
|
||||||
|
self.terminate_notify.release()
|
||||||
|
else:
|
||||||
|
print("warning: unknown action " + action)
|
||||||
|
|
||||||
|
@asyncio.coroutine
|
||||||
|
def task(self, scheduler_task):
|
||||||
|
yield from self.terminate_notify.acquire()
|
||||||
|
scheduler_task.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
try:
|
||||||
|
with Scheduler(loop) as scheduler:
|
||||||
|
server = Server(loop, scheduler)
|
||||||
|
scheduler_task = asyncio.Task(scheduler.task())
|
||||||
|
server_task = asyncio.Task(server.task(scheduler_task))
|
||||||
|
loop.run_until_complete(asyncio.wait([
|
||||||
|
scheduler_task,
|
||||||
|
server_task
|
||||||
|
]))
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
Loading…
Reference in New Issue
Block a user