artiq/artiq/master/worker.py

249 lines
8.5 KiB
Python
Raw Normal View History

import sys
import asyncio
2015-05-17 16:11:00 +08:00
import logging
import subprocess
import traceback
import time
2014-10-25 16:31:34 +08:00
from artiq.protocols import pyon
from artiq.tools import (asyncio_process_wait_timeout, asyncio_process_wait,
asyncio_wait_or_cancel)
2015-05-17 16:11:00 +08:00
logger = logging.getLogger(__name__)
class WorkerTimeout(Exception):
pass
class WorkerWatchdogTimeout(Exception):
pass
class WorkerError(Exception):
2014-12-31 17:41:22 +08:00
pass
class Worker:
def __init__(self, handlers,
2015-03-12 02:06:46 +08:00
send_timeout=0.5, term_timeout=1.0,
prepare_timeout=15.0, results_timeout=15.0):
self.handlers = handlers
self.send_timeout = send_timeout
self.term_timeout = term_timeout
2015-03-12 02:06:46 +08:00
self.prepare_timeout = prepare_timeout
self.results_timeout = results_timeout
2015-05-17 16:11:00 +08:00
self.rid = None
self.process = None
self.watchdogs = dict() # wid -> expiration (using time.monotonic)
2015-05-17 16:11:00 +08:00
self.io_lock = asyncio.Lock()
self.closed = asyncio.Event()
def create_watchdog(self, t):
2015-03-12 02:06:46 +08:00
n_user_watchdogs = len(self.watchdogs)
if -1 in self.watchdogs:
n_user_watchdogs -= 1
avail = set(range(n_user_watchdogs + 1)) \
- set(self.watchdogs.keys())
wid = next(iter(avail))
self.watchdogs[wid] = time.monotonic() + t
return wid
def delete_watchdog(self, wid):
del self.watchdogs[wid]
def watchdog_time(self):
if self.watchdogs:
return min(self.watchdogs.values()) - time.monotonic()
else:
return None
@asyncio.coroutine
def _create_process(self):
2015-05-17 16:11:00 +08:00
yield from self.io_lock.acquire()
try:
if self.closed.is_set():
raise WorkerError("Attempting to create process after close")
self.process = yield from asyncio.create_subprocess_exec(
sys.executable, "-m", "artiq.master.worker_impl",
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
finally:
self.io_lock.release()
@asyncio.coroutine
def close(self):
"""Interrupts any I/O with the worker process and terminates the
worker process.
This method should always be called by the user to clean up, even if
prepare() raises an exception."""
2015-05-17 16:11:00 +08:00
self.closed.set()
yield from self.io_lock.acquire()
try:
2015-05-17 16:11:00 +08:00
if self.process is None:
# Note the %s - self.rid can be None
logger.debug("worker was not created (RID %s)", self.rid)
return
if self.process.returncode is not None:
logger.debug("worker already terminated (RID %d)", self.rid)
if self.process.returncode != 0:
logger.warning("worker finished with status code %d"
" (RID %d)", self.process.returncode,
self.rid)
return
obj = {"action": "terminate"}
try:
yield from self._send(obj, self.send_timeout, cancellable=False)
except:
logger.warning("failed to send terminate command to worker"
" (RID %d), killing", self.rid, exc_info=True)
self.process.kill()
yield from asyncio_process_wait(self.process)
2015-05-17 16:11:00 +08:00
return
try:
yield from asyncio_process_wait_timeout(self.process,
self.term_timeout)
except asyncio.TimeoutError:
logger.warning("worker did not exit (RID %d), killing", self.rid)
self.process.kill()
yield from asyncio_process_wait(self.process)
2015-05-17 16:11:00 +08:00
else:
logger.debug("worker exited gracefully (RID %d)", self.rid)
finally:
self.io_lock.release()
@asyncio.coroutine
2015-05-17 16:11:00 +08:00
def _send(self, obj, timeout, cancellable=True):
assert self.io_lock.locked()
2014-10-25 16:31:34 +08:00
line = pyon.encode(obj)
self.process.stdin.write(line.encode())
self.process.stdin.write("\n".encode())
2015-05-17 16:11:00 +08:00
ifs = [self.process.stdin.drain()]
if cancellable:
ifs.append(self.closed.wait())
fs = yield from asyncio_wait_or_cancel(
ifs, timeout=timeout,
return_when=asyncio.FIRST_COMPLETED)
if all(f.cancelled() for f in fs):
raise WorkerTimeout("Timeout sending data to worker")
for f in fs:
if not f.cancelled() and f.done():
f.result() # raise any exceptions
if cancellable and self.closed.is_set():
raise WorkerError("Data transmission to worker cancelled")
@asyncio.coroutine
def _recv(self, timeout):
2015-05-17 16:11:00 +08:00
assert self.io_lock.locked()
fs = yield from asyncio_wait_or_cancel(
[self.process.stdout.readline(), self.closed.wait()],
timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
if all(f.cancelled() for f in fs):
raise WorkerTimeout("Timeout sending data to worker")
if self.closed.is_set():
raise WorkerError("Data transmission to worker cancelled")
line = fs[0].result()
if not line:
raise WorkerError("Worker ended while attempting to receive data")
try:
2014-10-25 16:31:34 +08:00
obj = pyon.decode(line.decode())
except:
raise WorkerError("Worker sent invalid PYON data")
return obj
@asyncio.coroutine
2015-03-12 02:06:46 +08:00
def _handle_worker_requests(self):
while True:
2015-03-12 02:06:46 +08:00
try:
2015-05-17 16:11:00 +08:00
yield from self.io_lock.acquire()
try:
obj = yield from self._recv(self.watchdog_time())
finally:
self.io_lock.release()
2015-03-12 02:06:46 +08:00
except WorkerTimeout:
raise WorkerWatchdogTimeout
action = obj["action"]
if action == "completed":
2015-05-17 16:11:00 +08:00
return True
elif action == "pause":
return False
del obj["action"]
if action == "create_watchdog":
func = self.create_watchdog
elif action == "delete_watchdog":
func = self.delete_watchdog
else:
func = self.handlers[action]
try:
data = func(**obj)
reply = {"status": "ok", "data": data}
except:
reply = {"status": "failed",
"message": traceback.format_exc()}
2015-05-17 16:11:00 +08:00
yield from self.io_lock.acquire()
try:
yield from self._send(reply, self.send_timeout)
finally:
self.io_lock.release()
2015-03-12 02:06:46 +08:00
@asyncio.coroutine
def _worker_action(self, obj, timeout=None):
if timeout is not None:
self.watchdogs[-1] = time.monotonic() + timeout
try:
2015-05-17 16:11:00 +08:00
yield from self.io_lock.acquire()
try:
yield from self._send(obj, self.send_timeout)
finally:
self.io_lock.release()
2015-03-12 02:06:46 +08:00
try:
2015-05-17 16:11:00 +08:00
completed = yield from self._handle_worker_requests()
2015-03-12 02:06:46 +08:00
except WorkerTimeout:
raise WorkerWatchdogTimeout
finally:
if timeout is not None:
del self.watchdogs[-1]
2015-05-17 16:11:00 +08:00
return completed
2015-03-12 02:06:46 +08:00
@asyncio.coroutine
def prepare(self, rid, pipeline_name, expid, priority):
2015-05-17 16:11:00 +08:00
self.rid = rid
yield from self._create_process()
2015-05-17 16:11:00 +08:00
yield from self._worker_action(
{"action": "prepare",
"rid": rid,
"pipeline_name": pipeline_name,
"expid": expid,
"priority": priority},
2015-05-17 16:11:00 +08:00
self.prepare_timeout)
@asyncio.coroutine
def run(self):
2015-05-17 16:11:00 +08:00
completed = yield from self._worker_action({"action": "run"})
if not completed:
self.yield_time = time.monotonic()
return completed
@asyncio.coroutine
def resume(self):
stop_duration = time.monotonic() - self.yield_time
for wid, expiry in self.watchdogs:
self.watchdogs[wid] += stop_duration
completed = yield from self._worker_action({"status": "ok",
"data": None})
if not completed:
self.yield_time = time.monotonic()
return completed
@asyncio.coroutine
def analyze(self):
2015-03-12 02:06:46 +08:00
yield from self._worker_action({"action": "analyze"})
@asyncio.coroutine
def write_results(self):
yield from self._worker_action({"action": "write_results"},
self.results_timeout)