Implement Kirdy AsyncIo Driver and example code

This commit is contained in:
linuswck 2024-03-15 17:14:22 +08:00
parent 1480305c16
commit 61624b0bd6
4 changed files with 746 additions and 149 deletions

View File

@ -1,149 +0,0 @@
# Python Test Scripts for Controlling Kirdy
# Kirdy is written to be controlled via a json object based on miniconf rust crate
# Json Field:
# "laser_diode_cmd / thermostat_cmd": Check cmd_handler.rs for the list of cmds
# "data_f32": Optional f32 Data field depending on cmd
# "data_f64": Optional f64 Data field depending on cmd
import socket
import json
import time
import signal
# Kirdy IP and Port Number
HOST = "192.168.1.132"
PORT = 1337
dfu_cmd = {
"device_cmd": "Dfu",
}
ld_cmd = {
"laser_diode_cmd": "SetI",
"data_f32": 0.0,
}
tec_clear_alarm = {
"thermostat_cmd": "ClearAlarm",
}
tec_power_down = {
"thermostat_cmd": "PowerDown",
}
tec_set_sh_t0_cmd = {
"thermostat_cmd": "SetShT0",
"data_f32": 25.0,
}
tec_set_sh_r0_cmd = {
"thermostat_cmd": "SetShR0",
"data_f32": 10.0 * 1000,
}
tec_set_sh_beta_cmd = {
"thermostat_cmd": "SetShBeta",
"data_f32": 3900.0,
}
tec_set_temperature_setpoint_cmd = {
"thermostat_cmd": "SetTemperatureSetpoint",
"data_f32": 30.0,
}
tec_set_pid_kp_cmd = {
"thermostat_cmd": "SetPidKp",
"data_f32": 0.14398316474562461
}
tec_set_pid_ki_cmd = {
"thermostat_cmd": "SetPidKi",
"data_f32": 0.004117762308816449
}
tec_set_pid_kd_cmd = {
"thermostat_cmd": "SetPidKd",
"data_f32": 0.36324599631515664
}
tec_set_pid_out_min_cmd = {
"thermostat_cmd": "SetPidOutMin",
"data_f32": -1.0,
}
tec_set_pid_out_max_cmd = {
"thermostat_cmd": "SetPidOutMax",
"data_f32": 1.0,
}
tec_power_up = {
"thermostat_cmd": "PowerUp",
}
tec_pid_engage = {
"thermostat_cmd": "SetPidEngage",
}
kirdy_get_status_report = {
"device_cmd": "GetStatusReport",
}
kirdy_report_mode_cmd = {
"device_cmd": "SetActiveReportMode",
"data_bool": True,
}
temp_adc_config = {
"thermostat_cmd": "ConfigTempAdcFilter",
"temp_adc_filter": {
"filter_type": "Sinc5Sinc1With50hz60HzRejection",
"sinc5sinc1postfilter": "F16SPS"
}
}
def send_cmd(input, socket):
socket.send(bytes(json.dumps(input), "UTF-8"))
# Give some time for Kirdy to process the cmd
time.sleep(0.5)
def poll_status_report(socket):
while True:
data = socket.recv(1024).decode('utf8')
try:
json_data = json.loads(data)
break
except json.decoder.JSONDecodeError:
pass
return json_data
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def signal_handler(sig, frame):
ld_cmd["data_f32"] = 0.0
send_cmd(ld_cmd, s)
send_cmd(tec_power_down, s)
s.close()
exit()
signal.signal(signal.SIGINT, signal_handler)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
send_cmd(ld_cmd, s)
send_cmd(tec_power_down, s)
send_cmd(tec_set_sh_t0_cmd, s)
send_cmd(tec_set_sh_r0_cmd, s)
send_cmd(tec_set_sh_beta_cmd, s)
send_cmd(tec_set_temperature_setpoint_cmd, s)
send_cmd(tec_set_pid_kp_cmd, s)
send_cmd(tec_set_pid_ki_cmd, s)
send_cmd(tec_set_pid_kd_cmd, s)
send_cmd(tec_clear_alarm, s)
send_cmd(temp_adc_config, s)
send_cmd(tec_pid_engage, s)
send_cmd(tec_power_up, s)
send_cmd(kirdy_report_mode_cmd, s)
while True:
status_report = poll_status_report(s)
print(f"Ts: {status_report['ts']} | Temperature: {status_report['tec']['temperature'] - 273.15}")

