From 0244dec5be4ce96327ce2cc39b10b00fb8f1b39f Mon Sep 17 00:00:00 2001 From: Egor Savkin Date: Mon, 26 Jun 2023 10:20:48 +0800 Subject: [PATCH] Try move from Qthreads to qasync Signed-off-by: Egor Savkin --- flake.nix | 40 ++++++++++++++- pytec/pytec/client.py | 1 + pytec/tec_qt.py | 112 ++++++++++++++++++++++++++---------------- 3 files changed, 109 insertions(+), 44 deletions(-) diff --git a/flake.nix b/flake.nix index 77dbec9..a18fe86 100644 --- a/flake.nix +++ b/flake.nix @@ -48,9 +48,45 @@ dontFixup = true; }; + + qasync = pkgs.python3Packages.buildPythonPackage rec { + pname = "qasync"; + version = "0.24.0"; + src = pkgs.fetchFromGitHub { + owner = "CabbageDevelopment"; + repo = "qasync"; + rev = "v${version}"; + sha256 = "sha256-ls5F+VntXXa3n+dULaYWK9sAmwly1nk/5+RGWLrcf2Y="; + }; + propagatedBuildInputs = [ pkgs.python3Packages.pyqt6 ]; + nativeCheckInputs = [ pkgs.python3Packages.pytest ]; + checkPhase = '' + pytest -k 'test_qthreadexec.py' # the others cause the test execution to be aborted, I think because of asyncio + ''; + }; + thermostat_gui = pkgs.python3Packages.buildPythonPackage rec { + pname = "thermostat_gui"; + version = "0.0.0"; + src = self; + + preBuild = + '' + export VERSIONEER_OVERRIDE=${version} + export VERSIONEER_REV=v0.0.0 + ''; + + nativeBuildInputs = [ pkgs.qt6.wrapQtAppsHook ]; + propagatedBuildInputs = (with pkgs.python3Packages; [ pyqtgraph pyqt6 qasync]); + + dontWrapQtApps = true; + postFixup = '' + ls -al $out/ + wrapQtApp "$out/pytec/tec_qt" + ''; + }; in { packages.x86_64-linux = { - inherit thermostat; + inherit thermostat qasync thermostat_gui; }; hydraJobs = { @@ -62,7 +98,7 @@ buildInputs = with pkgs; [ rust openocd dfu-util ] ++ (with python3Packages; [ - numpy matplotlib pyqtgraph setuptools pyqt6 + numpy matplotlib pyqtgraph setuptools pyqt6 qasync ]); }; defaultPackage.x86_64-linux = thermostat; diff --git a/pytec/pytec/client.py b/pytec/pytec/client.py index 32b05f5..0d93151 100644 --- a/pytec/pytec/client.py +++ b/pytec/pytec/client.py @@ -40,6 +40,7 @@ class Client: line = self._read_line() response = json.loads(line) + logging.debug(f"{command}: {response}") if "error" in response: raise CommandError(response["error"]) return response diff --git a/pytec/tec_qt.py b/pytec/tec_qt.py index 93d639f..38d8215 100644 --- a/pytec/tec_qt.py +++ b/pytec/tec_qt.py @@ -6,7 +6,10 @@ import pyqtgraph as pg import sys import argparse import logging +import asyncio +import atexit from pytec.client import Client +from qasync import QEventLoop # pyuic6 -x tec_qt.ui -o ui_tec_qt.py from ui_tec_qt import Ui_MainWindow @@ -16,7 +19,7 @@ tec_client: Client = None # ui = None ui: Ui_MainWindow = None -thread_pool = QThreadPool.globalInstance() +queue = None connection_watcher = None client_watcher = None app: QtWidgets.QApplication = None @@ -35,7 +38,43 @@ def get_argparser(): return parser -class WatchConnectTask(QThread): +def wrap_client_task(func, *args, **kwargs): + loop = asyncio.get_event_loop() + task = ClientTask(func, *args, **kwargs) + asyncio.ensure_future(queue.put(task), loop=loop) + + +async def process_client_tasks(): + global queue + if queue is None: + queue = asyncio.Queue() + loop = asyncio.get_event_loop() + while True: + task = await queue.get() + await task.run() + queue.task_done() + + +class ClientTask: + def __init__(self, func, *args, **kwargs): + self.func = func + self.args = args + self.kwargs = kwargs + super().__init__() + + async def run(self): + try: + lock = asyncio.Lock() + async with lock: + self.func(*self.args, **self.kwargs) + except (TimeoutError, OSError): + logging.warning("Client connection error, disconnecting", exc_info=True) + if connection_watcher: + #thread_pool.clear() # clearing all next requests + connection_watcher.client_disconnected() + + +class WatchConnectTask(QObject): connected = pyqtSignal(bool) hw_rev = pyqtSignal(dict) connecting = pyqtSignal() @@ -57,8 +96,8 @@ class WatchConnectTask(QThread): self.connecting.emit() tec_client = Client(host=self.ip, port=self.port, timeout=30) self.connected.emit(True) - thread_pool.start(ClientTask(lambda: self.hw_rev.emit(tec_client.hw_rev()))) - #thread_pool.start(ClientTask(lambda: self.fan_update.emit(tec_client.fan()))) + wrap_client_task(lambda: self.hw_rev.emit(tec_client.hw_rev())) + # wrap_client_task(lambda: self.fan_update.emit(tec_client.fan())) except Exception as e: logging.error(f"Failed communicating to the {self.ip}:{self.port}: {e}") self.connected.emit(False) @@ -72,7 +111,8 @@ class WatchConnectTask(QThread): self.connected.emit(False) -class ClientWatcher(QThread): + +class ClientWatcher(QObject): fan_update = pyqtSignal(object) pwm_update = pyqtSignal(object) report_update = pyqtSignal(object) @@ -83,10 +123,10 @@ class ClientWatcher(QThread): self.running = True super().__init__(parent) - def run(self): + async def run(self): while self.running: - thread_pool.start(ClientTask(lambda: self.update_params())) - self.msleep(int(self.update_s * 1000)) + wrap_client_task(lambda: self.update_params()) + await asyncio.sleep(int(self.update_s * 1000)) def update_params(self): self.fan_update.emit(tec_client.fan()) @@ -94,34 +134,17 @@ class ClientWatcher(QThread): @pyqtSlot() def stop_watching(self): self.running = False - deadline = QDeadlineTimer() - deadline.setDeadline(100) - self.wait(deadline) - self.terminate() + #deadline = QDeadlineTimer() + #deadline.setDeadline(100) + #self.wait(deadline) + #self.terminate() @pyqtSlot() def set_update_s(self): self.update_s = ui.report_refresh_spin.value() -class ClientTask(QRunnable): - def __init__(self, func, *args, **kwargs): - self.func = func - self.args = args - self.kwargs = kwargs - super().__init__() - - def run(self): - try: - self.func(*self.args, **self.kwargs) - except (TimeoutError, OSError): - logging.warning("Client connection error, disconnecting", exc_info=True) - if connection_watcher: - thread_pool.clear() # clearing all next requests - connection_watcher.client_disconnected() - - -def connected(result): +def on_connection_changed(result): global client_watcher, connection_watcher ui.graph_group.setEnabled(result) ui.hw_rev_lbl.setEnabled(result) @@ -143,7 +166,7 @@ def connected(result): client_watcher.fan_update.connect(fan_update) ui.report_apply_btn.clicked.connect(client_watcher.set_update_s) app.aboutToQuit.connect(client_watcher.stop_watching) - client_watcher.start() + wrap_client_task(client_watcher.run) def hw_rev(hw_rev_d: dict): @@ -173,7 +196,7 @@ def fan_set(): global tec_client if tec_client is None or ui.fan_auto_box.isChecked(): return - thread_pool.start(ClientTask(lambda: tec_client.set_param("fan", ui.fan_power_slider.value()))) + wrap_client_task(lambda: tec_client.set_param("fan", ui.fan_power_slider.value())) def fan_auto_set(enabled): @@ -182,37 +205,41 @@ def fan_auto_set(enabled): return ui.fan_power_slider.setEnabled(not enabled) if enabled: - thread_pool.start(ClientTask(lambda: tec_client.set_param("fan", "auto"))) + wrap_client_task(lambda: tec_client.set_param("fan", "auto")) else: - thread_pool.start(ClientTask(lambda: tec_client.set_param("fan", ui.fan_power_slider.value()))) + wrap_client_task(lambda: tec_client.set_param("fan", ui.fan_power_slider.value())) def connect(): global connection_watcher connection_watcher = WatchConnectTask(ui.main_widget, ui.ip_set_line.text(), ui.port_set_spin.value()) - connection_watcher.connected.connect(connected) + connection_watcher.connected.connect(on_connection_changed) connection_watcher.connecting.connect(lambda: ui.status_lbl.setText("Connecting...")) connection_watcher.hw_rev.connect(hw_rev) connection_watcher.fan_update.connect(fan_update) - connection_watcher.start() - app.aboutToQuit.connect(connection_watcher.terminate) + wrap_client_task(connection_watcher.run) + #app.aboutToQuit.connect(connection_watcher.terminate) def main(): - global ui, thread_pool, app + global ui, app, queue args = get_argparser().parse_args() if args.logLevel: logging.basicConfig(level=getattr(logging, args.logLevel)) app = QtWidgets.QApplication(sys.argv) + + loop = QEventLoop(app) + asyncio.set_event_loop(loop) + atexit.register(loop.close) + + loop.create_task(process_client_tasks()) + main_window = QtWidgets.QMainWindow() ui = Ui_MainWindow() ui.setupUi(main_window) # ui = uic.loadUi('tec_qt.ui', main_window) - thread_pool = QThreadPool(parent=ui.main_widget) - thread_pool.setMaxThreadCount(1) # avoid concurrent requests - ui.connect_btn.clicked.connect(lambda _checked: connect()) ui.fan_power_slider.valueChanged.connect(fan_set) ui.fan_auto_box.stateChanged.connect(fan_auto_set) @@ -225,7 +252,8 @@ def main(): ui.connect_btn.click() main_window.show() - sys.exit(app.exec()) + + loop.run_until_complete(app.exec()) if __name__ == '__main__':