Formatting

This commit is contained in:
atse 2024-07-03 14:40:13 +08:00
parent c804c7bedb
commit f19809c852
13 changed files with 160 additions and 95 deletions

3
pytec/.flake8 Normal file
View File

@ -0,0 +1,3 @@
[flake8]
max-line-length = 88
extend-ignore = E203,E701

View File

@ -12,32 +12,33 @@ from pytec.client import Client
class PIDAutotuneState(Enum):
STATE_OFF = 'off'
STATE_RELAY_STEP_UP = 'relay step up'
STATE_RELAY_STEP_DOWN = 'relay step down'
STATE_SUCCEEDED = 'succeeded'
STATE_FAILED = 'failed'
STATE_READY = 'ready'
STATE_OFF = "off"
STATE_RELAY_STEP_UP = "relay step up"
STATE_RELAY_STEP_DOWN = "relay step down"
STATE_SUCCEEDED = "succeeded"
STATE_FAILED = "failed"
STATE_READY = "ready"
class PIDAutotune:
PIDParams = namedtuple('PIDParams', ['Kp', 'Ki', 'Kd'])
PIDParams = namedtuple("PIDParams", ["Kp", "Ki", "Kd"])
PEAK_AMPLITUDE_TOLERANCE = 0.05
_tuning_rules = {
"ziegler-nichols": [0.6, 1.2, 0.075],
"tyreus-luyben": [0.4545, 0.2066, 0.07214],
"tyreus-luyben": [0.4545, 0.2066, 0.07214],
"ciancone-marlin": [0.303, 0.1364, 0.0481],
"pessen-integral": [0.7, 1.75, 0.105],
"some-overshoot": [0.333, 0.667, 0.111],
"no-overshoot": [0.2, 0.4, 0.0667]
"some-overshoot": [0.333, 0.667, 0.111],
"no-overshoot": [0.2, 0.4, 0.0667],
}
def __init__(self, setpoint, out_step=10, lookback=60,
noiseband=0.5, sampletime=1.2):
def __init__(
self, setpoint, out_step=10, lookback=60, noiseband=0.5, sampletime=1.2
):
if setpoint is None:
raise ValueError('setpoint must be specified')
raise ValueError("setpoint must be specified")
self._inputs = deque(maxlen=round(lookback / sampletime))
self._setpoint = setpoint
@ -84,7 +85,7 @@ class PIDAutotune:
"""Get a list of all available tuning rules."""
return self._tuning_rules.keys()
def get_pid_parameters(self, tuning_rule='ziegler-nichols'):
def get_pid_parameters(self, tuning_rule="ziegler-nichols"):
"""Get PID parameters.
Args:
@ -97,7 +98,7 @@ class PIDAutotune:
kd = divisors[2] * self._Ku * self._Pu
return PIDAutotune.PIDParams(kp, ki, kd)
def get_tec_pid (self):
def get_tec_pid(self):
divisors = self._tuning_rules["tyreus-luyben"]
kp = self._Ku * divisors[0]
ki = divisors[1] * self._Ku / self._Pu
@ -116,28 +117,34 @@ class PIDAutotune:
"""
now = time_input * 1000
if (self._state == PIDAutotuneState.STATE_OFF
or self._state == PIDAutotuneState.STATE_SUCCEEDED
or self._state == PIDAutotuneState.STATE_FAILED
or self._state == PIDAutotuneState.STATE_READY):
if (
self._state == PIDAutotuneState.STATE_OFF
or self._state == PIDAutotuneState.STATE_SUCCEEDED
or self._state == PIDAutotuneState.STATE_FAILED
or self._state == PIDAutotuneState.STATE_READY
):
self._state = PIDAutotuneState.STATE_RELAY_STEP_UP
self._last_run_timestamp = now
# check input and change relay state if necessary
if (self._state == PIDAutotuneState.STATE_RELAY_STEP_UP
and input_val > self._setpoint + self._noiseband):
if (
self._state == PIDAutotuneState.STATE_RELAY_STEP_UP
and input_val > self._setpoint + self._noiseband
):
self._state = PIDAutotuneState.STATE_RELAY_STEP_DOWN
logging.debug('switched state: {0}'.format(self._state))
logging.debug('input: {0}'.format(input_val))
elif (self._state == PIDAutotuneState.STATE_RELAY_STEP_DOWN
and input_val < self._setpoint - self._noiseband):
logging.debug("switched state: {0}".format(self._state))
logging.debug("input: {0}".format(input_val))
elif (
self._state == PIDAutotuneState.STATE_RELAY_STEP_DOWN
and input_val < self._setpoint - self._noiseband
):
self._state = PIDAutotuneState.STATE_RELAY_STEP_UP
logging.debug('switched state: {0}'.format(self._state))
logging.debug('input: {0}'.format(input_val))
logging.debug("switched state: {0}".format(self._state))
logging.debug("input: {0}".format(input_val))
# set output
if (self._state == PIDAutotuneState.STATE_RELAY_STEP_UP):
if self._state == PIDAutotuneState.STATE_RELAY_STEP_UP:
self._output = self._initial_output - self._outputstep
elif self._state == PIDAutotuneState.STATE_RELAY_STEP_DOWN:
self._output = self._initial_output + self._outputstep
@ -180,8 +187,8 @@ class PIDAutotune:
self._peak_count += 1
self._peaks.append(input_val)
self._peak_timestamps.append(now)
logging.debug('found peak: {0}'.format(input_val))
logging.debug('peak count: {0}'.format(self._peak_count))
logging.debug("found peak: {0}".format(input_val))
logging.debug("peak count: {0}".format(self._peak_count))
# check for convergence of induced oscillation
# convergence of amplitude assessed on last 4 peaks (1.5 cycles)
@ -191,20 +198,19 @@ class PIDAutotune:
abs_max = self._peaks[-2]
abs_min = self._peaks[-2]
for i in range(0, len(self._peaks) - 2):
self._induced_amplitude += abs(self._peaks[i]
- self._peaks[i+1])
self._induced_amplitude += abs(self._peaks[i] - self._peaks[i + 1])
abs_max = max(self._peaks[i], abs_max)
abs_min = min(self._peaks[i], abs_min)
self._induced_amplitude /= 6.0
# check convergence criterion for amplitude of induced oscillation
amplitude_dev = ((0.5 * (abs_max - abs_min)
- self._induced_amplitude)
/ self._induced_amplitude)
amplitude_dev = (
0.5 * (abs_max - abs_min) - self._induced_amplitude
) / self._induced_amplitude
logging.debug('amplitude: {0}'.format(self._induced_amplitude))
logging.debug('amplitude deviation: {0}'.format(amplitude_dev))
logging.debug("amplitude: {0}".format(self._induced_amplitude))
logging.debug("amplitude deviation: {0}".format(amplitude_dev))
if amplitude_dev < PIDAutotune.PEAK_AMPLITUDE_TOLERANCE:
self._state = PIDAutotuneState.STATE_SUCCEEDED
@ -218,25 +224,24 @@ class PIDAutotune:
if self._state == PIDAutotuneState.STATE_SUCCEEDED:
self._output = 0
logging.debug('peak finding successful')
logging.debug("peak finding successful")
# calculate ultimate gain
self._Ku = 4.0 * self._outputstep / \
(self._induced_amplitude * math.pi)
logging.debug('Ku: {0}'.format(self._Ku))
self._Ku = 4.0 * self._outputstep / (self._induced_amplitude * math.pi)
logging.debug("Ku: {0}".format(self._Ku))
# calculate ultimate period in seconds
period1 = self._peak_timestamps[3] - self._peak_timestamps[1]
period2 = self._peak_timestamps[4] - self._peak_timestamps[2]
self._Pu = 0.5 * (period1 + period2) / 1000.0
logging.debug('Pu: {0}'.format(self._Pu))
logging.debug("Pu: {0}".format(self._Pu))
for rule in self._tuning_rules:
params = self.get_pid_parameters(rule)
logging.debug('rule: {0}'.format(rule))
logging.debug('Kp: {0}'.format(params.Kp))
logging.debug('Ki: {0}'.format(params.Ki))
logging.debug('Kd: {0}'.format(params.Kd))
logging.debug("rule: {0}".format(rule))
logging.debug("Kp: {0}".format(params.Kp))
logging.debug("Ki: {0}".format(params.Ki))
logging.debug("Kd: {0}".format(params.Kd))
return True
return False
@ -263,16 +268,17 @@ def main():
data = next(tec.report_mode())
ch = data[channel]
tuner = PIDAutotune(target_temperature, output_step,
lookback, noiseband, ch['interval'])
tuner = PIDAutotune(
target_temperature, output_step, lookback, noiseband, ch["interval"]
)
for data in tec.report_mode():
ch = data[channel]
temperature = ch['temperature']
temperature = ch["temperature"]
if (tuner.run(temperature, ch['time'])):
if tuner.run(temperature, ch["time"]):
break
tuner_out = tuner.output()

