kirdy/pykirdy/driver/kirdy.py

917 lines
36 KiB
Python
Raw Normal View History

import socket
import json
import logging
import time
from threading import Thread
2024-06-21 11:01:20 +08:00
from aenum import StrEnum, NoAlias
import queue
import asyncio
2024-06-21 11:01:20 +08:00
class _dt(StrEnum):
ip_settings = "ip_settings"
temp_adc_filter = "temp_adc_filter"
f32 = "data_f32"
bool = "data_bool"
none = "None"
class State(StrEnum):
disconnected = "disconnected"
connected = "connected"
2024-06-21 11:01:20 +08:00
class CmdList:
class device(StrEnum, settings=NoAlias):
_target = "device_cmd"
SetIPSettings = _dt.ip_settings
SetPdFinGain = _dt.f32
SetPdTransconductance = _dt.f32
SetActiveReportMode = _dt.bool
GetStatusReport = _dt.none
GetSettingsSummary = _dt.none
Dfu = _dt.none
SaveFlashSettings = _dt.none
LoadFlashSettings = _dt.none
HardReset = _dt.none
class ld(StrEnum, settings=NoAlias):
_target = "laser_diode_cmd"
SetDefaultPowerOn = _dt.bool
PowerUp = _dt.none
PowerDown = _dt.none
LdTermsShort = _dt.none
LdTermsOpen = _dt.none
SetI = _dt.f32
SetPdResponsitivity = _dt.f32
SetPdDarkCurrent = _dt.f32
SetLdPwrLimit = _dt.f32
ClearAlarm = _dt.none
class thermostat(StrEnum, settings=NoAlias):
_target = "thermostat_cmd"
SetDefaultPowerOn = _dt.bool,
PowerUp = _dt.f32,
PowerDown = _dt.f32,
SetTecMaxV = _dt.f32,
SetTecMaxIPos = _dt.f32,
SetTecMaxINeg = _dt.f32,
SetTecIOut = _dt.f32,
SetTemperatureSetpoint = _dt.f32,
SetPidEngage = _dt.none,
SetPidDisEngage = _dt.none,
SetPidKp = _dt.f32,
SetPidKi = _dt.f32,
SetPidKd = _dt.f32,
SetPidOutMin = _dt.f32,
SetPidOutMax = _dt.f32,
ConfigTempAdcFilter = _dt.temp_adc_filter,
SetTempMonUpperLimit = _dt.f32,
SetTempMonLowerLimit = _dt.f32,
ClearAlarm = _dt.none,
SetShT0 = _dt.f32,
SetShR0 = _dt.f32,
SetShBeta = _dt.f32,
class FilterConfig:
class Sinc5Sinc1With50hz60HzRejection(StrEnum):
f27sps = "F27SPS"
f21sps = "F21SPS"
f20sps = "F20SPS"
f16sps = "F16SPS"
def _odr_type(self):
return "sinc5sinc1postfilter"
def _filter_type(self):
return "Sinc5Sinc1With50hz60HzRejection"
class Sinc5Sinc1(StrEnum):
f31250_0sps = "F31250_0SPS"
f15625_0sps = "F15625_0SPS"
f10417_0sps = "F10417_0SPS"
f5208_0sps = "F5208_0SPS"
f2597_0sps = "F2597_0SPS"
f1007_0sps = "F1007_0SPS"
f503_8sps = "F503_8SPS"
f381_0sps = "F381_0SPS"
f200_3sps = "F200_3SPS"
f100_2sps = "F100_2SPS"
f59_52sps = "F59_52SPS"
f49_68sps = "F49_68SPS"
f20_01sps = "F20_01SPS"
f16_63sps = "F16_63SPS"
f10_0sps = "F10_0SPS"
f5_0sps = "F5_0SPS"
f2_5sps = "F2_5SPS"
f1_25sps = "F1_25SPS"
def _odr_type(self):
return "sinc5sinc1odr"
def _filter_type(self):
return "Sinc5Sinc1"
class Sinc3(StrEnum):
f31250_0sps = "F31250_0SPS"
f15625_0sps = "F15625_0SPS"
f10417_0sps = "F10417_0SPS"
f5208_0sps = "F5208_0SPS"
f2597_0sps = "F2597_0SPS"
f1007_0sps = "F1007_0SPS"
f503_8sps = "F503_8SPS"
f381_0sps = "F381_0SPS"
f200_3sps = "F200_3SPS"
f100_2sps = "F100_2SPS"
f59_52sps = "F59_52SPS"
f49_68sps = "F49_68SPS"
f20_01sps = "F20_01SPS"
f16_63sps = "F16_63SPS"
f10_0sps = "F10_0SPS"
f5_0sps = "F5_0SPS"
f2_5sps = "F2_5SPS"
f1_25sps = "F1_25SPS"
def _odr_type(self):
return "sinc3odr"
def _filter_type(self):
return "Sinc3"
2024-06-21 11:01:20 +08:00
class Sinc3WithFineODR():
def __init__(self, rate):
assert rate >= 1.907465 and rate <= 31250
self.rate = float(rate)
2024-06-21 11:01:20 +08:00
def _odr_type(self):
return "sinc3fineodr"
def _filter_type(self):
return "Sinc3WithFineODR"
class InvalidDataType(Exception):
pass
class InvalidCmd(Exception):
pass
class Device:
def __init__(self, send_cmd_handler, send_raw_cmd_handler, read_msg_queue):
2024-06-21 11:01:20 +08:00
self._cmd = CmdList.device
self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler
self._read_msg_queue = read_msg_queue
def set_ip_settings(self, addr="192.168.1.128", port=1337, prefix_len=24, gateway="192.168.1.1"):
"""
After calling this fn, the ip settings are immediately saved into flash and will be effective on next reboot.
"""
try:
socket.inet_aton(addr)
socket.inet_aton(gateway)
except OSError:
raise InvalidDataType
addr = addr.split(".")
gateway = gateway.split(".")
if not(isinstance(port, int) and isinstance(prefix_len, int)):
raise InvalidDataType
return self._send_raw_cmd(
{
"device_cmd": "SetIPSettings",
"ip_settings": {
"addr": [int(x) for x in addr],
"port": port,
"prefix_len": prefix_len,
"gateway": [int(x) for x in gateway],
}
}
)
def set_active_report_mode(self, on):
"""
Set active report to be on. If it is on, Kirdy will send status report
to ALL client socket connections according to the temperature polling rate set.
"""
return self._send_cmd(self._cmd._target, self._cmd.SetActiveReportMode, on)
def set_pd_mon_fin_gain(self, gain):
"""
2024-06-21 11:01:20 +08:00
Configure the photodiode monitor final analog front-end stage gain
- gain: unitless
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPdFinGain, gain)
def set_pd_mon_transconductance(self, transconductance):
"""
Configure the photodiode monitor transconductance
- transconductance: 1/Ohm
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPdTransconductance, transconductance)
def get_status_report(self, sig=None):
"""
2024-03-18 15:38:08 +08:00
Get status of all peripherals in a json object
Example of yielded data::
{
'ts': 227657, # Relative Timestamp (ms)
'msg_type': 'Report' # Indicate it is a 'Report' json object
2024-03-18 15:38:08 +08:00
'laser': {
'pwr_on': False, # Laser Power is On (True/False)
'pwr_excursion': False, # Was Laser experienced an Overpowered Condition? (True/False)
'ld_i_set': 0.0, # Laser Diode Output Current (A)
'pd_i': 2.0000002e-06, # Internal Photodiode Monitor current (A)
'pd_pwr': None, # Power Readings from Internal Photodiode (W). Return None if pd_mon parameter(s) are not defined.
'term_50ohm': 'Is50Ohm' # Is the Low Frequency Modulation Input's Impedance 50 Ohm? (On/Off)
2024-03-18 15:38:08 +08:00
},
'thermostat': {
'pwr_on': False, # Tec Power is On (True/False)
'pid_engaged': False, # Is Pid_Engaged. If False, it is in Constant Current Mode (True/False)
'temp_mon_status': { # Temperature Monitor:
'status': 'Off', # (To be revised)
'over_temp_alarm': False # Was Laser Diode experienced an Overtemperature condition (True/False)
},
'temperature': 25.03344, # Temperature Readings (Degree Celsius)
2024-03-18 15:38:08 +08:00
'i_set': 0.0, # Tec Current Set by User/PID Controller(A)
'tec_i': 0.0024998188, # Tec Current Readings (A)
'tec_v': -0.00399971 # Tec Voltage Readings (V)
}
}
"""
if sig is None:
self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig)
return self._read_msg_queue()
else:
return self._send_cmd(self._cmd._target, self._cmd.GetStatusReport, msg_type="Report", sig=sig)
2024-03-18 15:38:08 +08:00
def get_settings_summary(self, sig=None):
2024-03-18 15:38:08 +08:00
"""
Get the current settings of laser and thermostat in a json object
{
'msg_type': 'Settings', # Indicate it is a 'Settings' json object
'laser': {
'default_pwr_on': False, # Power On Laser Diode at Startup
'ld_drive_current': { # Laser Diode Output Current(A)
'value': 0.0, # Value Set
'max': 0.3 # Max Value Settable
,
'pd_mon_params': { # Laser Diode Software Current Limit(A)
'responsitivity': None, # Value Set
'i_dark': 0.0 # Max Value Settable
,
'ld_pwr_limit': 0.0 # Laser Diode Power Limit(W)
'ld_terms_short: False # Is Laser Diode Terminals short? (True/False)
},
'thermostat': {
'default_pwr_on': True, # Power on Thermostat at Startup
'pid_engaged': True, # True: PID Control Mode | False Constant Current Mode
'temperature_setpoint': 25.0, # Temperature Setpoint (Degree Celsius)
'tec_settings': {
'i_set': { # Current TEC Current Set by PID Controller/User
'value': 0.04330516, # Value Set
'max': 1.0 # Max Value Settable
},
'max_v': { # Max Voltage Across Tec Terminals
'value': 4.990857, # Value Set
'max': 5.0 # Max Value Settable
},
'max_i_pos': { # Max Cooling Current Across Tec Terminals
'value': 0.99628574, # Value Set
'max': 1.0 # Max Value Settable
},
'max_i_neg': { # Max Heating Current Across Tec Terminals
'value': 0.99628574, # Value Set
'max': 1.0 # Max Value Settable
}
},
'pid_params': { # PID Controller Parameters
'kp': 0.15668282, # Proportional Gain
'ki': 0.0021359625, # Integral Gain
'kd': 0.8292545, # Derivative Gain
'output_min': -1.0, # Minimum Current Output (A)
'output_max': 1.0 # Maximum Current Output (A)
},
'temp_adc_settings': { # Temperature ADC Settings (Please read AD7172-2 Documentation)
'filter_type': 'Sinc5Sinc1With50hz60HzRejection', # Filter Types
'sinc5sinc1odr': None, # (Unused)
'sinc3odr': None, # (Unused)
'sinc5sinc1postfilter': None, # (Unused)
'sinc3fineodr': None, # (Unused)
'rate': 16.67 # ADC Sampling Rate (Hz)
},
'temp_mon_settings': { # Temperature Monitor Settings
'upper_limit': 40.0, # Temperature Upper Limit (Degree Celsius)
'lower_limit': 10.0 # Temperature Lower Limit (Degree Celsius)
},
'thermistor_params': { # Thermistor Steinhart-Hart equation parameters
't0': 25.0, # t0: Degree Celsius
'r0': 10000.0, # r0: Ohm
'b': 3900.0 # b: (unitless)
}
}
}
"""
if sig is None:
self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig)
return self._read_msg_queue()
else:
return self._send_cmd(self._cmd._target, self._cmd.GetSettingsSummary, msg_type="Settings", sig=sig)
def dfu(self):
"""
Issuing this cmd will HARD RESET the device and
put Kirdy into Dfu mode for flashing firmware.
"""
return self._send_cmd(self._cmd._target, self._cmd.Dfu, hard_reset=True)
def save_current_settings_to_flash(self):
"""
Save the current laser diode and thermostat configurations into flash.
"""
return self._send_cmd(self._cmd._target, self._cmd.SaveFlashSettings)
def restore_settings_from_flash(self):
"""
Restore the laser diode and thermostat settings from flash
"""
return self._send_cmd(self._cmd._target, self._cmd.LoadFlashSettings)
def hard_reset(self):
"""
Hard Reset Kirdy. The socket connection will be closed by Kirdy.
Laser diode power and Tec power will be turned off.
Kirdy will send out a json({'msg_type': 'HardReset'}) to all sockets indicating. The device is being reset.
"""
return self._send_cmd(self._cmd._target, self._cmd.HardReset, hard_reset=True)
class Laser:
def __init__(self, send_cmd_handler):
2024-06-21 11:01:20 +08:00
self._cmd = CmdList.ld
self._send_cmd = send_cmd_handler
def set_power_on(self, on):
"""
Power Up or Power Down laser diode. Powering up the Laser Diode resets the pwr_excursion status
- on (True/False)
"""
if on:
return self._send_cmd(self._cmd._target, self._cmd.PowerUp, None)
else:
return self._send_cmd(self._cmd._target, self._cmd.PowerDown, None)
def set_default_pwr_on(self, on):
"""
Set whether laser diode is powered up at Startup
- on (True/False)
"""
return self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on)
def set_ld_terms_short(self, short):
"""
Open/Short laser diode terminals.
- on (True/False)
"""
if short:
return self._send_cmd(self._cmd._target, self._cmd.LdTermsShort, None)
else:
return self._send_cmd(self._cmd._target, self._cmd.LdTermsOpen, None)
def set_i(self, i):
"""
Set laser diode output current: Max(0, Min(i_set, i_soft_limit))
- i: A
"""
return self._send_cmd(self._cmd._target, self._cmd.SetI, i)
def set_pd_mon_responsitivity(self, responsitivity):
"""
Configure the photodiode monitor responsitivity parameter
- responsitivity: A/W
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPdResponsitivity, responsitivity)
def set_pd_mon_dark_current(self, dark_current):
"""
Configure the photodiode monitor dark current parameter
- dark_current: A
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPdDarkCurrent, dark_current)
def set_ld_pwr_limit(self, pwr_limit):
"""
Set power limit for the power excursion monitor
If the calculated power with the params of pd_mon > pwr_limit,
overpower protection is triggered.
- pwr_limit: W
"""
return self._send_cmd(self._cmd._target, self._cmd.SetLdPwrLimit, pwr_limit)
def clear_alarm(self):
"""
Clear the power excursion monitor alarm
"""
return self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
class Thermostat:
def __init__(self, send_cmd_handler, send_raw_cmd_handler):
2024-06-21 11:01:20 +08:00
self._cmd = CmdList.thermostat
self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler
def set_power_on(self, on):
"""
Power up or power down thermostat
- Powering up the thermostat resets the pwr_excursion status
"""
if on:
return self._send_cmd(self._cmd._target, self._cmd.PowerUp, None)
else:
return self._send_cmd(self._cmd._target, self._cmd.PowerDown, None)
def set_default_pwr_on(self, on):
"""
Set whether thermostat is powered up at Startup
- on: (True/False)
"""
return self._send_cmd(self._cmd._target, self._cmd.SetDefaultPowerOn, on)
def set_tec_max_v(self, max_v):
"""
Set Tec Maximum Voltage Across the TEC Terminals
- max_v: V
"""
return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxV, max_v)
def set_tec_max_cooling_i(self, max_i_pos):
"""
Set Tec maximum cooling current (Settable Range: 0.0 - 1.0)
- max_i_pos: A
"""
return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxIPos, max_i_pos)
def set_tec_max_heating_i(self, max_i_neg):
"""
Set Tec maximum heating current (Settable Range: 0.0 - 1.0)
- max_i_neg: A
"""
return self._send_cmd(self._cmd._target, self._cmd.SetTecMaxINeg, max_i_neg)
def set_tec_i_out(self, i_out):
"""
Set Tec Output Current
This cmd is only effective in constant current control mode
or your newly set value will be overwritten by PID Controller Output
- i_out: A
"""
if isinstance(i_out, float):
return self._send_raw_cmd({"tec_set_i": i_out})
elif isinstance(i_out, int):
return self._send_raw_cmd({"tec_set_i": float(i_out)})
else:
raise InvalidDataType
def set_constant_current_control_mode(self):
"""
Disable PID Controller and output current can be controlled with set_tec_i_out() cmd.
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPidDisEngage, None)
def set_temperature_setpoint(self, temperature):
"""
Set Temperature Setpoint for PID Controller. This parameter is not active in constant current control mode
- temperature: Degree Celsius
"""
return self._send_cmd(self._cmd._target, self._cmd.SetTemperatureSetpoint, temperature)
def set_pid_control_mode(self):
"""
Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate.
Please refer to config_temp_adc_filter for the possible polling rate options
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPidEngage, None)
def set_pid_kp(self, kp):
"""
Set Kp parameter for PID Controller
kp: (unitless)
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPidKp, kp)
def set_pid_ki(self, ki):
"""
Set Ki parameter for PID Controller
ki: (unitless)
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPidKi, ki)
def set_pid_kd(self, kd):
"""
Set Kd parameter for PID Controller
kd: (unitless)
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPidKd, kd)
def set_pid_output_max(self, out_max):
"""
Set max output limit at the PID Output
- out_max: A
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPidOutMax, out_max)
def set_pid_output_min(self, out_min):
"""
Set min output limit at the PID Output
- out_min: A
"""
return self._send_cmd(self._cmd._target, self._cmd.SetPidOutMin, out_min)
def set_temp_mon_upper_limit(self, upper_limit):
"""
Set Temperature Monitor Upper Limit Threshold. Exceeding the limit for too long
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
- upper_limit: Degree Celsius
"""
return self._send_cmd(self._cmd._target, self._cmd.SetTempMonUpperLimit, upper_limit)
def set_temp_mon_lower_limit(self, lower_limit):
"""
Set Temperature Monitor Lower Limit Threshold. Exceeding the limit for too long
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
- lower_limit: Degree Celsius
"""
return self._send_cmd(self._cmd._target, self._cmd.SetTempMonLowerLimit, lower_limit)
def clear_alarm(self):
"""
Clear the temperature monitor alarm
"""
return self._send_cmd(self._cmd._target, self._cmd.ClearAlarm)
def set_sh_t0(self, t0):
"""
Set t0 Steinhart-Hart parameter for the laser diode NTC
- t0: Degree Celsius
"""
return self._send_cmd(self._cmd._target, self._cmd.SetShT0, t0)
def set_sh_r0(self, r0):
"""
Set r0 Steinhart-Hart parameter for the laser diode NTC
- r0: Ohm
"""
return self._send_cmd(self._cmd._target, self._cmd.SetShR0, r0)
def set_sh_beta(self, beta):
"""
Set beta Steinhart-Hart parameter for the laser diode NTC
- beta: (unitless)
"""
return self._send_cmd(self._cmd._target, self._cmd.SetShBeta, beta)
def config_temp_adc_filter(self, filter_config):
"""
Configure the temperature adc filter type and sampling rate.
Please refer to AD7172 datasheet for the usage of various types of filter.
The actual temperature polling rate is bottlenecked by the processing speed of the MCU and
performs differently under different kinds of workload. Please verify the polling rate with
the timestamp.
"""
cmd = {}
2024-06-21 11:01:20 +08:00
cmd[self._cmd._target] = self._cmd.ConfigTempAdcFilter.name
if hasattr(filter_config, 'rate'):
cmd[self._cmd.ConfigTempAdcFilter] = {
"filter_type": filter_config._filter_type(),
filter_config._odr_type(): filter_config.rate,
}
else:
cmd[self._cmd.ConfigTempAdcFilter] = {
"filter_type": filter_config._filter_type(),
filter_config._odr_type(): filter_config,
}
return self._send_raw_cmd(cmd)
class Kirdy:
def __init__(self):
self.device = Device(self._send_cmd_handler, self._send_raw_cmd_handler, self._get_msg)
self.laser = Laser(self._send_cmd_handler)
self.thermostat = Thermostat(self._send_cmd_handler, self._send_raw_cmd_handler)
self._task_queue, self._msg_queue, self._int_msg_queue, self._report_queue = None, None, None, None
self._writer, self._reader = None, None
self._event_loop = None
2024-06-21 11:01:20 +08:00
self._lock = asyncio.Lock()
self._msg_queue_get_report = False
self._report_mode_on = False
# PyQt Signal
self._report_sig = None # Dict
self._connected_sig = None # Bool
def set_report_sig(self, sig):
"""
Connect a PyQt Signal to the active report output(dict)
"""
self._report_sig = sig
def set_connected_sig(self, sig):
"""
Connect a PyQt Signal to connection status(bool)
- True: Connection is established
- False: Connection is dropped
"""
self._connected_sig = sig
def start_session(self, host='192.168.1.128', port=1337, timeout=5.0, con_retry=5.0):
"""
Start Kirdy Connection Session.
A new thread is started to handle TCP connection and task execution in the background.
In case of disconnection, all the queued tasks are cleared and the thread retries TCP connection indefinitely.
- host: Kirdy's IP Address
- port: Kirdy's TCP Port
"""
if self._event_loop is None:
self._host, self._ctrl_port = host, port
self._timeout, self._con_retry = timeout, con_retry
self._event_loop = asyncio.new_event_loop()
self._thread = Thread(target=self._event_loop.run_forever)
self._thread.start()
asyncio.run_coroutine_threadsafe(self._handler(), self._event_loop)
return True
else:
logging.warning("Helper Thread has been started.")
return False
def end_session(self, block=False):
"""
Stop Kirdy's TCP connection and its associated thread.
"""
if self._event_loop is not None:
if block:
while not(self._task_queue.empty()):
pass
cancel_task = asyncio.run_coroutine_threadsafe(self._stop_handler(), self._event_loop)
while not(cancel_task.done()):
pass
self._thread.join()
self._writer = None
if self._connected_sig is not None:
self._connected_sig.emit(False)
def connecting(self):
"""
Return True if client is connecting
"""
return not self.connected() and self._event_loop is not None
def connected(self):
"""
Returns True if client is connected.
"""
return self._writer is not None
def get_report_stream(self):
"""
Start reporting device status in json object.
"""
if self.connected():
self._report_mode_on = True
self.device.set_active_report_mode(True)
while self._report_mode_on:
report = self._report_queue.get()
if isinstance(report, Exception):
raise report
yield report
self.device.set_active_report_mode(False)
else:
raise ConnectionError
def stop_report_mode(self):
self._report_mode_on = False
async def _sock_disconnection_handling(self):
# Reader needn't be closed
try:
self._writer.close()
await self._writer.wait_closed()
except:
# In Hard Reset/DFU cmd, Kirdy may close its socket first
pass
self._reader = None
self._writer = None
for i in range(self._msg_queue.maxsize):
if self._msg_queue.full():
self._msg_queue.get_nowait()
self._msg_queue.put_nowait(None)
for i in range(self._report_queue.maxsize):
if self._report_queue.full():
self._report_queue.get_nowait()
self._report_queue.put_nowait(None)
if self._connected_sig is not None:
self._connected_sig.emit(False)
async def _handler(self):
try:
state = State.disconnected
first_con = True
task = None
while True:
if state == State.disconnected:
try:
await self.__coninit(self._timeout)
read_response_fut = asyncio.run_coroutine_threadsafe(self._read_response_handler(), self._event_loop)
task = None
logging.debug("Connected")
# State Transition
state = State.connected
except (OSError, TimeoutError):
if first_con:
first_con = False
logging.warning("Cannot connect to %s:%d. Retrying in the background", self._host, self._ctrl_port)
await asyncio.sleep(self._con_retry)
elif state == State.connected:
try:
task = await self._task_queue.get()
if isinstance(task, Exception):
raise task
response = await asyncio.wait_for(task(), self._timeout)
if response is not None:
if response["msg_type"] != "Acknowledge":
self._msg_queue.put_nowait(response)
except (TimeoutError, ConnectionResetError):
logging.warning("Kirdy connection is dropped.")
first_con = True
read_response_fut.cancel()
await self._sock_disconnection_handling()
# State Transition
state = State.disconnected
except asyncio.exceptions.CancelledError:
logging.debug("Handler is canceling")
except:
logging.debug("Handler experienced an error. Exiting.", exc_info=True)
await self._sock_disconnection_handling()
if self._event_loop is not None:
self._event_loop.call_soon_threadsafe(self._event_loop.stop)
self._event_loop = None
async def _read_response_handler(self):
try:
while True:
if self._report_mode_on:
responses = await asyncio.wait_for(self._read_response(), 5.0)
else:
responses = await self._read_response()
for response in responses:
if response["msg_type"] == 'Report' and not self._msg_queue_get_report:
if self._report_sig is None:
if self._report_queue.full():
self._report_queue.get_nowait()
self._report_queue.put_nowait(response)
else:
self._report_sig.emit(response)
else:
if self._msg_queue_get_report:
async with self._lock:
self._msg_queue_get_report = False
if self._int_msg_queue.full():
logging.debug("_int_msg_queue is full")
self._int_msg_queue.get_nowait()
self._int_msg_queue.put_nowait(response)
except asyncio.exceptions.CancelledError:
logging.debug("Read Response Handler is canceling")
except TimeoutError:
logging.warning("Read active report response timeout")
if self._task_queue.full():
logging.debug("_int_msg_queue is full")
self._task_queue.get_nowait()
self._task_queue.put_nowait(TimeoutError())
except ConnectionResetError:
logging.warning("Connection Reset by peer")
if self._task_queue.full():
logging.debug("_int_msg_queue is full")
self._task_queue.get_nowait()
self._task_queue.put_nowait(ConnectionResetError())
except:
logging.warn("Read Response Handler experienced an error. Exiting.", exc_info=True)
if self._report_mode_on:
self._report_mode_on = False
self._report_queue.put_nowait(TimeoutError)
async def _stop_handler(self):
for task in asyncio.all_tasks():
task.cancel()
await asyncio.gather(*asyncio.all_tasks(), loop=self._event_loop)
async def __coninit(self, timeout):
self._task_queue = asyncio.Queue(maxsize=64)
self._int_msg_queue = asyncio.Queue(maxsize=4)
self._msg_queue = queue.Queue(maxsize=64)
self._report_queue = queue.Queue(maxsize=16)
self._reader, self._writer = await asyncio.wait_for(asyncio.open_connection(self._host, self._ctrl_port), timeout)
writer_sock = self._writer.get_extra_info("socket")
writer_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if self._connected_sig is not None:
self._connected_sig.emit(True)
async def _read_response(self, buffer_size=16384):
raw_response = await self._reader.read(buffer_size)
response = raw_response.decode('utf-8', errors='ignore').split("\n")
response.reverse()
items = []
for item in response[1:]:
items.append(json.loads(item))
return items
def _get_msg(self):
msg = self._msg_queue.get()
return msg
def _send_raw_cmd_handler(self, cmd, msg_type="Acknowledge", sig=None):
if self.connected():
self._event_loop.call_soon_threadsafe(self._task_queue.put_nowait, lambda: self._send_raw_cmd(cmd, msg_type, sig=sig))
else:
raise ConnectionError
# If the cmd involves a cmd specific data type,
# checking is done separately within the functions being called
async def _send_raw_cmd(self, cmd, msg_type, sig=None):
if self.connected():
self._writer.write(bytes(json.dumps(cmd), "UTF-8"))
await self._writer.drain()
response = await self._int_msg_queue.get()
if response["msg_type"] == msg_type:
if sig is not None:
sig.emit(response)
return {"msg_type": "Acknowledge"}
return response
else:
raise InvalidCmd
else:
raise ConnectionError
def _send_cmd_handler(self, target, cmd, data=None, msg_type="Acknowledge", sig=None, hard_reset=False):
if self.connected():
if hard_reset:
while not(self._task_queue.empty()):
pass
self._event_loop.call_soon_threadsafe(self._task_queue.put_nowait, lambda: self._send_cmd(target, cmd, data, msg_type, sig=sig))
if hard_reset:
# Wait 1s for Kirdy to hard reset
time.sleep(1.0)
else:
raise ConnectionError
async def _send_cmd(self, target, cmd, data, msg_type, sig=None):
cmd_dict = {}
2024-06-21 11:01:20 +08:00
cmd_dict[target] = cmd.name
2024-06-21 11:01:20 +08:00
if cmd == _dt.f32:
if isinstance(data, float):
2024-06-21 11:01:20 +08:00
cmd_dict[cmd] = data
elif isinstance(data, int):
2024-06-21 11:01:20 +08:00
cmd_dict[cmd] = float(data)
elif cmd == _dt.bool:
if isinstance(data, bool):
2024-06-21 11:01:20 +08:00
cmd_dict[cmd] = data
else:
raise InvalidDataType
2024-06-21 11:01:20 +08:00
elif cmd == "None":
pass
if msg_type == 'Report':
async with self._lock:
self._msg_queue_get_report = True
self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8"))
await self._writer.drain()
msg = await self._int_msg_queue.get()
if msg['msg_type'] == msg_type:
if sig is not None:
sig.emit(msg)
return {"msg_type": "Acknowledge"}
return msg
else:
raise InvalidCmd