import os import sys import subprocess import time import numpy as np from sipyco import pyon # env QT_QPA_PLATFORM=wayland python gui_test.py class ParentComm: def __init__(self): self.c_rfd, self.p_wfd = os.pipe() self.p_rfd, self.c_wfd = os.pipe() self.rf = open(int(self.p_rfd), "rb", 0) self.wf = open(int(self.p_wfd), "wb", 0) self.process = None def get_address(self): return "{},{}".format(self.c_rfd, self.c_wfd) def read(self, n): return self.rf.read(n) def readline(self): return self.rf.readline() def write(self, data): return self.wf.write(data) def write_pyon(self, obj): self.write(pyon.encode(obj).encode() + b"\n") def close(self): self.rf.close() self.wf.close() if self.process is not None: self.process.wait() def create_subprocess(self, *args): env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" env["NOPTICA2_IPC"] = self.get_address() self.process = subprocess.Popen( *args, pass_fds={self.c_rfd, self.c_wfd}, env=env) os.close(self.c_rfd) os.close(self.c_wfd) def main(): gui = ParentComm() try: gui.create_subprocess([sys.executable, "gui.py"]) for i in range(600): obj = {"action": "update", "data": np.random.normal(size=4096)} gui.write_pyon(obj) time.sleep(1/60) obj = {"action": "terminate"} gui.write_pyon(obj) finally: gui.close() if __name__ == "__main__": main()