1
0
forked from M-Labs/kirdy

driver: kirdy driver rewrite

- Start a new thread to handle connections and schedule FIFO task execution
- Make all the cmd accessible without using async
- Connections automatically retry upon disconnection
- Add support for PyQt6 Signal
This commit is contained in:
linuswck 2024-08-02 17:44:51 +08:00
parent 1dcac25574
commit 47bf166ecb

View File

@ -1,8 +1,11 @@
import socket import socket
import asyncio
import json import json
import logging import logging
import time
from threading import Thread
from aenum import StrEnum, NoAlias from aenum import StrEnum, NoAlias
import queue
import asyncio
class _dt(StrEnum): class _dt(StrEnum):
ip_settings = "ip_settings" ip_settings = "ip_settings"
@ -11,6 +14,10 @@ class _dt(StrEnum):
bool = "data_bool" bool = "data_bool"
none = "None" none = "None"
class State(StrEnum):
disconnected = "disconnected"
connected = "connected"
class CmdList: class CmdList:
class device(StrEnum, settings=NoAlias): class device(StrEnum, settings=NoAlias):
_target = "device_cmd" _target = "device_cmd"
@ -152,14 +159,13 @@ class StoppedConnecting(Exception):
pass pass
class Device: class Device:
def __init__(self, send_cmd_handler, send_raw_cmd_handler, read_response, cmd_lock): def __init__(self, send_cmd_handler, send_raw_cmd_handler, read_msg_queue):
self._cmd = CmdList.device self._cmd = CmdList.device
self._send_cmd = send_cmd_handler self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler self._send_raw_cmd = send_raw_cmd_handler
self._read_response = read_response self._read_msg_queue = read_msg_queue
self._cmd_lock = cmd_lock
async def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"): def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"):
""" """
After calling this fn, the ip settings are immediately saved into flash and will be effective on next reboot. After calling this fn, the ip settings are immediately saved into flash and will be effective on next reboot.
""" """
@ -175,7 +181,7 @@ class Device:
if not(isinstance(port, int) and isinstance(prefix_len, int)): if not(isinstance(port, int) and isinstance(prefix_len, int)):
raise InvalidDataType raise InvalidDataType
return await self._send_raw_cmd( return self._send_raw_cmd(
{ {
"device_cmd": "SetIPSettings", "device_cmd": "SetIPSettings",
"ip_settings": { "ip_settings": {
@ -187,28 +193,28 @@ class Device:
} }
) )
async def set_active_report_mode(self, on): def set_active_report_mode(self, on):
""" """
Set active report to be on. If it is on, Kirdy will send status report Set active report to be on. If it is on, Kirdy will send status report
to ALL client socket connections according to the temperature polling rate set. to ALL client socket connections according to the temperature polling rate set.
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on) return self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on)
async def set_pd_mon_fin_gain(self, gain): def set_pd_mon_fin_gain(self, gain):
""" """
Configure the photodiode monitor final analog front-end stage gain Configure the photodiode monitor final analog front-end stage gain
- gain: unitless - gain: unitless
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain) return self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain)
async def set_pd_mon_transconductance(self, transconductance): def set_pd_mon_transconductance(self, transconductance):
""" """
Configure the photodiode monitor transconductance Configure the photodiode monitor transconductance
- transconductance: 1/Ohm - transconductance: 1/Ohm
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance) return self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance)
async def get_status_report(self): def get_status_report(self, sig=None):
""" """
Get status of all peripherals in a json object Get status of all peripherals in a json object
@ -238,9 +244,13 @@ class Device:
} }
} }
""" """
return await self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report") if sig is None:
self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig)
return self._read_msg_queue()
else:
return self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig)
async def get_settings_summary(self): def get_settings_summary(self, sig=None):
""" """
Get the current settings of laser and thermostat in a json object Get the current settings of laser and thermostat in a json object
@ -308,153 +318,152 @@ class Device:
} }
} }
""" """
return await self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings") if sig is None:
self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig)
return self._read_msg_queue()
else:
return self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig)
async def dfu(self): def dfu(self):
""" """
Issuing this cmd will HARD RESET the device and Issuing this cmd will HARD RESET the device and
put Kirdy into Dfu mode for flashing firmware. put Kirdy into Dfu mode for flashing firmware.
""" """
return await self._send_cmd(self._cmd._target, self._cmd.Dfu) return self._send_cmd(self._cmd._target, self._cmd.Dfu, hard_reset=True)
async def save_current_settings_to_flash(self): def save_current_settings_to_flash(self):
""" """
Save the current laser diode and thermostat configurations into flash. Save the current laser diode and thermostat configurations into flash.
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings) return self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings)
async def restore_settings_from_flash(self): def restore_settings_from_flash(self):
""" """
Restore the laser diode and thermostat settings from flash Restore the laser diode and thermostat settings from flash
""" """
return await self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings) return self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings)
async def hard_reset(self): def hard_reset(self):
""" """
Hard Reset Kirdy. The socket connection will be closed by Kirdy. Hard Reset Kirdy. The socket connection will be closed by Kirdy.
Laser diode power and Tec power will be turned off. Laser diode power and Tec power will be turned off.
Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets indicating. The device is being reset. Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets indicating. The device is being reset.
""" """
response = await self._send_cmd(self._cmd._target, self._cmd.HardReset) return self._send_cmd(self._cmd._target, self._cmd.HardReset, hard_reset=True)
if response is not None:
if response["msg_type"] == "Acknowledge":
# Delay for a second to wait for the hard reset message being sent out on Kirdy
await asyncio.sleep(1.0)
return response
class Laser: class Laser:
def __init__(self, send_cmd_handler, cmd_lock): def __init__(self, send_cmd_handler):
self._cmd = CmdList.ld self._cmd = CmdList.ld
self._send_cmd = send_cmd_handler self._send_cmd = send_cmd_handler
self._cmd_lock = cmd_lock
async def set_power_on(self, on): def set_power_on(self, on):
""" """
Power Up or Power Down laser diode. Powering up the Laser Diode resets the pwr_excursion status Power Up or Power Down laser diode. Powering up the Laser Diode resets the pwr_excursion status
- on (True/False) - on (True/False)
""" """
if on: if on:
return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) return self._send_cmd(self._cmd._target, self._cmd.PowerUp, None)
else: else:
return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) return self._send_cmd(self._cmd._target, self._cmd.PowerDown, None)
async def set_default_pwr_on(self, on): def set_default_pwr_on(self, on):
""" """
Set whether laser diode is powered up at Startup Set whether laser diode is powered up at Startup
- on (True/False) - on (True/False)
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) return self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on)
async def set_ld_terms_short(self, short): def set_ld_terms_short(self, short):
""" """
Open/Short laser diode terminals. Open/Short laser diode terminals.
- on (True/False) - on (True/False)
""" """
if short: if short:
return await self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None) return self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None)
else: else:
return await self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None) return self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None)
async def set_i(self, i): def set_i(self, i):
""" """
Set laser diode output current: Max(0, Min(i_set, i_soft_limit)) Set laser diode output current: Max(0, Min(i_set, i_soft_limit))
- i: A - i: A
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetI, i) return self._send_cmd(self._cmd._target, self._cmd.SetI, i)
async def set_pd_mon_responsitivity(self, responsitivity): def set_pd_mon_responsitivity(self, responsitivity):
""" """
Configure the photodiode monitor responsitivity parameter Configure the photodiode monitor responsitivity parameter
- responsitivity: A/W - responsitivity: A/W
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity) return self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity)
async def set_pd_mon_dark_current(self, dark_current): def set_pd_mon_dark_current(self, dark_current):
""" """
Configure the photodiode monitor responsitivity parameter Configure the photodiode monitor dark current parameter
- dark_current: A/W - dark_current: A
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current) return self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current)
async def set_ld_pwr_limit(self, pwr_limit): def set_ld_pwr_limit(self, pwr_limit):
""" """
Set power limit for the power excursion monitor Set power limit for the power excursion monitor
If the calculated power with the params of pd_mon > pwr_limit, If the calculated power with the params of pd_mon > pwr_limit,
overpower protection is triggered. overpower protection is triggered.
- pwr_limit: W - pwr_limit: W
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit) return self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit)
async def clear_alarm(self): def clear_alarm(self):
""" """
Clear the power excursion monitor alarm Clear the power excursion monitor alarm
""" """
return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) return self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
class Thermostat: class Thermostat:
def __init__(self, send_cmd_handler, send_raw_cmd_handler, cmd_lock): def __init__(self, send_cmd_handler, send_raw_cmd_handler):
self._cmd = CmdList.thermostat self._cmd = CmdList.thermostat
self._send_cmd = send_cmd_handler self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler self._send_raw_cmd = send_raw_cmd_handler
self._cmd_lock = cmd_lock
async def set_power_on(self, on): def set_power_on(self, on):
""" """
Power up or power down thermostat Power up or power down thermostat
- Powering up the thermostat resets the pwr_excursion status - Powering up the thermostat resets the pwr_excursion status
""" """
if on: if on:
return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) return self._send_cmd(self._cmd._target, self._cmd.PowerUp, None)
else: else:
return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) return self._send_cmd(self._cmd._target, self._cmd.PowerDown, None)
async def set_default_pwr_on(self, on): def set_default_pwr_on(self, on):
""" """
Set whether thermostat is powered up at Startup Set whether thermostat is powered up at Startup
- on: (True/False)
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) return self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on)
async def set_tec_max_v(self, max_v): def set_tec_max_v(self, max_v):
""" """
Set Tec Maximum Voltage Across the TEC Terminals Set Tec Maximum Voltage Across the TEC Terminals
- max_v: V - max_v: V
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v) return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v)
async def set_tec_max_cooling_i(self, max_i_pos): def set_tec_max_cooling_i(self, max_i_pos):
""" """
Set Tec maximum cooling current (Settable Range: 0.0 - 1.0) Set Tec maximum cooling current (Settable Range: 0.0 - 1.0)
- max_i_pos: A - max_i_pos: A
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos) return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos)
async def set_tec_max_heating_i(self, max_i_neg): def set_tec_max_heating_i(self, max_i_neg):
""" """
Set Tec maximum heating current (Settable Range: 0.0 - 1.0) Set Tec maximum heating current (Settable Range: 0.0 - 1.0)
- max_i_neg: A - max_i_neg: A
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg) return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg)
async def set_tec_i_out(self, i_out): def set_tec_i_out(self, i_out):
""" """
Set Tec Output Current Set Tec Output Current
This cmd is only effective in constant current control mode This cmd is only effective in constant current control mode
@ -462,112 +471,112 @@ class Thermostat:
- i_out: A - i_out: A
""" """
if isinstance(i_out, float): if isinstance(i_out, float):
return await self._send_raw_cmd({"tec_set_i": i_out}) return self._send_raw_cmd({"tec_set_i": i_out})
elif isinstance(i_out, int): elif isinstance(i_out, int):
return await self._send_raw_cmd({"tec_set_i": float(i_out)}) return self._send_raw_cmd({"tec_set_i": float(i_out)})
else: else:
raise InvalidDataType raise InvalidDataType
async def set_constant_current_control_mode(self): def set_constant_current_control_mode(self):
""" """
Disable PID Controller and output current can be controlled with set_tec_i_out() cmd. Disable PID Controller and output current can be controlled with set_tec_i_out() cmd.
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None) return self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None)
async def set_temperature_setpoint(self, temperature): def set_temperature_setpoint(self, temperature):
""" """
Set Temperature Setpoint for PID Controller. This parameter is not active in constant current control mode Set Temperature Setpoint for PID Controller. This parameter is not active in constant current control mode
- temperature: Degree Celsius - temperature: Degree Celsius
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature) return self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature)
async def set_pid_control_mode(self): def set_pid_control_mode(self):
""" """
Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate. Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate.
Please refer to config_temp_adc_filter for the possible polling rate options Please refer to config_temp_adc_filter for the possible polling rate options
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None) return self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None)
async def set_pid_kp(self, kp): def set_pid_kp(self, kp):
""" """
Set Kp parameter for PID Controller Set Kp parameter for PID Controller
kp: (unitless) kp: (unitless)
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp) return self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp)
async def set_pid_ki(self, ki): def set_pid_ki(self, ki):
""" """
Set Ki parameter for PID Controller Set Ki parameter for PID Controller
ki: (unitless) ki: (unitless)
""" """
await self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki) return self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki)
async def set_pid_kd(self, kd): def set_pid_kd(self, kd):
""" """
Set Kd parameter for PID Controller Set Kd parameter for PID Controller
kd: (unitless) kd: (unitless)
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd) return self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd)
async def set_pid_output_max(self, out_max): def set_pid_output_max(self, out_max):
""" """
Set max output limit at the PID Output Set max output limit at the PID Output
- out_max: A - out_max: A
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max) return self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max)
async def set_pid_output_min(self, out_min): def set_pid_output_min(self, out_min):
""" """
Set min output limit at the PID Output Set min output limit at the PID Output
- out_min: A - out_min: A
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min) return self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min)
async def set_temp_mon_upper_limit(self, upper_limit): def set_temp_mon_upper_limit(self, upper_limit):
""" """
Set Temperature Monitor Upper Limit Threshold. Exceeding the limit for too long Set Temperature Monitor Upper Limit Threshold. Exceeding the limit for too long
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
- upper_limit: Degree Celsius - upper_limit: Degree Celsius
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit) return self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit)
async def set_temp_mon_lower_limit(self, lower_limit): def set_temp_mon_lower_limit(self, lower_limit):
""" """
Set Temperature Monitor Lower Limit Threshold. Exceeding the limit for too long Set Temperature Monitor Lower Limit Threshold. Exceeding the limit for too long
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
- lower_limit: Degree Celsius - lower_limit: Degree Celsius
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit) return self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit)
async def clear_alarm(self): def clear_alarm(self):
""" """
Clear the temperature monitor alarm Clear the temperature monitor alarm
""" """
return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) return self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
async def set_sh_t0(self, t0): def set_sh_t0(self, t0):
""" """
Set t0 Steinhart-Hart parameter for the laser diode NTC Set t0 Steinhart-Hart parameter for the laser diode NTC
- t0: Degree Celsius - t0: Degree Celsius
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0) return self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0)
async def set_sh_r0(self, r0): def set_sh_r0(self, r0):
""" """
Set r0 Steinhart-Hart parameter for the laser diode NTC Set r0 Steinhart-Hart parameter for the laser diode NTC
- r0: Ohm - r0: Ohm
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0) return self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0)
async def set_sh_beta(self, beta): def set_sh_beta(self, beta):
""" """
Set beta Steinhart-Hart parameter for the laser diode NTC Set beta Steinhart-Hart parameter for the laser diode NTC
- beta: (unitless) - beta: (unitless)
""" """
return await self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta) return self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta)
async def config_temp_adc_filter(self, filter_config): def config_temp_adc_filter(self, filter_config):
""" """
Configure the temperature adc filter type and sampling rate. Configure the temperature adc filter type and sampling rate.
Please refer to AD7172 datasheet for the usage of various types of filter. Please refer to AD7172 datasheet for the usage of various types of filter.
@ -588,123 +597,298 @@ class Thermostat:
filter_config._odr_type(): filter_config, filter_config._odr_type(): filter_config,
} }
return await self._send_raw_cmd(cmd) return self._send_raw_cmd(cmd)
class Kirdy: class Kirdy:
def __init__(self): def __init__(self):
self._reader = None self.device = Device(self._send_cmd_handler, self._send_raw_cmd_handler, self._get_msg)
self._writer = None self.laser = Laser(self._send_cmd_handler)
self._connecting_task = None self.thermostat = Thermostat(self._send_cmd_handler, self._send_raw_cmd_handler)
self._cmd_lock = asyncio.Lock()
self._task_queue, self._msg_queue, self._int_msg_queue, self._report_queue = None, None, None, None
self._writer, self._reader = None, None
self._event_loop = None
self._lock = asyncio.Lock()
self._msg_queue_get_report = False
self._report_mode_on = False self._report_mode_on = False
self.timeout = None
self.device = Device(self._send_cmd_handler, self._send_raw_cmd_handler, self._read_response, self._cmd_lock)
self.laser = Laser(self._send_cmd_handler, self._cmd_lock)
self.thermostat = Thermostat(self._send_cmd_handler, self._send_raw_cmd_handler, self._cmd_lock)
async def start_session(self, host='192.168.1.128', port=1337, timeout=None): # PyQt Signal
self._connecting_task = asyncio.create_task( self._report_sig = None # Dict
asyncio.wait_for(asyncio.open_connection(host, port), timeout) self._connected_sig = None # Bool
)
self.timeout = timeout
try:
self._reader, self._writer = await self._connecting_task
writer_sock = self._writer.get_extra_info("socket")
writer_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except asyncio.CancelledError: def set_report_sig(self, sig):
raise StoppedConnecting """
finally: Connect a PyQt Signal to the active report output(dict)
self._connecting_task = None """
self._report_sig = sig
def set_connected_sig(self, sig):
"""
Connect a PyQt Signal to connection status(bool)
- True: Connection is established
- False: Connection is dropped
"""
self._connected_sig = sig
def start_session(self, host='192.168.1.128', port=1337, timeout=5.0, con_retry=5.0):
"""
Start Kirdy Connection Session.
A new thread is started to handle TCP connection and task execution in the background.
In case of disconnection, all the queued tasks are cleared and the thread retries TCP connection indefinitely.
- host: Kirdy's IP Address
- port: Kirdy's TCP Port
"""
if self._event_loop is None:
self._host, self._ctrl_port = host, port
self._timeout, self._con_retry = timeout, con_retry
self._event_loop = asyncio.new_event_loop()
self._thread = Thread(target=self._event_loop.run_forever)
self._thread.start()
asyncio.run_coroutine_threadsafe(self._handler(), self._event_loop)
return True
else:
logging.warning("Helper Thread has been started.")
return False
def end_session(self, block=False):
"""
Stop Kirdy's TCP connection and its associated thread.
"""
if self._event_loop is not None:
if block:
while not(self._task_queue.empty()):
pass
cancel_task = asyncio.run_coroutine_threadsafe(self._stop_handler(), self._event_loop)
while not(cancel_task.done()):
pass
self._thread.join()
self._writer = None
if self._connected_sig is not None:
self._connected_sig.emit(False)
def connecting(self): def connecting(self):
"""Returns True if client is connecting""" """
return self._connecting_task is not None Return True if client is connecting
"""
return not self.connected() and self._event_loop is not None
def connected(self): def connected(self):
"""Returns True if client is connected""" """
Returns True if client is connected.
"""
return self._writer is not None return self._writer is not None
async def end_session(self): def get_report_stream(self):
"""End session to Kirdy if connected, cancel connection if connecting""" """
if self._connecting_task is not None: Start reporting device status in json object.
self._connecting_task.cancel() """
if self.connected():
self._report_mode_on = True
if self._writer is None: self.device.set_active_report_mode(True)
return
while self._report_mode_on:
report = self._report_queue.get()
if isinstance(report, Exception):
raise report
yield report
self.device.set_active_report_mode(False)
else:
raise ConnectionError
def stop_report_mode(self):
self._report_mode_on = False
async def _sock_disconnection_handling(self):
# Reader needn't be closed # Reader needn't be closed
self._writer.close()
try: try:
self._writer.close()
await self._writer.wait_closed() await self._writer.wait_closed()
except ConnectionResetError: except:
# In Hard Reset/DFU cmd, Kirdy may close its socket first # In Hard Reset/DFU cmd, Kirdy may close its socket first
pass pass
self._reader = None self._reader = None
self._writer = None self._writer = None
async def _read_response(self, buffer_size=16384, report=False): for i in range(self._msg_queue.maxsize):
""" if self._msg_queue.full():
Decode newline delimited Json objects and return the latest json received inside the buffer. self._msg_queue.get_nowait()
- buffer_size: Integer self._msg_queue.put_nowait(None)
"""
try:
raw_response = await asyncio.wait_for(self._reader.read(buffer_size), self.timeout)
response = raw_response.decode('utf-8', errors='ignore').split("\n") for i in range(self._report_queue.maxsize):
items = [] if self._report_queue.full():
for item in reversed(response): self._report_queue.get_nowait()
self._report_queue.put_nowait(None)
if self._connected_sig is not None:
self._connected_sig.emit(False)
async def _handler(self):
try: try:
items.append(json.loads(item)) state = State.disconnected
except json.decoder.JSONDecodeError as e: first_con = True
pass task = None
if len(items) > 0 : while True:
if report: if state == State.disconnected:
return items[0] try:
await self.__coninit(self._timeout)
read_response_fut = asyncio.run_coroutine_threadsafe(self._read_response_handler(), self._event_loop)
task = None
logging.debug("Connected")
# State Transition
state = State.connected
except (OSError, TimeoutError):
if first_con:
first_con = False
logging.warning("Cannot connect to %s:%d. Retrying in the background", self._host, self._ctrl_port)
await asyncio.sleep(self._con_retry)
elif state == State.connected:
try:
task = await self._task_queue.get()
if isinstance(task, Exception):
raise task
response = await asyncio.wait_for(task(), self._timeout)
if response is not None:
if response["msg_type"] != "Acknowledge":
self._msg_queue.put_nowait(response)
except (TimeoutError, ConnectionResetError):
logging.warning("Kirdy connection is dropped.")
first_con = True
read_response_fut.cancel()
await self._sock_disconnection_handling()
# State Transition
state = State.disconnected
except asyncio.exceptions.CancelledError:
logging.debug("Handler is canceling")
except:
logging.debug("Handler experienced an error. Exiting.", exc_info=True)
await self._sock_disconnection_handling()
if self._event_loop is not None:
self._event_loop.call_soon_threadsafe(self._event_loop.stop)
self._event_loop = None
async def _read_response_handler(self):
try:
while True:
if self._report_mode_on:
responses = await asyncio.wait_for(self._read_response(), 5.0)
else: else:
return items responses = await self._read_response()
for response in responses:
if response["msg_type"] == 'Report' and not self._msg_queue_get_report:
if self._report_sig is None:
if self._report_queue.full():
self._report_queue.get_nowait()
self._report_queue.put_nowait(response)
else: else:
return { "msg_type": "EmptyResponse"} self._report_sig.emit(response)
else:
if self._msg_queue_get_report:
async with self._lock:
self._msg_queue_get_report = False
if self._int_msg_queue.full():
logging.debug("_int_msg_queue is full")
self._int_msg_queue.get_nowait()
self._int_msg_queue.put_nowait(response)
except asyncio.exceptions.CancelledError:
logging.debug("Read Response Handler is canceling")
except TimeoutError: except TimeoutError:
return { "msg_type": "RecvTimeout"} logging.warning("Read active report response timeout")
if self._task_queue.full():
logging.debug("_int_msg_queue is full")
self._task_queue.get_nowait()
self._task_queue.put_nowait(TimeoutError())
except ConnectionResetError:
logging.warning("Connection Reset by peer")
if self._task_queue.full():
logging.debug("_int_msg_queue is full")
self._task_queue.get_nowait()
self._task_queue.put_nowait(ConnectionResetError())
except:
logging.warn("Read Response Handler experienced an error. Exiting.", exc_info=True)
async def _send_raw_cmd_handler(self, cmd, lock=True, msg_type="Acknowledge"): if self._report_mode_on:
if lock: self._report_mode_on = False
async with self._cmd_lock: self._report_queue.put_nowait(TimeoutError)
return await asyncio.shield(self._send_raw_cmd(cmd, msg_type))
async def _stop_handler(self):
for task in asyncio.all_tasks():
task.cancel()
await asyncio.gather(*asyncio.all_tasks(), loop=self._event_loop)
async def __coninit(self, timeout):
self._task_queue = asyncio.Queue(maxsize=64)
self._int_msg_queue = asyncio.Queue(maxsize=4)
self._msg_queue = queue.Queue(maxsize=64)
self._report_queue = queue.Queue(maxsize=16)
self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self._host, self._ctrl_port), timeout)
writer_sock = self._writer.get_extra_info("socket")
writer_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self._connected_sig is not None:
self._connected_sig.emit(True)
async def _read_response(self, buffer_size=16384):
raw_response = await self._reader.read(buffer_size)
response = raw_response.decode('utf-8', errors='ignore').split("\n")
response.reverse()
items = []
for item in response[1:]:
items.append(json.loads(item))
return items
def _get_msg(self):
msg = self._msg_queue.get()
return msg
def _send_raw_cmd_handler(self, cmd, msg_type="Acknowledge", sig=None):
if self.connected():
self._event_loop.call_soon_threadsafe(self._task_queue.put_nowait, lambda: self._send_raw_cmd(cmd, msg_type, sig=sig))
else: else:
return await asyncio.shield(self._send_raw_cmd(cmd, msg_type)) raise ConnectionError
# If the cmd involves a cmd specific data type, # If the cmd involves a cmd specific data type,
# checking is done separately within the functions being called # checking is done separately within the functions being called
async def _send_raw_cmd(self, cmd, msg_type): async def _send_raw_cmd(self, cmd, msg_type, sig=None):
retry = 0 if self.connected():
while retry < 10:
try:
self._writer.write(bytes(json.dumps(cmd), "UTF-8")) self._writer.write(bytes(json.dumps(cmd), "UTF-8"))
await self._writer.drain() await self._writer.drain()
responses = await self._read_response() response = await self._int_msg_queue.get()
for response in responses:
if response["msg_type"] == msg_type: if response["msg_type"] == msg_type:
if sig is not None:
sig.emit(response)
return {"msg_type": "Acknowledge"}
return response return response
if response["msg_type"] == "InvalidCmd":
raise InvalidCmd
except asyncio.exceptions.CancelledError:
return None
except Exception as e:
retry += 1
logging.error(f"_send_raw_cmd Exception: {e}")
await asyncio.sleep(0.1)
raise NoAckRecv
async def _send_cmd_handler(self, target, cmd, data=None, msg_type="Acknowledge", lock=True):
if lock:
async with self._cmd_lock:
return await asyncio.shield(self._send_cmd(target, cmd, data, msg_type))
else: else:
return await asyncio.shield(self._send_cmd(target, cmd, data, msg_type)) raise InvalidCmd
else:
raise ConnectionError
async def _send_cmd(self, target, cmd, data, msg_type): def _send_cmd_handler(self, target, cmd, data=None, msg_type="Acknowledge", sig=None, hard_reset=False):
if self.connected():
if hard_reset:
while not(self._task_queue.empty()):
pass
self._event_loop.call_soon_threadsafe(self._task_queue.put_nowait, lambda: self._send_cmd(target, cmd, data, msg_type, sig=sig))
if hard_reset:
# Wait 1s for Kirdy to hard reset
time.sleep(1.0)
else:
raise ConnectionError
async def _send_cmd(self, target, cmd, data, msg_type, sig=None):
cmd_dict = {} cmd_dict = {}
cmd_dict[target] = cmd.name cmd_dict[target] = cmd.name
@ -721,38 +905,18 @@ class Kirdy:
elif cmd == "None": elif cmd == "None":
pass pass
retry = 0 if msg_type == 'Report':
while retry < 10: async with self._lock:
try: self._msg_queue_get_report = True
self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8")) self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8"))
await self._writer.drain() await self._writer.drain()
responses = await self._read_response()
for response in responses:
if response["msg_type"] == msg_type:
return response
retry += 1
await asyncio.sleep(0.1) msg = await self._int_msg_queue.get()
except asyncio.exceptions.CancelledError: if msg['msg_type'] == msg_type:
return None if sig is not None:
raise NoAckRecv sig.emit(msg)
return {"msg_type": "Acknowledge"}
async def report_mode(self, report_interval = 0.0, buffer_size=16384): return msg
""" else:
Start reporting device status in json object. Optional report_interval can be added to discard unwanted samples. raise InvalidCmd
Only the latest status report received within the buffer is returned.
- polling interval: float/int (unit: seconds)
- buffer_size: int
"""
await self.device.set_active_report_mode(True)
self._report_mode_on = True
while self._report_mode_on:
await asyncio.sleep(report_interval)
async with self._cmd_lock:
yield await self._read_response(buffer_size, report=True)
await self.device.set_active_report_mode(False)
def stop_report_mode(self):
self._report_mode_on = False