@ -25,15 +25,22 @@ to another. Disambiguation of devices is done by using Stabilizer's MAC address.
Settings are specific to an application. If two identical settings exist for two different
applications, each application maintains its own independent value.
### Setup
### Installation
Install the Miniconf configuration utilities:
python -m pip install git+https://github.com/quartiq/miniconf#subdirectory=miniconf-py
In order to use `miniconf.py`, run the following command:
To use `miniconf`, execute it as follows:
python -m pip install gmqtt
python -m miniconf --help
Miniconf also exposes a programmatic Python API, so it's possible to write automation scripting of
Stabilizer as well.
### Usage
The `miniconf.py` script utilizes a unique "device prefix". The device prefix is always of the
The Miniconf Python utility utilizes a unique "device prefix". The device prefix is always of the
form `dt/sinara/<app>/<mac-address>`, where `<app>` is the name of the application and
`<mac-address>` is the MAC address of the device, formatted with delimiting dashes.
@ -46,7 +53,7 @@ used:
* `value` = `{"ip": [192, 168, 0, 1], "port": 4000}`
python miniconf.py --broker dt/sinara/dual-iir/00-11-22-33-44-55 stream_target='{"ip": [10, 34, 16, 123], "port": 4000}'
python -m miniconf --broker dt/sinara/dual-iir/00-11-22-33-44-55 stream_target='{"ip": [10, 34, 16, 123], "port": 4000}'
Where `` is the MQTT broker address that matches the one configured in the source code and `` and `4000` are the desire stream target IP and port.
@ -61,7 +68,7 @@ The rules for constructing `path` values are documented in [`miniconf`'s
Refer to the documentation for [Miniconf]({{site.baseurl}}/firmware/miniconf/enum.Error.html) for a
description of the possible error codes that `miniconf.py` may return if the settings update was
description of the possible error codes that Miniconf may return if the settings update was
## Telemetry

hitl/loopback.py Normal file
View File

@ -0,0 +1,163 @@
Author: Vertigo Designs, Ryan Summers
Description: Loop-back integration tests for Stabilizer hardware
import argparse
import asyncio
import json
import sys
from gmqtt import Client as MqttClient
from miniconf import Miniconf
# The minimum allowable loopback voltage error (difference between output set point and input
# measured value).
def _voltage_to_machine_units(voltage):
""" Convert a voltage to IIR machine units.
voltage: The voltage to convert
The IIR machine-units associated with the voltage.
dac_range = 4.096 * 2.5
assert abs(voltage) <= dac_range, 'Voltage out-of-range'
return voltage / dac_range * 0x7FFF
def static_iir_output(output_voltage):
""" Generate IIR configuration for a static output voltage.
output_voltage: The desired static IIR output voltage.
The IIR configuration to send over Miniconf.
machine_units = _voltage_to_machine_units(output_voltage)
return {
'y_min': machine_units,
'y_max': machine_units,
'y_offset': 0,
'ba': [1, 0, 0, 0, 0],
class TelemetryReader:
""" Helper utility to read Stabilizer telemetry. """
async def create(cls, prefix, broker, queue):
"""Create a connection to the broker and an MQTT device using it."""
client = MqttClient(client_id='')
await client.connect(broker)
return cls(client, prefix, queue)
def __init__(self, client, prefix, queue):
""" Constructor. """
self.client = client
self._telemetry = []
self.client.on_message = self.handle_telemetry
self._telemetry_topic = f'{prefix}/telemetry'
self.queue = queue
def handle_telemetry(self, _client, topic, payload, _qos, _properties):
""" Handle incoming telemetry messages over MQTT. """
assert topic == self._telemetry_topic
async def test_loopback(miniconf, telemetry_queue, set_point, gain=1, channel=0):
""" Test loopback operation of Stabilizer.
Loopback is tested by configuring DACs for static output and verifying telemetry reports the
ADCs are measuring those values. Output OUTx should be connected in a loopback configuration
to INx on the device.
miniconf: The miniconf configuration interface.
telemetry: a helper utility to read inbound telemetry.
set_point: The desired output voltage to test.
channel: The loopback channel to test on. Either 0 or 1.
gain: The desired AFE gain.
print(f'Testing loopback for Vout = {set_point:.2f}, Gain = x{gain}')
# Configure the AFE and IIRs to output at the set point
await miniconf.command(f'afe/{channel}', f'G{gain}', retain=False)
await miniconf.command(f'iir_ch/{channel}/0', static_iir_output(set_point), retain=False)
# Configure signal generators to not affect the test.
await miniconf.command('signal_generator/0/amplitude', 0, retain=False)
# Wait for telemetry to update.
await asyncio.sleep(5.0)
# Verify the ADCs are receiving the setpoint voltage.
tolerance = max(0.05 * set_point, MINIMUM_VOLTAGE_ERROR)
latest_values = await telemetry_queue.get()
print(f'Latest telemtry: {latest_values}')
assert abs(latest_values['adcs'][channel] - set_point) < tolerance
def main():
""" Main program entry point. """
parser = argparse.ArgumentParser(description='Loopback tests for Stabilizer HITL testing',)
parser.add_argument('prefix', type=str,
help='The MQTT topic prefix of the target')
parser.add_argument('--broker', '-b', default='mqtt', type=str,
help='The MQTT broker address')
args = parser.parse_args()
telemetry_queue = asyncio.LifoQueue()
async def telemetry():
await TelemetryReader.create(args.prefix, args.broker, telemetry_queue)
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
telemetry_task = asyncio.Task(telemetry())
async def test():
""" The actual testing being completed. """
interface = await Miniconf.create(args.prefix, args.broker)
# Disable IIR holds and configure the telemetry rate.
await interface.command('allow_hold', False, retain=False)
await interface.command('force_hold', False, retain=False)
await interface.command('telemetry_period', 1, retain=False)
# Test loopback with a static 1V output of the DACs.
await test_loopback(interface, telemetry_queue, 1.0)
# Repeat test with AFE = 2x
await test_loopback(interface, telemetry_queue, 1.0, gain=2)
# Test with 0V output
await test_loopback(interface, telemetry_queue, 0.0)
loop = asyncio.get_event_loop()
if __name__ == '__main__':

View File

@ -13,7 +13,10 @@ set -eux
# Set up python for testing
python3 -m venv --system-site-packages py
. py/bin/activate
python3 -m pip install -r requirements.txt
# Install Miniconf utilities for configuring stabilizer.
python3 -m pip install git+https://github.com/quartiq/miniconf#subdirectory=py/miniconf-mqtt
python3 -m pip install gmqtt
cargo flash --chip STM32H743ZITx --elf target/thumbv7em-none-eabihf/release/dual-iir
@ -27,6 +30,9 @@ sleep 30
ping -c 5 -w 20 stabilizer-hitl
# Test the MQTT interface.
python3 miniconf.py dt/sinara/dual-iir/04-91-62-d9-7e-5f afe/0='"G2"'
python3 miniconf.py dt/sinara/dual-iir/04-91-62-d9-7e-5f afe/0='"G1"' iir_ch/0/0=\
python3 -m miniconf dt/sinara/dual-iir/04-91-62-d9-7e-5f afe/0='"G2"'
python3 -m miniconf dt/sinara/dual-iir/04-91-62-d9-7e-5f afe/0='"G1"' iir_ch/0/0=\
'{"y_min": -32767, "y_max": 32767, "y_offset": 0, "ba": [1.0, 0, 0, 0, 0]}'
# Test the ADC/DACs connected via loopback.
python3 hitl/loopback.py dt/sinara/dual-iir/04-91-62-d9-7e-5f

View File

View File

@ -1 +0,0 @@