diff --git a/artiq/protocols/pipe_ipc.py b/artiq/protocols/pipe_ipc.py index 910fe93c4..2955bcaf4 100644 --- a/artiq/protocols/pipe_ipc.py +++ b/artiq/protocols/pipe_ipc.py @@ -3,6 +3,9 @@ import asyncio from asyncio.streams import FlowControlMixin +__all__ = ["AsyncioParentComm", "AsyncioChildComm", "ChildComm"] + + class _BaseIO: def write(self, data): self.writer.write(data) @@ -92,10 +95,78 @@ if os.name != "nt": else: # windows - class AsyncioParentComm(_BaseIO): + import itertools + + + _pipe_count = itertools.count() + + + class AsyncioParentComm: + """Requires ProactorEventLoop""" + def __init__(self): + # We cannot use anonymous pipes on Windows, because we do not know + # in advance if the child process wants a handle open in overlapped + # mode or not. + self.address = "\\\\.\\pipe\\artiq-{}-{}".format(os.getpid(), + next(_pipe_count)) + self.server = None + self.ready = asyncio.Event() + self.write_buffer = b"" + + def get_address(self): + return self.address + async def _autoclose(self): await self.process.wait() - self.writer.close() + if self.server is not None: + self.server[0].close() + self.server = None + if self.ready.is_set(): + self.writer.close() + + async def create_subprocess(self, *args, **kwargs): + loop = asyncio.get_event_loop() + + def factory(): + reader = asyncio.StreamReader(loop=loop) + protocol = asyncio.StreamReaderProtocol(reader, + self._child_connected, + loop=loop) + return protocol + self.server = await loop.start_serving_pipe( + factory, self.address) + + self.process = await asyncio.create_subprocess_exec( + *args, **kwargs) + asyncio.ensure_future(self._autoclose()) + + def _child_connected(self, reader, writer): + self.server[0].close() + self.server = None + self.reader = reader + self.writer = writer + if self.write_buffer: + self.writer.write(self.write_buffer) + self.write_buffer = b"" + self.ready.set() + + def write(self, data): + if self.ready.is_set(): + self.writer.write(data) + else: + self.write_buffer += data + + async def drain(self): + await self.ready.wait() + await self.writer.drain() + + async def readline(self): + await self.ready.wait() + return await self.reader.readline() + + async def read(self, n): + await self.ready.wait() + return await self.reader.read(n) class AsyncioChildComm(_BaseIO): @@ -109,9 +180,26 @@ else: # windows reader_protocol = asyncio.StreamReaderProtocol( self.reader, loop=loop) transport, _ = await loop.create_pipe_connection( - self.address, lambda: reader_protocol) + lambda: reader_protocol, self.address) self.writer = asyncio.StreamWriter(transport, reader_protocol, self.reader, loop) + def close(self): + self.writer.close() + + class ChildComm: - pass + def __init__(self, address): + self.f = open(address, "a+b", 0) + + def read(self, n): + return self.f.read(n) + + def readline(self): + return self.f.readline() + + def write(self, data): + return self.f.write(data) + + def close(self): + self.f.close()