import types import socket import json import logging import time from threading import Thread from aenum import StrEnum, NoAlias import queue import asyncio class _dt(StrEnum): ip_settings = "ip_settings" temp_adc_filter = "temp_adc_filter" f32 = "data_f32" bool = "data_bool" none = "None" class State(StrEnum): disconnected = "disconnected" connected = "connected" class CmdList: class device(StrEnum, settings=NoAlias): _target = "device_cmd" SetIPSettings = _dt.ip_settings SetPdFinGain = _dt.f32 SetPdTransconductance = _dt.f32 SetActiveReportMode = _dt.bool GetHwRev = _dt.none GetStatusReport = _dt.none GetSettingsSummary = _dt.none Dfu = _dt.none SaveFlashSettings = _dt.none LoadFlashSettings = _dt.none HardReset = _dt.none class ld(StrEnum, settings=NoAlias): _target = "laser_diode_cmd" SetDefaultPowerOn = _dt.bool PowerUp = _dt.none PowerDown = _dt.none LdTermsShort = _dt.none LdTermsOpen = _dt.none SetI = _dt.f32 SetPdResponsitivity = _dt.f32 SetPdDarkCurrent = _dt.f32 ApplyPdParams = _dt.none SetLdPwrLimit = _dt.f32 ClearAlarm = _dt.none class thermostat(StrEnum, settings=NoAlias): _target = "thermostat_cmd" SetDefaultPowerOn = _dt.bool, PowerUp = _dt.f32, PowerDown = _dt.f32, SetTecMaxV = _dt.f32, SetTecMaxIPos = _dt.f32, SetTecMaxINeg = _dt.f32, SetTecIOut = _dt.f32, SetTemperatureSetpoint = _dt.f32, SetPidEngage = _dt.none, SetPidDisEngage = _dt.none, SetPidKp = _dt.f32, SetPidKi = _dt.f32, SetPidKd = _dt.f32, SetPidOutMin = _dt.f32, SetPidOutMax = _dt.f32, ConfigTempAdcFilter = _dt.temp_adc_filter, GetPollInterval = _dt.none, SetTempMonUpperLimit = _dt.f32, SetTempMonLowerLimit = _dt.f32, ClearAlarm = _dt.none, SetShT0 = _dt.f32, SetShR0 = _dt.f32, SetShBeta = _dt.f32, class FilterConfig: class Sinc5Sinc1With50hz60HzRejection(StrEnum): f27sps = "F27SPS" f21sps = "F21SPS" f20sps = "F20SPS" f16sps = "F16SPS" def _odr_type(self): return "sinc5sinc1postfilter" def _filter_type(self): return "Sinc5Sinc1With50hz60HzRejection" class Sinc5Sinc1(StrEnum): f31250_0sps = "F31250_0SPS" f15625_0sps = "F15625_0SPS" f10417_0sps = "F10417_0SPS" f5208_0sps = "F5208_0SPS" f2597_0sps = "F2597_0SPS" f1007_0sps = "F1007_0SPS" f503_8sps = "F503_8SPS" f381_0sps = "F381_0SPS" f200_3sps = "F200_3SPS" f100_2sps = "F100_2SPS" f59_52sps = "F59_52SPS" f49_68sps = "F49_68SPS" f20_01sps = "F20_01SPS" f16_63sps = "F16_63SPS" f10_0sps = "F10_0SPS" f5_0sps = "F5_0SPS" f2_5sps = "F2_5SPS" f1_25sps = "F1_25SPS" def _odr_type(self): return "sinc5sinc1odr" def _filter_type(self): return "Sinc5Sinc1" class Sinc3(StrEnum): f31250_0sps = "F31250_0SPS" f15625_0sps = "F15625_0SPS" f10417_0sps = "F10417_0SPS" f5208_0sps = "F5208_0SPS" f2597_0sps = "F2597_0SPS" f1007_0sps = "F1007_0SPS" f503_8sps = "F503_8SPS" f381_0sps = "F381_0SPS" f200_3sps = "F200_3SPS" f100_2sps = "F100_2SPS" f59_52sps = "F59_52SPS" f49_68sps = "F49_68SPS" f20_01sps = "F20_01SPS" f16_63sps = "F16_63SPS" f10_0sps = "F10_0SPS" f5_0sps = "F5_0SPS" f2_5sps = "F2_5SPS" f1_25sps = "F1_25SPS" def _odr_type(self): return "sinc3odr" def _filter_type(self): return "Sinc3" class Sinc3WithFineODR(): upper_limit = 31250 lower_limit = 1.907465 def __init__(self, rate): assert rate >= self.lower_limit and rate <= self.upper_limit self.rate = float(rate) def _odr_type(self): return "sinc3fineodr" def _filter_type(self): return "Sinc3WithFineODR" class InvalidDataType(Exception): pass class InvalidCmd(Exception): pass class Device: def __init__(self, send_cmd_handler, send_raw_cmd_handler): self._cmd = CmdList.device self._send_cmd = send_cmd_handler self._send_raw_cmd = send_raw_cmd_handler async def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"): """ Upon command execution, the ip settings are saved into flash and are effective upon next reboot. """ try: socket.inet_aton(addr) socket.inet_aton(gateway) except OSError: raise InvalidDataType addr = addr.split(".") gateway = gateway.split(".") if not(isinstance(port, int) and isinstance(prefix_len, int)): raise InvalidDataType return await self._send_raw_cmd( { "device_cmd": "SetIPSettings", "ip_settings": { "addr": [int(x) for x in addr], "port": port, "prefix_len": prefix_len, "gateway": [int(x) for x in gateway], } } ) async def set_active_report_mode(self, on): """ Set active report to be on. If it is on, Kirdy will send status report to the client socket according to the temperature polling rate set. """ return await self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on) async def set_pd_mon_fin_gain(self, gain): """ Configure the photodiode monitor final analog front-end stage gain. - gain: unitless """ return await self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain) async def set_pd_mon_transconductance(self, transconductance): """ Configure the photodiode monitor transconductance value. - transconductance: 1/Ohm """ return await self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance) async def get_hw_rev(self): """ Get hardware revision of the connected Kirdy { 'msg_type': 'HwRev', 'hw_rev': { 'major': 0, 'minor': 3 } } """ return await self._send_cmd(self._cmd._target, self._cmd.GetHwRev, msg_type="HwRev") async def get_status_report(self, sig=None): """ Get status of all peripherals in a json object. { 'ts': 227657, # Relative Timestamp (ms) 'msg_type': 'Report' # Indicate it is a 'Report' json object 'laser': { 'pwr_on': False, # Laser Power is On (True/False) 'pwr_excursion': False, # Was Laser experienced an Overpowered Condition? (True/False) 'ld_i_set': 0.0, # Laser Diode Output Current (A) 'pd_i': 2.0000002e-06, # Internal Photodiode Monitor current (A) 'pd_pwr': None, # Power Readings from Internal Photodiode (W). Return None if pd_mon parameter(s) are not defined. 'term_50ohm': 'On' # Is the Low Frequency Modulation Input's Impedance 50 Ohm? ("On"/"Off") }, 'thermostat': { 'pwr_on': False, # Tec Power is On (True/False) 'pid_engaged': False, # Is Pid_Engaged. If False, it is in Constant Current Mode (True/False) 'temp_mon_status': { # Temperature Monitor: 'status': 'Off', # "Off": Power is Off # "ConstantCurrentMode": Thermostat is regulated in CC mode # "PidStartUp": PID Regulation is not stable # "PidStable": PID Regulation is stable and the temperature is within +-1mK to the setpoint # "OverTempAlarm": Overtemperature Alarm is triggered 'over_temp_alarm': False # Was Laser Diode experienced an Overtemperature condition (True/False) }, 'temperature': 25.03344, # Temperature Readings (Degree Celsius) 'i_set': 0.0, # Tec Current Set by User/PID Controller(A) 'tec_i': 0.0024998188, # Tec Current Readings (A) 'tec_v': -0.00399971 # Tec Voltage Readings (V) } } """ return await self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig) async def get_settings_summary(self, sig=None): """ Get the current settings of laser and thermostat in a json object. { 'msg_type': 'Settings', # Indicate it is a 'Settings' json object 'laser': { 'default_pwr_on': False, # Power On Laser Diode at Startup 'ld_drive_current': { # Laser Diode Output Current (A) 'value': 0.0, # Value Set 'max': 0.3 # Max Value Settable }, 'pd_mon_params': { # Photodiode Parameters 'transconductance': 0.000115258765 # Board Specific Transconductance (1/ohm) 'responsitivity': 0.0141, # Responsitivity (A/W) 'i_dark': 0.0 # Max Value Settable (A) }, 'ld_pwr_limit': { # Laser Diode Power Limit (W) 'value': 0.00975, # Value Set 'max': 0.023321507 # Max Value Settable }, 'ld_terms_short: False, # Is Laser Diode Terminals short? (True/False) }, 'thermostat': { 'default_pwr_on': True, # Power on Thermostat at Startup 'pid_engaged': True, # True: PID Control Mode | False Constant Current Mode 'temperature_setpoint': 25.0, # Temperature Setpoint (Degree Celsius) 'tec_settings': { 'i_set': { # Current TEC Current Set by PID Controller/User (A) 'value': 0.04330516, # Value Set 'max': 1.0 # Max Value Settable }, 'max_v': { # Max Voltage Across Tec Terminals (V) 'value': 4.990857, # Value Set 'max': 5.0 # Max Value Settable }, 'max_i_pos': { # Max Cooling Current Across Tec Terminals (A) 'value': 0.99628574, # Value Set 'max': 1.0 # Max Value Settable }, 'max_i_neg': { # Max Heating Current Across Tec Terminals (A) 'value': 0.99628574, # Value Set 'max': 1.0 # Max Value Settable } }, 'pid_params': { # PID Controller Parameters 'kp': 0.15668282, # Proportional Gain 'ki': 0.0021359625, # Integral Gain 'kd': 0.8292545, # Derivative Gain 'output_min': -1.0, # Minimum Current Output (A) 'output_max': 1.0 # Maximum Current Output (A) }, 'temp_adc_settings': { # Temperature ADC Settings (Please read AD7172-2 Documentation) 'filter_type': 'Sinc5Sinc1With50hz60HzRejection', # Filter Types 'sinc5sinc1odr': None, # (Unused) 'sinc3odr': None, # (Unused) 'sinc5sinc1postfilter': None, # (Unused) 'sinc3fineodr': None, # (Unused) 'rate': 16.67 # ADC Sampling Rate (Hz) }, 'temp_mon_settings': { # Temperature Monitor Settings 'upper_limit': 40.0, # Temperature Upper Limit (Degree Celsius) 'lower_limit': 10.0 # Temperature Lower Limit (Degree Celsius) }, 'thermistor_params': { # Thermistor Steinhart-Hart equation parameters 't0': 25.0, # t0: Degree Celsius 'r0': 10000.0, # r0: Ohm 'b': 3900.0 # b: (unitless) } } } """ return await self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig) async def dfu(self): """ Hard reset and put the connected Kirdy into the Dfu mode for firmware update. """ return await self._send_cmd(self._cmd._target, self._cmd.Dfu) async def save_current_settings_to_flash(self): """ Save the current laser diode and thermostat configurations into flash. """ return await self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings) async def restore_settings_from_flash(self): """ Restore the laser diode and thermostat settings from flash. """ return await self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings) async def hard_reset(self): """ Hard Reset Kirdy. The socket connection will be closed by Kirdy. Laser diode power and Tec power will be turned off. Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets before hard reset take place. """ return await self._send_cmd(self._cmd._target, self._cmd.HardReset) class Laser: def __init__(self, send_cmd_handler): self._cmd = CmdList.ld self._send_cmd = send_cmd_handler async def set_power_on(self, on): """ Power Up or Power Down laser diode. Powering up the Laser Diode resets the pwr_excursion status - on (True/False) """ if on: return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) else: return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) async def set_default_pwr_on(self, on): """ Set whether laser diode is powered up at Startup. - on (True/False) """ return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) async def set_ld_terms_short(self, short): """ Open/Short laser diode terminals. - on (True/False) """ if short: return await self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None) else: return await self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None) async def set_i(self, i): """ Set laser diode output current: Max(0, Min(i_set, 300mA)). - i: A """ return await self._send_cmd(self._cmd._target, self._cmd.SetI, i) async def set_pd_mon_responsitivity(self, responsitivity): """ Configure the photodiode monitor responsitivity parameter. The value is only effective if ApplyPdParams cmd is issued. - responsitivity: A/W """ return await self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity) async def set_pd_mon_dark_current(self, dark_current): """ Configure the photodiode monitor dark current parameter. The value is only effective if ApplyPdParams cmd is issued. - dark_current: A """ return await self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current) async def apply_pd_params(self): """ Evaluate and apply photodiode monitor parameters that are set with SetPdDarkCurrent and SetPdResponsitivity cmd. After Kirdy receives the cmd, it will check if the current power limit is within the newly calculated power limit range. If it is out of range, the photodiode monitor parameters remains unchanged and Kirdy sends out a "InvalidSettings" message along with an error message. """ return await self._send_cmd(self._cmd._target, self._cmd.ApplyPdParams) async def set_ld_pwr_limit(self, pwr_limit): """ Set the power limit for the power excursion monitor. If the power limit settings is out of range, power limit remains unchanged and Kirdy sends out a "InvalidSettings" message along with an error message. If the calculated power with the params of pd_mon > pwr_limit, overpower protection is triggered. - pwr_limit: W """ return await self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit) async def clear_alarm(self): """ Clear the power excursion monitor alarm. """ return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) class Thermostat: def __init__(self, send_cmd_handler, send_raw_cmd_handler): self._cmd = CmdList.thermostat self._send_cmd = send_cmd_handler self._send_raw_cmd = send_raw_cmd_handler async def set_power_on(self, on): """ Power up or power down thermostat. - Powering up the thermostat resets the pwr_excursion status """ if on: return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) else: return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) async def set_default_pwr_on(self, on): """ Set whether thermostat is powered up at Startup. - on: (True/False) """ return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) async def set_tec_max_v(self, max_v): """ Set Tec Maximum Voltage Across the TEC Terminals. - max_v: V """ return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v) async def set_tec_max_cooling_i(self, max_i_pos): """ Set Tec maximum cooling current (Settable Range: 0.0 - 1.0) - max_i_pos: A """ return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos) async def set_tec_max_heating_i(self, max_i_neg): """ Set Tec maximum heating current (Settable Range: 0.0 - 1.0) - max_i_neg: A """ return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg) async def set_tec_i_out(self, i_out): """ Set Tec Output Current (Settable Range: 0.0 - 1.0) This cmd is only effective in constant current control mode or your newly set value will be overwritten by PID Controller Output - i_out: A """ if isinstance(i_out, float): return await self._send_raw_cmd({"tec_set_i": i_out}) elif isinstance(i_out, int): return await self._send_raw_cmd({"tec_set_i": float(i_out)}) else: raise InvalidDataType async def set_constant_current_control_mode(self): """ Disable PID Controller and output current can be controlled with set_tec_i_out() cmd. """ return await self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None) async def set_temperature_setpoint(self, temperature): """ Set Temperature Setpoint for PID Controller. This parameter is not active in constant current control mode - temperature: Degree Celsius """ return await self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature) async def set_pid_control_mode(self): """ Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate. Please refer to config_temp_adc_filter for the possible polling rate options """ return await self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None) async def set_pid_kp(self, kp): """ Set Kp parameter for PID Controller kp: (unitless) """ return await self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp) async def set_pid_ki(self, ki): """ Set Ki parameter for PID Controller ki: (unitless) """ return await self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki) async def set_pid_kd(self, kd): """ Set Kd parameter for PID Controller kd: (unitless) """ return await self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd) async def set_pid_output_max(self, out_max): """ Set max output limit at the PID Output - out_max: A """ return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max) async def set_pid_output_min(self, out_min): """ Set min output limit at the PID Output - out_min: A """ return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min) async def set_temp_mon_upper_limit(self, upper_limit): """ Set Temperature Monitor Upper Limit Threshold. Exceeding the limit for too long will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown - upper_limit: Degree Celsius """ return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit) async def set_temp_mon_lower_limit(self, lower_limit): """ Set Temperature Monitor Lower Limit Threshold. Exceeding the limit for too long will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown - lower_limit: Degree Celsius """ return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit) async def clear_alarm(self): """ Clear the temperature monitor alarm """ return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) async def set_sh_t0(self, t0): """ Set t0 Steinhart-Hart parameter for the laser diode NTC - t0: Degree Celsius """ return await self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0) async def set_sh_r0(self, r0): """ Set r0 Steinhart-Hart parameter for the laser diode NTC - r0: Ohm """ return await self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0) async def set_sh_beta(self, beta): """ Set beta Steinhart-Hart parameter for the laser diode NTC - beta: (unitless) """ return await self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta) async def config_temp_adc_filter(self, filter_config): """ Configure the temperature adc filter type and sampling rate. Please refer to AD7172 datasheet for the usage of various types of filter. The actual temperature polling rate is bottlenecked by the processing speed of the MCU and performs differently under different kinds of workload. Please verify the polling rate with the timestamp. """ cmd = {} cmd[self._cmd._target] = self._cmd.ConfigTempAdcFilter.name if hasattr(filter_config, 'rate'): cmd[self._cmd.ConfigTempAdcFilter] = { "filter_type": filter_config._filter_type(), filter_config._odr_type(): filter_config.rate, } else: cmd[self._cmd.ConfigTempAdcFilter] = { "filter_type": filter_config._filter_type(), filter_config._odr_type(): filter_config, } return await self._send_raw_cmd(cmd) async def get_poll_interval(self): return await self._send_cmd(self._cmd._target, self._cmd.GetPollInterval, msg_type="Interval") class Kirdy: def __init__(self): self.device = Device(self._send_cmd, self._send_raw_cmd) self.laser = Laser(self._send_cmd) self.thermostat = Thermostat(self._send_cmd, self._send_raw_cmd) self.hw_rev = None self._task_queue, self._int_msg_queue, self._report_queue = None, None, None self._timeout = 5.0 self._writer, self._reader = None, None self._event_loop = None self._msg_queue_get_report = False self._report_mode_on = False self._state = State.disconnected self.read_response_task, self.handler_task = None, None self._lock = asyncio.Lock() # PyQt Signal self._report_sig = None # Dict self._connected_sig = None # Bool self._err_msg_sig = None # Str self.connected_event = None def get_hw_rev(self): return self.hw_rev def set_report_sig(self, sig): """ Connect a PyQt Signal to the active report output(dict). This should be configured before the session is started. """ self._report_sig = sig def set_connected_sig(self, sig): """ Connect a PyQt Signal to connection status(bool) - True: Connection is established - False: Connection is dropped """ self._connected_sig = sig def set_err_msg_sig(self, sig): """ Emit a error message to a PyQt Signal(str) when a cmd fails to execute """ self._err_msg_sig = sig def start_session(self, host='192.168.1.128', port=1337): """ Start Kirdy Connection Session. In case of disconnection, all the queued tasks are cleared and the handler task retries TCP connection indefinitely. - host: Kirdy's IP Address - port: Kirdy's Port Number """ self._host, self._ctrl_port = host, port if self._event_loop is None: try: self._event_loop = asyncio.get_running_loop() except: self._event_loop = asyncio.new_event_loop() self._event_loop.run_forever() self.connected_event = asyncio.Event() self.handler_task = asyncio.create_task(self._handler()) async def end_session(self, block=False): """ Stop Kirdy's TCP connection and its associated thread. """ if self._event_loop is not None: if block: await self._task_queue.join() if self.read_response_task is not None: self.read_response_task.cancel() await self.read_response_task self.read_response_task = None if self.handler_task is not None: self.handler_task.cancel() await self.handler_task self.handler_task = None self._writer = None if self._connected_sig is not None: self._connected_sig.emit(False) def connecting(self): """ Return True if client is connecting """ return self._state == State.disconnected and self.read_response_task is not None def connected(self): """ Returns True if client is connected. """ return self._writer is not None async def wait_until_connected(self): if not(self.connected()): await self.connected_event.wait() async def report_mode(self): """ Enable and retrieve active report from Kirdy """ if self.connected(): self._report_mode_on = True await self.device.set_active_report_mode(True) report = None while self._report_mode_on: report = await self._report_queue.get() if not(isinstance(report, dict)): self.stop_active_report() else: yield report if isinstance(report, dict): await self.device.set_active_report_mode(False) else: raise ConnectionError def stop_report_mode(self): self._report_mode_on = False def task_dispatcher(self, awaitable_fn): """ Enqueue a task to be handled by the handler. """ if self.connected(): try: self._task_queue.put_nowait(lambda: awaitable_fn) return True except asyncio.queues.QueueFull: return False else: raise ConnectionError async def _sock_disconnection_handling(self): # Reader needn't be closed try: self._writer.close() await self._writer.wait_closed() except: # In Hard Reset/DFU cmd, Kirdy may close its socket first pass self._reader = None self._writer = None for i in range(self._report_queue.maxsize): if self._report_queue.full(): self._report_queue.get_nowait() self._report_queue.put_nowait(None) if self._connected_sig is not None: self._connected_sig.emit(False) async def _handler(self): try: self._state = State.disconnected first_con = True task = None while True: if self._state == State.disconnected: try: self.hw_rev = None await self.__coninit(self._timeout) self.read_response_task = asyncio.create_task(self._read_response_handler()) task = None logging.info("Connected to %s:%d", self._host, self._ctrl_port) hw_rev = await self.device.get_hw_rev() self.hw_rev = hw_rev["hw_rev"] if self._connected_sig is not None: self._connected_sig.emit(True) self.connected_event.set() # State Transition self._state = State.connected except (OSError, TimeoutError): if first_con: first_con = False logging.warning("Cannot connect to %s:%d. Retrying in the background.", self._host, self._ctrl_port) await asyncio.sleep(5.0) elif self._state == State.connected: try: task = await self._task_queue.get() if isinstance(task, Exception): raise task await task() self._task_queue.task_done() except (TimeoutError, ConnectionResetError, ConnectionError): logging.warning("Connection to Kirdy is dropped.") first_con = True self.read_response_task.cancel() # State Transition self._state = State.disconnected await self._sock_disconnection_handling() except asyncio.exceptions.CancelledError: pass except: logging.warning("Handler experienced an error.", exc_info=True) await self._sock_disconnection_handling() async def _read_response_handler(self): try: while True: if self._report_mode_on: response = await asyncio.wait_for(self._read_response(), self._timeout) else: response = await self._read_response() if response["msg_type"] == 'HardReset': logging.warn("Kirdy is being hard reset.") raise asyncio.exceptions.CancelledError if response["msg_type"] == 'Dfu': logging.warn("Kirdy enters Dfu Mode.") asyncio.create_task(self.end_session()) if response["msg_type"] == 'ConnectionClose': logging.warn("Kirdy runs out of TCP sockets and closes this connected socket.") asyncio.create_task(self.end_session()) if response["msg_type"] == 'Report' and not self._msg_queue_get_report: if self._report_sig is None: self._report_queue.put_nowait_overwrite(response) else: self._report_sig.emit(response) else: if self._msg_queue_get_report: self._msg_queue_get_report = False self._int_msg_queue.put_nowait_overwrite(response) except asyncio.exceptions.CancelledError: pass except (TimeoutError, ConnectionResetError, ConnectionError) as exec: self._task_queue.put_nowait_overwrite(exec) self._int_msg_queue.put_nowait_overwrite(exec) except Exception as exec: logging.warn("Read Response Handler experienced an error. Exiting.", exc_info=True) self._task_queue.put_nowait_overwrite(exec) self._int_msg_queue.put_nowait_overwrite(exec) if self._report_mode_on: self._report_mode_on = False self._report_queue.put_nowait_overwrite(TimeoutError) async def _stop_handler(self): for task in asyncio.all_tasks(): task.cancel() await asyncio.gather(*asyncio.all_tasks(), loop=self._event_loop) async def __coninit(self, timeout): def _put_nowait_overwrite(self, item): if self.full(): self.get_nowait() self.put_nowait(item) asyncio.Queue.put_nowait_overwrite = _put_nowait_overwrite if self._task_queue is not None: while not(self._task_queue.empty()): task = self._task_queue.get_nowait() if isinstance(task, types.FunctionType): task().close() else: self._task_queue = asyncio.Queue(maxsize=16) self._int_msg_queue = asyncio.Queue(maxsize=4) self._report_queue = asyncio.Queue(maxsize=16) self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self._host, self._ctrl_port), timeout) writer_sock = self._writer.get_extra_info("socket") writer_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) async def _read_response(self): raw_response = b'' while len(raw_response) == 0: # Ignore 0 size packet raw_response = await self._reader.readuntil() response = raw_response.decode('utf-8', errors='ignore').split("\n") return json.loads(response[0]) def _response_handling(self, msg, msg_type, sig=None): if msg["msg_type"] in ["InvalidCmd", "InvalidDatatype"]: raise InvalidCmd elif msg["msg_type"] == msg_type: if sig is not None: sig.emit(msg) else: logging.warn(f"Commands fail to execute. {msg['msg_type']}:{msg['msg']}") if self._err_msg_sig is not None and msg['msg'] is not None: self._err_msg_sig.emit(msg['msg']) return msg async def _send_raw_cmd(self, cmd, msg_type="Acknowledge", sig=None): if self.connected(): async with self._lock: self._writer.write(bytes(json.dumps(cmd), "UTF-8")) await self._writer.drain() msg = await asyncio.wait_for(self._int_msg_queue.get(), self._timeout) return self._response_handling(msg, msg_type, sig) else: raise ConnectionError async def _send_cmd(self, target, cmd, data=None, msg_type="Acknowledge", sig=None): cmd_dict = {} cmd_dict[target] = cmd.name if cmd == _dt.f32: if isinstance(data, float): cmd_dict[cmd] = data elif isinstance(data, int): cmd_dict[cmd] = float(data) elif cmd == _dt.bool: if isinstance(data, bool): cmd_dict[cmd] = data else: raise InvalidDataType elif cmd == "None": pass if msg_type == 'Report': self._msg_queue_get_report = True async with self._lock: self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8")) await self._writer.drain() msg = await asyncio.wait_for(self._int_msg_queue.get(), self._timeout) if isinstance(msg, Exception): raise msg return self._response_handling(msg, msg_type, sig)