diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 16b782a54..194e24ed7 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -7,7 +7,7 @@ import traceback import time from functools import partial -from artiq.protocols import pyon +from artiq.protocols import pipe_ipc, pyon from artiq.tools import asyncio_wait_or_cancel @@ -47,7 +47,7 @@ class Worker: self.rid = None self.filename = None - self.process = None + self.ipc = None self.watchdogs = dict() # wid -> expiration (using time.monotonic) self.io_lock = asyncio.Lock() @@ -77,10 +77,10 @@ class Worker: try: if self.closed.is_set(): raise WorkerError("Attempting to create process after close") - self.process = await asyncio.create_subprocess_exec( + self.ipc = pipe_ipc.AsyncioParentComm() + await self.ipc.create_subprocess( sys.executable, "-m", "artiq.master.worker_impl", - str(log_level), - stdout=subprocess.PIPE, stdin=subprocess.PIPE) + self.ipc.get_address(), str(log_level)) finally: self.io_lock.release() @@ -93,15 +93,15 @@ class Worker: self.closed.set() await self.io_lock.acquire() try: - if self.process is None: + if self.ipc is None: # Note the %s - self.rid can be None logger.debug("worker was not created (RID %s)", self.rid) return - if self.process.returncode is not None: + if self.ipc.process.returncode is not None: logger.debug("worker already terminated (RID %s)", self.rid) - if self.process.returncode != 0: + if self.ipc.process.returncode != 0: logger.warning("worker finished with status code %d" - " (RID %s)", self.process.returncode, + " (RID %s)", self.ipc.process.returncode, self.rid) return obj = {"action": "terminate"} @@ -111,21 +111,21 @@ class Worker: logger.debug("failed to send terminate command to worker" " (RID %s), killing", self.rid, exc_info=True) try: - self.process.kill() + self.ipc.process.kill() except ProcessLookupError: pass - await self.process.wait() + await self.ipc.process.wait() return try: - await asyncio.wait_for(self.process.wait(), term_timeout) + await asyncio.wait_for(self.ipc.process.wait(), term_timeout) except asyncio.TimeoutError: logger.debug("worker did not exit by itself (RID %s), killing", self.rid) try: - self.process.kill() + self.ipc.process.kill() except ProcessLookupError: pass - await self.process.wait() + await self.ipc.process.wait() else: logger.debug("worker exited by itself (RID %s)", self.rid) finally: @@ -134,9 +134,8 @@ class Worker: async def _send(self, obj, cancellable=True): assert self.io_lock.locked() line = pyon.encode(obj) - self.process.stdin.write(line.encode()) - self.process.stdin.write("\n".encode()) - ifs = [self.process.stdin.drain()] + self.ipc.write((line + "\n").encode()) + ifs = [self.ipc.drain()] if cancellable: ifs.append(self.closed.wait()) fs = await asyncio_wait_or_cancel( @@ -153,7 +152,7 @@ class Worker: async def _recv(self, timeout): assert self.io_lock.locked() fs = await asyncio_wait_or_cancel( - [self.process.stdout.readline(), self.closed.wait()], + [self.ipc.readline(), self.closed.wait()], timeout=timeout, return_when=asyncio.FIRST_COMPLETED) if all(f.cancelled() for f in fs): raise WorkerTimeout("Timeout receiving data from worker") diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index d8bf5cb82..3a9664ae6 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -6,7 +6,7 @@ import traceback from collections import OrderedDict import artiq -from artiq.protocols import pyon +from artiq.protocols import pipe_ipc, pyon from artiq.tools import file_import from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output from artiq.language.environment import is_experiment @@ -15,16 +15,16 @@ from artiq.coredevice.core import CompileError, host_only, _render_diagnostic from artiq import __version__ as artiq_version +ipc = None + def get_object(): - line = sys.__stdin__.readline() + line = ipc.readline().decode() return pyon.decode(line) def put_object(obj): ds = pyon.encode(obj) - sys.__stdout__.write(ds) - sys.__stdout__.write("\n") - sys.__stdout__.flush() + ipc.write((ds + "\n").encode()) class ParentActionError(Exception): @@ -53,23 +53,6 @@ def make_parent_action(action, exception=None): return parent_action -class LogForwarder: - def __init__(self): - self.buffer = "" - - to_parent = staticmethod(make_parent_action("log")) - - def write(self, data): - self.buffer += data - while "\n" in self.buffer: - i = self.buffer.index("\n") - self.to_parent(self.buffer[:i]) - self.buffer = self.buffer[i+1:] - - def flush(self): - pass - - class ParentDeviceDB: get_device_db = make_parent_action("get_device_db") get = make_parent_action("get_device", KeyError) @@ -202,9 +185,10 @@ def setup_diagnostics(experiment_file, repository_path): def main(): - sys.stdout = LogForwarder() - sys.stderr = LogForwarder() - logging.basicConfig(level=int(sys.argv[1])) + global ipc + + logging.basicConfig(level=int(sys.argv[2])) + ipc = pipe_ipc.ChildComm(sys.argv[1]) start_time = None rid = None @@ -277,6 +261,7 @@ def main(): put_object({"action": "exception"}) finally: device_mgr.close_devices() + ipc.close() if __name__ == "__main__":