drive: use aenum for cmd

This commit is contained in:
linuswck 2024-06-21 11:01:20 +08:00
parent 80d94270a2
commit 6f87cba9a6
3 changed files with 208 additions and 208 deletions

View File

@ -73,7 +73,7 @@
src = "${self}/pykirdy";
nativeBuildInputs = [ pkgs.qt6.wrapQtAppsHook ];
propagatedBuildInputs = [ pkgs.qt6.qtbase ] ++ (with pkgs.python3Packages; [ pyqtgraph pyqt6 qasync pglive ]);
propagatedBuildInputs = [ pkgs.qt6.qtbase ] ++ (with pkgs.python3Packages; [ pyqtgraph pyqt6 qasync pglive aenum]);
dontWrapQtApps = true;
postFixup = ''
@ -94,7 +94,7 @@
buildInputs = with pkgs; [
rust openocd dfu-util glibc
] ++ (with python3Packages; [
numpy matplotlib pyqtgraph setuptools pyqt6 qasync pglive
numpy matplotlib pyqtgraph setuptools pyqt6 qasync pglive aenum
]);
shellHook=
''

View File

@ -2,19 +2,148 @@ import socket
import asyncio
import json
import logging
from aenum import StrEnum, NoAlias
# Data Type Enums
IP_SETTINGS = "ip_settings"
TEMP_ADC_FILTER = "temp_adc_filter"
DATA_F32 = "data_f32"
DATA_BOOL = "data_bool"
class _dt(StrEnum):
ip_settings = "ip_settings"
temp_adc_filter = "temp_adc_filter"
f32 = "data_f32"
bool = "data_bool"
none = "None"
TARGET_DEVICE = "device_cmd"
TARGET_LD = "laser_diode_cmd"
TARGET_THERMOSTAT = "thermostat_cmd"
class CmdList:
class device(StrEnum, settings=NoAlias):
_target = "device_cmd"
SetIPSettings = _dt.ip_settings
SetPdFinGain = _dt.f32
SetPdTransconductance = _dt.f32
SetActiveReportMode = _dt.bool
GetStatusReport = _dt.none
GetSettingsSummary = _dt.none
Dfu = _dt.none
SaveFlashSettings = _dt.none
LoadFlashSettings = _dt.none
HardReset = _dt.none
class CmdDoesNotExist(Exception):
pass
class ld(StrEnum, settings=NoAlias):
_target = "laser_diode_cmd"
SetDefaultPowerOn = _dt.bool
PowerUp = _dt.none
PowerDown = _dt.none
LdTermsShort = _dt.none
LdTermsOpen = _dt.none
SetI = _dt.f32
SetISoftLimit = _dt.f32
SetPdResponsitivity = _dt.f32
SetPdDarkCurrent = _dt.f32
SetLdPwrLimit = _dt.f32
ClearAlarm = _dt.none
class thermostat(StrEnum, settings=NoAlias):
_target = "thermostat_cmd"
SetDefaultPowerOn = _dt.bool,
PowerUp = _dt.f32,
PowerDown = _dt.f32,
SetTecMaxV = _dt.f32,
SetTecMaxIPos = _dt.f32,
SetTecMaxINeg = _dt.f32,
SetTecIOut = _dt.f32,
SetTemperatureSetpoint = _dt.f32,
SetPidEngage = _dt.none,
SetPidDisEngage = _dt.none,
SetPidKp = _dt.f32,
SetPidKi = _dt.f32,
SetPidKd = _dt.f32,
SetPidOutMin = _dt.f32,
SetPidOutMax = _dt.f32,
ConfigTempAdcFilter = _dt.temp_adc_filter,
SetTempMonUpperLimit = _dt.f32,
SetTempMonLowerLimit = _dt.f32,
ClearAlarm = _dt.none,
SetShT0 = _dt.f32,
SetShR0 = _dt.f32,
SetShBeta = _dt.f32,
class FilterConfig:
class Sinc5Sinc1With50hz60HzRejection(StrEnum):
f27sps = "F27SPS"
f21sps = "F21SPS"
f20sps = "F20SPS"
f16sps = "F16SPS"
def _odr_type(self):
return "sinc5sinc1postfilter"
def _filter_type(self):
return "Sinc5Sinc1With50hz60HzRejection"
class Sinc5Sinc1(StrEnum):
f31250_0sps = "F31250_0SPS"
f15625_0sps = "F15625_0SPS"
f10417_0sps = "F10417_0SPS"
f5208_0sps = "F5208_0SPS"
f2597_0sps = "F2597_0SPS"
f1007_0sps = "F1007_0SPS"
f503_8sps = "F503_8SPS"
f381_0sps = "F381_0SPS"
f200_3sps = "F200_3SPS"
f100_2sps = "F100_2SPS"
f59_52sps = "F59_52SPS"
f49_68sps = "F49_68SPS"
f20_01sps = "F20_01SPS"
f16_63sps = "F16_63SPS"
f10_0sps = "F10_0SPS"
f5_0sps = "F5_0SPS"
f2_5sps = "F2_5SPS"
f1_25sps = "F1_25SPS"
def _odr_type(self):
return "sinc5sinc1odr"
def _filter_type(self):
return "Sinc5Sinc1"
class Sinc3(StrEnum):
f31250_0sps = "F31250_0SPS"
f15625_0sps = "F15625_0SPS"
f10417_0sps = "F10417_0SPS"
f5208_0sps = "F5208_0SPS"
f2597_0sps = "F2597_0SPS"
f1007_0sps = "F1007_0SPS"
f503_8sps = "F503_8SPS"
f381_0sps = "F381_0SPS"
f200_3sps = "F200_3SPS"
f100_2sps = "F100_2SPS"
f59_52sps = "F59_52SPS"
f49_68sps = "F49_68SPS"
f20_01sps = "F20_01SPS"
f16_63sps = "F16_63SPS"
f10_0sps = "F10_0SPS"
f5_0sps = "F5_0SPS"
f2_5sps = "F2_5SPS"
f1_25sps = "F1_25SPS"
def _odr_type(self):
return "sinc3odr"
def _filter_type(self):
return "Sinc3"
############## Rewrite
class Sinc3WithFineODR():
def __init__(self, rate):
assert rate >= 1.907465 and rate <= 31250
self.rate = float(rate)
# def rate(self, rate):
# assert rate >= 1.907465 and rate <= 31250
# return float(rate)
def _odr_type(self):
return "sinc3fineodr"
def _filter_type(self):
return "Sinc3WithFineODR"
class InvalidDataType(Exception):
pass
@ -28,70 +157,9 @@ class NoAckRecv(Exception):
class StoppedConnecting(Exception):
pass
Filter_Config = {
"Sinc5Sinc1With50hz60HzRejection": [
"sinc5sinc1postfilter",
[
"F27SPS",
"F21SPS",
"F20SPS",
"F16SPS",
]
],
"Sinc5Sinc1": [
"sinc5sinc1odr",
[
"F31250_0SPS",
"F15625_0SPS",
"F10417_0SPS",
"F5208_0SPS" ,
"F2597_0SPS" ,
"F1007_0SPS" ,
"F503_8SPS" ,
"F381_0SPS" ,
"F200_3SPS" ,
"F100_2SPS" ,
"F59_52SPS" ,
"F49_68SPS" ,
"F20_01SPS" ,
"F16_63SPS" ,
"F10_0SPS" ,
"F5_0SPS" ,
"F2_5SPS" ,
"F1_25SPS" ,
]
],
"Sinc3": [
"sinc3odr",
[
"F31250_0SPS",
"F15625_0SPS",
"F10417_0SPS",
"F5208_0SPS" ,
"F2597_0SPS" ,
"F1007_0SPS" ,
"F503_8SPS" ,
"F381_0SPS" ,
"F200_3SPS" ,
"F100_2SPS" ,
"F59_52SPS" ,
"F49_68SPS" ,
"F20_01SPS" ,
"F16_63SPS" ,
"F10_0SPS" ,
"F5_0SPS" ,
"F2_5SPS" ,
"F1_25SPS" ,
]
],
"Sinc3WithFineODR": [
"sinc3fineodr",
DATA_F32
],
}
class Device:
def __init__(self, send_cmd_handler, send_raw_cmd_handler, read_response, cmd_lock):
self._cmd = CmdList.device
self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler
self._read_response = read_response
@ -130,23 +198,21 @@ class Device:
Set active report to be on. If it is on, Kirdy will send status report
to ALL client socket connections according to the temperature polling rate set.
"""
return await self._send_cmd(TARGET_DEVICE, "SetActiveReportMode", on)
return await self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on)
async def set_pd_mon_fin_gain(self, gain):
"""
Configure the photodiode monitor transconductance
Configure the photodiode monitor final analog front-end stage gain
- gain: unitless
"""
print("set_pd_mon_fin_gain")
return await self._send_cmd(TARGET_DEVICE, "SetPdFinGain", gain)
return await self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain)
async def set_pd_mon_transconductance(self, transconductance):
"""
Configure the photodiode monitor transconductance
- transconductance: 1/Ohm
"""
print("set_pd_mon_transconductance")
return await self._send_cmd(TARGET_DEVICE, "SetPdTransconductance", transconductance)
return await self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance)
async def get_status_report(self):
"""
@ -178,7 +244,7 @@ class Device:
}
}
"""
return await self._send_cmd(TARGET_DEVICE, "GetStatusReport", msg_type="Report")
return await self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report")
async def get_settings_summary(self):
"""
@ -252,26 +318,26 @@ class Device:
}
}
"""
return await self._send_cmd(TARGET_DEVICE, "GetSettingsSummary", msg_type="Settings")
return await self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings")
async def dfu(self):
"""
Issuing this cmd will HARD RESET the device and
put Kirdy into Dfu mode for flashing firmware.
"""
return await self._send_cmd(TARGET_DEVICE, "Dfu")
return await self._send_cmd(self._cmd._target, self._cmd.Dfu)
async def save_current_settings_to_flash(self):
"""
Save the current laser diode and thermostat configurations into flash.
"""
return await self._send_cmd(TARGET_DEVICE, "SaveFlashSettings")
return await self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings)
async def load_current_settings_from_flash(self):
async def restore_settings_from_flash(self):
"""
Restore the laser diode and thermostat settings from flash
"""
return await self._send_cmd(TARGET_DEVICE, "LoadFlashSettings")
return await self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings)
async def hard_reset(self):
"""
@ -279,7 +345,7 @@ class Device:
Laser diode power and Tec power will be turned off.
Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets indicating. The device is being reset.
"""
response = await self._send_cmd(TARGET_DEVICE, "HardReset")
response = await self._send_cmd(self._cmd._target, self._cmd.HardReset)
if response is not None:
if response["msg_type"] == "Acknowledge":
# Delay for a second to wait for the hard reset message being sent out on Kirdy
@ -288,6 +354,7 @@ class Device:
class Laser:
def __init__(self, send_cmd_handler, cmd_lock):
self._cmd = CmdList.ld
self._send_cmd = send_cmd_handler
self._cmd_lock = cmd_lock
@ -297,16 +364,16 @@ class Laser:
- on (True/False)
"""
if on:
return await self._send_cmd(TARGET_LD, "PowerUp", None)
return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None)
else:
return await self._send_cmd(TARGET_LD, "PowerDown", None)
return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None)
async def set_default_pwr_on(self, on):
"""
Set whether laser diode is powered up at Startup
- on (True/False)
"""
return await self._send_cmd(TARGET_LD, "SetDefaultPowerOn", on)
return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on)
async def set_ld_terms_short(self, short):
"""
@ -314,37 +381,37 @@ class Laser:
- on (True/False)
"""
if short:
return await self._send_cmd(TARGET_LD, "LdTermsShort", None)
return await self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None)
else:
return await self._send_cmd(TARGET_LD, "LdTermsOpen", None)
return await self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None)
async def set_i(self, i):
"""
Set laser diode output current: Max(0, Min(i_set, i_soft_limit))
- i: A
"""
return await self._send_cmd(TARGET_LD, "SetI", i)
return await self._send_cmd(self._cmd._target, self._cmd.SetI, i)
async def set_i_soft_limit(self, i_limit):
"""
Set laser diode software output current limit
- i_limit: A
"""
return await self._send_cmd(TARGET_LD, "SetISoftLimit", i_limit)
return await self._send_cmd(self._cmd._target, self._cmd.SetISoftLimit, i_limit)
async def set_pd_mon_responsitivity(self, responsitivity):
"""
Configure the photodiode monitor responsitivity parameter
- responsitivity: A/W
"""
return await self._send_cmd(TARGET_LD, "SetPdResponsitivity", responsitivity)
return await self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity)
async def set_pd_mon_dark_current(self, dark_current):
"""
Configure the photodiode monitor responsitivity parameter
- dark_current: A/W
"""
return await self._send_cmd(TARGET_LD, "SetPdDarkCurrent", dark_current)
return await self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current)
async def set_ld_pwr_limit(self, pwr_limit):
"""
@ -353,16 +420,17 @@ class Laser:
overpower protection is triggered.
- pwr_limit: W
"""
return await self._send_cmd(TARGET_LD, "SetLdPwrLimit", pwr_limit)
return await self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit)
async def clear_alarm(self):
"""
Clear the power excursion monitor alarm
"""
return await self._send_cmd(TARGET_LD, "ClearAlarm")
return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
class Thermostat:
def __init__(self, send_cmd_handler, send_raw_cmd_handler, cmd_lock):
self._cmd = CmdList.thermostat
self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler
self._cmd_lock = cmd_lock
@ -372,36 +440,36 @@ class Thermostat:
- Powering up the thermostat resets the pwr_excursion status
"""
if on:
return await self._send_cmd(TARGET_THERMOSTAT, "PowerUp", None)
return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None)
else:
return await self._send_cmd(TARGET_THERMOSTAT, "PowerDown", None)
return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None)
async def set_default_pwr_on(self, on):
"""
Set whether thermostat is powered up at Startup
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetDefaultPowerOn", on)
return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on)
async def set_tec_max_v(self, max_v):
"""
Set Tec Maximum Voltage Across the TEC Terminals
- max_v: V
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTecMaxV", max_v)
return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v)
async def set_tec_max_cooling_i(self, max_i_pos):
"""
Set Tec maximum cooling current (Settable Range: 0.0 - 1.0)
- max_i_pos: A
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTecMaxIPos", max_i_pos)
return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos)
async def set_tec_max_heating_i(self, max_i_neg):
"""
Set Tec maximum heating current (Settable Range: 0.0 - 1.0)
- max_i_neg: A
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTecMaxINeg", max_i_neg)
return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg)
async def set_tec_i_out(self, i_out):
"""
@ -421,7 +489,7 @@ class Thermostat:
"""
Disable PID Controller and output current can be controlled with set_tec_i_out() cmd.
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidDisEngage", None)
return await self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None)
async def set_temperature_setpoint(self, temperature):
"""
@ -429,49 +497,49 @@ class Thermostat:
- temperature: Degree Celsius
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTemperatureSetpoint", temperature)
return await self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature)
async def set_pid_control_mode(self):
"""
Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate.
Please refer to config_temp_adc_filter for the possible polling rate options
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidEngage", None)
return await self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None)
async def set_pid_kp(self, kp):
"""
Set Kp parameter for PID Controller
kp: (unitless)
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidKp", kp)
return await self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp)
async def set_pid_ki(self, ki):
"""
Set Ki parameter for PID Controller
ki: (unitless)
"""
await self._send_cmd(TARGET_THERMOSTAT, "SetPidKi", ki)
await self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki)
async def set_pid_kd(self, kd):
"""
Set Kd parameter for PID Controller
kd: (unitless)
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidKd", kd)
return await self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd)
async def set_pid_output_max(self, out_max):
"""
Set max output limit at the PID Output
- out_max: A
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidOutMax", out_max)
return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max)
async def set_pid_output_min(self, out_min):
"""
Set min output limit at the PID Output
- out_min: A
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidOutMin", out_min)
return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min)
async def set_temp_mon_upper_limit(self, upper_limit):
"""
@ -479,7 +547,7 @@ class Thermostat:
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
- upper_limit: Degree Celsius
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTempMonUpperLimit", upper_limit)
return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit)
async def set_temp_mon_lower_limit(self, lower_limit):
"""
@ -487,36 +555,36 @@ class Thermostat:
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
- lower_limit: Degree Celsius
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTempMonLowerLimit", lower_limit)
return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit)
async def clear_alarm(self):
"""
Clear the temperature monitor alarm
"""
return await self._send_cmd(TARGET_THERMOSTAT, "ClearAlarm")
return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
async def set_sh_t0(self, t0):
"""
Set t0 Steinhart-Hart parameter for the laser diode NTC
- t0: Degree Celsius
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetShT0", t0)
return await self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0)
async def set_sh_r0(self, r0):
"""
Set r0 Steinhart-Hart parameter for the laser diode NTC
- r0: Ohm
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetShR0", r0)
return await self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0)
async def set_sh_beta(self, beta):
"""
Set beta Steinhart-Hart parameter for the laser diode NTC
- beta: (unitless)
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetShBeta", beta)
return await self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta)
async def config_temp_adc_filter(self, filter_type, sampling_rate):
async def config_temp_adc_filter(self, filter_config):
"""
Configure the temperature adc filter type and sampling rate.
Please refer to AD7172 datasheet for the usage of various types of filter.
@ -524,20 +592,17 @@ class Thermostat:
performs differently under different kinds of workload. Please verify the polling rate with
the timestamp.
"""
if not(filter_type in Filter_Config.keys()):
raise InvalidDataType
if Filter_Config[filter_type][1] != DATA_F32:
if not(sampling_rate in Filter_Config[filter_type][1]):
raise InvalidDataType
else:
if not(isinstance(sampling_rate, float)):
raise InvalidDataType
cmd = {}
cmd["thermostat_cmd"] = "ConfigTempAdcFilter"
cmd["temp_adc_filter"] = {
"filter_type": filter_type,
Filter_Config[filter_type][0]: sampling_rate,
cmd[self._cmd._target] = self._cmd.ConfigTempAdcFilter.name
if hasattr(filter_config, 'rate'):
cmd[self._cmd.ConfigTempAdcFilter] = {
"filter_type": filter_config._filter_type(),
filter_config._odr_type(): filter_config.rate,
}
else:
cmd[self._cmd.ConfigTempAdcFilter] = {
"filter_type": filter_config._filter_type(),
filter_config._odr_type(): filter_config,
}
return await self._send_raw_cmd(cmd)
@ -554,65 +619,6 @@ class Kirdy:
self.laser = Laser(self._send_cmd_handler, self._cmd_lock)
self.thermostat = Thermostat(self._send_cmd_handler, self._send_raw_cmd_handler, self._cmd_lock)
self._cmd_list = {
TARGET_DEVICE: {
"SetIPSettings": IP_SETTINGS,
"SetPdFinGain": DATA_F32,
"SetPdTransconductance": DATA_F32,
"SetActiveReportMode": DATA_BOOL,
"GetStatusReport": None,
"GetSettingsSummary": None,
"Dfu": None,
"SaveFlashSettings": None,
"LoadFlashSettings": None,
"HardReset": None,
},
TARGET_LD: {
# LD Drive Related
"SetDefaultPowerOn": DATA_BOOL,
"PowerUp": None,
"PowerDown": None,
"LdTermsShort": None,
"LdTermsOpen": None,
"SetI": DATA_F32,
"SetISoftLimit": DATA_F32,
# PD Mon Related
"SetPdResponsitivity": DATA_F32,
"SetPdDarkCurrent": DATA_F32,
"SetLdPwrLimit": DATA_F32,
"ClearAlarm": None,
},
TARGET_THERMOSTAT: {
"SetDefaultPowerOn": DATA_BOOL,
"PowerUp": DATA_F32,
"PowerDown": DATA_F32,
# TEC Controller Settings
"SetTecMaxV": DATA_F32,
"SetTecMaxIPos": DATA_F32,
"SetTecMaxINeg": DATA_F32,
"SetTecIOut": DATA_F32,
"SetTemperatureSetpoint": DATA_F32,
# PID Controller Settings
"SetPidEngage": None,
"SetPidDisEngage": None,
"SetPidKp": DATA_F32,
"SetPidKi": DATA_F32,
"SetPidKd": DATA_F32,
"SetPidOutMin": DATA_F32,
"SetPidOutMax": DATA_F32,
# Temperature Adc Internal Filters Settings
"ConfigTempAdcFilter": TEMP_ADC_FILTER,
# Temperature Monitor Settings
"SetTempMonUpperLimit": DATA_F32,
"SetTempMonLowerLimit": DATA_F32,
"ClearAlarm": None,
# Thermistor Parameter Settings
"SetShT0": DATA_F32,
"SetShR0": DATA_F32,
"SetShBeta": DATA_F32,
}
}
async def start_session(self, host='192.168.1.128', port=1337, timeout=None):
self._connecting_task = asyncio.create_task(
asyncio.wait_for(asyncio.open_connection(host, port), timeout)
@ -699,7 +705,7 @@ class Kirdy:
if response["msg_type"] == msg_type:
return response
if response["msg_type"] == "InvalidCmd":
return InvalidCmd
raise InvalidCmd
except asyncio.exceptions.CancelledError:
return None
except Exception as e:
@ -717,26 +723,20 @@ class Kirdy:
async def _send_cmd(self, target, cmd, data, msg_type):
cmd_dict = {}
cmd_dict[target] = cmd.name
if not(target in self._cmd_list.keys()) or not(cmd in self._cmd_list[target].keys()):
raise CmdDoesNotExist
cmd_dict[target] = cmd
if self._cmd_list[target][cmd] == DATA_F32:
if cmd == _dt.f32:
if isinstance(data, float):
cmd_dict[DATA_F32] = data
cmd_dict[cmd] = data
elif isinstance(data, int):
cmd_dict[DATA_F32] = float(data)
elif self._cmd_list[target][cmd] == DATA_BOOL:
cmd_dict[cmd] = float(data)
elif cmd == _dt.bool:
if isinstance(data, bool):
cmd_dict[DATA_BOOL] = data
cmd_dict[cmd] = data
else:
raise InvalidDataType
elif self._cmd_list[target][cmd] == None:
elif cmd == "None":
pass
else:
# Undefined Data Type
raise CmdDoesNotExist
retry = 0
while retry < 10:

View File

@ -523,7 +523,7 @@ class MainWindow(QtWidgets.QMainWindow):
@asyncSlot(bool)
async def load_settings(_):
await self.kirdy.device.load_current_settings_from_flash()
await self.kirdy.device.restore_settings_from_flash()
loaded = QtWidgets.QMessageBox(self)
loaded.setWindowTitle("Config loaded")
loaded.setText(f"Laser Diode and Thermostat configs have been loaded from flash.")