From 5166bb7ba89fbcc1e5efa17467717d8452e1c14d Mon Sep 17 00:00:00 2001 From: linuswck Date: Mon, 2 Sep 2024 17:57:24 +0800 Subject: [PATCH] driver: rm thread use & make it asyncio callable - make all cmds asyncio callable - control specific cmds can be enqueued to the handler synchronously --- pykirdy/driver/kirdy.py | 508 +++++++++++++++++++--------------------- 1 file changed, 247 insertions(+), 261 deletions(-) diff --git a/pykirdy/driver/kirdy.py b/pykirdy/driver/kirdy.py index 26f75b1..36c409b 100644 --- a/pykirdy/driver/kirdy.py +++ b/pykirdy/driver/kirdy.py @@ -1,3 +1,4 @@ +import types import socket import json import logging @@ -153,15 +154,14 @@ class InvalidCmd(Exception): pass class Device: - def __init__(self, send_cmd_handler, send_raw_cmd_handler, read_msg_queue): + def __init__(self, send_cmd_handler, send_raw_cmd_handler): self._cmd = CmdList.device self._send_cmd = send_cmd_handler self._send_raw_cmd = send_raw_cmd_handler - self._read_msg_queue = read_msg_queue - def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"): + async def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"): """ - After calling this fn, the ip settings are immediately saved into flash and will be effective on next reboot. + Upon command execution, the ip settings are saved into flash and are effective upon next reboot. """ try: socket.inet_aton(addr) @@ -175,7 +175,7 @@ class Device: if not(isinstance(port, int) and isinstance(prefix_len, int)): raise InvalidDataType - return self._send_raw_cmd( + return await self._send_raw_cmd( { "device_cmd": "SetIPSettings", "ip_settings": { @@ -187,32 +187,31 @@ class Device: } ) - def set_active_report_mode(self, on): + async def set_active_report_mode(self, on): """ Set active report to be on. If it is on, Kirdy will send status report - to ALL client socket connections according to the temperature polling rate set. + to the client socket according to the temperature polling rate set. """ - return self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on) + return await self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on) - def set_pd_mon_fin_gain(self, gain): + async def set_pd_mon_fin_gain(self, gain): """ - Configure the photodiode monitor final analog front-end stage gain + Configure the photodiode monitor final analog front-end stage gain. - gain: unitless """ - return self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain) + return await self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain) - def set_pd_mon_transconductance(self, transconductance): + async def set_pd_mon_transconductance(self, transconductance): """ - Configure the photodiode monitor transconductance + Configure the photodiode monitor transconductance value. - transconductance: 1/Ohm """ - return self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance) + return await self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance) - def get_status_report(self, sig=None): + async def get_status_report(self, sig=None): """ - Get status of all peripherals in a json object + Get status of all peripherals in a json object. - Example of yielded data:: { 'ts': 227657, # Relative Timestamp (ms) 'msg_type': 'Report' # Indicate it is a 'Report' json object @@ -222,13 +221,17 @@ class Device: 'ld_i_set': 0.0, # Laser Diode Output Current (A) 'pd_i': 2.0000002e-06, # Internal Photodiode Monitor current (A) 'pd_pwr': None, # Power Readings from Internal Photodiode (W). Return None if pd_mon parameter(s) are not defined. - 'term_50ohm': 'Is50Ohm' # Is the Low Frequency Modulation Input's Impedance 50 Ohm? (On/Off) + 'term_50ohm': 'On' # Is the Low Frequency Modulation Input's Impedance 50 Ohm? ("On"/"Off") }, 'thermostat': { 'pwr_on': False, # Tec Power is On (True/False) 'pid_engaged': False, # Is Pid_Engaged. If False, it is in Constant Current Mode (True/False) 'temp_mon_status': { # Temperature Monitor: - 'status': 'Off', # (To be revised) + 'status': 'Off', # "Off": Power is Off + # "ConstantCurrentMode": Thermostat is regulated in CC mode + # "PidStartUp": PID Regulation is not stable + # "PidStable": PID Regulation is stable and the temperature is within +-1mK to the setpoint + # "OverTempAlarm": Overtemperature Alarm is triggered 'over_temp_alarm': False # Was Laser Diode experienced an Overtemperature condition (True/False) }, 'temperature': 25.03344, # Temperature Readings (Degree Celsius) @@ -238,15 +241,11 @@ class Device: } } """ - if sig is None: - self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig) - return self._read_msg_queue() - else: - return self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig) + return await self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig) - def get_settings_summary(self, sig=None): + async def get_settings_summary(self, sig=None): """ - Get the current settings of laser and thermostat in a json object + Get the current settings of laser and thermostat in a json object. { 'msg_type': 'Settings', # Indicate it is a 'Settings' json object @@ -255,13 +254,13 @@ class Device: 'ld_drive_current': { # Laser Diode Output Current(A) 'value': 0.0, # Value Set 'max': 0.3 # Max Value Settable - , + }, 'pd_mon_params': { # Laser Diode Software Current Limit(A) 'responsitivity': None, # Value Set 'i_dark': 0.0 # Max Value Settable - , - 'ld_pwr_limit': 0.0 # Laser Diode Power Limit(W) - 'ld_terms_short: False # Is Laser Diode Terminals short? (True/False) + }, + 'ld_pwr_limit': 0.0, # Laser Diode Power Limit(W) + 'ld_terms_short: False, # Is Laser Diode Terminals short? (True/False) }, 'thermostat': { 'default_pwr_on': True, # Power on Thermostat at Startup @@ -312,106 +311,101 @@ class Device: } } """ - if sig is None: - self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig) - return self._read_msg_queue() - else: - return self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig) + return await self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig) - def dfu(self): + async def dfu(self): """ - Issuing this cmd will HARD RESET the device and - put Kirdy into Dfu mode for flashing firmware. + Hard reset and put the connected Kirdy into the Dfu mode for firmware update. """ - return self._send_cmd(self._cmd._target, self._cmd.Dfu, hard_reset=True) + return await self._send_cmd(self._cmd._target, self._cmd.Dfu) - def save_current_settings_to_flash(self): + async def save_current_settings_to_flash(self): """ Save the current laser diode and thermostat configurations into flash. """ - return self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings) + return await self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings) - def restore_settings_from_flash(self): + async def restore_settings_from_flash(self): """ - Restore the laser diode and thermostat settings from flash + Restore the laser diode and thermostat settings from flash. """ - return self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings) + return await self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings) - def hard_reset(self): + async def hard_reset(self): """ Hard Reset Kirdy. The socket connection will be closed by Kirdy. Laser diode power and Tec power will be turned off. - Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets indicating. The device is being reset. + Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets before hard reset take place. """ - return self._send_cmd(self._cmd._target, self._cmd.HardReset, hard_reset=True) + return await self._send_cmd(self._cmd._target, self._cmd.HardReset) class Laser: def __init__(self, send_cmd_handler): self._cmd = CmdList.ld self._send_cmd = send_cmd_handler - def set_power_on(self, on): + async def set_power_on(self, on): """ Power Up or Power Down laser diode. Powering up the Laser Diode resets the pwr_excursion status - on (True/False) """ if on: - return self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) + return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) else: - return self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) + return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) - def set_default_pwr_on(self, on): + async def set_default_pwr_on(self, on): """ - Set whether laser diode is powered up at Startup + Set whether laser diode is powered up at Startup. - on (True/False) """ - return self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) + return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) - def set_ld_terms_short(self, short): + async def set_ld_terms_short(self, short): """ Open/Short laser diode terminals. - on (True/False) """ if short: - return self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None) + return await self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None) else: - return self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None) + return await self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None) - def set_i(self, i): + async def set_i(self, i): """ - Set laser diode output current: Max(0, Min(i_set, i_soft_limit)) + Set laser diode output current: Max(0, Min(i_set, 300mA)). - i: A """ - return self._send_cmd(self._cmd._target, self._cmd.SetI, i) + return await self._send_cmd(self._cmd._target, self._cmd.SetI, i) - def set_pd_mon_responsitivity(self, responsitivity): + async def set_pd_mon_responsitivity(self, responsitivity): """ - Configure the photodiode monitor responsitivity parameter + Configure the photodiode monitor responsitivity parameter. - responsitivity: A/W """ - return self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity) + return await self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity) - def set_pd_mon_dark_current(self, dark_current): + async def set_pd_mon_dark_current(self, dark_current): """ - Configure the photodiode monitor dark current parameter + Configure the photodiode monitor dark current parameter. - dark_current: A """ - return self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current) + return await self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current) - def set_ld_pwr_limit(self, pwr_limit): + async def set_ld_pwr_limit(self, pwr_limit): """ - Set power limit for the power excursion monitor + Set the power limit for the power excursion monitor. If the calculated power with the params of pd_mon > pwr_limit, overpower protection is triggered. - pwr_limit: W """ - return self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit) + return await self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit) - def clear_alarm(self): + async def clear_alarm(self): """ - Clear the power excursion monitor alarm + Clear the power excursion monitor alarm. """ - return self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) + return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) class Thermostat: def __init__(self, send_cmd_handler, send_raw_cmd_handler): @@ -419,158 +413,158 @@ class Thermostat: self._send_cmd = send_cmd_handler self._send_raw_cmd = send_raw_cmd_handler - def set_power_on(self, on): + async def set_power_on(self, on): """ - Power up or power down thermostat + Power up or power down thermostat. - Powering up the thermostat resets the pwr_excursion status """ if on: - return self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) + return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) else: - return self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) + return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) - def set_default_pwr_on(self, on): + async def set_default_pwr_on(self, on): """ - Set whether thermostat is powered up at Startup + Set whether thermostat is powered up at Startup. - on: (True/False) """ - return self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) + return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) - def set_tec_max_v(self, max_v): + async def set_tec_max_v(self, max_v): """ - Set Tec Maximum Voltage Across the TEC Terminals + Set Tec Maximum Voltage Across the TEC Terminals. - max_v: V """ - return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v) + return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v) - def set_tec_max_cooling_i(self, max_i_pos): + async def set_tec_max_cooling_i(self, max_i_pos): """ Set Tec maximum cooling current (Settable Range: 0.0 - 1.0) - max_i_pos: A """ - return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos) + return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos) - def set_tec_max_heating_i(self, max_i_neg): + async def set_tec_max_heating_i(self, max_i_neg): """ Set Tec maximum heating current (Settable Range: 0.0 - 1.0) - max_i_neg: A """ - return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg) + return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg) - def set_tec_i_out(self, i_out): + async def set_tec_i_out(self, i_out): """ - Set Tec Output Current + Set Tec Output Current (Settable Range: 0.0 - 1.0) This cmd is only effective in constant current control mode or your newly set value will be overwritten by PID Controller Output - i_out: A """ if isinstance(i_out, float): - return self._send_raw_cmd({"tec_set_i": i_out}) + return await self._send_raw_cmd({"tec_set_i": i_out}) elif isinstance(i_out, int): - return self._send_raw_cmd({"tec_set_i": float(i_out)}) + return await self._send_raw_cmd({"tec_set_i": float(i_out)}) else: raise InvalidDataType - def set_constant_current_control_mode(self): + async def set_constant_current_control_mode(self): """ Disable PID Controller and output current can be controlled with set_tec_i_out() cmd. """ - return self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None) + return await self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None) - def set_temperature_setpoint(self, temperature): + async def set_temperature_setpoint(self, temperature): """ Set Temperature Setpoint for PID Controller. This parameter is not active in constant current control mode - temperature: Degree Celsius """ - return self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature) + return await self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature) - def set_pid_control_mode(self): + async def set_pid_control_mode(self): """ Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate. Please refer to config_temp_adc_filter for the possible polling rate options """ - return self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None) + return await self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None) - def set_pid_kp(self, kp): + async def set_pid_kp(self, kp): """ Set Kp parameter for PID Controller kp: (unitless) """ - return self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp) + return await self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp) - def set_pid_ki(self, ki): + async def set_pid_ki(self, ki): """ Set Ki parameter for PID Controller ki: (unitless) """ - return self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki) + return await self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki) - def set_pid_kd(self, kd): + async def set_pid_kd(self, kd): """ Set Kd parameter for PID Controller kd: (unitless) """ - return self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd) + return await self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd) - def set_pid_output_max(self, out_max): + async def set_pid_output_max(self, out_max): """ Set max output limit at the PID Output - out_max: A """ - return self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max) + return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max) - def set_pid_output_min(self, out_min): + async def set_pid_output_min(self, out_min): """ Set min output limit at the PID Output - out_min: A """ - return self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min) + return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min) - def set_temp_mon_upper_limit(self, upper_limit): + async def set_temp_mon_upper_limit(self, upper_limit): """ Set Temperature Monitor Upper Limit Threshold. Exceeding the limit for too long will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown - upper_limit: Degree Celsius """ - return self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit) + return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit) - def set_temp_mon_lower_limit(self, lower_limit): + async def set_temp_mon_lower_limit(self, lower_limit): """ Set Temperature Monitor Lower Limit Threshold. Exceeding the limit for too long will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown - lower_limit: Degree Celsius """ - return self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit) + return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit) - def clear_alarm(self): + async def clear_alarm(self): """ Clear the temperature monitor alarm """ - return self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) + return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) - def set_sh_t0(self, t0): + async def set_sh_t0(self, t0): """ Set t0 Steinhart-Hart parameter for the laser diode NTC - t0: Degree Celsius """ - return self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0) + return await self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0) - def set_sh_r0(self, r0): + async def set_sh_r0(self, r0): """ Set r0 Steinhart-Hart parameter for the laser diode NTC - r0: Ohm """ - return self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0) + return await self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0) - def set_sh_beta(self, beta): + async def set_sh_beta(self, beta): """ Set beta Steinhart-Hart parameter for the laser diode NTC - beta: (unitless) """ - return self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta) + return await self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta) - def config_temp_adc_filter(self, filter_config): + async def config_temp_adc_filter(self, filter_config): """ Configure the temperature adc filter type and sampling rate. Please refer to AD7172 datasheet for the usage of various types of filter. @@ -591,29 +585,35 @@ class Thermostat: filter_config._odr_type(): filter_config, } - return self._send_raw_cmd(cmd) - + return await self._send_raw_cmd(cmd) class Kirdy: def __init__(self): - self.device = Device(self._send_cmd_handler, self._send_raw_cmd_handler, self._get_msg) - self.laser = Laser(self._send_cmd_handler) - self.thermostat = Thermostat(self._send_cmd_handler, self._send_raw_cmd_handler) + self.device = Device(self._send_cmd, self._send_raw_cmd) + self.laser = Laser(self._send_cmd) + self.thermostat = Thermostat(self._send_cmd, self._send_raw_cmd) - self._task_queue, self._msg_queue, self._int_msg_queue, self._report_queue = None, None, None, None + self._task_queue, self._int_msg_queue, self._report_queue = None, None, None + self._timeout = 5.0 self._writer, self._reader = None, None self._event_loop = None - self._lock = asyncio.Lock() self._msg_queue_get_report = False self._report_mode_on = False + self._state = State.disconnected + + self.read_response_task, self.handler_task = None, None + self._lock = asyncio.Lock() # PyQt Signal self._report_sig = None # Dict self._connected_sig = None # Bool + + self.connected_event = None + def set_report_sig(self, sig): """ - Connect a PyQt Signal to the active report output(dict) + Connect a PyQt Signal to the active report output(dict). This should be configured before the session is started. """ self._report_sig = sig @@ -625,40 +625,41 @@ class Kirdy: """ self._connected_sig = sig - def start_session(self, host='192.168.1.128', port=1337, timeout=5.0, con_retry=5.0): + def start_session(self, host='192.168.1.128', port=1337): """ - Start Kirdy Connection Session. - A new thread is started to handle TCP connection and task execution in the background. - In case of disconnection, all the queued tasks are cleared and the thread retries TCP connection indefinitely. - + Start Kirdy Connection Session. + In case of disconnection, all the queued tasks are cleared and the handler task retries TCP connection indefinitely. - host: Kirdy's IP Address - - port: Kirdy's TCP Port + - port: Kirdy's Port Number """ + self._host, self._ctrl_port = host, port if self._event_loop is None: - self._host, self._ctrl_port = host, port - self._timeout, self._con_retry = timeout, con_retry - - self._event_loop = asyncio.new_event_loop() - self._thread = Thread(target=self._event_loop.run_forever) - self._thread.start() - asyncio.run_coroutine_threadsafe(self._handler(), self._event_loop) - return True - else: - logging.warning("Helper Thread has been started.") - return False + try: + self._event_loop = asyncio.get_running_loop() + except: + self._event_loop = asyncio.new_event_loop() + self._event_loop.run_forever() + self.connected_event = asyncio.Event() + self.handler_task = asyncio.create_task(self._handler()) - def end_session(self, block=False): + async def end_session(self, block=False): """ Stop Kirdy's TCP connection and its associated thread. """ if self._event_loop is not None: if block: - while not(self._task_queue.empty()): - pass - cancel_task = asyncio.run_coroutine_threadsafe(self._stop_handler(), self._event_loop) - while not(cancel_task.done()): - pass - self._thread.join() + await self._task_queue.join() + + if self.read_response_task is not None: + self.read_response_task.cancel() + await self.read_response_task + self.read_response_task = None + + if self.handler_task is not None: + self.handler_task.cancel() + await self.handler_task + self.handler_task = None + self._writer = None if self._connected_sig is not None: @@ -668,7 +669,7 @@ class Kirdy: """ Return True if client is connecting """ - return not self.connected() and self._event_loop is not None + return self._state == State.disconnected and self.read_response_task is not None def connected(self): """ @@ -676,28 +677,43 @@ class Kirdy: """ return self._writer is not None - def get_report_stream(self): - """ - Start reporting device status in json object. - """ - if self.connected(): - self._report_mode_on = True + async def wait_until_connected(self): + if not(self.connected()): + await self.connected_event.wait() - self.device.set_active_report_mode(True) + async def report_mode(self): + """ + Enable and retrieve active report from Kirdy + """ + if self.connected(): + self._report_mode_on = True + await self.device.set_active_report_mode(True) + report = None while self._report_mode_on: - report = self._report_queue.get() - if isinstance(report, Exception): - raise report - yield report - - self.device.set_active_report_mode(False) + report = await self._report_queue.get() + if not(isinstance(report, dict)): + self.stop_active_report() + else: + yield report + + if isinstance(report, dict): + await self.device.set_active_report_mode(False) else: raise ConnectionError def stop_report_mode(self): self._report_mode_on = False + def task_dispatcher(self, awaitable_fn): + """ + Enqueue a task to be handled by the handler. + """ + if self.connected(): + self._task_queue.put_nowait(lambda: awaitable_fn) + else: + raise ConnectionError + async def _sock_disconnection_handling(self): # Reader needn't be closed try: @@ -709,11 +725,6 @@ class Kirdy: self._reader = None self._writer = None - for i in range(self._msg_queue.maxsize): - if self._msg_queue.full(): - self._msg_queue.get_nowait() - self._msg_queue.put_nowait(None) - for i in range(self._report_queue.maxsize): if self._report_queue.full(): self._report_queue.get_nowait() @@ -724,97 +735,79 @@ class Kirdy: async def _handler(self): try: - state = State.disconnected + self._state = State.disconnected first_con = True task = None while True: - if state == State.disconnected: + if self._state == State.disconnected: try: await self.__coninit(self._timeout) - read_response_fut = asyncio.run_coroutine_threadsafe(self._read_response_handler(), self._event_loop) + self.read_response_task = asyncio.create_task(self._read_response_handler()) task = None - logging.debug("Connected") + logging.info("Connected to %s:%d", self._host, self._ctrl_port) + self.connected_event.set() # State Transition - state = State.connected + self._state = State.connected except (OSError, TimeoutError): if first_con: first_con = False - logging.warning("Cannot connect to %s:%d. Retrying in the background", self._host, self._ctrl_port) - await asyncio.sleep(self._con_retry) + logging.warning("Cannot connect to %s:%d. Retrying in the background.", self._host, self._ctrl_port) + await asyncio.sleep(5.0) - elif state == State.connected: + elif self._state == State.connected: try: task = await self._task_queue.get() if isinstance(task, Exception): raise task - - response = await asyncio.wait_for(task(), self._timeout) - if response is not None: - if response["msg_type"] != "Acknowledge": - self._msg_queue.put_nowait(response) + await task() + self._task_queue.task_done() - except (TimeoutError, ConnectionResetError): - logging.warning("Kirdy connection is dropped.") + except (TimeoutError, ConnectionResetError, ConnectionError): + logging.warning("Connection to Kirdy is dropped.") first_con = True - read_response_fut.cancel() - await self._sock_disconnection_handling() - - # State Transition - state = State.disconnected + self.read_response_task.cancel() + # State Transition + self._state = State.disconnected + await self._sock_disconnection_handling() except asyncio.exceptions.CancelledError: - logging.debug("Handler is canceling") + pass except: - logging.debug("Handler experienced an error. Exiting.", exc_info=True) + logging.warning("Handler experienced an error.", exc_info=True) await self._sock_disconnection_handling() - if self._event_loop is not None: - self._event_loop.call_soon_threadsafe(self._event_loop.stop) - self._event_loop = None async def _read_response_handler(self): try: while True: if self._report_mode_on: - responses = await asyncio.wait_for(self._read_response(), 5.0) + responses = await asyncio.wait_for(self._read_response(), self._timeout) else: responses = await self._read_response() for response in responses: + if response["msg_type"] == 'HardReset': + raise asyncio.exceptions.CancelledError if response["msg_type"] == 'Report' and not self._msg_queue_get_report: if self._report_sig is None: - if self._report_queue.full(): - self._report_queue.get_nowait() - self._report_queue.put_nowait(response) + self._report_queue.put_nowait_overwrite(response) else: self._report_sig.emit(response) else: if self._msg_queue_get_report: - async with self._lock: - self._msg_queue_get_report = False - if self._int_msg_queue.full(): - logging.debug("_int_msg_queue is full") - self._int_msg_queue.get_nowait() - self._int_msg_queue.put_nowait(response) + self._msg_queue_get_report = False + self._int_msg_queue.put_nowait_overwrite(response) except asyncio.exceptions.CancelledError: - logging.debug("Read Response Handler is canceling") - except TimeoutError: - logging.warning("Read active report response timeout") - if self._task_queue.full(): - logging.debug("_int_msg_queue is full") - self._task_queue.get_nowait() - self._task_queue.put_nowait(TimeoutError()) - except ConnectionResetError: - logging.warning("Connection Reset by peer") - if self._task_queue.full(): - logging.debug("_int_msg_queue is full") - self._task_queue.get_nowait() - self._task_queue.put_nowait(ConnectionResetError()) - except: + pass + except (TimeoutError, ConnectionResetError, ConnectionError) as exec: + self._task_queue.put_nowait_overwrite(exec) + self._int_msg_queue.put_nowait_overwrite(exec) + except Exception as exec: logging.warn("Read Response Handler experienced an error. Exiting.", exc_info=True) - + self._task_queue.put_nowait_overwrite(exec) + self._int_msg_queue.put_nowait_overwrite(exec) if self._report_mode_on: self._report_mode_on = False - self._report_queue.put_nowait(TimeoutError) + self._report_queue.put_nowait_overwrite(TimeoutError) async def _stop_handler(self): for task in asyncio.all_tasks(): @@ -822,10 +815,21 @@ class Kirdy: await asyncio.gather(*asyncio.all_tasks(), loop=self._event_loop) async def __coninit(self, timeout): - self._task_queue = asyncio.Queue(maxsize=64) + def _put_nowait_overwrite(self, item): + if self.full(): + self.get_nowait() + self.put_nowait(item) + asyncio.Queue.put_nowait_overwrite = _put_nowait_overwrite + + if self._task_queue is not None: + while not(self._task_queue.empty()): + task = self._task_queue.get_nowait() + if isinstance(task, types.FunctionType): + task().close() + else: + self._task_queue = asyncio.Queue(maxsize=16) self._int_msg_queue = asyncio.Queue(maxsize=4) - self._msg_queue = queue.Queue(maxsize=64) - self._report_queue = queue.Queue(maxsize=16) + self._report_queue = asyncio.Queue(maxsize=16) self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self._host, self._ctrl_port), timeout) writer_sock = self._writer.get_extra_info("socket") @@ -846,47 +850,25 @@ class Kirdy: for item in response: items.append(json.loads(item)) return items - - def _get_msg(self): - msg = self._msg_queue.get() - return msg - - def _send_raw_cmd_handler(self, cmd, msg_type="Acknowledge", sig=None): - if self.connected(): - self._event_loop.call_soon_threadsafe(self._task_queue.put_nowait, lambda: self._send_raw_cmd(cmd, msg_type, sig=sig)) - else: - raise ConnectionError - # If the cmd involves a cmd specific data type, - # checking is done separately within the functions being called - async def _send_raw_cmd(self, cmd, msg_type, sig=None): + async def _send_raw_cmd(self, cmd, msg_type="Acknowledge", sig=None): if self.connected(): - self._writer.write(bytes(json.dumps(cmd), "UTF-8")) - await self._writer.drain() - response = await self._int_msg_queue.get() - if response["msg_type"] == msg_type: + async with self._lock: + self._writer.write(bytes(json.dumps(cmd), "UTF-8")) + await self._writer.drain() + msg = await asyncio.wait_for(self._int_msg_queue.get(), self._timeout) + + if msg["msg_type"] == msg_type: if sig is not None: - sig.emit(response) + sig.emit(msg) return {"msg_type": "Acknowledge"} - return response + return msg else: raise InvalidCmd else: raise ConnectionError - def _send_cmd_handler(self, target, cmd, data=None, msg_type="Acknowledge", sig=None, hard_reset=False): - if self.connected(): - if hard_reset: - while not(self._task_queue.empty()): - pass - self._event_loop.call_soon_threadsafe(self._task_queue.put_nowait, lambda: self._send_cmd(target, cmd, data, msg_type, sig=sig)) - if hard_reset: - # Wait 1s for Kirdy to hard reset - time.sleep(1.0) - else: - raise ConnectionError - - async def _send_cmd(self, target, cmd, data, msg_type, sig=None): + async def _send_cmd(self, target, cmd, data=None, msg_type="Acknowledge", sig=None): cmd_dict = {} cmd_dict[target] = cmd.name @@ -904,17 +886,21 @@ class Kirdy: pass if msg_type == 'Report': - async with self._lock: - self._msg_queue_get_report = True + self._msg_queue_get_report = True - self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8")) - await self._writer.drain() + async with self._lock: + self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8")) + await self._writer.drain() + + msg = await asyncio.wait_for(self._int_msg_queue.get(), self._timeout) + if isinstance(msg, Exception): + raise msg - msg = await self._int_msg_queue.get() if msg['msg_type'] == msg_type: if sig is not None: sig.emit(msg) return {"msg_type": "Acknowledge"} - return msg + else: + return msg else: raise InvalidCmd