View File

@ -0,0 +1,47 @@
from driver.kirdy_async import Kirdy
import asyncio
async def main():
kirdy = Kirdy()
await kirdy.start_session(host='192.168.1.128', port=1337, timeout=0.25)
await kirdy.device.set_active_report_mode(False)
await kirdy.laser.set_power_on(False)
await kirdy.laser.clear_alarm()
await kirdy.laser.set_i(0)
await kirdy.laser.set_i_soft_limit(250)
await kirdy.laser.set_power_on(True)
await kirdy.thermostat.set_power_on(False)
await kirdy.thermostat.clear_alarm()
await kirdy.thermostat.set_sh_r0(10.0*1000)
await kirdy.thermostat.set_sh_t0(25)
await kirdy.thermostat.set_sh_beta(3900)
await kirdy.thermostat.set_temperature_setpoint(25)
await kirdy.thermostat.set_temp_mon_upper_limit(40)
await kirdy.thermostat.set_temp_mon_lower_limit(10)
await kirdy.thermostat.set_pid_kp(0.15668282198105507)
await kirdy.thermostat.set_pid_ki(0.002135962407793784)
await kirdy.thermostat.set_pid_kd(0.829254515277143)
await kirdy.thermostat.set_pid_output_max(1.0)
await kirdy.thermostat.set_pid_output_min(-1.0)
await kirdy.thermostat.config_temp_adc_filter("Sinc5Sinc1With50hz60HzRejection", "F16SPS")
await kirdy.thermostat.set_power_on(True)
await kirdy.thermostat.set_pid_control_mode()
await kirdy.laser.set_default_pwr_on(False)
await kirdy.thermostat.set_default_pwr_on(True)
await kirdy.device.save_current_settings_to_flash()
async for data in kirdy.report_mode():
print(data)
await kirdy.end_session()
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,699 @@
import socket
import asyncio
import json
# Data Type Enums
IP_SETTINGS = "ip_settings"
TEMP_ADC_FILTER = "temp_adc_filter"
DATA_F32 = "data_f32"
DATA_BOOL = "data_bool"
TARGET_DEVICE = "device_cmd"
TARGET_LD = "laser_diode_cmd"
TARGET_THERMOSTAT = "thermostat_cmd"
class CmdDoesNotExist(Exception):
pass
class InvalidDataType(Exception):
pass
class NoAckRecv(Exception):
pass
Filter_Config = {
"Sinc5Sinc1With50hz60HzRejection": [
"sinc5sinc1postfilter",
[
"F27SPS",
"F21SPS",
"F20SPS",
"F16SPS",
]
],
"Sinc5Sinc1": [
"sinc5sinc1odr",
[
"F31250_0SPS",
"F15625_0SPS",
"F10417_0SPS",
"F5208_0SPS" ,
"F2597_0SPS" ,
"F1007_0SPS" ,
"F503_8SPS" ,
"F381_0SPS" ,
"F200_3SPS" ,
"F100_2SPS" ,
"F59_52SPS" ,
"F49_68SPS" ,
"F20_01SPS" ,
"F16_63SPS" ,
"F10_0SPS" ,
"F5_0SPS" ,
"F2_5SPS" ,
"F1_25SPS" ,
]
],
"Sinc3": [
"sinc3odr",
[
"F31250_0SPS",
"F15625_0SPS",
"F10417_0SPS",
"F5208_0SPS" ,
"F2597_0SPS" ,
"F1007_0SPS" ,
"F503_8SPS" ,
"F381_0SPS" ,
"F200_3SPS" ,
"F100_2SPS" ,
"F59_52SPS" ,
"F49_68SPS" ,
"F20_01SPS" ,
"F16_63SPS" ,
"F10_0SPS" ,
"F5_0SPS" ,
"F2_5SPS" ,
"F1_25SPS" ,
]
],
"Sinc3WithFineODR": [
"sinc3fineodr",
DATA_F32
],
}
class Device:
def __init__(self, send_cmd_handler, send_raw_cmd_handler, read_response):
self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler
self._read_response = read_response
async def set_ip_settings(self, addr=[192, 168, 1, 132], port=1337, prefix_len=24, gateway=[192, 168, 1, 1]):
"""
After calling this fn, the user needs to issue the SaveFlashSettings cmd and then issue a
Hard Reset/Power Cycle Kirdy for the new network settings to be effective.
"""
if not(isinstance(addr, list) and isinstance(gateway, list)):
raise InvalidDataType
if not(len(addr) == 4 and len(gateway) == 4):
raise InvalidDataType
if not(isinstance(port, int) and isinstance(prefix_len, int)):
raise InvalidDataType
return await self._send_raw_cmd(
{
"device_cmd": "SetIPSettings",
"ip_settings": {
"addr": addr,
"port": port,
"prefix_len": prefix_len,
"gateway": gateway,
}
}
)
async def set_active_report_mode(self, on):
"""
Set active report to be on. If it is on, Kirdy will send status report
to ALL client socket connections according to the temperature polling rate set.
"""
return await self._send_cmd(TARGET_DEVICE, "SetActiveReportMode", on)
async def get_status_report(self):
"""
{
'laser': {
default_pwr_on': False, # Power On Laser Diode at Startup
ld_drive_current': { # Laser Diode Output Current(A)
'value': 0.0, # Value Set
'max': 0.3 # Max Value Settable
,
ld_drive_current_limit': { # Laser Diode Software Current Limit(A)
'value': 0.3, # Value Set
'max': 0.3 # Max Value Settable
,
pd_responsitivity': { # Laser Diode Software Current Limit(A)
'responsitivity': None, # Value Set
'i_dark': 0.0 # Max Value Settable
,
ld_pwr_limit': 0.0 # Laser Diode Power Limit(W)
},
'thermostat': {
'default_pwr_on': True, # Power on Thermostat at Startup
'pid_engaged': True, # True: PID Control Mode | False Constant Current Mode
'temperature_setpoint': 298.15, # Temperature Setpoint (K)
'tec_settings': {
'i_set': { # Current TEC Current Set by PID Controller/User
'value': 0.04330516, # Value Set
'max': 1.0 # Max Value Settable
},
'max_v': { # Max Voltage Across Tec Terminals
'value': 4.990857, # Value Set
'max': 5.0 # Max Value Settable
},
'max_i_pos': { # Max Cooling Current Across Tec Terminals
'value': 0.99628574, # Value Set
'max': 1.0 # Max Value Settable
},
'max_i_neg': { # Max Heating Current Across Tec Terminals
'value': 0.99628574, # Value Set
'max': 1.0 # Max Value Settable
}
},
'pid_params': { # PID Controller Parameters
'kp': 0.15668282, # Proportional Gain
'ki': 0.0021359625, # Integral Gain
'kd': 0.8292545, # Derivative Gain
'output_min': -1.0, # Minimum Current Output (A)
'output_max': 1.0 # Maximum Current Output (A)
},
'temp_adc_settings': { # Temperature ADC Settings (Please read AD7172-2 Documentation)
'filter_type': 'Sinc5Sinc1With50hz60HzRejection', # Filter Types
'sinc5sinc1odr': None, # (Unused)
'sinc3odr': None, # (Unused)
'sinc5sinc1postfilter': None, # (Unused)
'sinc3fineodr': None, # (Unused)
'rate': 16.67 # ADC Sampling Rate (Hz)
},
'temp_mon_settings': { # Temperature Monitor Settings
'upper_limit': 40.0, # Temperature Upper Limit (Degree Celsius)
'lower_limit': 10.0 # Temperature Lower Limit (Degree Celsius)
},
'thermistor_params': { # Thermistor Steinhart-Hart equation parameters
't0': 25.0, # t0: Degree Celsius
'r0': 10000.0, # r0: Ohm
'b': 3900.0 # b: (unitless)
}
}
}
"""
response = await self._send_cmd(TARGET_DEVICE, "GetStatusReport")
if response["msg_type"] != "Acknowledge":
return response
return await self._read_response()
async def get_settings_summary(self):
response = await self._send_cmd(TARGET_DEVICE, "GetSettingsSummary")
if response["msg_type"] != "Acknowledge":
return response
return await self._read_response()
async def dfu(self):
"""
Issuing this cmd will HARD RESET the device and
put Kirdy into Dfu mode for flashing firmware.
"""
return await self._send_cmd(TARGET_DEVICE, "Dfu")
async def save_current_settings_to_flash(self):
"""
Save the current device configurations into flash.
"""
return await self._send_cmd(TARGET_DEVICE, "SaveFlashSettings")
async def load_current_settings_to_flash(self):
"""
Restore stored settings in flash
"""
return await self._send_cmd(TARGET_DEVICE, "LoadFlashSettings")
async def hard_reset(self):
"""
Hard Reset Kirdy. The socket connection will be closed by Kirdy.
Laser diode power and Tec power will be turned off.
"""
return await self._send_cmd(TARGET_DEVICE, "HardReset")
class Laser:
def __init__(self, send_cmd_handler):
self._send_cmd = send_cmd_handler
async def set_power_on(self, on):
"""
Power Up or Power Down laser diode. Powering up the Laser Diode resets the pwr_excursion status
- on (True/False)
"""
if on:
return await self._send_cmd(TARGET_LD, "PowerUp", None)
else:
return await self._send_cmd(TARGET_LD, "PowerDown", None)
async def set_default_pwr_on(self, on):
"""
Set whether laser diode is powered up at Startup
- on (True/False)
"""
return await self._send_cmd(TARGET_LD, "SetDefaultPowerOn", on)
async def set_ld_terms_short(self, short):
"""
Open/Short laser diode terminals.
- on (True/False)
"""
if short:
return await self._send_cmd(TARGET_LD, "LdTermsShort", None)
else:
return await self._send_cmd(TARGET_LD, "LdTermsOpen", None)
async def set_i(self, i):
"""
Set laser diode output current: Max(0, Min(i_set, i_soft_limit))
- i: mA
"""
return await self._send_cmd(TARGET_LD, "SetI", i)
async def set_i_soft_limit(self, i_limit):
"""
Set laser diode software output current limit
- i_limit: mA
"""
return await self._send_cmd(TARGET_LD, "SetISoftLimit", i_limit)
async def config_pd_mon(self, responsitivity, dark_current):
"""
Configure the photodiode monitor parameters
- responsitivity: A/W
- dark current: uA
"""
response = await self._send_cmd(TARGET_LD, "SetPdResponsitivity", responsitivity)
if response["msg_type"] != "Acknowledge":
return response
return await self._send_cmd(TARGET_LD, "SetPdDarkCurrent", dark_current)
async def set_ld_pwr_limit(self, pwr_limit):
"""
Set power limit for the power excursion monitor
If the calculated power with the params of pd_mon > pwr_limit,
overpower protection is triggered.
- pwr_limit: mW
"""
return await self._send_cmd(TARGET_LD, "SetLdPwrLimit", pwr_limit)
async def clear_alarm(self):
"""
Clear the power excursion monitor alarm
"""
return await self._send_cmd(TARGET_LD, "ClearAlarm")
class Thermostat:
def __init__(self, send_cmd_handler, send_raw_cmd_handler):
self._send_cmd = send_cmd_handler
self._send_raw_cmd = send_raw_cmd_handler
async def set_power_on(self, on):
"""
Power up or power down thermostat
- Powering up the thermostat resets the pwr_excursion status
"""
if on:
return await self._send_cmd(TARGET_THERMOSTAT, "PowerUp", None)
else:
return await self._send_cmd(TARGET_THERMOSTAT, "PowerDown", None)
async def set_default_pwr_on(self, on):
"""
Set whether thermostat is powered up at Startup
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetDefaultPowerOn", on)
async def set_tec_max_v(self, max_v):
"""
Set Tec Maximum Voltage Across the TEC Terminals
- max_v: V
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTecMaxV", max_v)
async def set_tec_max_i_pos(self, max_i_pos):
"""
Set Tec maximum positive output
- max_i_pos: A
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTecMaxIPos", max_i_pos)
async def set_tec_max_i_neg(self, max_i_neg):
"""
Set Tec maximum negative output
- max_i_neg: A
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTecMaxINeg", max_i_neg)
async def set_tec_i_out(self, i_out):
"""
Set Tec Output Current
This cmd is only effective in constant current control mode
or your newly set value will be overwritten by PID Controller Output
- i_out: A
"""
if isinstance(i_out, float):
return await self._send_raw_cmd({"tec_set_i": i_out})
elif isinstance(i_out, int):
return await self._send_raw_cmd({"tec_set_i": float(i_out)})
else:
raise InvalidDataType
async def set_constant_current_control_mode(self):
"""
Disable PID Controller and output current can be controlled with set_tec_i_out() cmd.
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidDisEngage", None)
async def set_temperature_setpoint(self, temperature):
"""
Set Temperature Setpoint for PID Controller. This parameter is not active in constant current control mode
- temperature: Degree Celsius
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTemperatureSetpoint", temperature)
async def set_pid_control_mode(self):
"""
Enable PID Controller. Its PID Update Interval is controlled by the Temperature ADC polling rate.
Please refer to config_temp_adc_filter for the possible polling rate options
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidEngage", None)
async def set_pid_kp(self, kp):
"""
Set Kp parameter for PID Controller
kp: (unitless)
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidKp", kp)
async def set_pid_ki(self, ki):
"""
Set Ki parameter for PID Controller
ki: (unitless)
"""
await self._send_cmd(TARGET_THERMOSTAT, "SetPidKi", ki)
async def set_pid_kd(self, kd):
"""
Set Kd parameter for PID Controller
kd: (unitless)
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidKd", kd)
async def set_pid_output_max(self, out_max):
"""
Set max output limit at the PID Output
- out_max: A
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidOutMax", out_max)
async def set_pid_output_min(self, out_min):
"""
Set min output limit at the PID Output
- out_min: A
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetPidOutMin", out_min)
async def set_temp_mon_upper_limit(self, upper_limit):
"""
Set Temperature Monitor Upper Limit Threshold. Exceeding the limit for too long
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
- upper_limit: Degree Celsius
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTempMonUpperLimit", upper_limit)
async def set_temp_mon_lower_limit(self, lower_limit):
"""
Set Temperature Monitor Lower Limit Threshold. Exceeding the limit for too long
will force the TEC Controller, PID Controller and Laser Diode Power to Shutdown
- lower_limit: Degree Celsius
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetTempMonLowerLimit", lower_limit)
async def clear_alarm(self):
"""
Clear the temperature monitor alarm
"""
return await self._send_cmd(TARGET_THERMOSTAT, "ClearAlarm")
async def set_sh_t0(self, t0):
"""
Set t0 Steinhart-Hart parameter for the laser diode NTC
- t0: Degree Celsius
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetShT0", t0)
async def set_sh_r0(self, r0):
"""
Set r0 Steinhart-Hart parameter for the laser diode NTC
- r0: Ohm
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetShR0", r0)
async def set_sh_beta(self, beta):
"""
Set beta Steinhart-Hart parameter for the laser diode NTC
- beta: (unitless)
"""
return await self._send_cmd(TARGET_THERMOSTAT, "SetShBeta", beta)
async def config_temp_adc_filter(self, filter_type, sampling_rate):
"""
Configure the temperature adc filter type and sampling rate.
Please refer to AD7172 datasheet for the usage of various types of filter.
The actual temperature polling rate is bottlenecked by the processing speed of the MCU and
performs differently under different kinds of workload. Please verify the polling rate with
the timestamp.
"""
if not(filter_type in Filter_Config.keys()):
raise InvalidDataType
if Filter_Config[filter_type][1] != DATA_F32:
if not(sampling_rate in Filter_Config[filter_type][1]):
raise InvalidDataType
else:
if not(isinstance(sampling_rate, float)):
raise InvalidDataType
cmd = {}
cmd["thermostat_cmd"] = "ConfigTempAdcFilter"
cmd["temp_adc_filter"] = {
"filter_type": filter_type,
Filter_Config[filter_type][0]: sampling_rate,
}
return await self._send_raw_cmd(cmd)
class Kirdy:
def __init__(self):
self._reader = None
self._writer = None
self._connecting_task = None
self._cmd_lock = asyncio.Lock()
self._report_mode_on = False
self.timeout = None
self.device = Device(self._send_cmd_handler, self._send_raw_cmd_handler, self._read_response)
self.laser = Laser(self._send_cmd_handler)
self.thermostat = Thermostat(self._send_cmd_handler, self._send_raw_cmd_handler)
self._cmd_list = {
TARGET_DEVICE: {
"SetIPSettings": IP_SETTINGS,
"SetActiveReportMode": DATA_BOOL,
"GetStatusReport": None,
"GetSettingsSummary": None,
"Dfu": None,
"SaveFlashSettings": None,
"LoadFlashSettings": None,
"HardReset": None,
},
TARGET_LD: {
# LD Drive Related
"SetDefaultPowerOn": DATA_BOOL,
"PowerUp": None,
"PowerDown": None,
"LdTermsShort": None,
"LdTermsOpen": None,
"SetI": DATA_F32,
"SetISoftLimit": DATA_F32,
# PD Mon Related
"SetPdResponsitivity": DATA_F32,
"SetPdDarkCurrent": DATA_F32,
"SetLdPwrLimit": DATA_F32,
"ClearAlarm": None,
},
TARGET_THERMOSTAT: {
"SetDefaultPowerOn": DATA_BOOL,
"PowerUp": DATA_F32,
"PowerDown": DATA_F32,
# TEC Controller Settings
"SetTecMaxV": DATA_F32,
"SetTecMaxIPos": DATA_F32,
"SetTecMaxINeg": DATA_F32,
"SetTecIOut": DATA_F32,
"SetTemperatureSetpoint": DATA_F32,
# PID Controller Settings
"SetPidEngage": None,
"SetPidDisEngage": None,
"SetPidKp": DATA_F32,
"SetPidKi": DATA_F32,
"SetPidKd": DATA_F32,
"SetPidOutMin": DATA_F32,
"SetPidOutMax": DATA_F32,
# Temperature Adc Internal Filters Settings
"ConfigTempAdcFilter": TEMP_ADC_FILTER,
# Temperature Monitor Settings
"SetTempMonUpperLimit": DATA_F32,
"SetTempMonLowerLimit": DATA_F32,
"ClearAlarm": None,
# Thermistor Parameter Settings
"SetShT0": DATA_F32,
"SetShR0": DATA_F32,
"SetShBeta": DATA_F32,
}
}
async def start_session(self, host='192.168.1.128', port=1337, timeout=None):
self._connecting_task = asyncio.create_task(
asyncio.wait_for(asyncio.open_connection(host, port), timeout)
)
self.timeout = timeout
try:
self._reader, self._writer = await self._connecting_task
writer_sock = self._writer.get_extra_info("socket")
writer_sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except asyncio.CancelledError:
raise StoppedConnecting
finally:
self._connecting_task = None
def connecting(self):
"""Returns True if client is connecting"""
return self._connecting_task is not None
def connected(self):
"""Returns True if client is connected"""
return self._writer is not None
async def _cmd(self, *cmd):
async with self._cmd_lock:
# protect the read-write process from being cancelled midway
line = await asyncio.shield(self._read_write(cmd))
async def end_session(self):
"""End session to Thermostat if connected, cancel connection if connecting"""
if self._connecting_task is not None:
self._connecting_task.cancel()
if self._writer is None:
return
# Reader needn't be closed
self._writer.close()
await self._writer.wait_closed()
self._reader = None
self._writer = None
async def _read_response(self):
try:
response = await asyncio.wait_for(self._reader.read(1024), self.timeout)
except TimeoutError:
return {
"msg_type": "Internal Timeout"
}
response = response.decode('utf-8', errors='ignore')
try:
return json.loads(response)
except json.decoder.JSONDecodeError:
return {
"msg_type": "Internal Invalid"
}
# If the cmd involves a cmd specific data type,
# checking is done separately within the functions being called
async def _send_raw_cmd_handler(self, cmd):
retry = 0
while retry < 10:
self._writer.write(bytes(json.dumps(cmd), "UTF-8"))
await self._writer.drain()
response = await self._read_response()
try:
if response["msg_type"] == "Acknowledge":
return response
except:
retry += 1
await asyncio.sleep(0.25)
raise NoAckRecv
async def _send_cmd_handler(self, target, cmd, data=None):
cmd_dict = {}
if not(target in self._cmd_list.keys()) or not(cmd in self._cmd_list[target].keys()):
raise CmdDoesNotExist
cmd_dict[target] = cmd
if self._cmd_list[target][cmd] == DATA_F32:
if isinstance(data, float):
cmd_dict[DATA_F32] = data
elif isinstance(data, int):
cmd_dict[DATA_F32] = float(data)
elif self._cmd_list[target][cmd] == DATA_BOOL:
if isinstance(data, bool):
cmd_dict[DATA_BOOL] = data
else:
raise InvalidDataType
elif self._cmd_list[target][cmd] == None:
pass
else:
# Undefined Data Type
raise CmdDoesNotExist
retry = 0
while retry < 10:
self._writer.write(bytes(json.dumps(cmd_dict), "UTF-8"))
await self._writer.drain()
response = await self._read_response()
try:
if response["msg_type"] == "Acknowledge":
return response
except:
retry += 1
await asyncio.sleep(0.1)
raise NoAckRecv
async def report_mode(self):
"""Start reporting device status in json object
Example of yielded data::
{
'ts': 227657, # Relative Timestamp (ms)
'laser': {
'pwr_on': False, # Laser Power is On (True/False)
'pwr_excursion': False, # Was Laser experienced an Overpowered Condition? (True/False)
'ld_i_set': 0.0, # Laser Diode Output Current (A)
'pd_i': 2.0000002e-06, # Internal Photodiode Monitor current (A)
'pd_pwr': None, # Power Readings from Internal Photodiode (W). Return None if pd_mon parameter(s) are not defined.
'term_status': 'Is50Ohm' # Is the Low Frequency Modulation Input's Impedance 50 Ohm? (Is50Ohm/Not50Ohm)
},
'tec': {
'pwr_on': False, # Tec Power is On (True/False)
'pid_engaged': False, # Is Pid_Engaged. If False, it is in Constant Current Mode (True/False)
'temp_mon_status': { # Temperature Monitor:
'status': 'Off', # (To be revised)
'over_temp_alarm': False # Was Laser Diode experienced an Overtemperature condition (True/False)
},
'temperature': 298.18344, # Temperature Readings (K)
'i_set': 0.0, # Tec Current Set by User/PID Controller(A)
'tec_i': 0.0024998188, # Tec Current Readings (A)
'tec_v': -0.00399971 # Tec Voltage Readings (V)
}
}
"""
await self.device.set_active_report_mode(True)
self._report_mode_on = True
while self._report_mode_on:
async with self._cmd_lock:
yield await self._read_response()
await self.device.set_active_report_mode(False)
def stop_report_mode(self):
self._report_mode_on = False