Finish moving over to qasync

Also:

* Add aioclient

The old client is synchronous and blocking, and the only way to achieve
true asynchronous IO is to create a new client that interfaces with
asyncio.

* Finish Nix Flake description and make the GUI available for `nix run`
This commit is contained in:
atse 2023-06-27 17:34:39 +08:00
parent d8ec1b4ea3
commit b39e7dc015
5 changed files with 268 additions and 132 deletions

View File

@ -53,21 +53,14 @@
thermostat_gui = pkgs.python3Packages.buildPythonPackage rec {
pname = "thermostat_gui";
version = "0.0.0";
src = self;
preBuild =
''
export VERSIONEER_OVERRIDE=${version}
export VERSIONEER_REV=v0.0.0
'';
src = "${self}/pytec";
nativeBuildInputs = [ pkgs.qt6.wrapQtAppsHook ];
propagatedBuildInputs = (with pkgs.python3Packages; [ pyqtgraph pyqt6 qasync]);
propagatedBuildInputs = [ pkgs.qt6.qtbase ] ++ (with pkgs.python3Packages; [ pyqtgraph pyqt6 qasync ]);
dontWrapQtApps = true;
postFixup = ''
ls -al $out/
wrapQtApp "$out/pytec/tec_qt"
wrapQtApp "$out/bin/tec_qt"
'';
};
in {

16
pytec/aioexample.py Normal file
View File

@ -0,0 +1,16 @@
import asyncio
from pytec.aioclient import Client
async def main():
tec = Client()
await tec.connect() #(host="192.168.1.26", port=23)
await tec.set_param("s-h", 1, "t0", 20)
print(await tec.get_output())
print(await tec.get_pid())
print(await tec.get_output())
print(await tec.get_postfilter())
print(await tec.get_b_parameter())
async for data in tec.report_mode():
print(data)
asyncio.run(main())

183
pytec/pytec/aioclient.py Normal file
View File

@ -0,0 +1,183 @@
import asyncio
import json
import logging
class CommandError(Exception):
pass
class Client:
def __init__(self):
self._reader = None
self._writer = None
self._command_lock = asyncio.Lock()
async def connect(self, host='192.168.1.26', port=23, timeout=None):
self._reader, self._writer = await asyncio.open_connection(host, port)
await self._check_zero_limits()
async def disconnect(self):
self._writer.close()
await self._writer.wait_closed()
async def _check_zero_limits(self):
output_report = await self.get_output()
for output_channel in output_report:
for limit in ["max_i_neg", "max_i_pos", "max_v"]:
if output_channel[limit] == 0.0:
logging.warning("`{}` limit is set to zero on channel {}".format(limit, output_channel["channel"]))
async def _read_line(self):
# read 1 line
chunk = await self._reader.readline()
return chunk.decode('utf-8', errors='ignore')
async def _command(self, *command):
async with self._command_lock:
self._writer.write(((" ".join(command)).strip() + "\n").encode('utf-8'))
await self._writer.drain()
line = await self._read_line()
response = json.loads(line)
logging.debug(f"{command}: {response}")
if "error" in response:
raise CommandError(response["error"])
return response
async def _get_conf(self, topic):
result = [None, None]
for item in await self._command(topic):
result[int(item["channel"])] = item
return result
async def get_output(self):
"""Retrieve output limits for the TEC
Example::
[{'channel': 0,
'center': 'vref',
'i_set': -0.02002179650216762,
'max_i_neg': 2.0,
'max_v': : 3.988,
'max_i_pos': 2.0,
'polarity': 'normal'},
{'channel': 1,
'center': 'vref',
'i_set': -0.02002179650216762,
'max_i_neg': 2.0,
'max_v': : 3.988,
'max_i_pos': 2.0,
'polarity': 'normal'},
]
"""
return await self._get_conf("output")
async def get_pid(self):
"""Retrieve PID control state
Example::
[{'channel': 0,
'parameters': {
'kp': 10.0,
'ki': 0.02,
'kd': 0.0,
'output_min': 0.0,
'output_max': 3.0},
'target': 37.0},
{'channel': 1,
'parameters': {
'kp': 10.0,
'ki': 0.02,
'kd': 0.0,
'output_min': 0.0,
'output_max': 3.0},
'target': 36.5}]
"""
return await self._get_conf("pid")
async def get_b_parameter(self):
"""Retrieve B-Parameter equation parameters for resistance to temperature conversion
Example::
[{'params': {'b': 3800.0, 'r0': 10000.0, 't0': 298.15}, 'channel': 0},
{'params': {'b': 3800.0, 'r0': 10000.0, 't0': 298.15}, 'channel': 1}]
"""
return await self._get_conf("b-p")
async def get_postfilter(self):
"""Retrieve DAC postfilter configuration
Example::
[{'rate': None, 'channel': 0},
{'rate': 21.25, 'channel': 1}]
"""
return await self._get_conf("postfilter")
async def report_mode(self):
"""Start reporting measurement values
Example of yielded data::
{'channel': 0,
'time': 2302524,
'adc': 0.6199188965423515,
'sens': 6138.519310282602,
'temperature': 36.87032392655527,
'pid_engaged': True,
'i_set': 2.0635816680889123,
'vref': 1.494,
'dac_value': 2.527790834044456,
'dac_feedback': 2.523,
'i_tec': 2.331,
'tec_i': 2.0925,
'tec_u_meas': 2.5340000000000003,
'pid_output': 2.067581958092247}
"""
await self._command("report mode", "on")
while True:
line = await self._read_line()
if not line:
break
try:
yield json.loads(line)
except json.decoder.JSONDecodeError:
pass
async def set_param(self, topic, channel, field="", value=""):
"""Set configuration parameters
Examples::
tec.set_param("output", 0, "max_v", 2.0)
tec.set_param("pid", 1, "output_max", 2.5)
tec.set_param("s-h", 0, "t0", 20.0)
tec.set_param("center", 0, "vref")
tec.set_param("postfilter", 1, 21)
See the firmware's README.md for a full list.
"""
if type(value) is float:
value = "{:f}".format(value)
if type(value) is not str:
value = str(value)
await self._command(topic, str(channel), field, value)
async def power_up(self, channel, target):
"""Start closed-loop mode"""
await self.set_param("pid", channel, "target", value=target)
await self.set_param("output", channel, "pid")
async def save_config(self):
"""Save current configuration to EEPROM"""
await self._command("save")
async def load_config(self):
"""Load current configuration from EEPROM"""
await self._command("load")
async def hw_rev(self):
"""Get Thermostat hardware revision"""
return await self._command("hwrev")
async def fan(self):
"""Get Thermostat current fan settings"""
return await self._command("fan")

View File

@ -9,4 +9,10 @@ setup(
license="GPLv3",
install_requires=["setuptools"],
packages=find_packages(),
entry_points={
"gui_scripts": [
"tec_qt = tec_qt:main",
]
},
py_modules=['tec_qt', 'ui_tec_qt'],
)

View File

@ -1,5 +1,5 @@
from PyQt6 import QtWidgets, uic
from PyQt6.QtCore import QThread, QThreadPool, pyqtSignal, QRunnable, QObject, QSignalBlocker, pyqtSlot, QDeadlineTimer
from PyQt6.QtCore import pyqtSignal, QObject, QSignalBlocker, pyqtSlot
from pyqtgraph import PlotWidget
from pyqtgraph.parametertree import Parameter, ParameterTree, ParameterItem, registerParameterType
import pyqtgraph as pg
@ -7,9 +7,9 @@ import sys
import argparse
import logging
import asyncio
import atexit
from pytec.client import Client
from qasync import QEventLoop
from pytec.aioclient import Client
import qasync
from qasync import asyncSlot, asyncClose
# pyuic6 -x tec_qt.ui -o ui_tec_qt.py
from ui_tec_qt import Ui_MainWindow
@ -19,9 +19,8 @@ tec_client: Client = None
# ui = None
ui: Ui_MainWindow = None
queue = None
connection_watcher = None
client_watcher = None
client_watcher_task = None
app: QtWidgets.QApplication = None
@ -38,85 +37,8 @@ def get_argparser():
return parser
def wrap_client_task(func, *args, **kwargs):
loop = asyncio.get_event_loop()
task = ClientTask(func, *args, **kwargs)
asyncio.ensure_future(queue.put(task), loop=loop)
async def process_client_tasks():
global queue
if queue is None:
queue = asyncio.Queue()
loop = asyncio.get_event_loop()
while True:
task = await queue.get()
await task.run()
queue.task_done()
class ClientTask:
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
super().__init__()
async def run(self):
try:
lock = asyncio.Lock()
async with lock:
self.func(*self.args, **self.kwargs)
except (TimeoutError, OSError):
logging.warning("Client connection error, disconnecting", exc_info=True)
if connection_watcher:
#thread_pool.clear() # clearing all next requests
connection_watcher.client_disconnected()
class WatchConnectTask(QObject):
connected = pyqtSignal(bool)
hw_rev = pyqtSignal(dict)
connecting = pyqtSignal()
fan_update = pyqtSignal(object)
def __init__(self, parent, ip, port):
self.ip = ip
self.port = port
super().__init__(parent)
def run(self):
global tec_client
try:
if tec_client:
tec_client.disconnect()
tec_client = None
self.connected.emit(False)
else:
self.connecting.emit()
tec_client = Client(host=self.ip, port=self.port, timeout=30)
self.connected.emit(True)
wrap_client_task(lambda: self.hw_rev.emit(tec_client.hw_rev()))
# wrap_client_task(lambda: self.fan_update.emit(tec_client.fan()))
except Exception as e:
logging.error(f"Failed communicating to the {self.ip}:{self.port}: {e}")
self.connected.emit(False)
@pyqtSlot()
def client_disconnected(self):
global tec_client
if tec_client:
tec_client.disconnect()
tec_client = None
self.connected.emit(False)
class ClientWatcher(QObject):
fan_update = pyqtSignal(object)
pwm_update = pyqtSignal(object)
report_update = pyqtSignal(object)
pid_update = pyqtSignal(object)
def __init__(self, parent, update_s):
self.update_s = update_s
@ -125,19 +47,18 @@ class ClientWatcher(QObject):
async def run(self):
while self.running:
wrap_client_task(lambda: self.update_params())
await asyncio.sleep(int(self.update_s * 1000))
await self.update_params()
await asyncio.sleep(self.update_s)
def update_params(self):
self.fan_update.emit(tec_client.fan())
async def update_params(self):
self.fan_update.emit(await self._client.get_fan())
def start_watching(self):
self._watch_task = asyncio.create_task(self.run())
@pyqtSlot()
def stop_watching(self):
self.running = False
#deadline = QDeadlineTimer()
#deadline.setDeadline(100)
#self.wait(deadline)
#self.terminate()
@pyqtSlot()
def set_update_s(self):
@ -145,7 +66,7 @@ class ClientWatcher(QObject):
def on_connection_changed(result):
global client_watcher, connection_watcher
global client_watcher, client_watcher_task
ui.graph_group.setEnabled(result)
ui.hw_rev_lbl.setEnabled(result)
ui.fan_group.setEnabled(result)
@ -161,12 +82,7 @@ def on_connection_changed(result):
if client_watcher:
client_watcher.stop_watching()
client_watcher = None
elif client_watcher is None:
client_watcher = ClientWatcher(ui.main_widget, ui.report_refresh_spin.value())
client_watcher.fan_update.connect(fan_update)
ui.report_apply_btn.clicked.connect(client_watcher.set_update_s)
app.aboutToQuit.connect(client_watcher.stop_watching)
wrap_client_task(client_watcher.run)
client_watcher_task = None
def hw_rev(hw_rev_d: dict):
@ -192,55 +108,73 @@ def fan_update(fan_settings):
ui.fan_auto_box.setChecked(fan_settings["auto_mode"])
def fan_set():
@asyncSlot()
async def fan_set(_):
global tec_client
if tec_client is None or ui.fan_auto_box.isChecked():
return
wrap_client_task(lambda: tec_client.set_param("fan", ui.fan_power_slider.value()))
await tec_client.set_param("fan", ui.fan_power_slider.value())
def fan_auto_set(enabled):
@asyncSlot()
async def fan_auto_set(enabled):
global tec_client
if tec_client is None:
return
ui.fan_power_slider.setEnabled(not enabled)
if enabled:
wrap_client_task(lambda: tec_client.set_param("fan", "auto"))
await tec_client.set_param("fan", "auto")
else:
wrap_client_task(lambda: tec_client.set_param("fan", ui.fan_power_slider.value()))
await tec_client.set_param("fan", ui.fan_power_slider.value())
def connect():
global connection_watcher
connection_watcher = WatchConnectTask(ui.main_widget, ui.ip_set_line.text(), ui.port_set_spin.value())
connection_watcher.connected.connect(on_connection_changed)
connection_watcher.connecting.connect(lambda: ui.status_lbl.setText("Connecting..."))
connection_watcher.hw_rev.connect(hw_rev)
connection_watcher.fan_update.connect(fan_update)
wrap_client_task(connection_watcher.run)
#app.aboutToQuit.connect(connection_watcher.terminate)
@asyncSlot()
async def connect(_):
global tec_client, client_watcher, client_watcher_task
ip, port = ui.ip_set_line.text(), ui.port_set_spin.value()
try:
if tec_client:
await tec_client.disconnect()
tec_client = None
on_connection_changed(False)
else:
ui.status_lbl.setText("Connecting...")
tec_client = Client()
await tec_client.connect(host=ip, port=port, timeout=30)
on_connection_changed(True)
hw_rev(await tec_client.hw_rev())
# fan_update(await tec_client.fan())
if client_watcher is None:
client_watcher = ClientWatcher(ui.main_widget, ui.report_refresh_spin.value())
client_watcher.fan_update.connect(fan_update)
ui.report_apply_btn.clicked.connect(
lambda: client_watcher.set_update_s(ui.report_refresh_spin.value())
)
app.aboutToQuit.connect(client_watcher.stop_watching)
client_watcher_task = asyncio.create_task(client_watcher.run())
except Exception as e:
logging.error(f"Failed communicating to the {ip}:{port}: {e}")
on_connection_changed(False)
def main():
global ui, app, queue
async def coro_main():
global ui, app
args = get_argparser().parse_args()
if args.logLevel:
logging.basicConfig(level=getattr(logging, args.logLevel))
app = QtWidgets.QApplication(sys.argv)
app_quit_event = asyncio.Event()
loop = QEventLoop(app)
asyncio.set_event_loop(loop)
atexit.register(loop.close)
loop.create_task(process_client_tasks())
app = QtWidgets.QApplication.instance()
app.aboutToQuit.connect(app_quit_event.set)
main_window = QtWidgets.QMainWindow()
ui = Ui_MainWindow()
ui.setupUi(main_window)
# ui = uic.loadUi('tec_qt.ui', main_window)
ui.connect_btn.clicked.connect(lambda _checked: connect())
ui.connect_btn.clicked.connect(connect)
ui.fan_power_slider.valueChanged.connect(fan_set)
ui.fan_auto_box.stateChanged.connect(fan_auto_set)
@ -253,7 +187,11 @@ def main():
main_window.show()
loop.run_until_complete(app.exec())
await app_quit_event.wait()
def main():
qasync.run(coro_main())
if __name__ == '__main__':