forked from M-Labs/artiq
1
0
Fork 0

master: use a new worker process for each experiment

This commit is contained in:
Sebastien Bourdeauducq 2015-03-09 16:22:41 +01:00
parent ec1d082730
commit 4c280d5fcc
4 changed files with 47 additions and 60 deletions

View File

@ -62,7 +62,7 @@ def main():
"scheduler_run_timed": scheduler.run_timed, "scheduler_run_timed": scheduler.run_timed,
"scheduler_cancel_timed": scheduler.cancel_timed, "scheduler_cancel_timed": scheduler.cancel_timed,
} }
loop.run_until_complete(scheduler.start()) scheduler.start()
atexit.register(lambda: loop.run_until_complete(scheduler.stop())) atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
server_control = Server({ server_control = Server({

View File

@ -25,17 +25,14 @@ class Scheduler:
trids -= set(self.timed.read.keys()) trids -= set(self.timed.read.keys())
return next(iter(trids)) return next(iter(trids))
@asyncio.coroutine
def start(self): def start(self):
self.task = asyncio.Task(self._schedule()) self.task = asyncio.Task(self._schedule())
yield from self.worker.create_process()
@asyncio.coroutine @asyncio.coroutine
def stop(self): def stop(self):
self.task.cancel() self.task.cancel()
yield from asyncio.wait([self.task]) yield from asyncio.wait([self.task])
del self.task del self.task
yield from self.worker.end_process()
def run_queued(self, run_params): def run_queued(self, run_params):
rid = self.new_rid() rid = self.new_rid()

View File

@ -7,11 +7,7 @@ import traceback
from artiq.protocols import pyon from artiq.protocols import pyon
class WorkerFailed(Exception): class WorkerError(Exception):
pass
class RunFailed(Exception):
pass pass
@ -24,11 +20,22 @@ class Worker:
self.term_timeout = term_timeout self.term_timeout = term_timeout
@asyncio.coroutine @asyncio.coroutine
def create_process(self): def _create_process(self):
self.process = yield from asyncio.create_subprocess_exec( self.process = yield from asyncio.create_subprocess_exec(
sys.executable, "-m", "artiq.master.worker_impl", sys.executable, "-m", "artiq.master.worker_impl",
stdout=subprocess.PIPE, stdin=subprocess.PIPE) stdout=subprocess.PIPE, stdin=subprocess.PIPE)
@asyncio.coroutine
def _end_process(self):
if self.process.returncode is not None:
return
self.process.send_signal(signal.SIGTERM)
try:
yield from asyncio.wait_for(
self.process.wait(), timeout=self.term_timeout)
except asyncio.TimeoutError:
self.process.send_signal(signal.SIGKILL)
@asyncio.coroutine @asyncio.coroutine
def _send(self, obj, timeout): def _send(self, obj, timeout):
line = pyon.encode(obj) line = pyon.encode(obj)
@ -39,9 +46,9 @@ class Worker:
if fut is not (): # FIXME: why does Python return this? if fut is not (): # FIXME: why does Python return this?
yield from asyncio.wait_for(fut, timeout=timeout) yield from asyncio.wait_for(fut, timeout=timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise WorkerFailed("Timeout sending data from worker") raise WorkerError("Timeout sending data from worker")
except: except:
raise WorkerFailed("Failed to send data to worker") raise WorkerError("Failed to send data to worker")
@asyncio.coroutine @asyncio.coroutine
def _recv(self, timeout): def _recv(self, timeout):
@ -49,32 +56,33 @@ class Worker:
line = yield from asyncio.wait_for( line = yield from asyncio.wait_for(
self.process.stdout.readline(), timeout=timeout) self.process.stdout.readline(), timeout=timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise WorkerFailed("Timeout receiving data from worker") raise WorkerError("Timeout receiving data from worker")
if not line: if not line:
raise WorkerFailed( return None
"Worker ended unexpectedly while trying to receive data")
try: try:
obj = pyon.decode(line.decode()) obj = pyon.decode(line.decode())
except: except:
raise WorkerFailed("Worker sent invalid PYON data") raise WorkerError("Worker sent invalid PYON data")
return obj return obj
@asyncio.coroutine @asyncio.coroutine
def run(self, rid, run_params): def run(self, rid, run_params):
yield from self._create_process()
try:
obj = {"rid": rid, "run_params": run_params} obj = {"rid": rid, "run_params": run_params}
yield from self._send(obj, self.send_timeout) yield from self._send(obj, self.send_timeout)
obj = yield from self._recv(self.start_reply_timeout) obj = yield from self._recv(self.start_reply_timeout)
if obj != "ack": if obj != "ack":
raise WorkerFailed("Incorrect acknowledgement") raise WorkerError("Incorrect acknowledgement")
while True: while True:
obj = yield from self._recv(None) obj = yield from self._recv(None)
if obj is None:
if self.process.returncode != 0:
raise WorkerError("Worker finished with status code {}"
.format(self.process.returncode))
break
action = obj["action"] action = obj["action"]
if action == "report_completed":
if obj["status"] != "ok":
raise RunFailed(obj["message"])
else:
return
else:
del obj["action"] del obj["action"]
try: try:
data = self.handlers[action](**obj) data = self.handlers[action](**obj)
@ -83,14 +91,5 @@ class Worker:
reply = {"status": "failed", reply = {"status": "failed",
"message": traceback.format_exc()} "message": traceback.format_exc()}
yield from self._send(reply, self.send_timeout) yield from self._send(reply, self.send_timeout)
finally:
@asyncio.coroutine yield from self._end_process()
def end_process(self):
if self.process.returncode is not None:
return
self.process.send_signal(signal.SIGTERM)
try:
yield from asyncio.wait_for(
self.process.wait(), timeout=self.term_timeout)
except asyncio.TimeoutError:
self.process.send_signal(signal.SIGKILL)

View File

@ -1,6 +1,5 @@
import sys import sys
import time import time
import traceback
from artiq.protocols import pyon from artiq.protocols import pyon
from artiq.tools import file_import from artiq.tools import file_import
@ -79,7 +78,6 @@ def run(rid, run_params):
rdb = ResultDB(init_rt_results, update_rt_results) rdb = ResultDB(init_rt_results, update_rt_results)
dbh = DBHub(ParentDDB, ParentPDB, rdb) dbh = DBHub(ParentDDB, ParentPDB, rdb)
try:
try: try:
exp_inst = exp(dbh, exp_inst = exp(dbh,
scheduler=Scheduler, scheduler=Scheduler,
@ -87,13 +85,6 @@ def run(rid, run_params):
**run_params["arguments"]) **run_params["arguments"])
exp_inst.run() exp_inst.run()
exp_inst.analyze() exp_inst.analyze()
except Exception:
put_object({"action": "report_completed",
"status": "failed",
"message": traceback.format_exc()})
else:
put_object({"action": "report_completed",
"status": "ok"})
finally: finally:
dbh.close() dbh.close()
@ -107,10 +98,10 @@ def run(rid, run_params):
def main(): def main():
sys.stdout = sys.stderr sys.stdout = sys.stderr
while True:
obj = get_object() obj = get_object()
put_object("ack") put_object("ack")
run(obj["rid"], obj["run_params"]) run(obj["rid"], obj["run_params"])
put_object({"action": "report_completed"})
if __name__ == "__main__": if __name__ == "__main__":
main() main()