forked from M-Labs/thermostat
37 lines
903 B
Python
37 lines
903 B
Python
|
import asyncio
|
||
|
from contextlib import suppress
|
||
|
from pythermostat.aioclient import AsyncioClient
|
||
|
|
||
|
|
||
|
async def poll_for_info(tec):
|
||
|
while True:
|
||
|
print(tec.get_output())
|
||
|
print(tec.get_b_parameter())
|
||
|
print(tec.get_pid())
|
||
|
print(tec.get_postfilter())
|
||
|
print(tec.get_fan())
|
||
|
|
||
|
await asyncio.sleep(1)
|
||
|
|
||
|
|
||
|
async def main():
|
||
|
tec = AsyncioClient()
|
||
|
await tec.connect() # (host="192.168.1.26", port=23)
|
||
|
await tec.set_param("b-p", 1, "t0", 20)
|
||
|
print(await tec.get_output())
|
||
|
print(await tec.get_pid())
|
||
|
print(await tec.get_output())
|
||
|
print(await tec.get_postfilter())
|
||
|
print(await tec.get_b_parameter())
|
||
|
|
||
|
polling_task = asyncio.create_task(poll_for_info(tec))
|
||
|
|
||
|
async for data in tec.report_mode():
|
||
|
print(data)
|
||
|
|
||
|
polling_task.cancel()
|
||
|
with suppress(asyncio.CancelledError):
|
||
|
await polling_task
|
||
|
|
||
|
asyncio.run(main())
|