View File

@ -1,9 +1,10 @@
import asyncio
from pytec.aioclient import Client
async def main():
tec = Client()
await tec.start_session() #(host="192.168.1.26", port=23)
await tec.start_session() # (host="192.168.1.26", port=23)
await tec.set_param("s-h", 1, "t0", 20)
print(await tec.get_pwm())
print(await tec.get_pid())
@ -13,4 +14,5 @@ async def main():
async for data in tec.report_mode():
print(data)
asyncio.run(main())

View File

@ -1,6 +1,6 @@
from pytec.client import Client
tec = Client() #(host="localhost", port=6667)
tec = Client() # (host="localhost", port=6667)
tec.set_param("s-h", 1, "t0", 20)
print(tec.get_pwm())
print(tec.get_pid())

View File

@ -7,9 +7,10 @@ from pytec.client import Client
TIME_WINDOW = 300.0
tec = Client()
target_temperature = tec.get_pid()[0]['target']
target_temperature = tec.get_pid()[0]["target"]
print("Channel 0 target temperature: {:.3f}".format(target_temperature))
class Series:
def __init__(self, conv=lambda x: x):
self.conv = conv
@ -26,25 +27,27 @@ class Series:
drop += 1
self.x_data = self.x_data[drop:]
self.y_data = self.y_data[drop:]
series = {
# 'adc': Series(),
# 'sens': Series(lambda x: x * 0.0001),
'temperature': Series(),
"temperature": Series(),
# 'i_set': Series(),
'pid_output': Series(),
"pid_output": Series(),
# 'vref': Series(),
# 'dac_value': Series(),
# 'dac_feedback': Series(),
# 'i_tec': Series(),
'tec_i': Series(),
'tec_u_meas': Series(),
"tec_i": Series(),
"tec_u_meas": Series(),
# 'interval': Series(),
}
series_lock = Lock()
quit = False
def recv_data(tec):
global last_packet_time
for data in tec.report_mode():
@ -55,25 +58,27 @@ def recv_data(tec):
if k in ch0:
v = ch0[k]
if type(v) is float:
s.append(ch0['time'], v)
s.append(ch0["time"], v)
finally:
series_lock.release()
if quit:
break
thread = Thread(target=recv_data, args=(tec,))
thread.start()
fig, ax = plt.subplots()
for k, s in series.items():
s.plot, = ax.plot([], [], label=k)
(s.plot,) = ax.plot([], [], label=k)
legend = ax.legend()
def animate(i):
min_x, max_x, min_y, max_y = None, None, None, None
series_lock.acquire()
try:
for k, s in series.items():
@ -120,8 +125,8 @@ def animate(i):
legend.remove()
legend = ax.legend()
ani = animation.FuncAnimation(
fig, animate, interval=1, blit=False, save_count=50)
ani = animation.FuncAnimation(fig, animate, interval=1, blit=False, save_count=50)
plt.show()
quit = True

