driver: rm thread use & make it asyncio callable

- make all cmds asyncio callable
- control specific cmds can be enqueued to the handler synchronously
This commit is contained in:
linuswck 2024-09-02 17:57:24 +08:00
parent 9c611fc861
commit 5166bb7ba8
1 changed files with 247 additions and 261 deletions

View File

@ -1,3 +1,4 @@
import types
import socket import socket
import json import json
import logging import logging
@ -153,15 +154,14 @@ class InvalidCmd(Exception):
pass pass
class Device: class Device:
def __init__(self, send_cmd_handler, send_raw_cmd_handler, read_msg_queue): def __init__(self, send_cmd_handler, send_raw_cmd_handler):
self._cmd = CmdList.device self._cmd = CmdList.device
self._send_cmd = send_cmd_handler self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler self._send_raw_cmd = send_raw_cmd_handler
self._read_msg_queue = read_msg_queue
def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"): async def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"):
""" """
After calling this fn, the ip settings are immediately saved into flash and will be effective on next reboot. Upon command execution, the ip settings are saved into flash and are effective upon next reboot.
""" """
try: try:
socket.inet_aton(addr) socket.inet_aton(addr)
@ -175,7 +175,7 @@ class Device:
if not(isinstance(port, int) and isinstance(prefix_len, int)): if not(isinstance(port, int) and isinstance(prefix_len, int)):
raise InvalidDataType raise InvalidDataType
return self._send_raw_cmd( return await self._send_raw_cmd(
{ {
"device_cmd": "SetIPSettings", "device_cmd": "SetIPSettings",
"ip_settings": { "ip_settings": {
@ -187,32 +187,31 @@ class Device:
} }
) )
def set_active_report_mode(self, on): async def set_active_report_mode(self, on):
""" """
Set active report to be on. If it is on, Kirdy will send status report Set active report to be on. If it is on, Kirdy will send status report
to ALL client socket connections according to the temperature polling rate set. to the client socket according to the temperature polling rate set.
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on) return await self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on)
def set_pd_mon_fin_gain(self, gain): async def set_pd_mon_fin_gain(self, gain):
""" """
Configure the photodiode monitor final analog front-end stage gain Configure the photodiode monitor final analog front-end stage gain.
- gain: unitless - gain: unitless
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain) return await self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain)
def set_pd_mon_transconductance(self, transconductance): async def set_pd_mon_transconductance(self, transconductance):
""" """
Configure the photodiode monitor transconductance Configure the photodiode monitor transconductance value.
- transconductance: 1/Ohm - transconductance: 1/Ohm
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance) return await self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance)
def get_status_report(self, sig=None): async def get_status_report(self, sig=None):
""" """
Get status of all peripherals in a json object Get status of all peripherals in a json object.
Example of yielded data::
{ {
'ts': 227657, # Relative Timestamp (ms) 'ts': 227657, # Relative Timestamp (ms)
'msg_type': 'Report' # Indicate it is a 'Report' json object 'msg_type': 'Report' # Indicate it is a 'Report' json object
@ -222,13 +221,17 @@ class Device:
'ld_i_set': 0.0, # Laser Diode Output Current (A) 'ld_i_set': 0.0, # Laser Diode Output Current (A)
'pd_i': 2.0000002e-06, # Internal Photodiode Monitor current (A) 'pd_i': 2.0000002e-06, # Internal Photodiode Monitor current (A)
'pd_pwr': None, # Power Readings from Internal Photodiode (W). Return None if pd_mon parameter(s) are not defined. 'pd_pwr': None, # Power Readings from Internal Photodiode (W). Return None if pd_mon parameter(s) are not defined.
'term_50ohm': 'Is50Ohm' # Is the Low Frequency Modulation Input's Impedance 50 Ohm? (On/Off) 'term_50ohm': 'On' # Is the Low Frequency Modulation Input's Impedance 50 Ohm? ("On"/"Off")
}, },
'thermostat': { 'thermostat': {
'pwr_on': False, # Tec Power is On (True/False) 'pwr_on': False, # Tec Power is On (True/False)
'pid_engaged': False, # Is Pid_Engaged. If False, it is in Constant Current Mode (True/False) 'pid_engaged': False, # Is Pid_Engaged. If False, it is in Constant Current Mode (True/False)
'temp_mon_status': { # Temperature Monitor: 'temp_mon_status': { # Temperature Monitor:
'status': 'Off', # (To be revised) 'status': 'Off', # "Off": Power is Off
# "ConstantCurrentMode": Thermostat is regulated in CC mode
# "PidStartUp": PID Regulation is not stable
# "PidStable": PID Regulation is stable and the temperature is within +-1mK to the setpoint
# "OverTempAlarm": Overtemperature Alarm is triggered
'over_temp_alarm': False # Was Laser Diode experienced an Overtemperature condition (True/False) 'over_temp_alarm': False # Was Laser Diode experienced an Overtemperature condition (True/False)
}, },
'temperature': 25.03344, # Temperature Readings (Degree Celsius) 'temperature': 25.03344, # Temperature Readings (Degree Celsius)
@ -238,15 +241,11 @@ class Device:
} }
} }
""" """
if sig is None: return await self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig)
self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig)
return self._read_msg_queue()
else:
return self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig)
def get_settings_summary(self, sig=None): async def get_settings_summary(self, sig=None):
""" """
Get the current settings of laser and thermostat in a json object Get the current settings of laser and thermostat in a json object.
{ {
'msg_type': 'Settings', # Indicate it is a 'Settings' json object 'msg_type': 'Settings', # Indicate it is a 'Settings' json object
@ -255,13 +254,13 @@ class Device:
'ld_drive_current': { # Laser Diode Output Current(A) 'ld_drive_current': { # Laser Diode Output Current(A)
'value': 0.0, # Value Set 'value': 0.0, # Value Set
'max': 0.3 # Max Value Settable 'max': 0.3 # Max Value Settable
, },
'pd_mon_params': { # Laser Diode Software Current Limit(A) 'pd_mon_params': { # Laser Diode Software Current Limit(A)
'responsitivity': None, # Value Set 'responsitivity': None, # Value Set
'i_dark': 0.0 # Max Value Settable 'i_dark': 0.0 # Max Value Settable
, },
'ld_pwr_limit': 0.0 # Laser Diode Power Limit(W) 'ld_pwr_limit': 0.0, # Laser Diode Power Limit(W)
'ld_terms_short: False # Is Laser Diode Terminals short? (True/False) 'ld_terms_short: False, # Is Laser Diode Terminals short? (True/False)
}, },
'thermostat': { 'thermostat': {
'default_pwr_on': True, # Power on Thermostat at Startup 'default_pwr_on': True, # Power on Thermostat at Startup
@ -312,106 +311,101 @@ class Device:
} }
} }
""" """
if sig is None: return await self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig)
self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig)
return self._read_msg_queue()
else:
return self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig)
def dfu(self): async def dfu(self):
""" """
Issuing this cmd will HARD RESET the device and Hard reset and put the connected Kirdy into the Dfu mode for firmware update.
put Kirdy into Dfu mode for flashing firmware.
""" """
return self._send_cmd(self._cmd._target, self._cmd.Dfu, hard_reset=True) return await self._send_cmd(self._cmd._target, self._cmd.Dfu)
def save_current_settings_to_flash(self): async def save_current_settings_to_flash(self):
""" """
Save the current laser diode and thermostat configurations into flash. Save the current laser diode and thermostat configurations into flash.
""" """
return self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings) return await self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings)
def restore_settings_from_flash(self): async def restore_settings_from_flash(self):
""" """
Restore the laser diode and thermostat settings from flash Restore the laser diode and thermostat settings from flash.
""" """
return self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings) return await self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings)
def hard_reset(self): async def hard_reset(self):
""" """
Hard Reset Kirdy. The socket connection will be closed by Kirdy. Hard Reset Kirdy. The socket connection will be closed by Kirdy.
Laser diode power and Tec power will be turned off. Laser diode power and Tec power will be turned off.
Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets indicating. The device is being reset. Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets before hard reset take place.
""" """
return self._send_cmd(self._cmd._target, self._cmd.HardReset, hard_reset=True) return await self._send_cmd(self._cmd._target, self._cmd.HardReset)
class Laser: class Laser:
def __init__(self, send_cmd_handler): def __init__(self, send_cmd_handler):
self._cmd = CmdList.ld self._cmd = CmdList.ld
self._send_cmd = send_cmd_handler self._send_cmd = send_cmd_handler
def set_power_on(self, on): async def set_power_on(self, on):
""" """
Power Up or Power Down laser diode. Powering up the Laser Diode resets the pwr_excursion status Power Up or Power Down laser diode. Powering up the Laser Diode resets the pwr_excursion status
- on (True/False) - on (True/False)
""" """
if on: if on:
return self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None)
else: else:
return self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None)
def set_default_pwr_on(self, on): async def set_default_pwr_on(self, on):
""" """
Set whether laser diode is powered up at Startup Set whether laser diode is powered up at Startup.
- on (True/False) - on (True/False)
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on)
def set_ld_terms_short(self, short): async def set_ld_terms_short(self, short):
""" """
Open/Short laser diode terminals. Open/Short laser diode terminals.
- on (True/False) - on (True/False)
""" """
if short: if short:
return self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None) return await self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None)
else: else:
return self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None) return await self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None)
def set_i(self, i): async def set_i(self, i):
""" """
Set laser diode output current: Max(0, Min(i_set, i_soft_limit)) Set laser diode output current: Max(0, Min(i_set, 300mA)).
- i: A - i: A
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetI, i) return await self._send_cmd(self._cmd._target, self._cmd.SetI, i)
def set_pd_mon_responsitivity(self, responsitivity): async def set_pd_mon_responsitivity(self, responsitivity):
""" """
Configure the photodiode monitor responsitivity parameter Configure the photodiode monitor responsitivity parameter.
- responsitivity: A/W - responsitivity: A/W
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity) return await self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity)
def set_pd_mon_dark_current(self, dark_current): async def set_pd_mon_dark_current(self, dark_current):
""" """
Configure the photodiode monitor dark current parameter Configure the photodiode monitor dark current parameter.
- dark_current: A - dark_current: A
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current) return await self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current)
def set_ld_pwr_limit(self, pwr_limit): async def set_ld_pwr_limit(self, pwr_limit):
""" """
Set power limit for the power excursion monitor Set the power limit for the power excursion monitor.
If the calculated power with the params of pd_mon > pwr_limit, If the calculated power with the params of pd_mon > pwr_limit,
overpower protection is triggered. overpower protection is triggered.
- pwr_limit: W - pwr_limit: W
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit) return await self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit)
def clear_alarm(self): async def clear_alarm(self):
""" """
Clear the power excursion monitor alarm Clear the power excursion monitor alarm.
""" """
return self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
class Thermostat: class Thermostat:
def __init__(self, send_cmd_handler, send_raw_cmd_handler): def __init__(self, send_cmd_handler, send_raw_cmd_handler):
@ -419,158 +413,158 @@ class Thermostat:
self._send_cmd = send_cmd_handler self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler self._send_raw_cmd = send_raw_cmd_handler
def set_power_on(self, on): async def set_power_on(self, on):
""" """
Power up or power down thermostat Power up or power down thermostat.
- Powering up the thermostat resets the pwr_excursion status - Powering up the thermostat resets the pwr_excursion status
""" """
if on: if on:
return self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None)
else: else:
return self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None)
def set_default_pwr_on(self, on): async def set_default_pwr_on(self, on):
""" """
Set whether thermostat is powered up at Startup Set whether thermostat is powered up at Startup.
- on: (True/False) - on: (True/False)
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on)
def set_tec_max_v(self, max_v): async def set_tec_max_v(self, max_v):
""" """
Set Tec Maximum Voltage Across the TEC Terminals Set Tec Maximum Voltage Across the TEC Terminals.
- max_v: V - max_v: V
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v) return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v)
def set_tec_max_cooling_i(self, max_i_pos): async def set_tec_max_cooling_i(self, max_i_pos):
""" """
Set Tec maximum cooling current (Settable Range: 0.0 - 1.0) Set Tec maximum cooling current (Settable Range: 0.0 - 1.0)
- max_i_pos: A - max_i_pos: A
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos) return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos)
def set_tec_max_heating_i(self, max_i_neg): async def set_tec_max_heating_i(self, max_i_neg):
""" """
Set Tec maximum heating current (Settable Range: 0.0 - 1.0) Set Tec maximum heating current (Settable Range: 0.0 - 1.0)
- max_i_neg: A - max_i_neg: A
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg) return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg)
def set_tec_i_out(self, i_out): async def set_tec_i_out(self, i_out):
""" """
Set Tec Output Current Set Tec Output Current (Settable Range: 0.0 - 1.0)
This cmd is only effective in constant current control mode This cmd is only effective in constant current control mode
or your newly set value will be overwritten by PID Controller Output or your newly set value will be overwritten by PID Controller Output
- i_out: A - i_out: A
""" """
if isinstance(i_out, float): if isinstance(i_out, float):
return self._send_raw_cmd({"tec_set_i": i_out}) return await self._send_raw_cmd({"tec_set_i": i_out})
elif isinstance(i_out, int): elif isinstance(i_out, int):
return self._send_raw_cmd({"tec_set_i": float(i_out)}) return await self._send_raw_cmd({"tec_set_i": float(i_out)})
else: else:
raise InvalidDataType raise InvalidDataType
def set_constant_current_control_mode(self): async def set_constant_current_control_mode(self):
""" """
Disable PID Controller and output current can be controlled with set_tec_i_out() cmd. Disable PID Controller and output current can be controlled with set_tec_i_out() cmd.
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None) return await self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None)
def set_temperature_setpoint(self, temperature): async def set_temperature_setpoint(self, temperature):
""" """
Set Temperature Setpoint for PID Controller. This parameter is not active in constant current control mode Set Temperature Setpoint for PID Controller. This parameter is not active in constant current control mode
- temperature: Degree Celsius - temperature: Degree Celsius
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature) return await self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature)
def set_pid_control_mode(self): async def set_pid_control_mode(self):
""" """
Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate. Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate.
Please refer to config_temp_adc_filter for the possible polling rate options Please refer to config_temp_adc_filter for the possible polling rate options
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None) return await self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None)
def set_pid_kp(self, kp): async def set_pid_kp(self, kp):
""" """
Set Kp parameter for PID Controller Set Kp parameter for PID Controller
kp: (unitless) kp: (unitless)
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp) return await self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp)
def set_pid_ki(self, ki): async def set_pid_ki(self, ki):
""" """
Set Ki parameter for PID Controller Set Ki parameter for PID Controller
ki: (unitless) ki: (unitless)
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki) return await self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki)
def set_pid_kd(self, kd): async def set_pid_kd(self, kd):
""" """
Set Kd parameter for PID Controller Set Kd parameter for PID Controller
kd: (unitless) kd: (unitless)
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd) return await self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd)
def set_pid_output_max(self, out_max): async def set_pid_output_max(self, out_max):
""" """
Set max output limit at the PID Output Set max output limit at the PID Output
- out_max: A - out_max: A
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max) return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max)
def set_pid_output_min(self, out_min): async def set_pid_output_min(self, out_min):
""" """
Set min output limit at the PID Output Set min output limit at the PID Output
- out_min: A - out_min: A
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min) return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min)
def set_temp_mon_upper_limit(self, upper_limit): async def set_temp_mon_upper_limit(self, upper_limit):
""" """
Set Temperature Monitor Upper Limit Threshold. Exceeding the limit for too long Set Temperature Monitor Upper Limit Threshold. Exceeding the limit for too long
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
- upper_limit: Degree Celsius - upper_limit: Degree Celsius
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit) return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit)
def set_temp_mon_lower_limit(self, lower_limit): async def set_temp_mon_lower_limit(self, lower_limit):
""" """
Set Temperature Monitor Lower Limit Threshold. Exceeding the limit for too long Set Temperature Monitor Lower Limit Threshold. Exceeding the limit for too long
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
- lower_limit: Degree Celsius - lower_limit: Degree Celsius
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit) return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit)
def clear_alarm(self): async def clear_alarm(self):
""" """
Clear the temperature monitor alarm Clear the temperature monitor alarm
""" """
return self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
def set_sh_t0(self, t0): async def set_sh_t0(self, t0):
""" """
Set t0 Steinhart-Hart parameter for the laser diode NTC Set t0 Steinhart-Hart parameter for the laser diode NTC
- t0: Degree Celsius - t0: Degree Celsius
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0) return await self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0)
def set_sh_r0(self, r0): async def set_sh_r0(self, r0):
""" """
Set r0 Steinhart-Hart parameter for the laser diode NTC Set r0 Steinhart-Hart parameter for the laser diode NTC
- r0: Ohm - r0: Ohm
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0) return await self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0)
def set_sh_beta(self, beta): async def set_sh_beta(self, beta):
""" """
Set beta Steinhart-Hart parameter for the laser diode NTC Set beta Steinhart-Hart parameter for the laser diode NTC
- beta: (unitless) - beta: (unitless)
""" """
return self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta) return await self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta)
def config_temp_adc_filter(self, filter_config): async def config_temp_adc_filter(self, filter_config):
""" """
Configure the temperature adc filter type and sampling rate. Configure the temperature adc filter type and sampling rate.
Please refer to AD7172 datasheet for the usage of various types of filter. Please refer to AD7172 datasheet for the usage of various types of filter.
@ -591,29 +585,35 @@ class Thermostat:
filter_config._odr_type(): filter_config, filter_config._odr_type(): filter_config,
} }
return self._send_raw_cmd(cmd) return await self._send_raw_cmd(cmd)
class Kirdy: class Kirdy:
def __init__(self): def __init__(self):
self.device = Device(self._send_cmd_handler, self._send_raw_cmd_handler, self._get_msg) self.device = Device(self._send_cmd, self._send_raw_cmd)
self.laser = Laser(self._send_cmd_handler) self.laser = Laser(self._send_cmd)
self.thermostat = Thermostat(self._send_cmd_handler, self._send_raw_cmd_handler) self.thermostat = Thermostat(self._send_cmd, self._send_raw_cmd)
self._task_queue, self._msg_queue, self._int_msg_queue, self._report_queue = None, None, None, None self._task_queue, self._int_msg_queue, self._report_queue = None, None, None
self._timeout = 5.0
self._writer, self._reader = None, None self._writer, self._reader = None, None
self._event_loop = None self._event_loop = None
self._lock = asyncio.Lock()
self._msg_queue_get_report = False self._msg_queue_get_report = False
self._report_mode_on = False self._report_mode_on = False
self._state = State.disconnected
self.read_response_task, self.handler_task = None, None
self._lock = asyncio.Lock()
# PyQt Signal # PyQt Signal
self._report_sig = None # Dict self._report_sig = None # Dict
self._connected_sig = None # Bool self._connected_sig = None # Bool
self.connected_event = None
def set_report_sig(self, sig): def set_report_sig(self, sig):
""" """
Connect a PyQt Signal to the active report output(dict) Connect a PyQt Signal to the active report output(dict). This should be configured before the session is started.
""" """
self._report_sig = sig self._report_sig = sig
@ -625,40 +625,41 @@ class Kirdy:
""" """
self._connected_sig = sig self._connected_sig = sig
def start_session(self, host='192.168.1.128', port=1337, timeout=5.0, con_retry=5.0): def start_session(self, host='192.168.1.128', port=1337):
""" """
Start Kirdy Connection Session. Start Kirdy Connection Session.
A new thread is started to handle TCP connection and task execution in the background. In case of disconnection, all the queued tasks are cleared and the handler task retries TCP connection indefinitely.
In case of disconnection, all the queued tasks are cleared and the thread retries TCP connection indefinitely.
- host: Kirdy's IP Address - host: Kirdy's IP Address
- port: Kirdy's TCP Port - port: Kirdy's Port Number
""" """
self._host, self._ctrl_port = host, port
if self._event_loop is None: if self._event_loop is None:
self._host, self._ctrl_port = host, port try:
self._timeout, self._con_retry = timeout, con_retry self._event_loop = asyncio.get_running_loop()
except:
self._event_loop = asyncio.new_event_loop() self._event_loop = asyncio.new_event_loop()
self._thread = Thread(target=self._event_loop.run_forever) self._event_loop.run_forever()
self._thread.start() self.connected_event = asyncio.Event()
asyncio.run_coroutine_threadsafe(self._handler(), self._event_loop) self.handler_task = asyncio.create_task(self._handler())
return True
else:
logging.warning("Helper Thread has been started.")
return False
def end_session(self, block=False): async def end_session(self, block=False):
""" """
Stop Kirdy's TCP connection and its associated thread. Stop Kirdy's TCP connection and its associated thread.
""" """
if self._event_loop is not None: if self._event_loop is not None:
if block: if block:
while not(self._task_queue.empty()): await self._task_queue.join()
pass
cancel_task = asyncio.run_coroutine_threadsafe(self._stop_handler(), self._event_loop) if self.read_response_task is not None:
while not(cancel_task.done()): self.read_response_task.cancel()
pass await self.read_response_task
self._thread.join() self.read_response_task = None
if self.handler_task is not None:
self.handler_task.cancel()
await self.handler_task
self.handler_task = None
self._writer = None self._writer = None
if self._connected_sig is not None: if self._connected_sig is not None:
@ -668,7 +669,7 @@ class Kirdy:
""" """
Return True if client is connecting Return True if client is connecting
""" """
return not self.connected() and self._event_loop is not None return self._state == State.disconnected and self.read_response_task is not None
def connected(self): def connected(self):
""" """
@ -676,28 +677,43 @@ class Kirdy:
""" """
return self._writer is not None return self._writer is not None
def get_report_stream(self): async def wait_until_connected(self):
""" if not(self.connected()):
Start reporting device status in json object. await self.connected_event.wait()
"""
if self.connected():
self._report_mode_on = True
self.device.set_active_report_mode(True) async def report_mode(self):
"""
Enable and retrieve active report from Kirdy
"""
if self.connected():
self._report_mode_on = True
await self.device.set_active_report_mode(True)
report = None
while self._report_mode_on: while self._report_mode_on:
report = self._report_queue.get() report = await self._report_queue.get()
if isinstance(report, Exception): if not(isinstance(report, dict)):
raise report self.stop_active_report()
yield report else:
yield report
self.device.set_active_report_mode(False)
if isinstance(report, dict):
await self.device.set_active_report_mode(False)
else: else:
raise ConnectionError raise ConnectionError
def stop_report_mode(self): def stop_report_mode(self):
self._report_mode_on = False self._report_mode_on = False
def task_dispatcher(self, awaitable_fn):
"""
Enqueue a task to be handled by the handler.
"""
if self.connected():
self._task_queue.put_nowait(lambda: awaitable_fn)
else:
raise ConnectionError
async def _sock_disconnection_handling(self): async def _sock_disconnection_handling(self):
# Reader needn't be closed # Reader needn't be closed
try: try:
@ -709,11 +725,6 @@ class Kirdy:
self._reader = None self._reader = None
self._writer = None self._writer = None
for i in range(self._msg_queue.maxsize):
if self._msg_queue.full():
self._msg_queue.get_nowait()
self._msg_queue.put_nowait(None)
for i in range(self._report_queue.maxsize): for i in range(self._report_queue.maxsize):
if self._report_queue.full(): if self._report_queue.full():
self._report_queue.get_nowait() self._report_queue.get_nowait()
@ -724,97 +735,79 @@ class Kirdy:
async def _handler(self): async def _handler(self):
try: try:
state = State.disconnected self._state = State.disconnected
first_con = True first_con = True
task = None task = None
while True: while True:
if state == State.disconnected: if self._state == State.disconnected:
try: try:
await self.__coninit(self._timeout) await self.__coninit(self._timeout)
read_response_fut = asyncio.run_coroutine_threadsafe(self._read_response_handler(), self._event_loop) self.read_response_task = asyncio.create_task(self._read_response_handler())
task = None task = None
logging.debug("Connected") logging.info("Connected to %s:%d", self._host, self._ctrl_port)
self.connected_event.set()
# State Transition # State Transition
state = State.connected self._state = State.connected
except (OSError, TimeoutError): except (OSError, TimeoutError):
if first_con: if first_con:
first_con = False first_con = False
logging.warning("Cannot connect to %s:%d. Retrying in the background", self._host, self._ctrl_port) logging.warning("Cannot connect to %s:%d. Retrying in the background.", self._host, self._ctrl_port)
await asyncio.sleep(self._con_retry) await asyncio.sleep(5.0)
elif state == State.connected: elif self._state == State.connected:
try: try:
task = await self._task_queue.get() task = await self._task_queue.get()
if isinstance(task, Exception): if isinstance(task, Exception):
raise task raise task
await task()
response = await asyncio.wait_for(task(), self._timeout) self._task_queue.task_done()
if response is not None:
if response["msg_type"] != "Acknowledge":
self._msg_queue.put_nowait(response)
except (TimeoutError, ConnectionResetError): except (TimeoutError, ConnectionResetError, ConnectionError):
logging.warning("Kirdy connection is dropped.") logging.warning("Connection to Kirdy is dropped.")
first_con = True first_con = True
read_response_fut.cancel() self.read_response_task.cancel()
await self._sock_disconnection_handling()
# State Transition
state = State.disconnected
# State Transition
self._state = State.disconnected
await self._sock_disconnection_handling()
except asyncio.exceptions.CancelledError: except asyncio.exceptions.CancelledError:
logging.debug("Handler is canceling") pass
except: except:
logging.debug("Handler experienced an error. Exiting.", exc_info=True) logging.warning("Handler experienced an error.", exc_info=True)
await self._sock_disconnection_handling() await self._sock_disconnection_handling()
if self._event_loop is not None:
self._event_loop.call_soon_threadsafe(self._event_loop.stop)
self._event_loop = None
async def _read_response_handler(self): async def _read_response_handler(self):
try: try:
while True: while True:
if self._report_mode_on: if self._report_mode_on:
responses = await asyncio.wait_for(self._read_response(), 5.0) responses = await asyncio.wait_for(self._read_response(), self._timeout)
else: else:
responses = await self._read_response() responses = await self._read_response()
for response in responses: for response in responses:
if response["msg_type"] == 'HardReset':
raise asyncio.exceptions.CancelledError
if response["msg_type"] == 'Report' and not self._msg_queue_get_report: if response["msg_type"] == 'Report' and not self._msg_queue_get_report:
if self._report_sig is None: if self._report_sig is None:
if self._report_queue.full(): self._report_queue.put_nowait_overwrite(response)
self._report_queue.get_nowait()
self._report_queue.put_nowait(response)
else: else:
self._report_sig.emit(response) self._report_sig.emit(response)
else: else:
if self._msg_queue_get_report: if self._msg_queue_get_report:
async with self._lock: self._msg_queue_get_report = False
self._msg_queue_get_report = False self._int_msg_queue.put_nowait_overwrite(response)
if self._int_msg_queue.full():
logging.debug("_int_msg_queue is full")
self._int_msg_queue.get_nowait()
self._int_msg_queue.put_nowait(response)
except asyncio.exceptions.CancelledError: except asyncio.exceptions.CancelledError:
logging.debug("Read Response Handler is canceling") pass
except TimeoutError: except (TimeoutError, ConnectionResetError, ConnectionError) as exec:
logging.warning("Read active report response timeout") self._task_queue.put_nowait_overwrite(exec)
if self._task_queue.full(): self._int_msg_queue.put_nowait_overwrite(exec)
logging.debug("_int_msg_queue is full") except Exception as exec:
self._task_queue.get_nowait()
self._task_queue.put_nowait(TimeoutError())
except ConnectionResetError:
logging.warning("Connection Reset by peer")
if self._task_queue.full():
logging.debug("_int_msg_queue is full")
self._task_queue.get_nowait()
self._task_queue.put_nowait(ConnectionResetError())
except:
logging.warn("Read Response Handler experienced an error. Exiting.", exc_info=True) logging.warn("Read Response Handler experienced an error. Exiting.", exc_info=True)
self._task_queue.put_nowait_overwrite(exec)
self._int_msg_queue.put_nowait_overwrite(exec)
if self._report_mode_on: if self._report_mode_on:
self._report_mode_on = False self._report_mode_on = False
self._report_queue.put_nowait(TimeoutError) self._report_queue.put_nowait_overwrite(TimeoutError)
async def _stop_handler(self): async def _stop_handler(self):
for task in asyncio.all_tasks(): for task in asyncio.all_tasks():
@ -822,10 +815,21 @@ class Kirdy:
await asyncio.gather(*asyncio.all_tasks(), loop=self._event_loop) await asyncio.gather(*asyncio.all_tasks(), loop=self._event_loop)
async def __coninit(self, timeout): async def __coninit(self, timeout):
self._task_queue = asyncio.Queue(maxsize=64) def _put_nowait_overwrite(self, item):
if self.full():
self.get_nowait()
self.put_nowait(item)
asyncio.Queue.put_nowait_overwrite = _put_nowait_overwrite
if self._task_queue is not None:
while not(self._task_queue.empty()):
task = self._task_queue.get_nowait()
if isinstance(task, types.FunctionType):
task().close()
else:
self._task_queue = asyncio.Queue(maxsize=16)
self._int_msg_queue = asyncio.Queue(maxsize=4) self._int_msg_queue = asyncio.Queue(maxsize=4)
self._msg_queue = queue.Queue(maxsize=64) self._report_queue = asyncio.Queue(maxsize=16)
self._report_queue = queue.Queue(maxsize=16)
self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self._host, self._ctrl_port), timeout) self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self._host, self._ctrl_port), timeout)
writer_sock = self._writer.get_extra_info("socket") writer_sock = self._writer.get_extra_info("socket")
@ -846,47 +850,25 @@ class Kirdy:
for item in response: for item in response:
items.append(json.loads(item)) items.append(json.loads(item))
return items return items
def _get_msg(self):
msg = self._msg_queue.get()
return msg
def _send_raw_cmd_handler(self, cmd, msg_type="Acknowledge", sig=None):
if self.connected():
self._event_loop.call_soon_threadsafe(self._task_queue.put_nowait, lambda: self._send_raw_cmd(cmd, msg_type, sig=sig))
else:
raise ConnectionError
# If the cmd involves a cmd specific data type, async def _send_raw_cmd(self, cmd, msg_type="Acknowledge", sig=None):
# checking is done separately within the functions being called
async def _send_raw_cmd(self, cmd, msg_type, sig=None):
if self.connected(): if self.connected():
self._writer.write(bytes(json.dumps(cmd), "UTF-8")) async with self._lock:
await self._writer.drain() self._writer.write(bytes(json.dumps(cmd), "UTF-8"))
response = await self._int_msg_queue.get() await self._writer.drain()
if response["msg_type"] == msg_type: msg = await asyncio.wait_for(self._int_msg_queue.get(), self._timeout)
if msg["msg_type"] == msg_type:
if sig is not None: if sig is not None:
sig.emit(response) sig.emit(msg)
return {"msg_type": "Acknowledge"} return {"msg_type": "Acknowledge"}
return response return msg
else: else:
raise InvalidCmd raise InvalidCmd
else: else:
raise ConnectionError raise ConnectionError
def _send_cmd_handler(self, target, cmd, data=None, msg_type="Acknowledge", sig=None, hard_reset=False): async def _send_cmd(self, target, cmd, data=None, msg_type="Acknowledge", sig=None):
if self.connected():
if hard_reset:
while not(self._task_queue.empty()):
pass
self._event_loop.call_soon_threadsafe(self._task_queue.put_nowait, lambda: self._send_cmd(target, cmd, data, msg_type, sig=sig))
if hard_reset:
# Wait 1s for Kirdy to hard reset
time.sleep(1.0)
else:
raise ConnectionError
async def _send_cmd(self, target, cmd, data, msg_type, sig=None):
cmd_dict = {} cmd_dict = {}
cmd_dict[target] = cmd.name cmd_dict[target] = cmd.name
@ -904,17 +886,21 @@ class Kirdy:
pass pass
if msg_type == 'Report': if msg_type == 'Report':
async with self._lock: self._msg_queue_get_report = True
self._msg_queue_get_report = True
self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8")) async with self._lock:
await self._writer.drain() self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8"))
await self._writer.drain()
msg = await asyncio.wait_for(self._int_msg_queue.get(), self._timeout)
if isinstance(msg, Exception):
raise msg
msg = await self._int_msg_queue.get()
if msg['msg_type'] == msg_type: if msg['msg_type'] == msg_type:
if sig is not None: if sig is not None:
sig.emit(msg) sig.emit(msg)
return {"msg_type": "Acknowledge"} return {"msg_type": "Acknowledge"}
return msg else:
return msg
else: else:
raise InvalidCmd raise InvalidCmd