mc: use pc_rpc

This commit is contained in:
Sebastien Bourdeauducq 2014-10-23 18:48:03 +08:00
parent 2aac43c6e5
commit 7a10cb8c32
4 changed files with 52 additions and 66 deletions

View File

@ -4,23 +4,28 @@ from artiq.management.worker import Worker
class Scheduler: class Scheduler:
def __init__(self, loop): def __init__(self):
self.loop = loop self.worker = Worker()
self.queue = asyncio.Queue() self.queue = asyncio.Queue()
def __enter__(self): @asyncio.coroutine
self.worker = Worker(self.loop) def start(self):
return self self.task = asyncio.Task(self._schedule())
yield from self.worker.create_process()
def __exit__(self, type, value, traceback): @asyncio.coroutine
self.loop.run_until_complete(self.worker.end_process()) def stop(self):
del self.worker self.task.cancel()
yield from asyncio.wait([self.task])
del self.task
yield from self.worker.end_process()
def add_run_once(self, item, timeout): def run_once(self, run_params, timeout):
yield from self.queue.put((item, timeout)) self.queue.put_nowait((run_params, timeout))
def task(self): @asyncio.coroutine
def _schedule(self):
while True: while True:
item, timeout = yield from self.queue.get() run_params, timeout = yield from self.queue.get()
result = yield from self.worker.run(item, timeout) result = yield from self.worker.run(run_params, timeout)
print(result) print(result)

View File

@ -10,12 +10,11 @@ class WorkerFailed(Exception):
class Worker: class Worker:
def __init__(self, loop, send_timeout=0.5, start_reply_timeout=1.0, def __init__(self, send_timeout=0.5, start_reply_timeout=1.0,
term_timeout=1.0): term_timeout=1.0):
self.send_timeout = send_timeout self.send_timeout = send_timeout
self.start_reply_timeout = start_reply_timeout self.start_reply_timeout = start_reply_timeout
self.term_timeout = term_timeout self.term_timeout = term_timeout
loop.run_until_complete(self.create_process())
@asyncio.coroutine @asyncio.coroutine
def create_process(self): def create_process(self):
@ -30,7 +29,7 @@ class Worker:
self.process.stdin.write("\n".encode()) self.process.stdin.write("\n".encode())
try: try:
fut = self.process.stdin.drain() fut = self.process.stdin.drain()
if fut is not (): # FIXME: why does Python return this? if fut is not (): # FIXME: why does Python return this?
yield from asyncio.wait_for(fut, timeout=timeout) yield from asyncio.wait_for(fut, timeout=timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise WorkerFailed("Timeout sending data from worker") raise WorkerFailed("Timeout sending data from worker")

View File

@ -1,8 +1,8 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import socket
import json from artiq.management.pc_rpc import Client
def _get_args(): def _get_args():
@ -18,28 +18,20 @@ def _get_args():
return parser.parse_args() return parser.parse_args()
def _send_obj(sock, obj):
line = json.dumps(obj) + "\n"
sock.sendall(line.encode())
def main(): def main():
args = _get_args() args = _get_args()
remote = Client("::1", 8888)
with socket.create_connection(("::1", 8888)) as sock: try:
for path, name, timeout in args.run_once: for path, name, timeout in args.run_once:
obj = { remote.run_once(
"action": "run_once", {
"run_params": {
"path": path, "path": path,
"name": name "name": name
}, }, int(timeout))
"timeout": timeout
}
_send_obj(sock, obj)
if args.quit_master: if args.quit_master:
obj = {"action": "quit"} remote.quit()
_send_obj(sock, obj) finally:
remote.close()
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -1,52 +1,42 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio import asyncio
import json
from artiq.management.pc_rpc import Server
from artiq.management.scheduler import Scheduler from artiq.management.scheduler import Scheduler
class Server: class Master:
def __init__(self, loop, scheduler): def __init__(self, scheduler):
self.scheduler = scheduler self.scheduler = scheduler
self.terminate_notify = asyncio.Semaphore(0) self.terminate_notify = asyncio.Semaphore(0)
loop.run_until_complete(
asyncio.start_server(self.handle_connection, "::1", 8888))
@asyncio.coroutine @asyncio.coroutine
def handle_connection(self, reader, writer): def wait_quit(self):
while True:
line = yield from reader.readline()
if not line:
writer.close()
return
obj = json.loads(line.decode())
action = obj["action"]
if action == "run_once":
yield from self.scheduler.add_run_once(obj["run_params"],
float(obj["timeout"]))
elif action == "quit":
self.terminate_notify.release()
else:
print("warning: unknown action " + action)
@asyncio.coroutine
def task(self, scheduler_task):
yield from self.terminate_notify.acquire() yield from self.terminate_notify.acquire()
scheduler_task.cancel()
def quit(self):
self.terminate_notify.release()
def run_once(self, run_params, timeout):
self.scheduler.run_once(run_params, timeout)
def main(): def main():
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
with Scheduler(loop) as scheduler: scheduler = Scheduler()
server = Server(loop, scheduler) loop.run_until_complete(scheduler.start())
scheduler_task = asyncio.Task(scheduler.task()) try:
server_task = asyncio.Task(server.task(scheduler_task)) master = Master(scheduler)
loop.run_until_complete(asyncio.wait([ server = Server(master)
scheduler_task, loop.run_until_complete(server.start("::1", 8888))
server_task try:
])) loop.run_until_complete(master.wait_quit())
finally:
loop.run_until_complete(server.stop())
finally:
loop.run_until_complete(scheduler.stop())
finally: finally:
loop.close() loop.close()