noptica2-sdr/gui_impl.py

131 lines
4.3 KiB
Python

import asyncio
import os
import sys
import logging
import numpy as np
from scipy.signal import blackmanharris
from quamash import QEventLoop, QtWidgets, QtCore
import pyqtgraph as pg
from sipyco.pipe_ipc import AsyncioChildComm
from sipyco import pyon
class SpectrogramItem(pg.ImageItem):
def __init__(self, freq_sample, freq_base, block_size):
pg.ImageItem.__init__(self)
depth = 100
self.setOpts(axisOrder="row-major")
self.img_array = np.zeros((depth, block_size))
self.setImage(self.img_array, autoLevels=True, autoDownsample=True)
self.setRect(QtCore.QRectF((freq_base-freq_sample/2)/1e6, -float(depth), freq_sample/1e6, float(depth)))
def add_block(self, block):
self.img_array = np.roll(self.img_array, -1, 0)
self.img_array[-1:] = np.fft.fftshift(block)
self.setImage(self.img_array, autoLevels=True, autoDownsample=True)
class MainWindow(pg.GraphicsLayoutWidget):
def __init__(self, freq_sample, freq_base, block_size):
pg.GraphicsLayoutWidget.__init__(self)
self.setWindowTitle("NOPTICA Wavemeter")
self.freq_sample = freq_sample
self.freq_base = freq_base
self.block_size = block_size
self.text_ref = pg.LabelItem(size="24pt")
self.addItem(self.text_ref, row=0, col=0)
self.text_locked = pg.LabelItem(size="24pt")
self.addItem(self.text_locked, row=0, col=1)
self.update_ref(None, None, False)
p1 = self.addPlot(row=1, col=0, colspan=2, title="REF spectrum")
self.ref_spectrum = SpectrogramItem(freq_sample, freq_base, block_size)
p1.addItem(self.ref_spectrum)
p2 = self.addPlot(row=2, col=0, colspan=2, title="MEAS spectrum")
self.meas_spectrum = SpectrogramItem(freq_sample, freq_base, block_size)
p2.addItem(self.meas_spectrum)
self.position = self.addPlot(row=3, col=0, colspan=2, title="Position (nm)")
self.position_history = np.zeros(300)
def update_ref(self, block, peak_freq, locked):
if block is not None:
self.ref_spectrum.add_block(block)
if peak_freq is None:
self.text_ref.setText("REF: NO SIGNAL")
else:
self.text_ref.setText("REF: {:.3f}MHz".format((self.freq_base + peak_freq)/1e6))
if locked:
self.text_locked.setText("REF LASER LOCKED", color="00FF00")
else:
self.text_locked.setText("REF LASER UNLOCKED", color="FF0000")
def update_meas(self, block):
assert len(block) == self.block_size
spectrum = np.abs(np.fft.fft(block*blackmanharris(self.block_size)))
self.meas_spectrum.add_block(spectrum)
def update_position(self, position):
self.position_history = np.roll(self.position_history, -1)
self.position_history[-1] = position*632.816/2
self.position.clear()
self.position.plot(self.position_history)
class IPCClient(AsyncioChildComm):
def set_close_cb(self, close_cb):
self.close_cb = close_cb
async def read_pyon(self):
line = await self.readline()
return pyon.decode(line.decode())
async def listen(self, main_window):
while True:
obj = await self.read_pyon()
try:
action = obj["action"]
del obj["action"]
if action == "terminate":
self.close_cb()
return
else:
getattr(main_window, action)(**obj)
except:
logging.error("error processing parent message",
exc_info=True)
self.close_cb()
def main():
freq_sample = float(sys.argv[1])
freq_base = float(sys.argv[2])
block_size = int(sys.argv[3])
app = QtWidgets.QApplication([])
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
try:
ipc = IPCClient(os.getenv("NOPTICA2_IPC"))
loop.run_until_complete(ipc.connect())
try:
main_window = MainWindow(freq_sample, freq_base, block_size)
main_window.showFullScreen()
ipc.set_close_cb(main_window.close)
asyncio.ensure_future(ipc.listen(main_window))
loop.run_forever()
finally:
ipc.close()
finally:
loop.close()
if __name__ == "__main__":
main()