kirdy/eth_cmd_test.py

150 lines
3.3 KiB
Python

# Python Test Scripts for Controlling Kirdy
# Kirdy is written to be controlled via a json object based on miniconf rust crate
# Json Field:
# "laser_diode_cmd / thermostat_cmd": Check cmd_handler.rs for the list of cmds
# "data_f32": Optional f32 Data field depending on cmd
# "data_f64": Optional f64 Data field depending on cmd
import socket
import json
import time
import signal
# Kirdy IP and Port Number
HOST = "192.168.1.132"
PORT = 1337
dfu_cmd = {
"device_cmd": "Dfu",
}
ld_cmd = {
"laser_diode_cmd": "SetI",
"data_f32": 0.0,
}
tec_clear_alarm = {
"thermostat_cmd": "ClearAlarm",
}
tec_power_down = {
"thermostat_cmd": "PowerDown",
}
tec_set_sh_t0_cmd = {
"thermostat_cmd": "SetShT0",
"data_f32": 25.0,
}
tec_set_sh_r0_cmd = {
"thermostat_cmd": "SetShR0",
"data_f32": 10.0 * 1000,
}
tec_set_sh_beta_cmd = {
"thermostat_cmd": "SetShBeta",
"data_f32": 3900.0,
}
tec_set_temperature_setpoint_cmd = {
"thermostat_cmd": "SetTemperatureSetpoint",
"data_f32": 30.0,
}
tec_set_pid_kp_cmd = {
"thermostat_cmd": "SetPidKp",
"data_f32": 0.14398316474562461
}
tec_set_pid_ki_cmd = {
"thermostat_cmd": "SetPidKi",
"data_f32": 0.004117762308816449
}
tec_set_pid_kd_cmd = {
"thermostat_cmd": "SetPidKd",
"data_f32": 0.36324599631515664
}
tec_set_pid_out_min_cmd = {
"thermostat_cmd": "SetPidOutMin",
"data_f32": -1.0,
}
tec_set_pid_out_max_cmd = {
"thermostat_cmd": "SetPidOutMax",
"data_f32": 1.0,
}
tec_power_up = {
"thermostat_cmd": "PowerUp",
}
tec_pid_engage = {
"thermostat_cmd": "SetPidEngage",
}
kirdy_get_status_report = {
"device_cmd": "GetStatusReport",
}
kirdy_report_mode_cmd = {
"device_cmd": "SetActiveReportMode",
"data_bool": True,
}
temp_adc_config = {
"thermostat_cmd": "ConfigTempAdcFilter",
"temp_adc_filter": {
"filter_type": "Sinc5Sinc1With50hz60HzRejection",
"sinc5sinc1postfilter": "F16SPS"
}
}
def send_cmd(input, socket):
socket.send(bytes(json.dumps(input), "UTF-8"))
# Give some time for Kirdy to process the cmd
time.sleep(0.5)
def poll_status_report(socket):
while True:
data = socket.recv(1024).decode('utf8')
try:
json_data = json.loads(data)
break
except json.decoder.JSONDecodeError:
pass
return json_data
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def signal_handler(sig, frame):
ld_cmd["data_f32"] = 0.0
send_cmd(ld_cmd, s)
send_cmd(tec_power_down, s)
s.close()
exit()
signal.signal(signal.SIGINT, signal_handler)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
send_cmd(ld_cmd, s)
send_cmd(tec_power_down, s)
send_cmd(tec_set_sh_t0_cmd, s)
send_cmd(tec_set_sh_r0_cmd, s)
send_cmd(tec_set_sh_beta_cmd, s)
send_cmd(tec_set_temperature_setpoint_cmd, s)
send_cmd(tec_set_pid_kp_cmd, s)
send_cmd(tec_set_pid_ki_cmd, s)
send_cmd(tec_set_pid_kd_cmd, s)
send_cmd(tec_clear_alarm, s)
send_cmd(temp_adc_config, s)
send_cmd(tec_pid_engage, s)
send_cmd(tec_power_up, s)
send_cmd(kirdy_report_mode_cmd, s)
while True:
status_report = poll_status_report(s)
print(f"Ts: {status_report['ts']} | Temperature: {status_report['tec']['temperature'] - 273.15}")