forked from M-Labs/thermostat
252 lines
8.7 KiB
Python
Executable File
252 lines
8.7 KiB
Python
Executable File
from pytec.gui.view.zero_limits_warning import ZeroLimitsWarningView
|
|
from pytec.gui.view.thermostat_ctrl_menu import ThermostatCtrlMenu
|
|
from pytec.gui.view.conn_menu import ConnMenu
|
|
from pytec.gui.view.plot_options_menu import PlotOptionsMenu
|
|
from pytec.gui.view.live_plot_view import LiveDataPlotter
|
|
from pytec.gui.view.ctrl_panel import CtrlPanel
|
|
from pytec.gui.view.info_box import InfoBox
|
|
from pytec.gui.model.pid_autotuner import PIDAutoTuner
|
|
from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState
|
|
import json
|
|
from autotune import PIDAutotuneState
|
|
from qasync import asyncSlot, asyncClose
|
|
import qasync
|
|
import asyncio
|
|
import logging
|
|
import argparse
|
|
from PyQt6 import QtWidgets, QtGui, uic
|
|
from PyQt6.QtCore import pyqtSlot
|
|
import importlib.resources
|
|
|
|
|
|
def get_argparser():
|
|
parser = argparse.ArgumentParser(description="Thermostat Control Panel")
|
|
|
|
parser.add_argument(
|
|
"--connect",
|
|
default=None,
|
|
action="store_true",
|
|
help="Automatically connect to the specified Thermostat in host:port format",
|
|
)
|
|
parser.add_argument("HOST", metavar="host", default=None, nargs="?")
|
|
parser.add_argument("PORT", metavar="port", default=None, nargs="?")
|
|
parser.add_argument(
|
|
"-l",
|
|
"--log",
|
|
dest="logLevel",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
|
help="Set the logging level",
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--param_tree",
|
|
default=importlib.resources.files("pytec.gui.view").joinpath("param_tree.json"),
|
|
help="Param Tree Description JSON File",
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
class MainWindow(QtWidgets.QMainWindow):
|
|
NUM_CHANNELS = 2
|
|
|
|
def __init__(self, args):
|
|
super().__init__()
|
|
|
|
ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui")
|
|
uic.loadUi(ui_file_path, self)
|
|
|
|
self._info_box = InfoBox()
|
|
|
|
# Models
|
|
self._thermostat = Thermostat(self, self.report_refresh_spin.value())
|
|
self._connecting_task = None
|
|
self._thermostat.connection_state_update.connect(self._on_connection_changed)
|
|
|
|
self._autotuners = PIDAutoTuner(self, self._thermostat, 2)
|
|
self._autotuners.autotune_state_changed.connect(self._pid_autotune_handler)
|
|
|
|
# Handlers for disconnections
|
|
async def autotune_disconnect():
|
|
for ch in range(self.NUM_CHANNELS):
|
|
if self._autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
|
|
await self._autotuners.stop_pid_from_running(ch)
|
|
self._thermostat.disconnect_cb = autotune_disconnect
|
|
|
|
@pyqtSlot()
|
|
def handle_connection_error():
|
|
self._info_box.display_info_box(
|
|
"Connection Error", "Thermostat connection lost. Is it unplugged?"
|
|
)
|
|
self._thermostat.connection_error.connect(handle_connection_error)
|
|
|
|
# Control Panel
|
|
def get_ctrl_panel_config(args):
|
|
with open(args.param_tree, "r", encoding="utf-8") as f:
|
|
return json.load(f)["ctrl_panel"]
|
|
|
|
self._ctrl_panel_view = CtrlPanel(
|
|
self._thermostat,
|
|
self._autotuners,
|
|
self._info_box,
|
|
[self.ch0_tree, self.ch1_tree],
|
|
get_ctrl_panel_config(args),
|
|
)
|
|
|
|
# Graphs
|
|
self._channel_graphs = LiveDataPlotter(
|
|
self._thermostat,
|
|
[
|
|
[getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")]
|
|
for ch in range(self.NUM_CHANNELS)
|
|
]
|
|
)
|
|
|
|
# Bottom bar menus
|
|
self._conn_menu = ConnMenu(self._thermostat, self.connect_btn)
|
|
self.connect_btn.setMenu(self._conn_menu)
|
|
|
|
self._thermostat_ctrl_menu = ThermostatCtrlMenu(
|
|
self._thermostat, self._info_box, self.style()
|
|
)
|
|
self.thermostat_settings.setMenu(self._thermostat_ctrl_menu)
|
|
|
|
self._plot_options_menu = PlotOptionsMenu(self._channel_graphs)
|
|
self.plot_settings.setMenu(self._plot_options_menu)
|
|
|
|
# Status line
|
|
self._zero_limits_warning = ZeroLimitsWarningView(
|
|
self._thermostat, self.style(), self.limits_warning
|
|
)
|
|
self.loading_spinner.hide()
|
|
|
|
self.report_apply_btn.clicked.connect(
|
|
lambda: self._thermostat.set_update_s(self.report_refresh_spin.value())
|
|
)
|
|
|
|
@asyncClose
|
|
async def closeEvent(self, _event):
|
|
try:
|
|
await self._thermostat.end_session()
|
|
self._thermostat.connection_state = ThermostatConnectionState.DISCONNECTED
|
|
except:
|
|
pass
|
|
|
|
@pyqtSlot(ThermostatConnectionState)
|
|
def _on_connection_changed(self, state):
|
|
self.graph_group.setEnabled(state == ThermostatConnectionState.CONNECTED)
|
|
self.thermostat_settings.setEnabled(
|
|
state == ThermostatConnectionState.CONNECTED
|
|
)
|
|
self.report_group.setEnabled(state == ThermostatConnectionState.CONNECTED)
|
|
|
|
match state:
|
|
case ThermostatConnectionState.CONNECTED:
|
|
self.connect_btn.setText("Disconnect")
|
|
self.status_lbl.setText(
|
|
"Connected to Thermostat v"
|
|
f"{self._thermostat.hw_rev['rev']['major']}."
|
|
f"{self._thermostat.hw_rev['rev']['minor']}"
|
|
)
|
|
|
|
case ThermostatConnectionState.CONNECTING:
|
|
self.connect_btn.setText("Stop")
|
|
self.status_lbl.setText("Connecting...")
|
|
|
|
case ThermostatConnectionState.DISCONNECTED:
|
|
self.connect_btn.setText("Connect")
|
|
self.status_lbl.setText("Disconnected")
|
|
self.report_box.setChecked(False)
|
|
|
|
@pyqtSlot(int, PIDAutotuneState)
|
|
def _pid_autotune_handler(self, _ch, _state):
|
|
ch_tuning = []
|
|
for ch in range(self.NUM_CHANNELS):
|
|
if self._autotuners.get_state(ch) in {
|
|
PIDAutotuneState.STATE_READY,
|
|
PIDAutotuneState.STATE_RELAY_STEP_UP,
|
|
PIDAutotuneState.STATE_RELAY_STEP_DOWN,
|
|
}:
|
|
ch_tuning.append(ch)
|
|
|
|
if len(ch_tuning) == 0:
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.hide()
|
|
self.loading_spinner.stop()
|
|
else:
|
|
self.background_task_lbl.setText(
|
|
"Autotuning channel {ch}...".format(ch=ch_tuning)
|
|
)
|
|
self.loading_spinner.start()
|
|
self.loading_spinner.show()
|
|
|
|
@asyncSlot()
|
|
async def on_connect_btn_clicked(self):
|
|
match self._thermostat.connection_state:
|
|
case ThermostatConnectionState.DISCONNECTED:
|
|
self._connecting_task = asyncio.create_task(
|
|
self._thermostat.start_session(
|
|
host=self._conn_menu.host_set_line.text(),
|
|
port=self._conn_menu.port_set_spin.value(),
|
|
)
|
|
)
|
|
self._thermostat.connection_state = ThermostatConnectionState.CONNECTING
|
|
await self._connecting_task
|
|
self._connecting_task = None
|
|
self._thermostat.connection_state = ThermostatConnectionState.CONNECTED
|
|
self._thermostat.start_watching()
|
|
|
|
case ThermostatConnectionState.CONNECTING:
|
|
self._connecting_task.cancel()
|
|
await self._thermostat.end_session()
|
|
self._thermostat.connection_state = (
|
|
ThermostatConnectionState.DISCONNECTED
|
|
)
|
|
self._connecting_task = None
|
|
|
|
case ThermostatConnectionState.CONNECTED:
|
|
await self._thermostat.end_session()
|
|
self._thermostat.connection_state = (
|
|
ThermostatConnectionState.DISCONNECTED
|
|
)
|
|
|
|
@asyncSlot(int)
|
|
async def on_report_box_stateChanged(self, enabled):
|
|
await self._thermostat.set_report_mode(enabled)
|
|
|
|
|
|
async def coro_main():
|
|
args = get_argparser().parse_args()
|
|
if args.logLevel:
|
|
logging.basicConfig(level=getattr(logging, args.logLevel))
|
|
|
|
app_quit_event = asyncio.Event()
|
|
|
|
app = QtWidgets.QApplication.instance()
|
|
app.aboutToQuit.connect(app_quit_event.set)
|
|
app.setWindowIcon(
|
|
QtGui.QIcon(
|
|
str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico"))
|
|
)
|
|
)
|
|
|
|
main_window = MainWindow(args)
|
|
main_window.show()
|
|
|
|
if args.connect:
|
|
if args.HOST:
|
|
main_window._conn_menu.host_set_line.setText(args.HOST)
|
|
if args.PORT:
|
|
main_window._conn_menu.port_set_spin.setValue(int(args.PORT))
|
|
main_window.connect_btn.click()
|
|
|
|
await app_quit_event.wait()
|
|
|
|
|
|
def main():
|
|
qasync.run(coro_main())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|