worker: split write_results action

This commit is contained in:
Sebastien Bourdeauducq 2015-03-11 19:06:46 +01:00
parent 4ba54ac929
commit 43a05c783d
3 changed files with 40 additions and 21 deletions

View File

@ -69,6 +69,7 @@ class Scheduler:
try:
yield from worker.run()
yield from worker.analyze()
yield from worker.write_results()
finally:
yield from worker.close()
except Exception as e:

View File

@ -23,15 +23,20 @@ class WorkerError(Exception):
class Worker:
def __init__(self, handlers,
send_timeout=0.5, prepare_timeout=15.0, term_timeout=1.0):
send_timeout=0.5, term_timeout=1.0,
prepare_timeout=15.0, results_timeout=15.0):
self.handlers = handlers
self.send_timeout = send_timeout
self.prepare_timeout = prepare_timeout
self.term_timeout = term_timeout
self.prepare_timeout = prepare_timeout
self.results_timeout = results_timeout
self.watchdogs = dict() # wid -> expiration (using time.monotonic)
def create_watchdog(self, t):
avail = set(range(len(self.watchdogs) + 1)) \
n_user_watchdogs = len(self.watchdogs)
if -1 in self.watchdogs:
n_user_watchdogs -= 1
avail = set(range(n_user_watchdogs + 1)) \
- set(self.watchdogs.keys())
wid = next(iter(avail))
self.watchdogs[wid] = time.monotonic() + strip_unit(t, "s")
@ -101,9 +106,12 @@ class Worker:
return obj
@asyncio.coroutine
def _handle_worker_requests(self, timeout_func):
def _handle_worker_requests(self):
while True:
obj = yield from self._recv(timeout_func())
try:
obj = yield from self._recv(self.watchdog_time())
except WorkerTimeout:
raise WorkerWatchdogTimeout
action = obj["action"]
if action == "completed":
return
@ -122,32 +130,40 @@ class Worker:
"message": traceback.format_exc()}
yield from self._send(reply, self.send_timeout)
@asyncio.coroutine
def _worker_action(self, obj, timeout=None):
if timeout is not None:
self.watchdogs[-1] = time.monotonic() + timeout
try:
yield from self._send(obj, self.send_timeout)
try:
yield from self._handle_worker_requests()
except WorkerTimeout:
raise WorkerWatchdogTimeout
finally:
if timeout is not None:
del self.watchdogs[-1]
@asyncio.coroutine
def prepare(self, rid, run_params):
yield from self._create_process()
try:
obj = {"action": "prepare", "rid": rid, "run_params": run_params}
yield from self._send(obj, self.send_timeout)
end_time = time.monotonic() + self.prepare_timeout
yield from self._handle_worker_requests(
lambda: end_time - time.monotonic())
yield from self._worker_action(
{"action": "prepare", "rid": rid, "run_params": run_params},
self.prepare_timeout)
except:
yield from self.close()
raise
@asyncio.coroutine
def _run_analyze(self, action):
obj = {"action": action}
yield from self._send(obj, self.send_timeout)
try:
yield from self._handle_worker_requests(self.watchdog_time)
except WorkerTimeout:
raise WorkerWatchdogTimeout
@asyncio.coroutine
def run(self):
yield from self._run_analyze("run")
yield from self._worker_action({"action": "run"})
@asyncio.coroutine
def analyze(self):
yield from self._run_analyze("analyze")
yield from self._worker_action({"action": "analyze"})
@asyncio.coroutine
def write_results(self):
yield from self._worker_action({"action": "write_results"},
self.results_timeout)

View File

@ -118,6 +118,8 @@ def main():
put_object({"action": "completed"})
elif action == "analyze":
exp_inst.analyze()
put_object({"action": "completed"})
elif action == "write_results":
f = get_hdf5_output(start_time, rid, exp.__name__)
try:
rdb.write_hdf5(f)