pytec: __name__ check, examples does not run when pytec module is imported, change autotune state management to use enum

This commit is contained in:
topquark12 2021-01-04 10:54:04 +08:00
parent d1f8a4761b
commit ab305d5fa5
3 changed files with 175 additions and 164 deletions

View File

@ -3,35 +3,41 @@ import logging
from time import time from time import time
from collections import deque, namedtuple from collections import deque, namedtuple
from pytec.client import Client from pytec.client import Client
from enum import Enum
# Based on hirshmann pid-autotune libiary # Based on hirshmann pid-autotune libiary
# See https://github.com/hirschmann/pid-autotune # See https://github.com/hirschmann/pid-autotune
# Which is in turn based on a fork of Arduino PID AutoTune Library # Which is in turn based on a fork of Arduino PID AutoTune Library
# See https://github.com/t0mpr1c3/Arduino-PID-AutoTune-Library # See https://github.com/t0mpr1c3/Arduino-PID-AutoTune-Library
# Auto tune parameters if __name__ == "__main__":
# Thermostat channel
channel = 0
# Target temperature of the autotune routine, celcius
target_temperature = 30
# Value by which output will be increased/decreased from zero, amps
output_step = 1
# Reference period for local minima/maxima, seconds
lookback = 3
# Determines by how much the input value must overshoot/undershoot the setpoint, celcius
noiseband = 1.5
class PIDAutotune(object):
PIDParams = namedtuple('PIDParams', ['Kp', 'Ki', 'Kd']) # Auto tune parameters
# Thermostat channel
channel = 0
# Target temperature of the autotune routine, celcius
target_temperature = 30
# Value by which output will be increased/decreased from zero, amps
output_step = 1
# Reference period for local minima/maxima, seconds
lookback = 3
# Determines by how much the input value must overshoot/undershoot the setpoint, celcius
noiseband = 1.5
PEAK_AMPLITUDE_TOLERANCE = 0.05 class PIDAutotuneState(Enum):
STATE_OFF = 'off' STATE_OFF = 'off'
STATE_RELAY_STEP_UP = 'relay step up' STATE_RELAY_STEP_UP = 'relay step up'
STATE_RELAY_STEP_DOWN = 'relay step down' STATE_RELAY_STEP_DOWN = 'relay step down'
STATE_SUCCEEDED = 'succeeded' STATE_SUCCEEDED = 'succeeded'
STATE_FAILED = 'failed' STATE_FAILED = 'failed'
class PIDAutotune():
PIDParams = namedtuple('PIDParams', ['Kp', 'Ki', 'Kd'])
PEAK_AMPLITUDE_TOLERANCE = 0.05
_tuning_rules = { _tuning_rules = {
"ziegler-nichols": [0.6, 1.2, 0.075], "ziegler-nichols": [0.6, 1.2, 0.075],
"tyreus-luyben": [0.4545, 0.2066, 0.07214], "tyreus-luyben": [0.4545, 0.2066, 0.07214],
@ -62,7 +68,7 @@ class PIDAutotune(object):
self._noiseband = noiseband self._noiseband = noiseband
self._out_min = -out_step self._out_min = -out_step
self._out_max = out_step self._out_max = out_step
self._state = PIDAutotune.STATE_OFF self._state = PIDAutotuneState.STATE_OFF
self._peak_timestamps = deque(maxlen=5) self._peak_timestamps = deque(maxlen=5)
self._peaks = deque(maxlen=5) self._peaks = deque(maxlen=5)
self._output = 0 self._output = 0
@ -115,30 +121,30 @@ class PIDAutotune(object):
""" """
now = time_input * 1000 now = time_input * 1000
if (self._state == PIDAutotune.STATE_OFF if (self._state == PIDAutotuneState.STATE_OFF
or self._state == PIDAutotune.STATE_SUCCEEDED or self._state == PIDAutotuneState.STATE_SUCCEEDED
or self._state == PIDAutotune.STATE_FAILED): or self._state == PIDAutotuneState.STATE_FAILED):
self._initTuner(input_val, now) self._initTuner(input_val, now)
self._last_run_timestamp = now self._last_run_timestamp = now
# print("temp : ", input_val) # print("temp : ", input_val)
# check input and change relay state if necessary # check input and change relay state if necessary
if (self._state == PIDAutotune.STATE_RELAY_STEP_UP if (self._state == PIDAutotuneState.STATE_RELAY_STEP_UP
and input_val > self._setpoint + self._noiseband): and input_val > self._setpoint + self._noiseband):
self._state = PIDAutotune.STATE_RELAY_STEP_DOWN self._state = PIDAutotuneState.STATE_RELAY_STEP_DOWN
logging.debug('switched state: {0}'.format(self._state)) logging.debug('switched state: {0}'.format(self._state))
logging.debug('input: {0}'.format(input_val)) logging.debug('input: {0}'.format(input_val))
elif (self._state == PIDAutotune.STATE_RELAY_STEP_DOWN elif (self._state == PIDAutotuneState.STATE_RELAY_STEP_DOWN
and input_val < self._setpoint - self._noiseband): and input_val < self._setpoint - self._noiseband):
self._state = PIDAutotune.STATE_RELAY_STEP_UP self._state = PIDAutotuneState.STATE_RELAY_STEP_UP
logging.debug('switched state: {0}'.format(self._state)) logging.debug('switched state: {0}'.format(self._state))
logging.debug('input: {0}'.format(input_val)) logging.debug('input: {0}'.format(input_val))
# set output # set output
if (self._state == PIDAutotune.STATE_RELAY_STEP_UP): if (self._state == PIDAutotuneState.STATE_RELAY_STEP_UP):
self._output = self._initial_output + self._outputstep self._output = self._initial_output + self._outputstep
elif self._state == PIDAutotune.STATE_RELAY_STEP_DOWN: elif self._state == PIDAutotuneState.STATE_RELAY_STEP_DOWN:
self._output = self._initial_output - self._outputstep self._output = self._initial_output - self._outputstep
# respect output limits # respect output limits
@ -204,17 +210,17 @@ class PIDAutotune(object):
logging.debug('amplitude deviation: {0}'.format(amplitude_dev)) logging.debug('amplitude deviation: {0}'.format(amplitude_dev))
if amplitude_dev < PIDAutotune.PEAK_AMPLITUDE_TOLERANCE: if amplitude_dev < PIDAutotune.PEAK_AMPLITUDE_TOLERANCE:
self._state = PIDAutotune.STATE_SUCCEEDED self._state = PIDAutotuneState.STATE_SUCCEEDED
# logging.debug('peak finding succeeded') # logging.debug('peak finding succeeded')
# if the autotune has not already converged # if the autotune has not already converged
# terminate after 10 cycles # terminate after 10 cycles
if self._peak_count >= 20: if self._peak_count >= 20:
self._output = 0 self._output = 0
self._state = PIDAutotune.STATE_FAILED self._state = PIDAutotuneState.STATE_FAILED
return True return True
if self._state == PIDAutotune.STATE_SUCCEEDED: if self._state == PIDAutotuneState.STATE_SUCCEEDED:
self._output = 0 self._output = 0
logging.debug('peak finding successful') logging.debug('peak finding successful')
@ -250,33 +256,34 @@ class PIDAutotune(object):
self._peaks.clear() self._peaks.clear()
self._peak_timestamps.clear() self._peak_timestamps.clear()
self._peak_timestamps.append(timestamp) self._peak_timestamps.append(timestamp)
self._state = PIDAutotune.STATE_RELAY_STEP_UP self._state = PIDAutotuneState.STATE_RELAY_STEP_UP
# logging.basicConfig(level=logging.DEBUG) if __name__ == "__main__":
tec = Client() #(host="localhost", port=6667) # logging.basicConfig(level=logging.DEBUG)
data = next(tec.report_mode()) tec = Client() #(host="localhost", port=6667)
ch = data[channel]
tuner = PIDAutotune(target_temperature, output_step, lookback, noiseband, ch['interval']) data = next(tec.report_mode())
ch = data[channel]
for data in tec.report_mode(): tuner = PIDAutotune(target_temperature, output_step, lookback, noiseband, ch['interval'])
try: for data in tec.report_mode():
ch = data[channel]
temperature = ch['temperature'] try:
ch = data[channel]
if (tuner.run(temperature, ch['time'])): temperature = ch['temperature']
# logging.debug('true')
break
tunerOut = tuner.output() if (tuner.run(temperature, ch['time'])):
break
tec.set_param("pwm", channel, "i_set" , tunerOut) tunerOut = tuner.output()
except: tec.set_param("pwm", channel, "i_set" , tunerOut)
pass
tec.set_param("pwm", channel, "i_set" , channel) except:
pass
tec.set_param("pwm", channel, "i_set" , channel)

View File

@ -1,11 +1,13 @@
from pytec.client import Client if __name__ == "__main__":
tec = Client() #(host="localhost", port=6667) from pytec.client import Client
tec.set_param("s-h", 1, "t0", 20)
print(tec.get_pwm()) tec = Client() #(host="localhost", port=6667)
print(tec.get_pid()) tec.set_param("s-h", 1, "t0", 20)
print(tec.get_pwm()) print(tec.get_pwm())
print(tec.get_postfilter()) print(tec.get_pid())
print(tec.get_steinhart_hart()) print(tec.get_pwm())
for data in tec.report_mode(): print(tec.get_postfilter())
print(data) print(tec.get_steinhart_hart())
for data in tec.report_mode():
print(data)

View File

@ -4,125 +4,127 @@ import matplotlib.animation as animation
from threading import Thread, Lock from threading import Thread, Lock
from pytec.client import Client from pytec.client import Client
TIME_WINDOW = 300.0 if __name__ == "__main__":
tec = Client() TIME_WINDOW = 300.0
target_temperature = tec.get_pid()[0]['target']
print("Channel 0 target temperature: {:.3f}".format(target_temperature))
class Series: tec = Client()
def __init__(self, conv=lambda x: x): target_temperature = tec.get_pid()[0]['target']
self.conv = conv print("Channel 0 target temperature: {:.3f}".format(target_temperature))
self.x_data = []
self.y_data = []
def append(self, x, y): class Series:
self.x_data.append(x) def __init__(self, conv=lambda x: x):
self.y_data.append(self.conv(y)) self.conv = conv
self.x_data = []
self.y_data = []
def clip(self, min_x): def append(self, x, y):
drop = 0 self.x_data.append(x)
while drop < len(self.x_data) and self.x_data[drop] < min_x: self.y_data.append(self.conv(y))
drop += 1
self.x_data = self.x_data[drop:] def clip(self, min_x):
self.y_data = self.y_data[drop:] drop = 0
while drop < len(self.x_data) and self.x_data[drop] < min_x:
drop += 1
self.x_data = self.x_data[drop:]
self.y_data = self.y_data[drop:]
series = {
'adc': Series(),
'sens': Series(lambda x: x * 0.0001),
'temperature': Series(),
'i_set': Series(),
'pid_output': Series(),
'vref': Series(),
'dac_value': Series(),
'dac_feedback': Series(),
'i_tec': Series(),
'tec_i': Series(),
'tec_u_meas': Series(),
'interval': Series(),
}
series_lock = Lock()
quit = False
def recv_data(tec):
global last_packet_time
for data in tec.report_mode():
ch0 = data[0]
series_lock.acquire()
try:
for k, s in series.items():
if k in ch0:
v = ch0[k]
if type(v) is float:
s.append(ch0['time'], v)
finally:
series_lock.release()
if quit:
break
thread = Thread(target=recv_data, args=(tec,))
thread.start()
fig, ax = plt.subplots()
for k, s in series.items():
s.plot, = ax.plot([], [], label=k)
legend = ax.legend()
def animate(i):
min_x, max_x, min_y, max_y = None, None, None, None
series = {
'adc': Series(),
'sens': Series(lambda x: x * 0.0001),
'temperature': Series(lambda t: t - target_temperature),
'i_set': Series(),
'pid_output': Series(),
'vref': Series(),
'dac_value': Series(),
'dac_feedback': Series(),
'i_tec': Series(),
'tec_i': Series(),
'tec_u_meas': Series(),
'interval': Series(),
}
series_lock = Lock()
quit = False
def recv_data(tec):
global last_packet_time
for data in tec.report_mode():
ch0 = data[0]
series_lock.acquire() series_lock.acquire()
try: try:
for k, s in series.items(): for k, s in series.items():
if k in ch0: s.plot.set_data(s.x_data, s.y_data)
v = ch0[k] if len(s.y_data) > 0:
if type(v) is float: s.plot.set_label("{}: {:.3f}".format(k, s.y_data[-1]))
s.append(ch0['time'], v)
if len(s.x_data) > 0:
min_x_ = min(s.x_data)
if min_x is None:
min_x = min_x_
else:
min_x = min(min_x, min_x_)
max_x_ = max(s.x_data)
if max_x is None:
max_x = max_x_
else:
max_x = max(max_x, max_x_)
if len(s.y_data) > 0:
min_y_ = min(s.y_data)
if min_y is None:
min_y = min_y_
else:
min_y = min(min_y, min_y_)
max_y_ = max(s.y_data)
if max_y is None:
max_y = max_y_
else:
max_y = max(max_y, max_y_)
if min_x and max_x - TIME_WINDOW > min_x:
for s in series.values():
s.clip(max_x - TIME_WINDOW)
finally: finally:
series_lock.release() series_lock.release()
if quit: if min_x != max_x:
break ax.set_xlim(min_x, max_x)
if min_y != max_y:
margin_y = 0.01 * (max_y - min_y)
ax.set_ylim(min_y - margin_y, max_y + margin_y)
thread = Thread(target=recv_data, args=(tec,)) global legend
thread.start() legend.remove()
legend = ax.legend()
fig, ax = plt.subplots() ani = animation.FuncAnimation(
fig, animate, interval=1, blit=False, save_count=50)
for k, s in series.items(): plt.show()
s.plot, = ax.plot([], [], label=k) quit = True
legend = ax.legend() thread.join()
def animate(i):
min_x, max_x, min_y, max_y = None, None, None, None
series_lock.acquire()
try:
for k, s in series.items():
s.plot.set_data(s.x_data, s.y_data)
if len(s.y_data) > 0:
s.plot.set_label("{}: {:.3f}".format(k, s.y_data[-1]))
if len(s.x_data) > 0:
min_x_ = min(s.x_data)
if min_x is None:
min_x = min_x_
else:
min_x = min(min_x, min_x_)
max_x_ = max(s.x_data)
if max_x is None:
max_x = max_x_
else:
max_x = max(max_x, max_x_)
if len(s.y_data) > 0:
min_y_ = min(s.y_data)
if min_y is None:
min_y = min_y_
else:
min_y = min(min_y, min_y_)
max_y_ = max(s.y_data)
if max_y is None:
max_y = max_y_
else:
max_y = max(max_y, max_y_)
if min_x and max_x - TIME_WINDOW > min_x:
for s in series.values():
s.clip(max_x - TIME_WINDOW)
finally:
series_lock.release()
if min_x != max_x:
ax.set_xlim(min_x, max_x)
if min_y != max_y:
margin_y = 0.01 * (max_y - min_y)
ax.set_ylim(min_y - margin_y, max_y + margin_y)
global legend
legend.remove()
legend = ax.legend()
ani = animation.FuncAnimation(
fig, animate, interval=1, blit=False, save_count=50)
plt.show()
quit = True
thread.join()