759 lines
30 KiB
Python
759 lines
30 KiB
Python
import socket
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from aenum import StrEnum, NoAlias
|
|
|
|
class _dt(StrEnum):
|
|
ip_settings = "ip_settings"
|
|
temp_adc_filter = "temp_adc_filter"
|
|
f32 = "data_f32"
|
|
bool = "data_bool"
|
|
none = "None"
|
|
|
|
class CmdList:
|
|
class device(StrEnum, settings=NoAlias):
|
|
_target = "device_cmd"
|
|
SetIPSettings = _dt.ip_settings
|
|
SetPdFinGain = _dt.f32
|
|
SetPdTransconductance = _dt.f32
|
|
SetActiveReportMode = _dt.bool
|
|
GetStatusReport = _dt.none
|
|
GetSettingsSummary = _dt.none
|
|
Dfu = _dt.none
|
|
SaveFlashSettings = _dt.none
|
|
LoadFlashSettings = _dt.none
|
|
HardReset = _dt.none
|
|
|
|
class ld(StrEnum, settings=NoAlias):
|
|
_target = "laser_diode_cmd"
|
|
SetDefaultPowerOn = _dt.bool
|
|
PowerUp = _dt.none
|
|
PowerDown = _dt.none
|
|
LdTermsShort = _dt.none
|
|
LdTermsOpen = _dt.none
|
|
SetI = _dt.f32
|
|
SetPdResponsitivity = _dt.f32
|
|
SetPdDarkCurrent = _dt.f32
|
|
SetLdPwrLimit = _dt.f32
|
|
ClearAlarm = _dt.none
|
|
|
|
class thermostat(StrEnum, settings=NoAlias):
|
|
_target = "thermostat_cmd"
|
|
SetDefaultPowerOn = _dt.bool,
|
|
PowerUp = _dt.f32,
|
|
PowerDown = _dt.f32,
|
|
SetTecMaxV = _dt.f32,
|
|
SetTecMaxIPos = _dt.f32,
|
|
SetTecMaxINeg = _dt.f32,
|
|
SetTecIOut = _dt.f32,
|
|
SetTemperatureSetpoint = _dt.f32,
|
|
SetPidEngage = _dt.none,
|
|
SetPidDisEngage = _dt.none,
|
|
SetPidKp = _dt.f32,
|
|
SetPidKi = _dt.f32,
|
|
SetPidKd = _dt.f32,
|
|
SetPidOutMin = _dt.f32,
|
|
SetPidOutMax = _dt.f32,
|
|
ConfigTempAdcFilter = _dt.temp_adc_filter,
|
|
SetTempMonUpperLimit = _dt.f32,
|
|
SetTempMonLowerLimit = _dt.f32,
|
|
ClearAlarm = _dt.none,
|
|
SetShT0 = _dt.f32,
|
|
SetShR0 = _dt.f32,
|
|
SetShBeta = _dt.f32,
|
|
|
|
class FilterConfig:
|
|
class Sinc5Sinc1With50hz60HzRejection(StrEnum):
|
|
f27sps = "F27SPS"
|
|
f21sps = "F21SPS"
|
|
f20sps = "F20SPS"
|
|
f16sps = "F16SPS"
|
|
|
|
def _odr_type(self):
|
|
return "sinc5sinc1postfilter"
|
|
|
|
def _filter_type(self):
|
|
return "Sinc5Sinc1With50hz60HzRejection"
|
|
|
|
class Sinc5Sinc1(StrEnum):
|
|
f31250_0sps = "F31250_0SPS"
|
|
f15625_0sps = "F15625_0SPS"
|
|
f10417_0sps = "F10417_0SPS"
|
|
f5208_0sps = "F5208_0SPS"
|
|
f2597_0sps = "F2597_0SPS"
|
|
f1007_0sps = "F1007_0SPS"
|
|
f503_8sps = "F503_8SPS"
|
|
f381_0sps = "F381_0SPS"
|
|
f200_3sps = "F200_3SPS"
|
|
f100_2sps = "F100_2SPS"
|
|
f59_52sps = "F59_52SPS"
|
|
f49_68sps = "F49_68SPS"
|
|
f20_01sps = "F20_01SPS"
|
|
f16_63sps = "F16_63SPS"
|
|
f10_0sps = "F10_0SPS"
|
|
f5_0sps = "F5_0SPS"
|
|
f2_5sps = "F2_5SPS"
|
|
f1_25sps = "F1_25SPS"
|
|
|
|
def _odr_type(self):
|
|
return "sinc5sinc1odr"
|
|
|
|
def _filter_type(self):
|
|
return "Sinc5Sinc1"
|
|
|
|
class Sinc3(StrEnum):
|
|
f31250_0sps = "F31250_0SPS"
|
|
f15625_0sps = "F15625_0SPS"
|
|
f10417_0sps = "F10417_0SPS"
|
|
f5208_0sps = "F5208_0SPS"
|
|
f2597_0sps = "F2597_0SPS"
|
|
f1007_0sps = "F1007_0SPS"
|
|
f503_8sps = "F503_8SPS"
|
|
f381_0sps = "F381_0SPS"
|
|
f200_3sps = "F200_3SPS"
|
|
f100_2sps = "F100_2SPS"
|
|
f59_52sps = "F59_52SPS"
|
|
f49_68sps = "F49_68SPS"
|
|
f20_01sps = "F20_01SPS"
|
|
f16_63sps = "F16_63SPS"
|
|
f10_0sps = "F10_0SPS"
|
|
f5_0sps = "F5_0SPS"
|
|
f2_5sps = "F2_5SPS"
|
|
f1_25sps = "F1_25SPS"
|
|
|
|
def _odr_type(self):
|
|
return "sinc3odr"
|
|
|
|
def _filter_type(self):
|
|
return "Sinc3"
|
|
|
|
class Sinc3WithFineODR():
|
|
def __init__(self, rate):
|
|
assert rate >= 1.907465 and rate <= 31250
|
|
self.rate = float(rate)
|
|
|
|
def _odr_type(self):
|
|
return "sinc3fineodr"
|
|
|
|
def _filter_type(self):
|
|
return "Sinc3WithFineODR"
|
|
|
|
class InvalidDataType(Exception):
|
|
pass
|
|
|
|
class InvalidCmd(Exception):
|
|
pass
|
|
|
|
class NoAckRecv(Exception):
|
|
pass
|
|
|
|
class StoppedConnecting(Exception):
|
|
pass
|
|
|
|
class Device:
|
|
def __init__(self, send_cmd_handler, send_raw_cmd_handler, read_response, cmd_lock):
|
|
self._cmd = CmdList.device
|
|
self._send_cmd = send_cmd_handler
|
|
self._send_raw_cmd = send_raw_cmd_handler
|
|
self._read_response = read_response
|
|
self._cmd_lock = cmd_lock
|
|
|
|
async def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"):
|
|
"""
|
|
After calling this fn, the ip settings are immediately saved into flash and will be effective on next reboot.
|
|
"""
|
|
try:
|
|
socket.inet_aton(addr)
|
|
socket.inet_aton(gateway)
|
|
except OSError:
|
|
raise InvalidDataType
|
|
|
|
addr = addr.split(".")
|
|
gateway = gateway.split(".")
|
|
|
|
if not(isinstance(port, int) and isinstance(prefix_len, int)):
|
|
raise InvalidDataType
|
|
|
|
return await self._send_raw_cmd(
|
|
{
|
|
"device_cmd": "SetIPSettings",
|
|
"ip_settings": {
|
|
"addr": [int(x) for x in addr],
|
|
"port": port,
|
|
"prefix_len": prefix_len,
|
|
"gateway": [int(x) for x in gateway],
|
|
}
|
|
}
|
|
)
|
|
|
|
async def set_active_report_mode(self, on):
|
|
"""
|
|
Set active report to be on. If it is on, Kirdy will send status report
|
|
to ALL client socket connections according to the temperature polling rate set.
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on)
|
|
|
|
async def set_pd_mon_fin_gain(self, gain):
|
|
"""
|
|
Configure the photodiode monitor final analog front-end stage gain
|
|
- gain: unitless
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain)
|
|
|
|
async def set_pd_mon_transconductance(self, transconductance):
|
|
"""
|
|
Configure the photodiode monitor transconductance
|
|
- transconductance: 1/Ohm
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance)
|
|
|
|
async def get_status_report(self):
|
|
"""
|
|
Get status of all peripherals in a json object
|
|
|
|
Example of yielded data::
|
|
{
|
|
'ts': 227657, # Relative Timestamp (ms)
|
|
'msg_type': 'Report' # Indicate it is a 'Report' json object
|
|
'laser': {
|
|
'pwr_on': False, # Laser Power is On (True/False)
|
|
'pwr_excursion': False, # Was Laser experienced an Overpowered Condition? (True/False)
|
|
'ld_i_set': 0.0, # Laser Diode Output Current (A)
|
|
'pd_i': 2.0000002e-06, # Internal Photodiode Monitor current (A)
|
|
'pd_pwr': None, # Power Readings from Internal Photodiode (W). Return None if pd_mon parameter(s) are not defined.
|
|
'term_50ohm': 'Is50Ohm' # Is the Low Frequency Modulation Input's Impedance 50 Ohm? (On/Off)
|
|
},
|
|
'thermostat': {
|
|
'pwr_on': False, # Tec Power is On (True/False)
|
|
'pid_engaged': False, # Is Pid_Engaged. If False, it is in Constant Current Mode (True/False)
|
|
'temp_mon_status': { # Temperature Monitor:
|
|
'status': 'Off', # (To be revised)
|
|
'over_temp_alarm': False # Was Laser Diode experienced an Overtemperature condition (True/False)
|
|
},
|
|
'temperature': 25.03344, # Temperature Readings (Degree Celsius)
|
|
'i_set': 0.0, # Tec Current Set by User/PID Controller(A)
|
|
'tec_i': 0.0024998188, # Tec Current Readings (A)
|
|
'tec_v': -0.00399971 # Tec Voltage Readings (V)
|
|
}
|
|
}
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report")
|
|
|
|
async def get_settings_summary(self):
|
|
"""
|
|
Get the current settings of laser and thermostat in a json object
|
|
|
|
{
|
|
'msg_type': 'Settings', # Indicate it is a 'Settings' json object
|
|
'laser': {
|
|
'default_pwr_on': False, # Power On Laser Diode at Startup
|
|
'ld_drive_current': { # Laser Diode Output Current(A)
|
|
'value': 0.0, # Value Set
|
|
'max': 0.3 # Max Value Settable
|
|
,
|
|
'pd_mon_params': { # Laser Diode Software Current Limit(A)
|
|
'responsitivity': None, # Value Set
|
|
'i_dark': 0.0 # Max Value Settable
|
|
,
|
|
'ld_pwr_limit': 0.0 # Laser Diode Power Limit(W)
|
|
'ld_terms_short: False # Is Laser Diode Terminals short? (True/False)
|
|
},
|
|
'thermostat': {
|
|
'default_pwr_on': True, # Power on Thermostat at Startup
|
|
'pid_engaged': True, # True: PID Control Mode | False Constant Current Mode
|
|
'temperature_setpoint': 25.0, # Temperature Setpoint (Degree Celsius)
|
|
'tec_settings': {
|
|
'i_set': { # Current TEC Current Set by PID Controller/User
|
|
'value': 0.04330516, # Value Set
|
|
'max': 1.0 # Max Value Settable
|
|
},
|
|
'max_v': { # Max Voltage Across Tec Terminals
|
|
'value': 4.990857, # Value Set
|
|
'max': 5.0 # Max Value Settable
|
|
},
|
|
'max_i_pos': { # Max Cooling Current Across Tec Terminals
|
|
'value': 0.99628574, # Value Set
|
|
'max': 1.0 # Max Value Settable
|
|
},
|
|
'max_i_neg': { # Max Heating Current Across Tec Terminals
|
|
'value': 0.99628574, # Value Set
|
|
'max': 1.0 # Max Value Settable
|
|
}
|
|
},
|
|
'pid_params': { # PID Controller Parameters
|
|
'kp': 0.15668282, # Proportional Gain
|
|
'ki': 0.0021359625, # Integral Gain
|
|
'kd': 0.8292545, # Derivative Gain
|
|
'output_min': -1.0, # Minimum Current Output (A)
|
|
'output_max': 1.0 # Maximum Current Output (A)
|
|
},
|
|
'temp_adc_settings': { # Temperature ADC Settings (Please read AD7172-2 Documentation)
|
|
'filter_type': 'Sinc5Sinc1With50hz60HzRejection', # Filter Types
|
|
'sinc5sinc1odr': None, # (Unused)
|
|
'sinc3odr': None, # (Unused)
|
|
'sinc5sinc1postfilter': None, # (Unused)
|
|
'sinc3fineodr': None, # (Unused)
|
|
'rate': 16.67 # ADC Sampling Rate (Hz)
|
|
},
|
|
'temp_mon_settings': { # Temperature Monitor Settings
|
|
'upper_limit': 40.0, # Temperature Upper Limit (Degree Celsius)
|
|
'lower_limit': 10.0 # Temperature Lower Limit (Degree Celsius)
|
|
},
|
|
'thermistor_params': { # Thermistor Steinhart-Hart equation parameters
|
|
't0': 25.0, # t0: Degree Celsius
|
|
'r0': 10000.0, # r0: Ohm
|
|
'b': 3900.0 # b: (unitless)
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings")
|
|
|
|
async def dfu(self):
|
|
"""
|
|
Issuing this cmd will HARD RESET the device and
|
|
put Kirdy into Dfu mode for flashing firmware.
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.Dfu)
|
|
|
|
async def save_current_settings_to_flash(self):
|
|
"""
|
|
Save the current laser diode and thermostat configurations into flash.
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings)
|
|
|
|
async def restore_settings_from_flash(self):
|
|
"""
|
|
Restore the laser diode and thermostat settings from flash
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings)
|
|
|
|
async def hard_reset(self):
|
|
"""
|
|
Hard Reset Kirdy. The socket connection will be closed by Kirdy.
|
|
Laser diode power and Tec power will be turned off.
|
|
Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets indicating. The device is being reset.
|
|
"""
|
|
response = await self._send_cmd(self._cmd._target, self._cmd.HardReset)
|
|
if response is not None:
|
|
if response["msg_type"] == "Acknowledge":
|
|
# Delay for a second to wait for the hard reset message being sent out on Kirdy
|
|
await asyncio.sleep(1.0)
|
|
return response
|
|
|
|
class Laser:
|
|
def __init__(self, send_cmd_handler, cmd_lock):
|
|
self._cmd = CmdList.ld
|
|
self._send_cmd = send_cmd_handler
|
|
self._cmd_lock = cmd_lock
|
|
|
|
async def set_power_on(self, on):
|
|
"""
|
|
Power Up or Power Down laser diode. Powering up the Laser Diode resets the pwr_excursion status
|
|
- on (True/False)
|
|
"""
|
|
if on:
|
|
return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None)
|
|
else:
|
|
return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None)
|
|
|
|
async def set_default_pwr_on(self, on):
|
|
"""
|
|
Set whether laser diode is powered up at Startup
|
|
- on (True/False)
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on)
|
|
|
|
async def set_ld_terms_short(self, short):
|
|
"""
|
|
Open/Short laser diode terminals.
|
|
- on (True/False)
|
|
"""
|
|
if short:
|
|
return await self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None)
|
|
else:
|
|
return await self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None)
|
|
|
|
async def set_i(self, i):
|
|
"""
|
|
Set laser diode output current: Max(0, Min(i_set, i_soft_limit))
|
|
- i: A
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetI, i)
|
|
|
|
async def set_pd_mon_responsitivity(self, responsitivity):
|
|
"""
|
|
Configure the photodiode monitor responsitivity parameter
|
|
- responsitivity: A/W
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity)
|
|
|
|
async def set_pd_mon_dark_current(self, dark_current):
|
|
"""
|
|
Configure the photodiode monitor responsitivity parameter
|
|
- dark_current: A/W
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current)
|
|
|
|
async def set_ld_pwr_limit(self, pwr_limit):
|
|
"""
|
|
Set power limit for the power excursion monitor
|
|
If the calculated power with the params of pd_mon > pwr_limit,
|
|
overpower protection is triggered.
|
|
- pwr_limit: W
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit)
|
|
|
|
async def clear_alarm(self):
|
|
"""
|
|
Clear the power excursion monitor alarm
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
|
|
|
|
class Thermostat:
|
|
def __init__(self, send_cmd_handler, send_raw_cmd_handler, cmd_lock):
|
|
self._cmd = CmdList.thermostat
|
|
self._send_cmd = send_cmd_handler
|
|
self._send_raw_cmd = send_raw_cmd_handler
|
|
self._cmd_lock = cmd_lock
|
|
async def set_power_on(self, on):
|
|
"""
|
|
Power up or power down thermostat
|
|
- Powering up the thermostat resets the pwr_excursion status
|
|
"""
|
|
if on:
|
|
return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None)
|
|
else:
|
|
return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None)
|
|
|
|
async def set_default_pwr_on(self, on):
|
|
"""
|
|
Set whether thermostat is powered up at Startup
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on)
|
|
|
|
async def set_tec_max_v(self, max_v):
|
|
"""
|
|
Set Tec Maximum Voltage Across the TEC Terminals
|
|
- max_v: V
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v)
|
|
|
|
async def set_tec_max_cooling_i(self, max_i_pos):
|
|
"""
|
|
Set Tec maximum cooling current (Settable Range: 0.0 - 1.0)
|
|
- max_i_pos: A
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos)
|
|
|
|
async def set_tec_max_heating_i(self, max_i_neg):
|
|
"""
|
|
Set Tec maximum heating current (Settable Range: 0.0 - 1.0)
|
|
- max_i_neg: A
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg)
|
|
|
|
async def set_tec_i_out(self, i_out):
|
|
"""
|
|
Set Tec Output Current
|
|
This cmd is only effective in constant current control mode
|
|
or your newly set value will be overwritten by PID Controller Output
|
|
- i_out: A
|
|
"""
|
|
if isinstance(i_out, float):
|
|
return await self._send_raw_cmd({"tec_set_i": i_out})
|
|
elif isinstance(i_out, int):
|
|
return await self._send_raw_cmd({"tec_set_i": float(i_out)})
|
|
else:
|
|
raise InvalidDataType
|
|
|
|
async def set_constant_current_control_mode(self):
|
|
"""
|
|
Disable PID Controller and output current can be controlled with set_tec_i_out() cmd.
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None)
|
|
|
|
async def set_temperature_setpoint(self, temperature):
|
|
"""
|
|
Set Temperature Setpoint for PID Controller. This parameter is not active in constant current control mode
|
|
- temperature: Degree Celsius
|
|
"""
|
|
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature)
|
|
|
|
async def set_pid_control_mode(self):
|
|
"""
|
|
Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate.
|
|
Please refer to config_temp_adc_filter for the possible polling rate options
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None)
|
|
|
|
async def set_pid_kp(self, kp):
|
|
"""
|
|
Set Kp parameter for PID Controller
|
|
kp: (unitless)
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp)
|
|
|
|
async def set_pid_ki(self, ki):
|
|
"""
|
|
Set Ki parameter for PID Controller
|
|
ki: (unitless)
|
|
"""
|
|
await self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki)
|
|
|
|
async def set_pid_kd(self, kd):
|
|
"""
|
|
Set Kd parameter for PID Controller
|
|
kd: (unitless)
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd)
|
|
|
|
async def set_pid_output_max(self, out_max):
|
|
"""
|
|
Set max output limit at the PID Output
|
|
- out_max: A
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max)
|
|
|
|
async def set_pid_output_min(self, out_min):
|
|
"""
|
|
Set min output limit at the PID Output
|
|
- out_min: A
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min)
|
|
|
|
async def set_temp_mon_upper_limit(self, upper_limit):
|
|
"""
|
|
Set Temperature Monitor Upper Limit Threshold. Exceeding the limit for too long
|
|
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
|
|
- upper_limit: Degree Celsius
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit)
|
|
|
|
async def set_temp_mon_lower_limit(self, lower_limit):
|
|
"""
|
|
Set Temperature Monitor Lower Limit Threshold. Exceeding the limit for too long
|
|
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
|
|
- lower_limit: Degree Celsius
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit)
|
|
|
|
async def clear_alarm(self):
|
|
"""
|
|
Clear the temperature monitor alarm
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
|
|
|
|
async def set_sh_t0(self, t0):
|
|
"""
|
|
Set t0 Steinhart-Hart parameter for the laser diode NTC
|
|
- t0: Degree Celsius
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0)
|
|
|
|
async def set_sh_r0(self, r0):
|
|
"""
|
|
Set r0 Steinhart-Hart parameter for the laser diode NTC
|
|
- r0: Ohm
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0)
|
|
|
|
async def set_sh_beta(self, beta):
|
|
"""
|
|
Set beta Steinhart-Hart parameter for the laser diode NTC
|
|
- beta: (unitless)
|
|
"""
|
|
return await self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta)
|
|
|
|
async def config_temp_adc_filter(self, filter_config):
|
|
"""
|
|
Configure the temperature adc filter type and sampling rate.
|
|
Please refer to AD7172 datasheet for the usage of various types of filter.
|
|
The actual temperature polling rate is bottlenecked by the processing speed of the MCU and
|
|
performs differently under different kinds of workload. Please verify the polling rate with
|
|
the timestamp.
|
|
"""
|
|
cmd = {}
|
|
cmd[self._cmd._target] = self._cmd.ConfigTempAdcFilter.name
|
|
if hasattr(filter_config, 'rate'):
|
|
cmd[self._cmd.ConfigTempAdcFilter] = {
|
|
"filter_type": filter_config._filter_type(),
|
|
filter_config._odr_type(): filter_config.rate,
|
|
}
|
|
else:
|
|
cmd[self._cmd.ConfigTempAdcFilter] = {
|
|
"filter_type": filter_config._filter_type(),
|
|
filter_config._odr_type(): filter_config,
|
|
}
|
|
|
|
return await self._send_raw_cmd(cmd)
|
|
|
|
class Kirdy:
|
|
def __init__(self):
|
|
self._reader = None
|
|
self._writer = None
|
|
self._connecting_task = None
|
|
self._cmd_lock = asyncio.Lock()
|
|
self._report_mode_on = False
|
|
self.timeout = None
|
|
self.device = Device(self._send_cmd_handler, self._send_raw_cmd_handler, self._read_response, self._cmd_lock)
|
|
self.laser = Laser(self._send_cmd_handler, self._cmd_lock)
|
|
self.thermostat = Thermostat(self._send_cmd_handler, self._send_raw_cmd_handler, self._cmd_lock)
|
|
|
|
async def start_session(self, host='192.168.1.128', port=1337, timeout=None):
|
|
self._connecting_task = asyncio.create_task(
|
|
asyncio.wait_for(asyncio.open_connection(host, port), timeout)
|
|
)
|
|
self.timeout = timeout
|
|
try:
|
|
self._reader, self._writer = await self._connecting_task
|
|
writer_sock = self._writer.get_extra_info("socket")
|
|
writer_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
|
|
|
except asyncio.CancelledError:
|
|
raise StoppedConnecting
|
|
finally:
|
|
self._connecting_task = None
|
|
|
|
def connecting(self):
|
|
"""Returns True if client is connecting"""
|
|
return self._connecting_task is not None
|
|
|
|
def connected(self):
|
|
"""Returns True if client is connected"""
|
|
return self._writer is not None
|
|
|
|
async def end_session(self):
|
|
"""End session to Kirdy if connected, cancel connection if connecting"""
|
|
if self._connecting_task is not None:
|
|
self._connecting_task.cancel()
|
|
|
|
if self._writer is None:
|
|
return
|
|
|
|
# Reader needn't be closed
|
|
self._writer.close()
|
|
try:
|
|
await self._writer.wait_closed()
|
|
except ConnectionResetError:
|
|
# In Hard Reset/DFU cmd, Kirdy may close its socket first
|
|
pass
|
|
self._reader = None
|
|
self._writer = None
|
|
|
|
async def _read_response(self, buffer_size=16384, report=False):
|
|
"""
|
|
Decode newline delimited Json objects and return the latest json received inside the buffer.
|
|
- buffer_size: Integer
|
|
"""
|
|
try:
|
|
raw_response = await asyncio.wait_for(self._reader.read(buffer_size), self.timeout)
|
|
|
|
response = raw_response.decode('utf-8', errors='ignore').split("\n")
|
|
items = []
|
|
for item in reversed(response):
|
|
try:
|
|
items.append(json.loads(item))
|
|
except json.decoder.JSONDecodeError as e:
|
|
pass
|
|
if len(items) > 0 :
|
|
if report:
|
|
return items[0]
|
|
else:
|
|
return items
|
|
else:
|
|
return { "msg_type": "EmptyResponse"}
|
|
except TimeoutError:
|
|
return { "msg_type": "RecvTimeout"}
|
|
|
|
async def _send_raw_cmd_handler(self, cmd, lock=True, msg_type="Acknowledge"):
|
|
if lock:
|
|
async with self._cmd_lock:
|
|
return await asyncio.shield(self._send_raw_cmd(cmd, msg_type))
|
|
else:
|
|
return await asyncio.shield(self._send_raw_cmd(cmd, msg_type))
|
|
|
|
# If the cmd involves a cmd specific data type,
|
|
# checking is done separately within the functions being called
|
|
async def _send_raw_cmd(self, cmd, msg_type):
|
|
retry = 0
|
|
while retry < 10:
|
|
try:
|
|
self._writer.write(bytes(json.dumps(cmd), "UTF-8"))
|
|
await self._writer.drain()
|
|
responses = await self._read_response()
|
|
for response in responses:
|
|
if response["msg_type"] == msg_type:
|
|
return response
|
|
if response["msg_type"] == "InvalidCmd":
|
|
raise InvalidCmd
|
|
except asyncio.exceptions.CancelledError:
|
|
return None
|
|
except Exception as e:
|
|
retry += 1
|
|
logging.error(f"_send_raw_cmd Exception: {e}")
|
|
await asyncio.sleep(0.1)
|
|
raise NoAckRecv
|
|
|
|
async def _send_cmd_handler(self, target, cmd, data=None, msg_type="Acknowledge", lock=True):
|
|
if lock:
|
|
async with self._cmd_lock:
|
|
return await asyncio.shield(self._send_cmd(target, cmd, data, msg_type))
|
|
else:
|
|
return await asyncio.shield(self._send_cmd(target, cmd, data, msg_type))
|
|
|
|
async def _send_cmd(self, target, cmd, data, msg_type):
|
|
cmd_dict = {}
|
|
cmd_dict[target] = cmd.name
|
|
|
|
if cmd == _dt.f32:
|
|
if isinstance(data, float):
|
|
cmd_dict[cmd] = data
|
|
elif isinstance(data, int):
|
|
cmd_dict[cmd] = float(data)
|
|
elif cmd == _dt.bool:
|
|
if isinstance(data, bool):
|
|
cmd_dict[cmd] = data
|
|
else:
|
|
raise InvalidDataType
|
|
elif cmd == "None":
|
|
pass
|
|
|
|
retry = 0
|
|
while retry < 10:
|
|
try:
|
|
self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8"))
|
|
await self._writer.drain()
|
|
responses = await self._read_response()
|
|
for response in responses:
|
|
if response["msg_type"] == msg_type:
|
|
return response
|
|
retry += 1
|
|
|
|
await asyncio.sleep(0.1)
|
|
except asyncio.exceptions.CancelledError:
|
|
return None
|
|
raise NoAckRecv
|
|
|
|
async def report_mode(self, report_interval = 0.0, buffer_size=16384):
|
|
"""
|
|
Start reporting device status in json object. Optional report_interval can be added to discard unwanted samples.
|
|
Only the latest status report received within the buffer is returned.
|
|
- polling interval: float/int (unit: seconds)
|
|
- buffer_size: int
|
|
"""
|
|
await self.device.set_active_report_mode(True)
|
|
self._report_mode_on = True
|
|
|
|
while self._report_mode_on:
|
|
await asyncio.sleep(report_interval)
|
|
async with self._cmd_lock:
|
|
yield await self._read_response(buffer_size, report=True)
|
|
|
|
await self.device.set_active_report_mode(False)
|
|
|
|
def stop_report_mode(self):
|
|
self._report_mode_on = False
|