Try move from Qthreads to qasync

Signed-off-by: Egor Savkin <es@m-labs.hk>
This commit is contained in:
Egor Savkin 2023-06-26 10:20:48 +08:00 committed by atse
parent 8ba2969bbc
commit 549dacd2c6
3 changed files with 109 additions and 44 deletions

View File

@ -55,9 +55,45 @@
dontFixup = true; dontFixup = true;
}; };
qasync = pkgs.python3Packages.buildPythonPackage rec {
pname = "qasync";
version = "0.24.0";
src = pkgs.fetchFromGitHub {
owner = "CabbageDevelopment";
repo = "qasync";
rev = "v${version}";
sha256 = "sha256-ls5F+VntXXa3n+dULaYWK9sAmwly1nk/5+RGWLrcf2Y=";
};
propagatedBuildInputs = [ pkgs.python3Packages.pyqt6 ];
nativeCheckInputs = [ pkgs.python3Packages.pytest ];
checkPhase = ''
pytest -k 'test_qthreadexec.py' # the others cause the test execution to be aborted, I think because of asyncio
'';
};
thermostat_gui = pkgs.python3Packages.buildPythonPackage rec {
pname = "thermostat_gui";
version = "0.0.0";
src = self;
preBuild =
''
export VERSIONEER_OVERRIDE=${version}
export VERSIONEER_REV=v0.0.0
'';
nativeBuildInputs = [ pkgs.qt6.wrapQtAppsHook ];
propagatedBuildInputs = (with pkgs.python3Packages; [ pyqtgraph pyqt6 qasync]);
dontWrapQtApps = true;
postFixup = ''
ls -al $out/
wrapQtApp "$out/pytec/tec_qt"
'';
};
in { in {
packages.x86_64-linux = { packages.x86_64-linux = {
inherit thermostat; inherit thermostat qasync thermostat_gui;
}; };
hydraJobs = { hydraJobs = {
@ -69,7 +105,7 @@
buildInputs = with pkgs; [ buildInputs = with pkgs; [
rust openocd dfu-util rust openocd dfu-util
] ++ (with python3Packages; [ ] ++ (with python3Packages; [
numpy matplotlib pyqtgraph setuptools pyqt6 numpy matplotlib pyqtgraph setuptools pyqt6 qasync
]); ]);
}; };
defaultPackage.x86_64-linux = thermostat; defaultPackage.x86_64-linux = thermostat;

View File

@ -40,6 +40,7 @@ class Client:
line = self._read_line() line = self._read_line()
response = json.loads(line) response = json.loads(line)
logging.debug(f"{command}: {response}")
if "error" in response: if "error" in response:
raise CommandError(response["error"]) raise CommandError(response["error"])
return response return response

View File

@ -6,7 +6,10 @@ import pyqtgraph as pg
import sys import sys
import argparse import argparse
import logging import logging
import asyncio
import atexit
from pytec.client import Client from pytec.client import Client
from qasync import QEventLoop
# pyuic6 -x tec_qt.ui -o ui_tec_qt.py # pyuic6 -x tec_qt.ui -o ui_tec_qt.py
from ui_tec_qt import Ui_MainWindow from ui_tec_qt import Ui_MainWindow
@ -16,7 +19,7 @@ tec_client: Client = None
# ui = None # ui = None
ui: Ui_MainWindow = None ui: Ui_MainWindow = None
thread_pool = QThreadPool.globalInstance() queue = None
connection_watcher = None connection_watcher = None
client_watcher = None client_watcher = None
app: QtWidgets.QApplication = None app: QtWidgets.QApplication = None
@ -35,7 +38,43 @@ def get_argparser():
return parser return parser
class WatchConnectTask(QThread): def wrap_client_task(func, *args, **kwargs):
loop = asyncio.get_event_loop()
task = ClientTask(func, *args, **kwargs)
asyncio.ensure_future(queue.put(task), loop=loop)
async def process_client_tasks():
global queue
if queue is None:
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
while True:
task = await queue.get()
await task.run()
queue.task_done()
class ClientTask:
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
super().__init__()
async def run(self):
try:
lock = asyncio.Lock()
async with lock:
self.func(*self.args, **self.kwargs)
except (TimeoutError, OSError):
logging.warning("Client connection error, disconnecting", exc_info=True)
if connection_watcher:
#thread_pool.clear() # clearing all next requests
connection_watcher.client_disconnected()
class WatchConnectTask(QObject):
connected = pyqtSignal(bool) connected = pyqtSignal(bool)
hw_rev = pyqtSignal(dict) hw_rev = pyqtSignal(dict)
connecting = pyqtSignal() connecting = pyqtSignal()
@ -57,8 +96,8 @@ class WatchConnectTask(QThread):
self.connecting.emit() self.connecting.emit()
tec_client = Client(host=self.ip, port=self.port, timeout=30) tec_client = Client(host=self.ip, port=self.port, timeout=30)
self.connected.emit(True) self.connected.emit(True)
thread_pool.start(ClientTask(lambda: self.hw_rev.emit(tec_client.hw_rev()))) wrap_client_task(lambda: self.hw_rev.emit(tec_client.hw_rev()))
#thread_pool.start(ClientTask(lambda: self.fan_update.emit(tec_client.fan()))) # wrap_client_task(lambda: self.fan_update.emit(tec_client.fan()))
except Exception as e: except Exception as e:
logging.error(f"Failed communicating to the {self.ip}:{self.port}: {e}") logging.error(f"Failed communicating to the {self.ip}:{self.port}: {e}")
self.connected.emit(False) self.connected.emit(False)
@ -72,7 +111,8 @@ class WatchConnectTask(QThread):
self.connected.emit(False) self.connected.emit(False)
class ClientWatcher(QThread):
class ClientWatcher(QObject):
fan_update = pyqtSignal(object) fan_update = pyqtSignal(object)
pwm_update = pyqtSignal(object) pwm_update = pyqtSignal(object)
report_update = pyqtSignal(object) report_update = pyqtSignal(object)
@ -83,10 +123,10 @@ class ClientWatcher(QThread):
self.running = True self.running = True
super().__init__(parent) super().__init__(parent)
def run(self): async def run(self):
while self.running: while self.running:
thread_pool.start(ClientTask(lambda: self.update_params())) wrap_client_task(lambda: self.update_params())
self.msleep(int(self.update_s * 1000)) await asyncio.sleep(int(self.update_s * 1000))
def update_params(self): def update_params(self):
self.fan_update.emit(tec_client.fan()) self.fan_update.emit(tec_client.fan())
@ -94,34 +134,17 @@ class ClientWatcher(QThread):
@pyqtSlot() @pyqtSlot()
def stop_watching(self): def stop_watching(self):
self.running = False self.running = False
deadline = QDeadlineTimer() #deadline = QDeadlineTimer()
deadline.setDeadline(100) #deadline.setDeadline(100)
self.wait(deadline) #self.wait(deadline)
self.terminate() #self.terminate()
@pyqtSlot() @pyqtSlot()
def set_update_s(self): def set_update_s(self):
self.update_s = ui.report_refresh_spin.value() self.update_s = ui.report_refresh_spin.value()
class ClientTask(QRunnable): def on_connection_changed(result):
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
super().__init__()
def run(self):
try:
self.func(*self.args, **self.kwargs)
except (TimeoutError, OSError):
logging.warning("Client connection error, disconnecting", exc_info=True)
if connection_watcher:
thread_pool.clear() # clearing all next requests
connection_watcher.client_disconnected()
def connected(result):
global client_watcher, connection_watcher global client_watcher, connection_watcher
ui.graph_group.setEnabled(result) ui.graph_group.setEnabled(result)
ui.hw_rev_lbl.setEnabled(result) ui.hw_rev_lbl.setEnabled(result)
@ -143,7 +166,7 @@ def connected(result):
client_watcher.fan_update.connect(fan_update) client_watcher.fan_update.connect(fan_update)
ui.report_apply_btn.clicked.connect(client_watcher.set_update_s) ui.report_apply_btn.clicked.connect(client_watcher.set_update_s)
app.aboutToQuit.connect(client_watcher.stop_watching) app.aboutToQuit.connect(client_watcher.stop_watching)
client_watcher.start() wrap_client_task(client_watcher.run)
def hw_rev(hw_rev_d: dict): def hw_rev(hw_rev_d: dict):
@ -173,7 +196,7 @@ def fan_set():
global tec_client global tec_client
if tec_client is None or ui.fan_auto_box.isChecked(): if tec_client is None or ui.fan_auto_box.isChecked():
return return
thread_pool.start(ClientTask(lambda: tec_client.set_param("fan", ui.fan_power_slider.value()))) wrap_client_task(lambda: tec_client.set_param("fan", ui.fan_power_slider.value()))
def fan_auto_set(enabled): def fan_auto_set(enabled):
@ -182,37 +205,41 @@ def fan_auto_set(enabled):
return return
ui.fan_power_slider.setEnabled(not enabled) ui.fan_power_slider.setEnabled(not enabled)
if enabled: if enabled:
thread_pool.start(ClientTask(lambda: tec_client.set_param("fan", "auto"))) wrap_client_task(lambda: tec_client.set_param("fan", "auto"))
else: else:
thread_pool.start(ClientTask(lambda: tec_client.set_param("fan", ui.fan_power_slider.value()))) wrap_client_task(lambda: tec_client.set_param("fan", ui.fan_power_slider.value()))
def connect(): def connect():
global connection_watcher global connection_watcher
connection_watcher = WatchConnectTask(ui.main_widget, ui.ip_set_line.text(), ui.port_set_spin.value()) connection_watcher = WatchConnectTask(ui.main_widget, ui.ip_set_line.text(), ui.port_set_spin.value())
connection_watcher.connected.connect(connected) connection_watcher.connected.connect(on_connection_changed)
connection_watcher.connecting.connect(lambda: ui.status_lbl.setText("Connecting...")) connection_watcher.connecting.connect(lambda: ui.status_lbl.setText("Connecting..."))
connection_watcher.hw_rev.connect(hw_rev) connection_watcher.hw_rev.connect(hw_rev)
connection_watcher.fan_update.connect(fan_update) connection_watcher.fan_update.connect(fan_update)
connection_watcher.start() wrap_client_task(connection_watcher.run)
app.aboutToQuit.connect(connection_watcher.terminate) #app.aboutToQuit.connect(connection_watcher.terminate)
def main(): def main():
global ui, thread_pool, app global ui, app, queue
args = get_argparser().parse_args() args = get_argparser().parse_args()
if args.logLevel: if args.logLevel:
logging.basicConfig(level=getattr(logging, args.logLevel)) logging.basicConfig(level=getattr(logging, args.logLevel))
app = QtWidgets.QApplication(sys.argv) app = QtWidgets.QApplication(sys.argv)
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
atexit.register(loop.close)
loop.create_task(process_client_tasks())
main_window = QtWidgets.QMainWindow() main_window = QtWidgets.QMainWindow()
ui = Ui_MainWindow() ui = Ui_MainWindow()
ui.setupUi(main_window) ui.setupUi(main_window)
# ui = uic.loadUi('tec_qt.ui', main_window) # ui = uic.loadUi('tec_qt.ui', main_window)
thread_pool = QThreadPool(parent=ui.main_widget)
thread_pool.setMaxThreadCount(1) # avoid concurrent requests
ui.connect_btn.clicked.connect(lambda _checked: connect()) ui.connect_btn.clicked.connect(lambda _checked: connect())
ui.fan_power_slider.valueChanged.connect(fan_set) ui.fan_power_slider.valueChanged.connect(fan_set)
ui.fan_auto_box.stateChanged.connect(fan_auto_set) ui.fan_auto_box.stateChanged.connect(fan_auto_set)
@ -225,7 +252,8 @@ def main():
ui.connect_btn.click() ui.connect_btn.click()
main_window.show() main_window.show()
sys.exit(app.exec())
loop.run_until_complete(app.exec())
if __name__ == '__main__': if __name__ == '__main__':