From d5fb8b8317c953eab87437587561a5a43ee943d3 Mon Sep 17 00:00:00 2001 From: Egor Savkin Date: Wed, 28 Jun 2023 14:59:54 +0800 Subject: [PATCH] Revert "Try move from Qthreads to qasync" This reverts commit 5b1f2df2615bc089ae4f04a22b99597bddc7c471. --- flake.nix | 40 +-------------- pytec/pytec/client.py | 1 - pytec/tec_qt.py | 112 ++++++++++++++++-------------------------- 3 files changed, 44 insertions(+), 109 deletions(-) diff --git a/flake.nix b/flake.nix index 5b7320a..a9150ef 100644 --- a/flake.nix +++ b/flake.nix @@ -55,45 +55,9 @@ dontFixup = true; }; - - qasync = pkgs.python3Packages.buildPythonPackage rec { - pname = "qasync"; - version = "0.24.0"; - src = pkgs.fetchFromGitHub { - owner = "CabbageDevelopment"; - repo = "qasync"; - rev = "v${version}"; - sha256 = "sha256-ls5F+VntXXa3n+dULaYWK9sAmwly1nk/5+RGWLrcf2Y="; - }; - propagatedBuildInputs = [ pkgs.python3Packages.pyqt6 ]; - checkInputs = [ pkgs.python3Packages.pytest ]; - checkPhase = '' - pytest -k 'test_qthreadexec.py' # the others cause the test execution to be aborted, I think because of asyncio - ''; - }; - thermostat_gui = pkgs.python3Packages.buildPythonPackage rec { - pname = "thermostat_gui"; - version = "0.0.0"; - src = self; - - preBuild = - '' - export VERSIONEER_OVERRIDE=${version} - export VERSIONEER_REV=v0.0.0 - ''; - - nativeBuildInputs = [ pkgs.qt6.wrapQtAppsHook ]; - propagatedBuildInputs = (with pkgs.python3Packages; [ pyqtgraph pyqt6 qasync]); - - dontWrapQtApps = true; - postFixup = '' - ls -al $out/ - wrapQtApp "$out/pytec/tec_qt" - ''; - }; in { packages.x86_64-linux = { - inherit thermostat qasync thermostat_gui; + inherit thermostat; }; hydraJobs = { @@ -107,7 +71,7 @@ rustPlatform.rust.cargo openocd dfu-util ] ++ (with python3Packages; [ - numpy matplotlib pyqtgraph setuptools pyqt6 qasync + numpy matplotlib pyqtgraph setuptools pyqt6 ]); shellHook= '' diff --git a/pytec/pytec/client.py b/pytec/pytec/client.py index c9da63a..3a78b46 100644 --- a/pytec/pytec/client.py +++ b/pytec/pytec/client.py @@ -40,7 +40,6 @@ class Client: line = self._read_line() response = json.loads(line) - logging.debug(f"{command}: {response}") if "error" in response: raise CommandError(response["error"]) return response diff --git a/pytec/tec_qt.py b/pytec/tec_qt.py index 38d8215..93d639f 100644 --- a/pytec/tec_qt.py +++ b/pytec/tec_qt.py @@ -6,10 +6,7 @@ import pyqtgraph as pg import sys import argparse import logging -import asyncio -import atexit from pytec.client import Client -from qasync import QEventLoop # pyuic6 -x tec_qt.ui -o ui_tec_qt.py from ui_tec_qt import Ui_MainWindow @@ -19,7 +16,7 @@ tec_client: Client = None # ui = None ui: Ui_MainWindow = None -queue = None +thread_pool = QThreadPool.globalInstance() connection_watcher = None client_watcher = None app: QtWidgets.QApplication = None @@ -38,43 +35,7 @@ def get_argparser(): return parser -def wrap_client_task(func, *args, **kwargs): - loop = asyncio.get_event_loop() - task = ClientTask(func, *args, **kwargs) - asyncio.ensure_future(queue.put(task), loop=loop) - - -async def process_client_tasks(): - global queue - if queue is None: - queue = asyncio.Queue() - loop = asyncio.get_event_loop() - while True: - task = await queue.get() - await task.run() - queue.task_done() - - -class ClientTask: - def __init__(self, func, *args, **kwargs): - self.func = func - self.args = args - self.kwargs = kwargs - super().__init__() - - async def run(self): - try: - lock = asyncio.Lock() - async with lock: - self.func(*self.args, **self.kwargs) - except (TimeoutError, OSError): - logging.warning("Client connection error, disconnecting", exc_info=True) - if connection_watcher: - #thread_pool.clear() # clearing all next requests - connection_watcher.client_disconnected() - - -class WatchConnectTask(QObject): +class WatchConnectTask(QThread): connected = pyqtSignal(bool) hw_rev = pyqtSignal(dict) connecting = pyqtSignal() @@ -96,8 +57,8 @@ class WatchConnectTask(QObject): self.connecting.emit() tec_client = Client(host=self.ip, port=self.port, timeout=30) self.connected.emit(True) - wrap_client_task(lambda: self.hw_rev.emit(tec_client.hw_rev())) - # wrap_client_task(lambda: self.fan_update.emit(tec_client.fan())) + thread_pool.start(ClientTask(lambda: self.hw_rev.emit(tec_client.hw_rev()))) + #thread_pool.start(ClientTask(lambda: self.fan_update.emit(tec_client.fan()))) except Exception as e: logging.error(f"Failed communicating to the {self.ip}:{self.port}: {e}") self.connected.emit(False) @@ -111,8 +72,7 @@ class WatchConnectTask(QObject): self.connected.emit(False) - -class ClientWatcher(QObject): +class ClientWatcher(QThread): fan_update = pyqtSignal(object) pwm_update = pyqtSignal(object) report_update = pyqtSignal(object) @@ -123,10 +83,10 @@ class ClientWatcher(QObject): self.running = True super().__init__(parent) - async def run(self): + def run(self): while self.running: - wrap_client_task(lambda: self.update_params()) - await asyncio.sleep(int(self.update_s * 1000)) + thread_pool.start(ClientTask(lambda: self.update_params())) + self.msleep(int(self.update_s * 1000)) def update_params(self): self.fan_update.emit(tec_client.fan()) @@ -134,17 +94,34 @@ class ClientWatcher(QObject): @pyqtSlot() def stop_watching(self): self.running = False - #deadline = QDeadlineTimer() - #deadline.setDeadline(100) - #self.wait(deadline) - #self.terminate() + deadline = QDeadlineTimer() + deadline.setDeadline(100) + self.wait(deadline) + self.terminate() @pyqtSlot() def set_update_s(self): self.update_s = ui.report_refresh_spin.value() -def on_connection_changed(result): +class ClientTask(QRunnable): + def __init__(self, func, *args, **kwargs): + self.func = func + self.args = args + self.kwargs = kwargs + super().__init__() + + def run(self): + try: + self.func(*self.args, **self.kwargs) + except (TimeoutError, OSError): + logging.warning("Client connection error, disconnecting", exc_info=True) + if connection_watcher: + thread_pool.clear() # clearing all next requests + connection_watcher.client_disconnected() + + +def connected(result): global client_watcher, connection_watcher ui.graph_group.setEnabled(result) ui.hw_rev_lbl.setEnabled(result) @@ -166,7 +143,7 @@ def on_connection_changed(result): client_watcher.fan_update.connect(fan_update) ui.report_apply_btn.clicked.connect(client_watcher.set_update_s) app.aboutToQuit.connect(client_watcher.stop_watching) - wrap_client_task(client_watcher.run) + client_watcher.start() def hw_rev(hw_rev_d: dict): @@ -196,7 +173,7 @@ def fan_set(): global tec_client if tec_client is None or ui.fan_auto_box.isChecked(): return - wrap_client_task(lambda: tec_client.set_param("fan", ui.fan_power_slider.value())) + thread_pool.start(ClientTask(lambda: tec_client.set_param("fan", ui.fan_power_slider.value()))) def fan_auto_set(enabled): @@ -205,41 +182,37 @@ def fan_auto_set(enabled): return ui.fan_power_slider.setEnabled(not enabled) if enabled: - wrap_client_task(lambda: tec_client.set_param("fan", "auto")) + thread_pool.start(ClientTask(lambda: tec_client.set_param("fan", "auto"))) else: - wrap_client_task(lambda: tec_client.set_param("fan", ui.fan_power_slider.value())) + thread_pool.start(ClientTask(lambda: tec_client.set_param("fan", ui.fan_power_slider.value()))) def connect(): global connection_watcher connection_watcher = WatchConnectTask(ui.main_widget, ui.ip_set_line.text(), ui.port_set_spin.value()) - connection_watcher.connected.connect(on_connection_changed) + connection_watcher.connected.connect(connected) connection_watcher.connecting.connect(lambda: ui.status_lbl.setText("Connecting...")) connection_watcher.hw_rev.connect(hw_rev) connection_watcher.fan_update.connect(fan_update) - wrap_client_task(connection_watcher.run) - #app.aboutToQuit.connect(connection_watcher.terminate) + connection_watcher.start() + app.aboutToQuit.connect(connection_watcher.terminate) def main(): - global ui, app, queue + global ui, thread_pool, app args = get_argparser().parse_args() if args.logLevel: logging.basicConfig(level=getattr(logging, args.logLevel)) app = QtWidgets.QApplication(sys.argv) - - loop = QEventLoop(app) - asyncio.set_event_loop(loop) - atexit.register(loop.close) - - loop.create_task(process_client_tasks()) - main_window = QtWidgets.QMainWindow() ui = Ui_MainWindow() ui.setupUi(main_window) # ui = uic.loadUi('tec_qt.ui', main_window) + thread_pool = QThreadPool(parent=ui.main_widget) + thread_pool.setMaxThreadCount(1) # avoid concurrent requests + ui.connect_btn.clicked.connect(lambda _checked: connect()) ui.fan_power_slider.valueChanged.connect(fan_set) ui.fan_auto_box.stateChanged.connect(fan_auto_set) @@ -252,8 +225,7 @@ def main(): ui.connect_btn.click() main_window.show() - - loop.run_until_complete(app.exec()) + sys.exit(app.exec()) if __name__ == '__main__':