forked from M-Labs/artiq
1
0
Fork 0

worker: use pipe_ipc (no log)

This commit is contained in:
Sebastien Bourdeauducq 2016-01-26 14:59:36 +01:00
parent 6383253e03
commit a583a923d8
2 changed files with 27 additions and 43 deletions

View File

@ -7,7 +7,7 @@ import traceback
import time import time
from functools import partial from functools import partial
from artiq.protocols import pyon from artiq.protocols import pipe_ipc, pyon
from artiq.tools import asyncio_wait_or_cancel from artiq.tools import asyncio_wait_or_cancel
@ -47,7 +47,7 @@ class Worker:
self.rid = None self.rid = None
self.filename = None self.filename = None
self.process = None self.ipc = None
self.watchdogs = dict() # wid -> expiration (using time.monotonic) self.watchdogs = dict() # wid -> expiration (using time.monotonic)
self.io_lock = asyncio.Lock() self.io_lock = asyncio.Lock()
@ -77,10 +77,10 @@ class Worker:
try: try:
if self.closed.is_set(): if self.closed.is_set():
raise WorkerError("Attempting to create process after close") raise WorkerError("Attempting to create process after close")
self.process = await asyncio.create_subprocess_exec( self.ipc = pipe_ipc.AsyncioParentComm()
await self.ipc.create_subprocess(
sys.executable, "-m", "artiq.master.worker_impl", sys.executable, "-m", "artiq.master.worker_impl",
str(log_level), self.ipc.get_address(), str(log_level))
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
finally: finally:
self.io_lock.release() self.io_lock.release()
@ -93,15 +93,15 @@ class Worker:
self.closed.set() self.closed.set()
await self.io_lock.acquire() await self.io_lock.acquire()
try: try:
if self.process is None: if self.ipc is None:
# Note the %s - self.rid can be None # Note the %s - self.rid can be None
logger.debug("worker was not created (RID %s)", self.rid) logger.debug("worker was not created (RID %s)", self.rid)
return return
if self.process.returncode is not None: if self.ipc.process.returncode is not None:
logger.debug("worker already terminated (RID %s)", self.rid) logger.debug("worker already terminated (RID %s)", self.rid)
if self.process.returncode != 0: if self.ipc.process.returncode != 0:
logger.warning("worker finished with status code %d" logger.warning("worker finished with status code %d"
" (RID %s)", self.process.returncode, " (RID %s)", self.ipc.process.returncode,
self.rid) self.rid)
return return
obj = {"action": "terminate"} obj = {"action": "terminate"}
@ -111,21 +111,21 @@ class Worker:
logger.debug("failed to send terminate command to worker" logger.debug("failed to send terminate command to worker"
" (RID %s), killing", self.rid, exc_info=True) " (RID %s), killing", self.rid, exc_info=True)
try: try:
self.process.kill() self.ipc.process.kill()
except ProcessLookupError: except ProcessLookupError:
pass pass
await self.process.wait() await self.ipc.process.wait()
return return
try: try:
await asyncio.wait_for(self.process.wait(), term_timeout) await asyncio.wait_for(self.ipc.process.wait(), term_timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.debug("worker did not exit by itself (RID %s), killing", logger.debug("worker did not exit by itself (RID %s), killing",
self.rid) self.rid)
try: try:
self.process.kill() self.ipc.process.kill()
except ProcessLookupError: except ProcessLookupError:
pass pass
await self.process.wait() await self.ipc.process.wait()
else: else:
logger.debug("worker exited by itself (RID %s)", self.rid) logger.debug("worker exited by itself (RID %s)", self.rid)
finally: finally:
@ -134,9 +134,8 @@ class Worker:
async def _send(self, obj, cancellable=True): async def _send(self, obj, cancellable=True):
assert self.io_lock.locked() assert self.io_lock.locked()
line = pyon.encode(obj) line = pyon.encode(obj)
self.process.stdin.write(line.encode()) self.ipc.write((line + "\n").encode())
self.process.stdin.write("\n".encode()) ifs = [self.ipc.drain()]
ifs = [self.process.stdin.drain()]
if cancellable: if cancellable:
ifs.append(self.closed.wait()) ifs.append(self.closed.wait())
fs = await asyncio_wait_or_cancel( fs = await asyncio_wait_or_cancel(
@ -153,7 +152,7 @@ class Worker:
async def _recv(self, timeout): async def _recv(self, timeout):
assert self.io_lock.locked() assert self.io_lock.locked()
fs = await asyncio_wait_or_cancel( fs = await asyncio_wait_or_cancel(
[self.process.stdout.readline(), self.closed.wait()], [self.ipc.readline(), self.closed.wait()],
timeout=timeout, return_when=asyncio.FIRST_COMPLETED) timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
if all(f.cancelled() for f in fs): if all(f.cancelled() for f in fs):
raise WorkerTimeout("Timeout receiving data from worker") raise WorkerTimeout("Timeout receiving data from worker")

View File

@ -6,7 +6,7 @@ import traceback
from collections import OrderedDict from collections import OrderedDict
import artiq import artiq
from artiq.protocols import pyon from artiq.protocols import pipe_ipc, pyon
from artiq.tools import file_import from artiq.tools import file_import
from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output
from artiq.language.environment import is_experiment from artiq.language.environment import is_experiment
@ -15,16 +15,16 @@ from artiq.coredevice.core import CompileError, host_only, _render_diagnostic
from artiq import __version__ as artiq_version from artiq import __version__ as artiq_version
ipc = None
def get_object(): def get_object():
line = sys.__stdin__.readline() line = ipc.readline().decode()
return pyon.decode(line) return pyon.decode(line)
def put_object(obj): def put_object(obj):
ds = pyon.encode(obj) ds = pyon.encode(obj)
sys.__stdout__.write(ds) ipc.write((ds + "\n").encode())
sys.__stdout__.write("\n")
sys.__stdout__.flush()
class ParentActionError(Exception): class ParentActionError(Exception):
@ -53,23 +53,6 @@ def make_parent_action(action, exception=None):
return parent_action return parent_action
class LogForwarder:
def __init__(self):
self.buffer = ""
to_parent = staticmethod(make_parent_action("log"))
def write(self, data):
self.buffer += data
while "\n" in self.buffer:
i = self.buffer.index("\n")
self.to_parent(self.buffer[:i])
self.buffer = self.buffer[i+1:]
def flush(self):
pass
class ParentDeviceDB: class ParentDeviceDB:
get_device_db = make_parent_action("get_device_db") get_device_db = make_parent_action("get_device_db")
get = make_parent_action("get_device", KeyError) get = make_parent_action("get_device", KeyError)
@ -202,9 +185,10 @@ def setup_diagnostics(experiment_file, repository_path):
def main(): def main():
sys.stdout = LogForwarder() global ipc
sys.stderr = LogForwarder()
logging.basicConfig(level=int(sys.argv[1])) logging.basicConfig(level=int(sys.argv[2]))
ipc = pipe_ipc.ChildComm(sys.argv[1])
start_time = None start_time = None
rid = None rid = None
@ -277,6 +261,7 @@ def main():
put_object({"action": "exception"}) put_object({"action": "exception"})
finally: finally:
device_mgr.close_devices() device_mgr.close_devices()
ipc.close()
if __name__ == "__main__": if __name__ == "__main__":