View File

@ -16,3 +16,6 @@ tec_qt = "tec_qt:main"
[tool.setuptools]
packages.find = {}
py-modules = ["autotune", "plot", "tec_qt"]
[tool.pylint.format]
max-line-length = "88"

View File

@ -2,12 +2,15 @@ import asyncio
import json
import logging
class CommandError(Exception):
pass
class StoppedConnecting(Exception):
pass
class Client:
def __init__(self):
self._reader = None
@ -17,7 +20,7 @@ class Client:
self._report_mode_on = False
self.timeout = None
async def start_session(self, host='192.168.1.26', port=23, timeout=None):
async def start_session(self, host="192.168.1.26", port=23, timeout=None):
"""Start session to Thermostat at specified host and port.
Throws StoppedConnecting if disconnect was called while connecting.
Throws asyncio.TimeoutError if timeout was exceeded.
@ -69,15 +72,21 @@ class Client:
for pwm_channel in pwm_report:
for limit in ["max_i_neg", "max_i_pos", "max_v"]:
if pwm_channel[limit]["value"] == 0.0:
logging.warning("`{}` limit is set to zero on channel {}".format(limit, pwm_channel["channel"]))
logging.warning(
"`{}` limit is set to zero on channel {}".format(
limit, pwm_channel["channel"]
)
)
async def _read_line(self):
# read 1 line
chunk = await asyncio.wait_for(self._reader.readline(), self.timeout) # Only wait for response until timeout
return chunk.decode('utf-8', errors='ignore')
chunk = await asyncio.wait_for(
self._reader.readline(), self.timeout
) # Only wait for response until timeout
return chunk.decode("utf-8", errors="ignore")
async def _read_write(self, command):
self._writer.write(((" ".join(command)).strip() + "\n").encode('utf-8'))
self._writer.write(((" ".join(command)).strip() + "\n").encode("utf-8"))
await self._writer.drain()
return await self._read_line()
@ -244,7 +253,7 @@ class Client:
"""Load current configuration from EEPROM"""
await self._command("load", str(channel))
if channel == "":
await self._read_line() # Read the extra {}
await self._read_line() # Read the extra {}
async def hw_rev(self):
"""Get Thermostat hardware revision"""
@ -252,28 +261,28 @@ class Client:
async def reset(self):
"""Reset the Thermostat
The client is disconnected as the TCP session is terminated.
"""
async with self._command_lock:
self._writer.write("reset\n".encode('utf-8'))
self._writer.write("reset\n".encode("utf-8"))
await self._writer.drain()
await self.end_session()
async def dfu(self):
"""Put the Thermostat in DFU update mode
The client is disconnected as the Thermostat stops responding to
TCP commands in DFU update mode. The only way to exit it is by
power-cycling.
"""
async with self._command_lock:
self._writer.write("dfu\n".encode('utf-8'))
self._writer.write("dfu\n".encode("utf-8"))
await self._writer.drain()
await self.end_session()
async def ipv4(self):
"""Get the IPv4 settings of the Thermostat"""
return await self._command('ipv4')
return await self._command("ipv4")

View File

@ -2,9 +2,11 @@ import socket
import json
import logging
class CommandError(Exception):
pass
class Client:
def __init__(self, host="192.168.1.26", port=23, timeout=None):
self._socket = socket.create_connection((host, port), timeout)
@ -20,7 +22,11 @@ class Client:
for pwm_channel in pwm_report:
for limit in ["max_i_neg", "max_i_pos", "max_v"]:
if pwm_channel[limit]["value"] == 0.0:
logging.warning("`{}` limit is set to zero on channel {}".format(limit, pwm_channel["channel"]))
logging.warning(
"`{}` limit is set to zero on channel {}".format(
limit, pwm_channel["channel"]
)
)
def _read_line(self):
# read more lines
@ -28,7 +34,7 @@ class Client:
chunk = self._socket.recv(4096)
if not chunk:
return None
buf = self._lines[-1] + chunk.decode('utf-8', errors='ignore')
buf = self._lines[-1] + chunk.decode("utf-8", errors="ignore")
self._lines = buf.split("\n")
line = self._lines[0]
@ -36,7 +42,7 @@ class Client:
return line
def _command(self, *command):
self._socket.sendall((" ".join(command) + "\n").encode('utf-8'))
self._socket.sendall((" ".join(command) + "\n").encode("utf-8"))
line = self._read_line()
response = json.loads(line)

View File

@ -48,7 +48,11 @@ class PIDAutoTuner(QObject):
ch = channel_report["channel"]
match self.autotuners[ch].state():
case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN:
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
self.autotuners[ch].run(
channel_report["temperature"], channel_report["time"]
)

View File

@ -6,7 +6,7 @@ class InfoBox(QtWidgets.QMessageBox):
def __init__(self):
super().__init__()
self.setIcon(QtWidgets.QMessageBox.Icon.Information)
@pyqtSlot(str, str)
def display_info_box(self, title, text):
self.setWindowTitle(title)

View File

@ -67,9 +67,7 @@ class _TecGraphs:
self._t_line = self._t_widget.getPlotItem().addLine(label="{value} °C")
self._t_line.setVisible(False)
# Hack for keeping setpoint line in plot range
self._t_setpoint_plot = (
LiveLinePlot()
)
self._t_setpoint_plot = LiveLinePlot()
for graph in t_widget, i_widget:
time_axis = LiveAxis(

View File

@ -67,16 +67,30 @@ class QtWaitingSpinner(QWidget):
painter.setPen(Qt.PenStyle.NoPen)
for i in range(0, self._numberOfLines):
painter.save()
painter.translate(self._innerRadius + self._lineLength, self._innerRadius + self._lineLength)
painter.translate(
self._innerRadius + self._lineLength,
self._innerRadius + self._lineLength,
)
rotateAngle = float(360 * i) / float(self._numberOfLines)
painter.rotate(rotateAngle)
painter.translate(self._innerRadius, 0)
distance = self.lineCountDistanceFromPrimary(i, self._currentCounter, self._numberOfLines)
color = self.currentLineColor(distance, self._numberOfLines, self._trailFadePercentage,
self._minimumTrailOpacity, self._color)
distance = self.lineCountDistanceFromPrimary(
i, self._currentCounter, self._numberOfLines
)
color = self.currentLineColor(
distance,
self._numberOfLines,
self._trailFadePercentage,
self._minimumTrailOpacity,
self._color,
)
painter.setBrush(color)
painter.drawRoundedRect(QRect(0, int(-self._lineWidth / 2), self._lineLength, self._lineWidth), self._roundness,
self._roundness, Qt.SizeMode.RelativeSize)
painter.drawRoundedRect(
QRect(0, int(-self._lineWidth / 2), self._lineLength, self._lineWidth),
self._roundness,
self._roundness,
Qt.SizeMode.RelativeSize,
)
painter.restore()
def start(self):
@ -160,7 +174,9 @@ class QtWaitingSpinner(QWidget):
self.setFixedSize(self.size, self.size)
def updateTimer(self):
self._timer.setInterval(int(1000 / (self._numberOfLines * self._revolutionsPerSecond)))
self._timer.setInterval(
int(1000 / (self._numberOfLines * self._revolutionsPerSecond))
)
def lineCountDistanceFromPrimary(self, current, primary, totalNrOfLines):
distance = primary - current
@ -168,7 +184,9 @@ class QtWaitingSpinner(QWidget):
distance += totalNrOfLines
return distance
def currentLineColor(self, countDistance, totalNrOfLines, trailFadePerc, minOpacity, colorinput):
def currentLineColor(
self, countDistance, totalNrOfLines, trailFadePerc, minOpacity, colorinput
):
color = QColor(colorinput)
if countDistance == 0:
return color
@ -186,7 +204,7 @@ class QtWaitingSpinner(QWidget):
return color
if __name__ == '__main__':
if __name__ == "__main__":
app = QApplication([])
waiting_spinner = QtWaitingSpinner()
waiting_spinner.show()

View File

@ -299,7 +299,11 @@ class MainWindow(QtWidgets.QMainWindow):
case PIDAutotuneState.STATE_OFF | PIDAutotuneState.STATE_FAILED:
self.autotuners.load_params_and_set_ready(ch)
case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN:
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
await self.autotuners.stop_pid_from_running(ch)
# To Update the UI elements
self.pid_autotune_handler([])
@ -313,7 +317,11 @@ class MainWindow(QtWidgets.QMainWindow):
self.ctrl_panel_view.change_params_title(
ch, ("pid", "pid_autotune", "run_pid"), "Run"
)
case PIDAutotuneState.STATE_READY | PIDAutotuneState.STATE_RELAY_STEP_UP | PIDAutotuneState.STATE_RELAY_STEP_DOWN:
case (
PIDAutotuneState.STATE_READY
| PIDAutotuneState.STATE_RELAY_STEP_UP
| PIDAutotuneState.STATE_RELAY_STEP_DOWN
):
self.ctrl_panel_view.change_params_title(
ch, ("pid", "pid_autotune", "run_pid"), "Stop"
)
@ -328,7 +336,8 @@ class MainWindow(QtWidgets.QMainWindow):
case PIDAutotuneState.STATE_FAILED:
self.info_box.display_info_box(
"PID Autotune Failed", f"Channel {ch} PID Autotune has failed."
"PID Autotune Failed",
f"Channel {ch} PID Autotune has failed.",
)
self.info_box.show()
@ -410,7 +419,9 @@ async def coro_main():
app = QtWidgets.QApplication.instance()
app.aboutToQuit.connect(app_quit_event.set)
app.setWindowIcon(
QtGui.QIcon(str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico")))
QtGui.QIcon(
str(importlib.resources.files("pytec.gui.resources").joinpath("artiq.ico"))
)
)
main_window = MainWindow(args)