import os import sys import subprocess import time import numpy as np from sipyco import pyon class ParentComm: def __init__(self): self.c_rfd, self.p_wfd = os.pipe() self.p_rfd, self.c_wfd = os.pipe() self.rf = open(int(self.p_rfd), "rb", 0) self.wf = open(int(self.p_wfd), "wb", 0) self.process = None def get_address(self): return "{},{}".format(self.c_rfd, self.c_wfd) def read(self, n): return self.rf.read(n) def readline(self): return self.rf.readline() def write(self, data): return self.wf.write(data) def write_pyon(self, obj): self.write(pyon.encode(obj).encode() + b"\n") def close(self): self.rf.close() self.wf.close() if self.process is not None: self.process.wait() def create_subprocess(self, *args): env = os.environ.copy() env["PYTHONUNBUFFERED"] = "1" env["NOPTICA2_IPC"] = self.get_address() self.process = subprocess.Popen( *args, pass_fds={self.c_rfd, self.c_wfd}, env=env) os.close(self.c_rfd) os.close(self.c_wfd) class GUI: def __init__(self, freq_sample, freq_base, block_size): self.impl = ParentComm() self.impl.create_subprocess([sys.executable, os.path.join(os.path.dirname(os.path.abspath(__file__)), "gui_impl.py"), str(freq_sample), str(freq_base), str(block_size)]) def update_ref(self, block, peak_freq, locked): obj = {"action": "update_ref", "block": block, "peak_freq": peak_freq, "locked": locked} self.impl.write_pyon(obj) def update_meas(self, block): obj = {"action": "update_meas", "block": block} self.impl.write_pyon(obj) def update_position(self, position): obj = {"action": "update_position", "position": position} self.impl.write_pyon(obj) def close(self): obj = {"action": "terminate"} self.impl.write_pyon(obj) self.impl.close()