Make Ui_MainWindow a superclass of our main window

Gets rid of the global ui.
This commit is contained in:
atse 2023-07-01 23:46:40 +08:00
parent 299ef7dcc3
commit c6ca2b3490
1 changed files with 101 additions and 107 deletions

View File

@ -1,4 +1,4 @@
from PyQt6 import QtWidgets, uic from PyQt6 import QtWidgets
from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot
from pyqtgraph import PlotWidget from pyqtgraph import PlotWidget
from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType
@ -16,11 +16,7 @@ from ui_tec_qt import Ui_MainWindow
tec_client: Client = None tec_client: Client = None
# ui = None
ui: Ui_MainWindow = None
client_watcher = None client_watcher = None
client_watcher_task = None
def get_argparser(): def get_argparser():
@ -62,105 +58,118 @@ class ClientWatcher(QObject):
self.running = False self.running = False
@pyqtSlot() @pyqtSlot()
def set_update_s(self): def set_update_s(self, update_s):
self.update_s = ui.report_refresh_spin.value() self.update_s = update_s
def on_connection_changed(result): class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow):
global client_watcher, client_watcher_task def __init__(self, args):
ui.graph_group.setEnabled(result) super().__init__()
ui.hw_rev_lbl.setEnabled(result)
ui.fan_group.setEnabled(result)
ui.report_group.setEnabled(result)
ui.ip_set_line.setEnabled(not result) self.setupUi(self)
ui.port_set_spin.setEnabled(not result)
ui.status_lbl.setText("Connected" if result else "Disconnected")
ui.connect_btn.setText("Disconnect" if result else "Connect")
if not result:
ui.hw_rev_lbl.setText("Thermostat vX.Y")
ui.fan_group.setStyleSheet("")
if client_watcher:
client_watcher.stop_watching()
client_watcher = None
client_watcher_task = None
self.connect_btn.clicked.connect(self.connect)
self.fan_power_slider.valueChanged.connect(self.fan_set)
self.fan_auto_box.stateChanged.connect(self.fan_auto_set)
def hw_rev(hw_rev_d: dict): self.client_watcher_task = None
logging.debug(hw_rev_d)
ui.hw_rev_lbl.setText(f"Thermostat v{hw_rev_d['rev']['major']}.{hw_rev_d['rev']['major']}")
ui.fan_group.setEnabled(hw_rev_d["settings"]["fan_available"])
if hw_rev_d["settings"]["fan_pwm_recommended"]:
ui.fan_group.setStyleSheet("")
ui.fan_group.setToolTip("")
else:
ui.fan_group.setStyleSheet("background-color: yellow")
ui.fan_group.setToolTip("Changing the fan settings of not recommended")
if args.connect:
if args.IP:
self.ip_set_line.setText(args.IP)
if args.PORT:
self.port_set_spin.setValue(int(args.PORT))
self.connect_btn.click()
def fan_update(fan_settings): def _on_connection_changed(self, result):
logging.debug(fan_settings) global client_watcher
if fan_settings is None: self.graph_group.setEnabled(result)
return self.hw_rev_lbl.setEnabled(result)
with QSignalBlocker(ui.fan_power_slider) as _: self.fan_group.setEnabled(result)
ui.fan_power_slider.setValue(fan_settings["fan_pwm"]) self.report_group.setEnabled(result)
ui.fan_power_slider.setEnabled(not fan_settings["auto_mode"])
with QSignalBlocker(ui.fan_auto_box) as _:
ui.fan_auto_box.setChecked(fan_settings["auto_mode"])
self.ip_set_line.setEnabled(not result)
self.port_set_spin.setEnabled(not result)
self.status_lbl.setText("Connected" if result else "Disconnected")
self.connect_btn.setText("Disconnect" if result else "Connect")
if not result:
self.hw_rev_lbl.setText("Thermostat vX.Y")
self.fan_group.setStyleSheet("")
if client_watcher:
client_watcher.stop_watching()
client_watcher = None
self.client_watcher_task = None
@asyncSlot() def _hw_rev(self, hw_rev_d: dict):
async def fan_set(_): logging.debug(hw_rev_d)
global tec_client self.hw_rev_lbl.setText(f"Thermostat v{hw_rev_d['rev']['major']}.{hw_rev_d['rev']['major']}")
if tec_client is None or ui.fan_auto_box.isChecked(): self.fan_group.setEnabled(hw_rev_d["settings"]["fan_available"])
return if hw_rev_d["settings"]["fan_pwm_recommended"]:
await tec_client.set_param("fan", ui.fan_power_slider.value()) self.fan_group.setStyleSheet("")
self.fan_group.setToolTip("")
@asyncSlot()
async def fan_auto_set(enabled):
global tec_client
if tec_client is None:
return
ui.fan_power_slider.setEnabled(not enabled)
if enabled:
await tec_client.set_param("fan", "auto")
else:
await tec_client.set_param("fan", ui.fan_power_slider.value())
@asyncSlot()
async def connect(_):
global tec_client, client_watcher, client_watcher_task
ip, port = ui.ip_set_line.text(), ui.port_set_spin.value()
try:
if tec_client:
await tec_client.disconnect()
tec_client = None
on_connection_changed(False)
else: else:
ui.status_lbl.setText("Connecting...") self.fan_group.setStyleSheet("background-color: yellow")
tec_client = Client() self.fan_group.setToolTip("Changing the fan settings of not recommended")
await tec_client.connect(host=ip, port=port, timeout=30)
on_connection_changed(True) def fan_update(self, fan_settings):
hw_rev(await tec_client.hw_rev()) logging.debug(fan_settings)
# fan_update(await tec_client.fan()) if fan_settings is None:
if client_watcher is None: return
client_watcher = ClientWatcher(ui.main_widget, ui.report_refresh_spin.value()) with QSignalBlocker(self.fan_power_slider) as _:
client_watcher.fan_update.connect(fan_update) self.fan_power_slider.setValue(fan_settings["fan_pwm"])
ui.report_apply_btn.clicked.connect( self.fan_power_slider.setEnabled(not fan_settings["auto_mode"])
lambda: client_watcher.set_update_s(ui.report_refresh_spin.value()) with QSignalBlocker(self.fan_auto_box) as _:
) self.fan_auto_box.setChecked(fan_settings["auto_mode"])
QtWidgets.QApplication.instance().aboutToQuit.connect(client_watcher.stop_watching)
client_watcher_task = asyncio.create_task(client_watcher.run()) @asyncSlot()
except Exception as e: async def fan_set(self):
logging.error(f"Failed communicating to the {ip}:{port}: {e}") global tec_client
on_connection_changed(False) if tec_client is None or self.fan_auto_box.isChecked():
return
await tec_client.set_param("fan", self.fan_power_slider.value())
@asyncSlot(int)
async def fan_auto_set(self, enabled):
global tec_client
if tec_client is None:
return
self.fan_power_slider.setEnabled(not enabled)
if enabled:
await tec_client.set_param("fan", "auto")
else:
await tec_client.set_param("fan", self.fan_power_slider.value())
@asyncSlot()
async def connect(self):
global tec_client, client_watcher
ip, port = self.ip_set_line.text(), self.port_set_spin.value()
try:
if tec_client:
await tec_client.disconnect()
tec_client = None
self._on_connection_changed(False)
else:
self.status_lbl.setText("Connecting...")
tec_client = Client()
await tec_client.connect(host=ip, port=port, timeout=30)
self._on_connection_changed(True)
self._hw_rev(await tec_client.hw_rev())
# self.fan_update(await tec_client.fan())
if client_watcher is None:
client_watcher = ClientWatcher(self.main_widget, self.report_refresh_spin.value())
client_watcher.fan_update.connect(self.fan_update)
self.report_apply_btn.clicked.connect(
lambda: client_watcher.set_update_s(self.report_refresh_spin.value())
)
QtWidgets.QApplication.instance().aboutToQuit.connect(client_watcher.stop_watching)
self.client_watcher_task = asyncio.create_task(client_watcher.run())
except Exception as e:
logging.error(f"Failed communicating to the {ip}:{port}: {e}")
self._on_connection_changed(False)
async def coro_main(): async def coro_main():
global ui
args = get_argparser().parse_args() args = get_argparser().parse_args()
if args.logLevel: if args.logLevel:
logging.basicConfig(level=getattr(logging, args.logLevel)) logging.basicConfig(level=getattr(logging, args.logLevel))
@ -170,22 +179,7 @@ async def coro_main():
app = QtWidgets.QApplication.instance() app = QtWidgets.QApplication.instance()
app.aboutToQuit.connect(app_quit_event.set) app.aboutToQuit.connect(app_quit_event.set)
main_window = QtWidgets.QMainWindow() main_window = MainWindow(args)
ui = Ui_MainWindow()
ui.setupUi(main_window)
# ui = uic.loadUi('tec_qt.ui', main_window)
ui.connect_btn.clicked.connect(connect)
ui.fan_power_slider.valueChanged.connect(fan_set)
ui.fan_auto_box.stateChanged.connect(fan_auto_set)
if args.connect:
if args.IP:
ui.ip_set_line.setText(args.IP)
if args.PORT:
ui.port_set_spin.setValue(int(args.PORT))
ui.connect_btn.click()
main_window.show() main_window.show()
await app_quit_event.wait() await app_quit_event.wait()