forked from M-Labs/thermostat
78 lines
2.1 KiB
Python
78 lines
2.1 KiB
Python
import socket
|
|
import json
|
|
|
|
CHANNELS = 2
|
|
|
|
class Client:
|
|
def __init__(self, host="192.168.1.26", port=23, timeout=None):
|
|
self._socket = socket.create_connection((host, port), timeout)
|
|
self._lines = [""]
|
|
|
|
def _command(self, *command):
|
|
self._socket.sendall((" ".join(command) + "\n").encode('utf-8'))
|
|
|
|
def _read_line(self):
|
|
# read more lines
|
|
while len(self._lines) <= 1:
|
|
chunk = self._socket.recv(4096)
|
|
if not chunk:
|
|
return None
|
|
buf = self._lines[-1] + chunk.decode('utf-8', errors='ignore')
|
|
self._lines = buf.split("\n")
|
|
|
|
line = self._lines[0]
|
|
self._lines = self._lines[1:]
|
|
return line
|
|
|
|
def _get_conf(self, topic):
|
|
self._command(topic)
|
|
result = []
|
|
for channel in range(0, CHANNELS):
|
|
line = self._read_line()
|
|
conf = json.loads(line)
|
|
result.append(conf)
|
|
return result
|
|
|
|
def get_pwm(self):
|
|
return self._get_conf("pwm")
|
|
|
|
def get_pid(self):
|
|
return self._get_conf("pid")
|
|
|
|
def get_steinhart_hart(self):
|
|
return self._get_conf("s-h")
|
|
|
|
def get_postfilter(self):
|
|
return self._get_conf("postfilter")
|
|
|
|
def report_mode(self):
|
|
"""Start reporting measurement values
|
|
|
|
|
|
"""
|
|
self._command("report mode", "on")
|
|
self._read_line()
|
|
|
|
while True:
|
|
line = self._read_line()
|
|
if not line:
|
|
break
|
|
try:
|
|
yield json.loads(line)
|
|
except json.decoder.JSONDecodeError:
|
|
pass
|
|
|
|
def set_param(self, topic, channel, field="", value=""):
|
|
if type(value) is float:
|
|
value = "{:f}".format(value)
|
|
if type(value) is not str:
|
|
value = str(value)
|
|
self._command(topic, str(channel), field, value)
|
|
|
|
# read response line
|
|
self._read_line()
|
|
|
|
def power_up(self, channel, target):
|
|
self.set_param("pid", channel, "target", value=target)
|
|
self.set_param("pwm", channel, "pid")
|