noptica2-sdr/gui_impl.py

122 lines
3.9 KiB
Python
Raw Normal View History

2020-08-10 18:56:08 +08:00
import asyncio
import os
2020-08-10 19:28:45 +08:00
import sys
2020-08-10 18:56:08 +08:00
import logging
import numpy as np
2020-08-11 16:09:24 +08:00
from scipy.signal import blackmanharris
2020-08-10 19:55:34 +08:00
from quamash import QEventLoop, QtWidgets, QtCore
2020-08-10 18:56:08 +08:00
import pyqtgraph as pg
from sipyco.pipe_ipc import AsyncioChildComm
from sipyco import pyon
2020-08-10 20:25:01 +08:00
class SpectrogramItem(pg.ImageItem):
2020-08-10 19:55:34 +08:00
def __init__(self, freq_sample, freq_base, block_size):
2020-08-10 20:25:01 +08:00
pg.ImageItem.__init__(self)
2020-08-10 19:55:34 +08:00
depth = 100
2020-08-11 16:20:15 +08:00
self.setOpts(axisOrder="row-major")
2020-08-10 19:55:34 +08:00
self.img_array = np.zeros((depth, block_size))
2020-08-11 16:09:24 +08:00
self.setImage(self.img_array, autoLevels=True, autoDownsample=True)
2020-08-11 16:20:15 +08:00
self.setRect(QtCore.QRectF((freq_base-freq_sample/2)/1e6, -float(depth), freq_sample/1e6, float(depth)))
2020-08-10 18:56:08 +08:00
2020-08-10 20:25:01 +08:00
def add_block(self, block):
2020-08-10 18:56:08 +08:00
self.img_array = np.roll(self.img_array, -1, 0)
2020-08-10 19:22:58 +08:00
self.img_array[-1:] = np.fft.fftshift(block)
2020-08-11 16:09:24 +08:00
self.setImage(self.img_array, autoLevels=True, autoDownsample=True)
2020-08-10 20:25:01 +08:00
class MainWindow(pg.GraphicsLayoutWidget):
2020-08-10 20:25:01 +08:00
def __init__(self, freq_sample, freq_base, block_size):
pg.GraphicsLayoutWidget.__init__(self)
2020-08-10 20:25:01 +08:00
self.setWindowTitle("NOPTICA Wavemeter")
self.freq_sample = freq_sample
self.freq_base = freq_base
2020-08-11 16:09:24 +08:00
self.block_size = block_size
2020-08-10 20:25:01 +08:00
self.text_ref = pg.LabelItem(size="24pt")
self.addItem(self.text_ref, row=0, col=0)
self.text_locked = pg.LabelItem(size="24pt")
self.addItem(self.text_locked, row=0, col=1)
2020-08-11 16:09:24 +08:00
self.update_ref(None, None, False)
2020-08-10 20:25:01 +08:00
p1 = self.addPlot(row=1, col=0, colspan=2)
2020-08-11 16:09:24 +08:00
self.ref_spectrum = SpectrogramItem(freq_sample, freq_base, block_size)
p1.addItem(self.ref_spectrum)
p2 = self.addPlot(row=2, col=0, colspan=2)
self.meas_spectrum = SpectrogramItem(freq_sample, freq_base, block_size)
p2.addItem(self.meas_spectrum)
2020-08-10 20:25:01 +08:00
2020-08-11 16:09:24 +08:00
def update_ref(self, block, peak_freq, locked):
if block is not None:
self.ref_spectrum.add_block(block)
2020-08-10 20:25:01 +08:00
if peak_freq is None:
self.text_ref.setText("REF: NO SIGNAL")
2020-08-10 20:25:01 +08:00
else:
2020-08-11 14:59:52 +08:00
self.text_ref.setText("REF: {:.3f}MHz".format((self.freq_base + peak_freq)/1e6))
if locked:
self.text_locked.setText("REF LASER LOCKED", color="00FF00")
else:
self.text_locked.setText("REF LASER UNLOCKED", color="FF0000")
2020-08-11 16:09:24 +08:00
def update_meas(self, block):
assert len(block) == self.block_size
spectrum = np.abs(np.fft.fft(block*blackmanharris(self.block_size)))
self.meas_spectrum.add_block(spectrum)
2020-08-10 18:56:08 +08:00
class IPCClient(AsyncioChildComm):
def set_close_cb(self, close_cb):
self.close_cb = close_cb
async def read_pyon(self):
line = await self.readline()
return pyon.decode(line.decode())
2020-08-10 20:25:01 +08:00
async def listen(self, main_window):
2020-08-10 18:56:08 +08:00
while True:
obj = await self.read_pyon()
try:
action = obj["action"]
2020-08-11 16:09:24 +08:00
del obj["action"]
2020-08-10 18:56:08 +08:00
if action == "terminate":
self.close_cb()
return
2020-08-11 16:09:24 +08:00
else:
getattr(main_window, action)(**obj)
2020-08-10 18:56:08 +08:00
except:
logging.error("error processing parent message",
exc_info=True)
self.close_cb()
def main():
2020-08-10 19:55:34 +08:00
freq_sample = float(sys.argv[1])
freq_base = float(sys.argv[2])
block_size = int(sys.argv[3])
2020-08-10 19:28:45 +08:00
2020-08-10 18:56:08 +08:00
app = QtWidgets.QApplication([])
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
try:
ipc = IPCClient(os.getenv("NOPTICA2_IPC"))
loop.run_until_complete(ipc.connect())
try:
2020-08-10 20:25:01 +08:00
main_window = MainWindow(freq_sample, freq_base, block_size)
2020-08-11 15:13:08 +08:00
main_window.showFullScreen()
2020-08-10 20:25:01 +08:00
ipc.set_close_cb(main_window.close)
asyncio.ensure_future(ipc.listen(main_window))
2020-08-10 18:56:08 +08:00
loop.run_forever()
finally:
ipc.close()
finally:
loop.close()
if __name__ == "__main__":
main()