import math import logging from collections import deque, namedtuple from enum import Enum import socket import json import time import signal from driver.kirdy import Kirdy, FilterConfig import asyncio from sipyco.asyncio_tools import SignalHandler # Based on hirshmann pid-autotune libiary # See https://github.com/hirschmann/pid-autotune # Which is in turn based on a fork of Arduino PID AutoTune Library # See https://github.com/t0mpr1c3/Arduino-PID-AutoTune-Library class PIDAutotuneState(Enum): STATE_OFF = 'off' STATE_RELAY_STEP_UP = 'relay step up' STATE_RELAY_STEP_DOWN = 'relay step down' STATE_SUCCEEDED = 'succeeded' STATE_FAILED = 'failed' STATE_READY = 'ready' class PIDAutotune: PIDParams = namedtuple('PIDParams', ['Kp', 'Ki', 'Kd']) PEAK_AMPLITUDE_TOLERANCE = 0.05 _tuning_rules = { "ziegler-nichols": [0.6, 1.2, 0.075], "tyreus-luyben": [0.4545, 0.2066, 0.07214], "ciancone-marlin": [0.303, 0.1364, 0.0481], "pessen-integral": [0.7, 1.75, 0.105], "some-overshoot": [0.333, 0.667, 0.111], "no-overshoot": [0.2, 0.4, 0.0667] } def __init__(self, setpoint, out_step=10, lookback=60, noiseband=0.5, sampletime=1.2): if setpoint is None: raise ValueError('setpoint must be specified') self._inputs = deque(maxlen=round(lookback / sampletime)) self._setpoint = setpoint self._outputstep = out_step self._noiseband = noiseband self._out_min = -out_step self._out_max = out_step self._state = PIDAutotuneState.STATE_OFF self._peak_timestamps = deque(maxlen=5) self._peaks = deque(maxlen=5) self._output = 0 self._last_run_timestamp = 0 self._peak_type = 0 self._peak_count = 0 self._initial_output = 0 self._induced_amplitude = 0 self._Ku = 0 self._Pu = 0 def setParam(self, target, step, noiseband, sampletime, lookback): self._setpoint = target self._outputstep = step self._out_max = step self._out_min = -step self._noiseband = noiseband self._inputs = deque(maxlen=round(lookback / sampletime)) def setReady(self): self._state = PIDAutotuneState.STATE_READY self._peak_count = 0 def setOff(self): self._state = PIDAutotuneState.STATE_OFF def setFailed(self): self._state = PIDAutotuneState.STATE_FAILED self._peak_count = 30 def state(self): """Get the current state.""" return self._state def output(self): """Get the last output value.""" return self._output def tuning_rules(self): """Get a list of all available tuning rules.""" return self._tuning_rules.keys() def get_tec_pid (self): divisors = self._tuning_rules["tyreus-luyben"] kp = self._Ku * divisors[0] ki = divisors[1] * self._Ku / self._Pu kd = divisors[2] * self._Ku * self._Pu return kp, ki, kd def get_pid_parameters(self, tuning_rule='ziegler-nichols'): """Get PID parameters. Args: tuning_rule (str): Sets the rule which should be used to calculate the parameters. """ divisors = self._tuning_rules[tuning_rule] kp = self._Ku * divisors[0] ki = divisors[1] * self._Ku / self._Pu kd = divisors[2] * self._Ku * self._Pu return PIDAutotune.PIDParams(kp, ki, kd) def run(self, input_val, time_input): """To autotune a system, this method must be called periodically. Args: input_val (float): The temperature input value. time_input (float): Current time in seconds. Returns: `true` if tuning is finished, otherwise `false`. """ now = time_input * 1000 if (self._state == PIDAutotuneState.STATE_OFF or self._state == PIDAutotuneState.STATE_SUCCEEDED or self._state == PIDAutotuneState.STATE_FAILED or self._state == PIDAutotuneState.STATE_READY): self._state = PIDAutotuneState.STATE_RELAY_STEP_UP self._last_run_timestamp = now # check input and change relay state if necessary if (self._state == PIDAutotuneState.STATE_RELAY_STEP_UP and input_val > self._setpoint + self._noiseband): self._state = PIDAutotuneState.STATE_RELAY_STEP_DOWN logging.debug('switched state: {0}'.format(self._state)) logging.debug('input: {0}'.format(input_val)) elif (self._state == PIDAutotuneState.STATE_RELAY_STEP_DOWN and input_val < self._setpoint - self._noiseband): self._state = PIDAutotuneState.STATE_RELAY_STEP_UP logging.debug('switched state: {0}'.format(self._state)) logging.debug('input: {0}'.format(input_val)) # set output if (self._state == PIDAutotuneState.STATE_RELAY_STEP_UP): self._output = self._initial_output - self._outputstep elif self._state == PIDAutotuneState.STATE_RELAY_STEP_DOWN: self._output = self._initial_output + self._outputstep # respect output limits self._output = min(self._output, self._out_max) self._output = max(self._output, self._out_min) # identify peaks is_max = True is_min = True for val in self._inputs: is_max = is_max and (input_val >= val) is_min = is_min and (input_val <= val) self._inputs.append(input_val) # we don't trust the maxes or mins until the input array is full if len(self._inputs) < self._inputs.maxlen: return False # increment peak count and record peak time for maxima and minima inflection = False # peak types: # -1: minimum # +1: maximum if is_max: if self._peak_type == -1: inflection = True self._peak_type = 1 elif is_min: if self._peak_type == 1: inflection = True self._peak_type = -1 # update peak times and values if inflection: self._peak_count += 1 self._peaks.append(input_val) self._peak_timestamps.append(now) logging.debug('found peak: {0}'.format(input_val)) logging.debug('peak count: {0}'.format(self._peak_count)) # check for convergence of induced oscillation # convergence of amplitude assessed on last 4 peaks (1.5 cycles) self._induced_amplitude = 0 if inflection and (self._peak_count > 4): abs_max = self._peaks[-2] abs_min = self._peaks[-2] for i in range(0, len(self._peaks) - 2): self._induced_amplitude += abs(self._peaks[i] - self._peaks[i+1]) abs_max = max(self._peaks[i], abs_max) abs_min = min(self._peaks[i], abs_min) self._induced_amplitude /= 6.0 # check convergence criterion for amplitude of induced oscillation amplitude_dev = ((0.5 * (abs_max - abs_min) - self._induced_amplitude) / self._induced_amplitude) logging.debug('amplitude: {0}'.format(self._induced_amplitude)) logging.debug('amplitude deviation: {0}'.format(amplitude_dev)) if amplitude_dev < PIDAutotune.PEAK_AMPLITUDE_TOLERANCE: self._state = PIDAutotuneState.STATE_SUCCEEDED # if the autotune has not already converged # terminate after 10 cycles if self._peak_count >= 20: self._output = 0 self._state = PIDAutotuneState.STATE_FAILED return True if self._state == PIDAutotuneState.STATE_SUCCEEDED: self._output = 0 logging.debug('peak finding successful') # calculate ultimate gain self._Ku = 4.0 * self._outputstep / \ (self._induced_amplitude * math.pi) print('Ku: {0}'.format(self._Ku)) # calculate ultimate period in seconds period1 = self._peak_timestamps[3] - self._peak_timestamps[1] period2 = self._peak_timestamps[4] - self._peak_timestamps[2] self._Pu = 0.5 * (period1 + period2) / 1000.0 print('Pu: {0}'.format(self._Pu)) for rule in self._tuning_rules: params = self.get_pid_parameters(rule) print('rule: {0}'.format(rule)) print('Kp: {0}'.format(params.Kp)) print('Ki: {0}'.format(params.Ki)) print('Kd: {0}'.format(params.Kd)) return True return False async def main(): """ PID AutoTune Tools for Kirdy The obtained temperature works best at the target temperature specified. Before running PID AutoTune, please 1. Secure the laser diode onto the LD adapter and copper heat sink 2. Make sure Kirdy has warmed up and reached thermal equilibrium state In case of PID Autotune Failure, you can 1. Run the PID Autotune again 2. Or increase the lookback period 3. Or increase the sampling rate """ # Target temperature of the autotune routine, celsius target_temperature = 20 # Value by which output will be increased/decreased from zero, amps output_step = 1 # Reference period for local minima/maxima, seconds lookback = 2.0 # Determines by how much the input value must # overshoot/undershoot the setpoint, celsius noiseband = 2.0 kirdy = Kirdy() kirdy.start_session(host='192.168.1.126', port=1337) await kirdy.wait_until_connected() while not(kirdy.connected()): pass await kirdy.laser.set_power_on(False) await kirdy.laser.set_i(0) await kirdy.thermostat.set_power_on(False) await kirdy.thermostat.set_constant_current_control_mode() await kirdy.thermostat.set_tec_i_out(0) await kirdy.thermostat.clear_alarm() signal_handler = SignalHandler() signal_handler.setup() async def sig_handling(): await signal_handler.wait_terminate() tuner.setFailed() asyncio.create_task(sig_handling()) await kirdy.device.set_active_report_mode(False) # Configure the Thermistor Parameters await kirdy.thermostat.set_sh_beta(3950) await kirdy.thermostat.set_sh_r0(10.0 * 1000) await kirdy.thermostat.set_sh_t0(25) # Set a large enough temperature range so that it won't trigger overtemperature protection await kirdy.thermostat.set_temp_mon_upper_limit(target_temperature + 20) await kirdy.thermostat.set_temp_mon_lower_limit(target_temperature - 20) await kirdy.thermostat.set_tec_max_cooling_i(output_step) await kirdy.thermostat.set_tec_max_heating_i(output_step) # The Polling Rate of Temperature Adc is equal to the PID Update Interval await kirdy.thermostat.config_temp_adc_filter(FilterConfig.Sinc5Sinc1With50hz60HzRejection.f16sps) settings = await kirdy.device.get_settings_summary() sampling_rate = settings["thermostat"]["temp_adc_settings"]["rate"] print("Settings: {0}".format(settings)) tuner = PIDAutotune(target_temperature, output_step, lookback, noiseband, 1/sampling_rate) await kirdy.thermostat.set_power_on(True) while True: status_report = await kirdy.device.get_status_report() temperature = status_report["thermostat"]["temperature"] ts = status_report['ts'] print("Ts: {0} Current Temperature: {1} degree".format(ts, temperature)) if (tuner.run(temperature, ts / 1000.0)): print(tuner._state) break tuner_out = tuner.output() await kirdy.thermostat.set_tec_i_out(float(tuner_out)) await kirdy.thermostat.set_tec_i_out(0) await kirdy.thermostat.set_power_on(False) if tuner.state() == PIDAutotuneState.STATE_SUCCEEDED: pid_params = tuner.get_pid_parameters(tuning_rule="tyreus-luyben") await kirdy.thermostat.set_pid_kp(pid_params.Kp) await kirdy.thermostat.set_pid_ki(pid_params.Ki) await kirdy.thermostat.set_pid_kd(pid_params.Kd) await kirdy.thermostat.set_pid_output_max(1.0) await kirdy.thermostat.set_pid_output_min(1.0) await kirdy.end_session() if __name__ == "__main__": asyncio.run(main())