master,worker: split prepare/run/analyze

This commit is contained in:
Sebastien Bourdeauducq 2015-03-09 23:34:09 +01:00
parent 4c280d5fcc
commit f2134fa4b2
3 changed files with 98 additions and 59 deletions

View File

@ -64,7 +64,12 @@ class Scheduler:
def _run(self, rid, run_params): def _run(self, rid, run_params):
self.run_cb(rid, run_params) self.run_cb(rid, run_params)
try: try:
yield from self.worker.run(rid, run_params) yield from self.worker.prepare(rid, run_params)
try:
yield from self.worker.run()
yield from self.worker.analyze()
finally:
yield from self.worker.close()
except Exception as e: except Exception as e:
print("RID {} failed:".format(rid)) print("RID {} failed:".format(rid))
print(e) print(e)

View File

@ -1,8 +1,8 @@
import sys import sys
import asyncio import asyncio
import subprocess import subprocess
import signal
import traceback import traceback
import time
from artiq.protocols import pyon from artiq.protocols import pyon
@ -13,10 +13,10 @@ class WorkerError(Exception):
class Worker: class Worker:
def __init__(self, def __init__(self,
send_timeout=0.5, start_reply_timeout=1.0, term_timeout=1.0): send_timeout=0.5, prepare_timeout=15.0, term_timeout=1.0):
self.handlers = dict() self.handlers = dict()
self.send_timeout = send_timeout self.send_timeout = send_timeout
self.start_reply_timeout = start_reply_timeout self.prepare_timeout = prepare_timeout
self.term_timeout = term_timeout self.term_timeout = term_timeout
@asyncio.coroutine @asyncio.coroutine
@ -26,15 +26,23 @@ class Worker:
stdout=subprocess.PIPE, stdin=subprocess.PIPE) stdout=subprocess.PIPE, stdin=subprocess.PIPE)
@asyncio.coroutine @asyncio.coroutine
def _end_process(self): def close(self):
if self.process.returncode is not None: if self.process.returncode is not None:
if process.returncode != 0:
raise WorkerError("Worker finished with status code {}"
.format(process.returncode))
return
obj = {"action": "terminate"}
try:
yield from self._send(obj, self.send_timeout)
except:
self.process.kill()
return return
self.process.send_signal(signal.SIGTERM)
try: try:
yield from asyncio.wait_for( yield from asyncio.wait_for(
self.process.wait(), timeout=self.term_timeout) self.process.wait(), timeout=self.term_timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
self.process.send_signal(signal.SIGKILL) self.process.kill()
@asyncio.coroutine @asyncio.coroutine
def _send(self, obj, timeout): def _send(self, obj, timeout):
@ -58,7 +66,7 @@ class Worker:
except asyncio.TimeoutError: except asyncio.TimeoutError:
raise WorkerError("Timeout receiving data from worker") raise WorkerError("Timeout receiving data from worker")
if not line: if not line:
return None raise WorkerError("Worker ended while attempting to receive data")
try: try:
obj = pyon.decode(line.decode()) obj = pyon.decode(line.decode())
except: except:
@ -66,30 +74,45 @@ class Worker:
return obj return obj
@asyncio.coroutine @asyncio.coroutine
def run(self, rid, run_params): def _handle_worker_requests(self, timeout):
yield from self._create_process() if timeout is None:
end_time = None
else:
end_time = time.monotonic() + timeout
while True:
obj = yield from self._recv(None if end_time is None
else end_time - time.monotonic())
action = obj["action"]
if action == "completed":
return
del obj["action"]
try:
data = self.handlers[action](**obj)
reply = {"status": "ok", "data": data}
except:
reply = {"status": "failed",
"message": traceback.format_exc()}
yield from self._send(reply, self.send_timeout)
@asyncio.coroutine
def prepare(self, rid, run_params):
yield from self._create_process()
try: try:
obj = {"rid": rid, "run_params": run_params} obj = {"action": "prepare", "rid": rid, "run_params": run_params}
yield from self._send(obj, self.send_timeout) yield from self._send(obj, self.send_timeout)
obj = yield from self._recv(self.start_reply_timeout) yield from self._handle_worker_requests(self.prepare_timeout)
if obj != "ack": except:
raise WorkerError("Incorrect acknowledgement") yield from self.close()
while True: raise
obj = yield from self._recv(None)
if obj is None: @asyncio.coroutine
if self.process.returncode != 0: def run(self):
raise WorkerError("Worker finished with status code {}" obj = {"action": "run"}
.format(self.process.returncode)) yield from self._send(obj, self.send_timeout)
break yield from self._handle_worker_requests(None)
action = obj["action"]
del obj["action"] @asyncio.coroutine
try: def analyze(self):
data = self.handlers[action](**obj) obj = {"action": "analyze"}
reply = {"status": "ok", "data": data} yield from self._send(obj, self.send_timeout)
except: yield from self._handle_worker_requests(None)
reply = {"status": "failed",
"message": traceback.format_exc()}
yield from self._send(reply, self.send_timeout)
finally:
yield from self._end_process()

View File

@ -72,36 +72,47 @@ def get_exp(file, exp):
return getattr(module, exp) return getattr(module, exp)
def run(rid, run_params):
start_time = time.localtime()
exp = get_exp(run_params["file"], run_params["experiment"])
rdb = ResultDB(init_rt_results, update_rt_results)
dbh = DBHub(ParentDDB, ParentPDB, rdb)
try:
exp_inst = exp(dbh,
scheduler=Scheduler,
run_params=run_params,
**run_params["arguments"])
exp_inst.run()
exp_inst.analyze()
finally:
dbh.close()
f = get_hdf5_output(start_time, rid, exp.__name__)
try:
rdb.write_hdf5(f)
finally:
f.close()
def main(): def main():
sys.stdout = sys.stderr sys.stdout = sys.stderr
obj = get_object() start_time = None
put_object("ack") rid = None
run(obj["rid"], obj["run_params"]) run_params = None
put_object({"action": "report_completed"}) exp = None
exp_inst = None
rdb = ResultDB(init_rt_results, update_rt_results)
dbh = DBHub(ParentDDB, ParentPDB, rdb)
try:
while True:
obj = get_object()
action = obj["action"]
if action == "prepare":
start_time = time.localtime()
rid = obj["rid"]
run_params = obj["run_params"]
exp = get_exp(run_params["file"], run_params["experiment"])
exp_inst = exp(dbh,
scheduler=Scheduler,
run_params=run_params,
**run_params["arguments"])
put_object({"action": "completed"})
elif action == "run":
exp_inst.run()
put_object({"action": "completed"})
elif action == "analyze":
exp_inst.analyze()
f = get_hdf5_output(start_time, rid, exp.__name__)
try:
rdb.write_hdf5(f)
finally:
f.close()
put_object({"action": "completed"})
elif action == "terminate":
break
finally:
dbh.close()
if __name__ == "__main__": if __name__ == "__main__":
main() main()