forked from M-Labs/thermostat
atse
c549d0344e
For communicating with the autotuner before the client fully disconnects Also then there's no need for explicitly resetting autotune elements
248 lines
8.2 KiB
Python
Executable File
248 lines
8.2 KiB
Python
Executable File
from pytec.gui.view.zero_limits_warning import ZeroLimitsWarningView
|
|
from pytec.gui.view.thermostat_ctrl_menu import ThermostatCtrlMenu
|
|
from pytec.gui.view.conn_menu import ConnMenu
|
|
from pytec.gui.view.plot_options_menu import PlotOptionsMenu
|
|
from pytec.gui.view.live_plot_view import LiveDataPlotter
|
|
from pytec.gui.view.ctrl_panel import CtrlPanel
|
|
from pytec.gui.view.info_box import InfoBox
|
|
from pytec.gui.model.pid_autotuner import PIDAutoTuner
|
|
from pytec.gui.model.thermostat import Thermostat, ThermostatConnectionState
|
|
import json
|
|
from autotune import PIDAutotuneState
|
|
from qasync import asyncSlot, asyncClose
|
|
import qasync
|
|
import asyncio
|
|
import logging
|
|
import argparse
|
|
from PyQt6 import QtWidgets, QtGui, uic
|
|
from PyQt6.QtCore import QSignalBlocker, pyqtSlot
|
|
import pyqtgraph as pg
|
|
from functools import partial
|
|
import importlib.resources
|
|
|
|
|
|
def get_argparser():
|
|
parser = argparse.ArgumentParser(description="Thermostat Control Panel")
|
|
|
|
parser.add_argument(
|
|
"--connect",
|
|
default=None,
|
|
action="store_true",
|
|
help="Automatically connect to the specified Thermostat in host:port format",
|
|
)
|
|
parser.add_argument("HOST", metavar="host", default=None, nargs="?")
|
|
parser.add_argument("PORT", metavar="port", default=None, nargs="?")
|
|
parser.add_argument(
|
|
"-l",
|
|
"--log",
|
|
dest="logLevel",
|
|
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
|
|
help="Set the logging level",
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--param_tree",
|
|
default=importlib.resources.files("pytec.gui.view").joinpath("param_tree.json"),
|
|
help="Param Tree Description JSON File",
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
class MainWindow(QtWidgets.QMainWindow):
|
|
NUM_CHANNELS = 2
|
|
|
|
def __init__(self, args):
|
|
super().__init__()
|
|
|
|
ui_file_path = importlib.resources.files("pytec.gui.view").joinpath("tec_qt.ui")
|
|
uic.loadUi(ui_file_path, self)
|
|
|
|
self.info_box = InfoBox()
|
|
|
|
self.thermostat = Thermostat(self, self.report_refresh_spin.value())
|
|
self._connecting_task = None
|
|
|
|
async def autotune_disconnect():
|
|
for ch in range(self.NUM_CHANNELS):
|
|
if self.autotuners.get_state(ch) != PIDAutotuneState.STATE_OFF:
|
|
await self.autotuners.stop_pid_from_running(ch)
|
|
self.thermostat.disconnect_cb = autotune_disconnect
|
|
|
|
@asyncSlot()
|
|
async def handle_connection_error():
|
|
self.info_box.display_info_box(
|
|
"Connection Error", "Thermostat connection lost. Is it unplugged?"
|
|
)
|
|
await self.thermostat.end_session()
|
|
|
|
self.thermostat.connection_error.connect(handle_connection_error)
|
|
|
|
self.thermostat.connection_state_changed.connect(self._on_connection_changed)
|
|
|
|
self.autotuners = PIDAutoTuner(self, self.thermostat, 2)
|
|
self.autotuners.autotune_state_changed.connect(self.pid_autotune_handler)
|
|
|
|
def get_ctrl_panel_config(args):
|
|
with open(args.param_tree, "r", encoding="utf-8") as f:
|
|
return json.load(f)["ctrl_panel"]
|
|
|
|
self.ctrl_panel_view = CtrlPanel(
|
|
self.thermostat,
|
|
self.autotuners,
|
|
self.info_box,
|
|
[self.ch0_tree, self.ch1_tree],
|
|
get_ctrl_panel_config(args),
|
|
)
|
|
|
|
self.zero_limits_warning = ZeroLimitsWarningView(
|
|
self.style(), self.limits_warning
|
|
)
|
|
self.ctrl_panel_view.set_zero_limits_warning_sig.connect(
|
|
self.zero_limits_warning.set_limits_warning
|
|
)
|
|
|
|
self.report_apply_btn.clicked.connect(
|
|
lambda: self.thermostat.set_update_s(self.report_refresh_spin.value())
|
|
)
|
|
|
|
self.channel_graphs = LiveDataPlotter(
|
|
self.thermostat,
|
|
[
|
|
[getattr(self, f"ch{ch}_t_graph"), getattr(self, f"ch{ch}_i_graph")]
|
|
for ch in range(self.NUM_CHANNELS)
|
|
]
|
|
)
|
|
|
|
self.plot_options_menu = PlotOptionsMenu(self.channel_graphs)
|
|
self.plot_settings.setMenu(self.plot_options_menu)
|
|
|
|
self.conn_menu = ConnMenu(self.thermostat, self.connect_btn)
|
|
self.connect_btn.setMenu(self.conn_menu)
|
|
|
|
self.thermostat_ctrl_menu = ThermostatCtrlMenu(
|
|
self.thermostat, self.info_box, self.style()
|
|
)
|
|
self.thermostat_settings.setMenu(self.thermostat_ctrl_menu)
|
|
|
|
self.loading_spinner.hide()
|
|
|
|
@asyncSlot(ThermostatConnectionState)
|
|
async def _on_connection_changed(self, state):
|
|
self.graph_group.setEnabled(state == ThermostatConnectionState.CONNECTED)
|
|
self.report_group.setEnabled(state == ThermostatConnectionState.CONNECTED)
|
|
self.thermostat_settings.setEnabled(
|
|
state == ThermostatConnectionState.CONNECTED
|
|
)
|
|
|
|
match state:
|
|
case ThermostatConnectionState.CONNECTED:
|
|
self.connect_btn.setText("Disconnect")
|
|
self.status_lbl.setText(
|
|
"Connected to Thermostat v"
|
|
f"{self.thermostat.hw_rev['rev']['major']}."
|
|
f"{self.thermostat.hw_rev['rev']['minor']}"
|
|
)
|
|
|
|
case ThermostatConnectionState.CONNECTING:
|
|
self.connect_btn.setText("Stop")
|
|
self.status_lbl.setText("Connecting...")
|
|
|
|
case ThermostatConnectionState.DISCONNECTED:
|
|
self.connect_btn.setText("Connect")
|
|
self.status_lbl.setText("Disconnected")
|
|
self.report_box.setChecked(False)
|
|
|
|
@asyncSlot(int)
|
|
async def on_report_box_stateChanged(self, enabled):
|
|
await self.thermostat.set_report_mode(enabled)
|
|
|
|
@asyncClose
|
|
async def closeEvent(self, _event):
|
|
try:
|
|
await self.thermostat.end_session()
|
|
except:
|
|
pass
|
|
|
|
@asyncSlot()
|
|
async def on_connect_btn_clicked(self):
|
|
if (self._connecting_task is None) and (not self.thermostat.connected()):
|
|
self._connecting_task = asyncio.create_task(
|
|
self.thermostat.start_session(
|
|
host=self.conn_menu.host_set_line.text(),
|
|
port=self.conn_menu.port_set_spin.value(),
|
|
)
|
|
)
|
|
try:
|
|
await self._connecting_task
|
|
except (OSError, asyncio.CancelledError) as exc:
|
|
await self.thermostat.end_session()
|
|
if isinstance(exc, asyncio.CancelledError):
|
|
return
|
|
raise
|
|
finally:
|
|
self._connecting_task = None
|
|
|
|
elif self._connecting_task is not None:
|
|
self._connecting_task.cancel()
|
|
else:
|
|
await self.thermostat.end_session()
|
|
|
|
@asyncSlot(int, PIDAutotuneState)
|
|
async def pid_autotune_handler(self, _ch, _state):
|
|
ch_tuning = []
|
|
for ch in range(self.NUM_CHANNELS):
|
|
if self.autotuners.get_state(ch) in {
|
|
PIDAutotuneState.STATE_READY,
|
|
PIDAutotuneState.STATE_RELAY_STEP_UP,
|
|
PIDAutotuneState.STATE_RELAY_STEP_DOWN,
|
|
}:
|
|
ch_tuning.append(ch)
|
|
|
|
if len(ch_tuning) == 0:
|
|
self.background_task_lbl.setText("Ready.")
|
|
self.loading_spinner.hide()
|
|
self.loading_spinner.stop()
|
|
else:
|
|
self.background_task_lbl.setText(
|
|
"Autotuning channel {ch}...".format(ch=ch_tuning)
|
|
)
|
|
self.loading_spinner.start()
|
|
self.loading_spinner.show()
|
|
|
|
|
|
async def coro_main():
|
|
args = get_argparser().parse_args()
|
|
if args.logLevel:
|
|
logging.basicConfig(level=getattr(logging, args.logLevel))
|
|
|
|
app_quit_event = asyncio.Event()
|
|
|
|
app = QtWidgets.QApplication.instance()
|
|
app.aboutToQuit.connect(app_quit_event.set)
|
|
app.setWindowIcon(
|
|
QtGui.QIcon(
|
|
str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico"))
|
|
)
|
|
)
|
|
|
|
main_window = MainWindow(args)
|
|
main_window.show()
|
|
|
|
if args.connect:
|
|
if args.HOST:
|
|
main_window.conn_menu.host_set_line.setText(args.HOST)
|
|
if args.PORT:
|
|
main_window.conn_menu.port_set_spin.setValue(int(args.PORT))
|
|
main_window.connect_btn.click()
|
|
|
|
await app_quit_event.wait()
|
|
|
|
|
|
def main():
|
|
qasync.run(coro_main())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|