driver: rm data validation

- Data validation is done on Kirdy Server side
- Thus, remove duplicated logic
- Merge send_cmd and send_raw_cmd
This commit is contained in:
linuswck 2025-01-20 15:34:27 +08:00
parent cd3347798c
commit 76a19cb65a

View File

@ -20,6 +20,9 @@ class State(StrEnum):
connected = "connected"
class CmdList:
class tec_set_i(StrEnum, settings=NoAlias):
_target = "tec_set_i"
tec_set_i = _dt.f32
class device(StrEnum, settings=NoAlias):
_target = "device_cmd"
SetIPSettings = _dt.ip_settings
@ -168,17 +171,13 @@ class FilterConfig:
def _filter_type(self):
return "Sinc3WithFineODR"
class InvalidDataType(Exception):
pass
class InvalidCmd(Exception):
pass
class Device:
def __init__(self, send_cmd_handler, send_raw_cmd_handler):
def __init__(self, send_cmd_handler):
self._cmd = CmdList.device
self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler
async def set_ip_settings(self, addr="192.168.1.128", port=1550, prefix_len=24, gateway="192.168.1.1"):
"""
@ -196,15 +195,14 @@ class Device:
if not(isinstance(port, int) and isinstance(prefix_len, int)):
raise InvalidDataType
return await self._send_raw_cmd(
return await self._send_cmd(
self._cmd._target,
self._cmd.SetIPSettings,
{
"device_cmd": "SetIPSettings",
"ip_settings": {
"addr": [int(x) for x in addr],
"port": port,
"prefix_len": prefix_len,
"gateway": [int(x) for x in gateway],
}
"addr": [int(x) for x in addr],
"port": port,
"prefix_len": prefix_len,
"gateway": [int(x) for x in gateway],
}
)
@ -454,10 +452,10 @@ class Laser:
return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
class Thermostat:
def __init__(self, send_cmd_handler, send_raw_cmd_handler):
def __init__(self, send_cmd_handler):
self._cmd = CmdList.thermostat
self.tec_set_i_cmd = CmdList.tec_set_i
self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler
async def set_power_on(self, on):
"""
@ -504,12 +502,7 @@ class Thermostat:
or your newly set value will be overwritten by PID Controller Output
- i_out: A
"""
if isinstance(i_out, float):
return await self._send_raw_cmd({"tec_set_i": i_out})
elif isinstance(i_out, int):
return await self._send_raw_cmd({"tec_set_i": float(i_out)})
else:
raise InvalidDataType
await self._send_cmd(self.tec_set_i_cmd._target, self.tec_set_i_cmd.tec_set_i, i_out)
async def set_constant_current_control_mode(self):
"""
@ -618,29 +611,33 @@ class Thermostat:
performs differently under different kinds of workload. Please verify the polling rate with
the timestamp.
"""
cmd = {}
cmd[self._cmd._target] = self._cmd.ConfigTempAdcFilter.name
if hasattr(filter_config, 'rate'):
cmd[self._cmd.ConfigTempAdcFilter] = {
"filter_type": filter_config._filter_type(),
filter_config._odr_type: filter_config.rate,
}
return await self._send_cmd(
self._cmd._target,
self._cmd.ConfigTempAdcFilter,
{
"filter_type": filter_config._filter_type(),
filter_config._odr_type: filter_config.rate,
}
)
else:
cmd[self._cmd.ConfigTempAdcFilter] = {
"filter_type": filter_config._filter_type(),
filter_config._odr_type: filter_config,
}
return await self._send_raw_cmd(cmd)
return await self._send_cmd(
self._cmd._target,
self._cmd.ConfigTempAdcFilter,
{
"filter_type": filter_config._filter_type(),
filter_config._odr_type: filter_config,
}
)
async def get_poll_interval(self):
return await self._send_cmd(self._cmd._target, self._cmd.GetPollInterval, msg_type="Interval")
class Kirdy:
def __init__(self):
self.device = Device(self._send_cmd, self._send_raw_cmd)
self.device = Device(self._send_cmd)
self.laser = Laser(self._send_cmd)
self.thermostat = Thermostat(self._send_cmd, self._send_raw_cmd)
self.thermostat = Thermostat(self._send_cmd)
self.hw_rev = None
self._task_queue, self._int_msg_queue, self._report_queue = None, None, None
@ -902,32 +899,19 @@ class Kirdy:
self._err_msg_sig.emit(msg['msg'])
return msg
async def _send_raw_cmd(self, cmd, msg_type="Acknowledge", sig=None):
if self.connected():
async with self._lock:
self._writer.write(bytes(json.dumps(cmd), "UTF-8"))
await self._writer.drain()
msg = await asyncio.wait_for(self._int_msg_queue.get(), self._timeout)
return self._response_handling(msg, msg_type, sig)
else:
raise ConnectionError
async def _send_cmd(self, target, cmd, data=None, msg_type="Acknowledge", sig=None):
cmd_dict = {}
cmd_dict[target] = cmd.name
if cmd == _dt.f32:
if isinstance(data, float):
if target == "tec_set_i":
cmd_dict[target] = float(data)
else:
cmd_dict[target] = cmd.name
if cmd != 'None':
if cmd == _dt.bool:
data = bool(data)
elif isinstance(data, int):
data = float(data)
cmd_dict[cmd] = data
elif isinstance(data, int):
cmd_dict[cmd] = float(data)
elif cmd == _dt.bool:
if isinstance(data, bool):
cmd_dict[cmd] = data
else:
raise InvalidDataType
elif cmd == "None":
pass
if msg_type == 'Report':
self._msg_queue_get_report = True