noptica2-sdr/gui.py

73 lines
2.0 KiB
Python

import os
import sys
import subprocess
import time
import numpy as np
from sipyco import pyon
class ParentComm:
def __init__(self):
self.c_rfd, self.p_wfd = os.pipe()
self.p_rfd, self.c_wfd = os.pipe()
self.rf = open(int(self.p_rfd), "rb", 0)
self.wf = open(int(self.p_wfd), "wb", 0)
self.process = None
def get_address(self):
return "{},{}".format(self.c_rfd, self.c_wfd)
def read(self, n):
return self.rf.read(n)
def readline(self):
return self.rf.readline()
def write(self, data):
return self.wf.write(data)
def write_pyon(self, obj):
self.write(pyon.encode(obj).encode() + b"\n")
def close(self):
self.rf.close()
self.wf.close()
if self.process is not None:
self.process.wait()
def create_subprocess(self, *args):
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
env["NOPTICA2_IPC"] = self.get_address()
self.process = subprocess.Popen(
*args, pass_fds={self.c_rfd, self.c_wfd},
env=env)
os.close(self.c_rfd)
os.close(self.c_wfd)
class GUI:
def __init__(self, freq_sample, freq_base, block_size):
self.impl = ParentComm()
self.impl.create_subprocess([sys.executable,
os.path.join(os.path.dirname(os.path.abspath(__file__)), "gui_impl.py"),
str(freq_sample), str(freq_base), str(block_size)])
def update_ref(self, block, peak_freq, locked):
obj = {"action": "update_ref", "block": block, "peak_freq": peak_freq, "locked": locked}
self.impl.write_pyon(obj)
def update_meas(self, block):
obj = {"action": "update_meas", "block": block}
self.impl.write_pyon(obj)
def update_position(self, position):
obj = {"action": "update_position", "position": position}
self.impl.write_pyon(obj)
def close(self):
obj = {"action": "terminate"}
self.impl.write_pyon(obj)
self.impl.close()