From 47bf166ecbeccb1b6de0e2eee8f86ead11931b2d Mon Sep 17 00:00:00 2001 From: linuswck Date: Fri, 2 Aug 2024 17:44:51 +0800 Subject: [PATCH] driver: kirdy driver rewrite - Start a new thread to handle connections and schedule FIFO task execution - Make all the cmd accessible without using async - Connections automatically retry upon disconnection - Add support for PyQt6 Signal --- pykirdy/driver/{kirdy_async.py => kirdy.py} | 600 +++++++++++++------- 1 file changed, 382 insertions(+), 218 deletions(-) rename pykirdy/driver/{kirdy_async.py => kirdy.py} (53%) diff --git a/pykirdy/driver/kirdy_async.py b/pykirdy/driver/kirdy.py similarity index 53% rename from pykirdy/driver/kirdy_async.py rename to pykirdy/driver/kirdy.py index b2b4bd1..d630181 100644 --- a/pykirdy/driver/kirdy_async.py +++ b/pykirdy/driver/kirdy.py @@ -1,8 +1,11 @@ import socket -import asyncio import json import logging +import time +from threading import Thread from aenum import StrEnum, NoAlias +import queue +import asyncio class _dt(StrEnum): ip_settings = "ip_settings" @@ -11,6 +14,10 @@ class _dt(StrEnum): bool = "data_bool" none = "None" +class State(StrEnum): + disconnected = "disconnected" + connected = "connected" + class CmdList: class device(StrEnum, settings=NoAlias): _target = "device_cmd" @@ -152,14 +159,13 @@ class StoppedConnecting(Exception): pass class Device: - def __init__(self, send_cmd_handler, send_raw_cmd_handler, read_response, cmd_lock): + def __init__(self, send_cmd_handler, send_raw_cmd_handler, read_msg_queue): self._cmd = CmdList.device self._send_cmd = send_cmd_handler self._send_raw_cmd = send_raw_cmd_handler - self._read_response = read_response - self._cmd_lock = cmd_lock + self._read_msg_queue = read_msg_queue - async def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"): + def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"): """ After calling this fn, the ip settings are immediately saved into flash and will be effective on next reboot. """ @@ -175,7 +181,7 @@ class Device: if not(isinstance(port, int) and isinstance(prefix_len, int)): raise InvalidDataType - return await self._send_raw_cmd( + return self._send_raw_cmd( { "device_cmd": "SetIPSettings", "ip_settings": { @@ -187,28 +193,28 @@ class Device: } ) - async def set_active_report_mode(self, on): + def set_active_report_mode(self, on): """ Set active report to be on. If it is on, Kirdy will send status report to ALL client socket connections according to the temperature polling rate set. """ - return await self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on) + return self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on) - async def set_pd_mon_fin_gain(self, gain): + def set_pd_mon_fin_gain(self, gain): """ Configure the photodiode monitor final analog front-end stage gain - gain: unitless """ - return await self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain) + return self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain) - async def set_pd_mon_transconductance(self, transconductance): + def set_pd_mon_transconductance(self, transconductance): """ Configure the photodiode monitor transconductance - transconductance: 1/Ohm """ - return await self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance) + return self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance) - async def get_status_report(self): + def get_status_report(self, sig=None): """ Get status of all peripherals in a json object @@ -238,9 +244,13 @@ class Device: } } """ - return await self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report") + if sig is None: + self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig) + return self._read_msg_queue() + else: + return self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig) - async def get_settings_summary(self): + def get_settings_summary(self, sig=None): """ Get the current settings of laser and thermostat in a json object @@ -251,7 +261,7 @@ class Device: 'ld_drive_current': { # Laser Diode Output Current(A) 'value': 0.0, # Value Set 'max': 0.3 # Max Value Settable - , + , 'pd_mon_params': { # Laser Diode Software Current Limit(A) 'responsitivity': None, # Value Set 'i_dark': 0.0 # Max Value Settable @@ -308,153 +318,152 @@ class Device: } } """ - return await self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings") + if sig is None: + self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig) + return self._read_msg_queue() + else: + return self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig) - async def dfu(self): + def dfu(self): """ Issuing this cmd will HARD RESET the device and put Kirdy into Dfu mode for flashing firmware. """ - return await self._send_cmd(self._cmd._target, self._cmd.Dfu) + return self._send_cmd(self._cmd._target, self._cmd.Dfu, hard_reset=True) - async def save_current_settings_to_flash(self): + def save_current_settings_to_flash(self): """ Save the current laser diode and thermostat configurations into flash. """ - return await self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings) + return self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings) - async def restore_settings_from_flash(self): + def restore_settings_from_flash(self): """ Restore the laser diode and thermostat settings from flash """ - return await self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings) + return self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings) - async def hard_reset(self): + def hard_reset(self): """ Hard Reset Kirdy. The socket connection will be closed by Kirdy. Laser diode power and Tec power will be turned off. Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets indicating. The device is being reset. """ - response = await self._send_cmd(self._cmd._target, self._cmd.HardReset) - if response is not None: - if response["msg_type"] == "Acknowledge": - # Delay for a second to wait for the hard reset message being sent out on Kirdy - await asyncio.sleep(1.0) - return response + return self._send_cmd(self._cmd._target, self._cmd.HardReset, hard_reset=True) class Laser: - def __init__(self, send_cmd_handler, cmd_lock): + def __init__(self, send_cmd_handler): self._cmd = CmdList.ld self._send_cmd = send_cmd_handler - self._cmd_lock = cmd_lock - async def set_power_on(self, on): + def set_power_on(self, on): """ Power Up or Power Down laser diode. Powering up the Laser Diode resets the pwr_excursion status - on (True/False) """ if on: - return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) + return self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) else: - return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) + return self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) - async def set_default_pwr_on(self, on): + def set_default_pwr_on(self, on): """ Set whether laser diode is powered up at Startup - on (True/False) """ - return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) + return self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) - async def set_ld_terms_short(self, short): + def set_ld_terms_short(self, short): """ Open/Short laser diode terminals. - on (True/False) """ if short: - return await self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None) + return self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None) else: - return await self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None) + return self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None) - async def set_i(self, i): + def set_i(self, i): """ Set laser diode output current: Max(0, Min(i_set, i_soft_limit)) - i: A """ - return await self._send_cmd(self._cmd._target, self._cmd.SetI, i) + return self._send_cmd(self._cmd._target, self._cmd.SetI, i) - async def set_pd_mon_responsitivity(self, responsitivity): + def set_pd_mon_responsitivity(self, responsitivity): """ Configure the photodiode monitor responsitivity parameter - responsitivity: A/W """ - return await self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity) + return self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity) - async def set_pd_mon_dark_current(self, dark_current): + def set_pd_mon_dark_current(self, dark_current): """ - Configure the photodiode monitor responsitivity parameter - - dark_current: A/W + Configure the photodiode monitor dark current parameter + - dark_current: A """ - return await self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current) + return self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current) - async def set_ld_pwr_limit(self, pwr_limit): + def set_ld_pwr_limit(self, pwr_limit): """ Set power limit for the power excursion monitor If the calculated power with the params of pd_mon > pwr_limit, overpower protection is triggered. - pwr_limit: W """ - return await self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit) + return self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit) - async def clear_alarm(self): + def clear_alarm(self): """ Clear the power excursion monitor alarm """ - return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) + return self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) class Thermostat: - def __init__(self, send_cmd_handler, send_raw_cmd_handler, cmd_lock): + def __init__(self, send_cmd_handler, send_raw_cmd_handler): self._cmd = CmdList.thermostat self._send_cmd = send_cmd_handler self._send_raw_cmd = send_raw_cmd_handler - self._cmd_lock = cmd_lock - async def set_power_on(self, on): + + def set_power_on(self, on): """ Power up or power down thermostat - Powering up the thermostat resets the pwr_excursion status """ if on: - return await self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) + return self._send_cmd(self._cmd._target, self._cmd.PowerUp, None) else: - return await self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) + return self._send_cmd(self._cmd._target, self._cmd.PowerDown, None) - async def set_default_pwr_on(self, on): + def set_default_pwr_on(self, on): """ Set whether thermostat is powered up at Startup + - on: (True/False) """ - return await self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) + return self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on) - async def set_tec_max_v(self, max_v): + def set_tec_max_v(self, max_v): """ Set Tec Maximum Voltage Across the TEC Terminals - max_v: V """ - return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v) + return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v) - async def set_tec_max_cooling_i(self, max_i_pos): + def set_tec_max_cooling_i(self, max_i_pos): """ Set Tec maximum cooling current (Settable Range: 0.0 - 1.0) - max_i_pos: A """ - return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos) + return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos) - async def set_tec_max_heating_i(self, max_i_neg): + def set_tec_max_heating_i(self, max_i_neg): """ Set Tec maximum heating current (Settable Range: 0.0 - 1.0) - max_i_neg: A """ - return await self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg) + return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg) - async def set_tec_i_out(self, i_out): + def set_tec_i_out(self, i_out): """ Set Tec Output Current This cmd is only effective in constant current control mode @@ -462,112 +471,112 @@ class Thermostat: - i_out: A """ if isinstance(i_out, float): - return await self._send_raw_cmd({"tec_set_i": i_out}) + return self._send_raw_cmd({"tec_set_i": i_out}) elif isinstance(i_out, int): - return await self._send_raw_cmd({"tec_set_i": float(i_out)}) + return self._send_raw_cmd({"tec_set_i": float(i_out)}) else: raise InvalidDataType - async def set_constant_current_control_mode(self): + def set_constant_current_control_mode(self): """ Disable PID Controller and output current can be controlled with set_tec_i_out() cmd. """ - return await self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None) + return self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None) - async def set_temperature_setpoint(self, temperature): + def set_temperature_setpoint(self, temperature): """ Set Temperature Setpoint for PID Controller. This parameter is not active in constant current control mode - temperature: Degree Celsius """ - return await self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature) + return self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature) - async def set_pid_control_mode(self): + def set_pid_control_mode(self): """ Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate. Please refer to config_temp_adc_filter for the possible polling rate options """ - return await self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None) + return self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None) - async def set_pid_kp(self, kp): + def set_pid_kp(self, kp): """ Set Kp parameter for PID Controller kp: (unitless) """ - return await self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp) + return self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp) - async def set_pid_ki(self, ki): + def set_pid_ki(self, ki): """ Set Ki parameter for PID Controller ki: (unitless) """ - await self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki) + return self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki) - async def set_pid_kd(self, kd): + def set_pid_kd(self, kd): """ Set Kd parameter for PID Controller kd: (unitless) """ - return await self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd) + return self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd) - async def set_pid_output_max(self, out_max): + def set_pid_output_max(self, out_max): """ Set max output limit at the PID Output - out_max: A """ - return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max) + return self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max) - async def set_pid_output_min(self, out_min): + def set_pid_output_min(self, out_min): """ Set min output limit at the PID Output - out_min: A """ - return await self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min) + return self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min) - async def set_temp_mon_upper_limit(self, upper_limit): + def set_temp_mon_upper_limit(self, upper_limit): """ Set Temperature Monitor Upper Limit Threshold. Exceeding the limit for too long will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown - upper_limit: Degree Celsius """ - return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit) + return self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit) - async def set_temp_mon_lower_limit(self, lower_limit): + def set_temp_mon_lower_limit(self, lower_limit): """ Set Temperature Monitor Lower Limit Threshold. Exceeding the limit for too long will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown - lower_limit: Degree Celsius """ - return await self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit) + return self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit) - async def clear_alarm(self): + def clear_alarm(self): """ Clear the temperature monitor alarm """ - return await self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) + return self._send_cmd(self._cmd._target, self._cmd.ClearAlarm) - async def set_sh_t0(self, t0): + def set_sh_t0(self, t0): """ Set t0 Steinhart-Hart parameter for the laser diode NTC - t0: Degree Celsius """ - return await self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0) + return self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0) - async def set_sh_r0(self, r0): + def set_sh_r0(self, r0): """ Set r0 Steinhart-Hart parameter for the laser diode NTC - r0: Ohm """ - return await self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0) + return self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0) - async def set_sh_beta(self, beta): + def set_sh_beta(self, beta): """ Set beta Steinhart-Hart parameter for the laser diode NTC - beta: (unitless) """ - return await self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta) + return self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta) - async def config_temp_adc_filter(self, filter_config): + def config_temp_adc_filter(self, filter_config): """ Configure the temperature adc filter type and sampling rate. Please refer to AD7172 datasheet for the usage of various types of filter. @@ -588,123 +597,298 @@ class Thermostat: filter_config._odr_type(): filter_config, } - return await self._send_raw_cmd(cmd) + return self._send_raw_cmd(cmd) class Kirdy: def __init__(self): - self._reader = None - self._writer = None - self._connecting_task = None - self._cmd_lock = asyncio.Lock() - self._report_mode_on = False - self.timeout = None - self.device = Device(self._send_cmd_handler, self._send_raw_cmd_handler, self._read_response, self._cmd_lock) - self.laser = Laser(self._send_cmd_handler, self._cmd_lock) - self.thermostat = Thermostat(self._send_cmd_handler, self._send_raw_cmd_handler, self._cmd_lock) - - async def start_session(self, host='192.168.1.128', port=1337, timeout=None): - self._connecting_task = asyncio.create_task( - asyncio.wait_for(asyncio.open_connection(host, port), timeout) - ) - self.timeout = timeout - try: - self._reader, self._writer = await self._connecting_task - writer_sock = self._writer.get_extra_info("socket") - writer_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.device = Device(self._send_cmd_handler, self._send_raw_cmd_handler, self._get_msg) + self.laser = Laser(self._send_cmd_handler) + self.thermostat = Thermostat(self._send_cmd_handler, self._send_raw_cmd_handler) - except asyncio.CancelledError: - raise StoppedConnecting - finally: - self._connecting_task = None + self._task_queue, self._msg_queue, self._int_msg_queue, self._report_queue = None, None, None, None + self._writer, self._reader = None, None + self._event_loop = None + + self._lock = asyncio.Lock() + self._msg_queue_get_report = False + self._report_mode_on = False + + # PyQt Signal + self._report_sig = None # Dict + self._connected_sig = None # Bool + + def set_report_sig(self, sig): + """ + Connect a PyQt Signal to the active report output(dict) + """ + self._report_sig = sig + + def set_connected_sig(self, sig): + """ + Connect a PyQt Signal to connection status(bool) + - True: Connection is established + - False: Connection is dropped + """ + self._connected_sig = sig + + def start_session(self, host='192.168.1.128', port=1337, timeout=5.0, con_retry=5.0): + """ + Start Kirdy Connection Session. + A new thread is started to handle TCP connection and task execution in the background. + In case of disconnection, all the queued tasks are cleared and the thread retries TCP connection indefinitely. + + - host: Kirdy's IP Address + - port: Kirdy's TCP Port + """ + if self._event_loop is None: + self._host, self._ctrl_port = host, port + self._timeout, self._con_retry = timeout, con_retry + + self._event_loop = asyncio.new_event_loop() + self._thread = Thread(target=self._event_loop.run_forever) + self._thread.start() + asyncio.run_coroutine_threadsafe(self._handler(), self._event_loop) + return True + else: + logging.warning("Helper Thread has been started.") + return False + + def end_session(self, block=False): + """ + Stop Kirdy's TCP connection and its associated thread. + """ + if self._event_loop is not None: + if block: + while not(self._task_queue.empty()): + pass + cancel_task = asyncio.run_coroutine_threadsafe(self._stop_handler(), self._event_loop) + while not(cancel_task.done()): + pass + self._thread.join() + self._writer = None + + if self._connected_sig is not None: + self._connected_sig.emit(False) def connecting(self): - """Returns True if client is connecting""" - return self._connecting_task is not None + """ + Return True if client is connecting + """ + return not self.connected() and self._event_loop is not None def connected(self): - """Returns True if client is connected""" + """ + Returns True if client is connected. + """ return self._writer is not None - async def end_session(self): - """End session to Kirdy if connected, cancel connection if connecting""" - if self._connecting_task is not None: - self._connecting_task.cancel() + def get_report_stream(self): + """ + Start reporting device status in json object. + """ + if self.connected(): + self._report_mode_on = True - if self._writer is None: - return + self.device.set_active_report_mode(True) + while self._report_mode_on: + report = self._report_queue.get() + if isinstance(report, Exception): + raise report + yield report + + self.device.set_active_report_mode(False) + else: + raise ConnectionError + + def stop_report_mode(self): + self._report_mode_on = False + + async def _sock_disconnection_handling(self): # Reader needn't be closed - self._writer.close() try: + self._writer.close() await self._writer.wait_closed() - except ConnectionResetError: + except: # In Hard Reset/DFU cmd, Kirdy may close its socket first pass self._reader = None self._writer = None - async def _read_response(self, buffer_size=16384, report=False): - """ - Decode newline delimited Json objects and return the latest json received inside the buffer. - - buffer_size: Integer - """ - try: - raw_response = await asyncio.wait_for(self._reader.read(buffer_size), self.timeout) + for i in range(self._msg_queue.maxsize): + if self._msg_queue.full(): + self._msg_queue.get_nowait() + self._msg_queue.put_nowait(None) - response = raw_response.decode('utf-8', errors='ignore').split("\n") - items = [] - for item in reversed(response): - try: - items.append(json.loads(item)) - except json.decoder.JSONDecodeError as e: - pass - if len(items) > 0 : - if report: - return items[0] - else: - return items - else: - return { "msg_type": "EmptyResponse"} + for i in range(self._report_queue.maxsize): + if self._report_queue.full(): + self._report_queue.get_nowait() + self._report_queue.put_nowait(None) + + if self._connected_sig is not None: + self._connected_sig.emit(False) + + async def _handler(self): + try: + state = State.disconnected + first_con = True + task = None + while True: + if state == State.disconnected: + try: + await self.__coninit(self._timeout) + read_response_fut = asyncio.run_coroutine_threadsafe(self._read_response_handler(), self._event_loop) + task = None + logging.debug("Connected") + + # State Transition + state = State.connected + except (OSError, TimeoutError): + if first_con: + first_con = False + logging.warning("Cannot connect to %s:%d. Retrying in the background", self._host, self._ctrl_port) + await asyncio.sleep(self._con_retry) + + elif state == State.connected: + try: + task = await self._task_queue.get() + if isinstance(task, Exception): + raise task + + response = await asyncio.wait_for(task(), self._timeout) + if response is not None: + if response["msg_type"] != "Acknowledge": + self._msg_queue.put_nowait(response) + + except (TimeoutError, ConnectionResetError): + logging.warning("Kirdy connection is dropped.") + first_con = True + read_response_fut.cancel() + await self._sock_disconnection_handling() + + # State Transition + state = State.disconnected + + except asyncio.exceptions.CancelledError: + logging.debug("Handler is canceling") + except: + logging.debug("Handler experienced an error. Exiting.", exc_info=True) + await self._sock_disconnection_handling() + if self._event_loop is not None: + self._event_loop.call_soon_threadsafe(self._event_loop.stop) + self._event_loop = None + + async def _read_response_handler(self): + try: + while True: + if self._report_mode_on: + responses = await asyncio.wait_for(self._read_response(), 5.0) + else: + responses = await self._read_response() + for response in responses: + if response["msg_type"] == 'Report' and not self._msg_queue_get_report: + if self._report_sig is None: + if self._report_queue.full(): + self._report_queue.get_nowait() + self._report_queue.put_nowait(response) + else: + self._report_sig.emit(response) + else: + if self._msg_queue_get_report: + async with self._lock: + self._msg_queue_get_report = False + if self._int_msg_queue.full(): + logging.debug("_int_msg_queue is full") + self._int_msg_queue.get_nowait() + self._int_msg_queue.put_nowait(response) + except asyncio.exceptions.CancelledError: + logging.debug("Read Response Handler is canceling") except TimeoutError: - return { "msg_type": "RecvTimeout"} + logging.warning("Read active report response timeout") + if self._task_queue.full(): + logging.debug("_int_msg_queue is full") + self._task_queue.get_nowait() + self._task_queue.put_nowait(TimeoutError()) + except ConnectionResetError: + logging.warning("Connection Reset by peer") + if self._task_queue.full(): + logging.debug("_int_msg_queue is full") + self._task_queue.get_nowait() + self._task_queue.put_nowait(ConnectionResetError()) + except: + logging.warn("Read Response Handler experienced an error. Exiting.", exc_info=True) + + if self._report_mode_on: + self._report_mode_on = False + self._report_queue.put_nowait(TimeoutError) - async def _send_raw_cmd_handler(self, cmd, lock=True, msg_type="Acknowledge"): - if lock: - async with self._cmd_lock: - return await asyncio.shield(self._send_raw_cmd(cmd, msg_type)) + async def _stop_handler(self): + for task in asyncio.all_tasks(): + task.cancel() + await asyncio.gather(*asyncio.all_tasks(), loop=self._event_loop) + + async def __coninit(self, timeout): + self._task_queue = asyncio.Queue(maxsize=64) + self._int_msg_queue = asyncio.Queue(maxsize=4) + self._msg_queue = queue.Queue(maxsize=64) + self._report_queue = queue.Queue(maxsize=16) + + self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self._host, self._ctrl_port), timeout) + writer_sock = self._writer.get_extra_info("socket") + writer_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + + if self._connected_sig is not None: + self._connected_sig.emit(True) + + async def _read_response(self, buffer_size=16384): + raw_response = await self._reader.read(buffer_size) + response = raw_response.decode('utf-8', errors='ignore').split("\n") + response.reverse() + items = [] + for item in response[1:]: + items.append(json.loads(item)) + return items + + def _get_msg(self): + msg = self._msg_queue.get() + return msg + + def _send_raw_cmd_handler(self, cmd, msg_type="Acknowledge", sig=None): + if self.connected(): + self._event_loop.call_soon_threadsafe(self._task_queue.put_nowait, lambda: self._send_raw_cmd(cmd, msg_type, sig=sig)) else: - return await asyncio.shield(self._send_raw_cmd(cmd, msg_type)) + raise ConnectionError # If the cmd involves a cmd specific data type, # checking is done separately within the functions being called - async def _send_raw_cmd(self, cmd, msg_type): - retry = 0 - while retry < 10: - try: - self._writer.write(bytes(json.dumps(cmd), "UTF-8")) - await self._writer.drain() - responses = await self._read_response() - for response in responses: - if response["msg_type"] == msg_type: - return response - if response["msg_type"] == "InvalidCmd": - raise InvalidCmd - except asyncio.exceptions.CancelledError: - return None - except Exception as e: - retry += 1 - logging.error(f"_send_raw_cmd Exception: {e}") - await asyncio.sleep(0.1) - raise NoAckRecv - - async def _send_cmd_handler(self, target, cmd, data=None, msg_type="Acknowledge", lock=True): - if lock: - async with self._cmd_lock: - return await asyncio.shield(self._send_cmd(target, cmd, data, msg_type)) + async def _send_raw_cmd(self, cmd, msg_type, sig=None): + if self.connected(): + self._writer.write(bytes(json.dumps(cmd), "UTF-8")) + await self._writer.drain() + response = await self._int_msg_queue.get() + if response["msg_type"] == msg_type: + if sig is not None: + sig.emit(response) + return {"msg_type": "Acknowledge"} + return response + else: + raise InvalidCmd else: - return await asyncio.shield(self._send_cmd(target, cmd, data, msg_type)) + raise ConnectionError - async def _send_cmd(self, target, cmd, data, msg_type): + def _send_cmd_handler(self, target, cmd, data=None, msg_type="Acknowledge", sig=None, hard_reset=False): + if self.connected(): + if hard_reset: + while not(self._task_queue.empty()): + pass + self._event_loop.call_soon_threadsafe(self._task_queue.put_nowait, lambda: self._send_cmd(target, cmd, data, msg_type, sig=sig)) + if hard_reset: + # Wait 1s for Kirdy to hard reset + time.sleep(1.0) + else: + raise ConnectionError + + async def _send_cmd(self, target, cmd, data, msg_type, sig=None): cmd_dict = {} cmd_dict[target] = cmd.name @@ -721,38 +905,18 @@ class Kirdy: elif cmd == "None": pass - retry = 0 - while retry < 10: - try: - self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8")) - await self._writer.drain() - responses = await self._read_response() - for response in responses: - if response["msg_type"] == msg_type: - return response - retry += 1 - - await asyncio.sleep(0.1) - except asyncio.exceptions.CancelledError: - return None - raise NoAckRecv + if msg_type == 'Report': + async with self._lock: + self._msg_queue_get_report = True - async def report_mode(self, report_interval = 0.0, buffer_size=16384): - """ - Start reporting device status in json object. Optional report_interval can be added to discard unwanted samples. - Only the latest status report received within the buffer is returned. - - polling interval: float/int (unit: seconds) - - buffer_size: int - """ - await self.device.set_active_report_mode(True) - self._report_mode_on = True + self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8")) + await self._writer.drain() - while self._report_mode_on: - await asyncio.sleep(report_interval) - async with self._cmd_lock: - yield await self._read_response(buffer_size, report=True) - - await self.device.set_active_report_mode(False) - - def stop_report_mode(self): - self._report_mode_on = False + msg = await self._int_msg_queue.get() + if msg['msg_type'] == msg_type: + if sig is not None: + sig.emit(msg) + return {"msg_type": "Acknowledge"} + return msg + else: + raise InvalidCmd