149 lines
4.7 KiB
Python
149 lines
4.7 KiB
Python
|
#!/usr/bin/python3
|
||
|
"""
|
||
|
Author: Vertigo Designs, Ryan Summers
|
||
|
|
||
|
Description: Loop-back integration tests for Stabilizer hardware
|
||
|
"""
|
||
|
import argparse
|
||
|
import asyncio
|
||
|
import json
|
||
|
import sys
|
||
|
|
||
|
from gmqtt import Client as MqttClient
|
||
|
from miniconf import Miniconf
|
||
|
|
||
|
# The minimum allowably loopback voltage error (difference between output set point and input
|
||
|
# measured value).
|
||
|
MINIMUM_VOLTAGE_ERROR = 0.010
|
||
|
|
||
|
def _voltage_to_machine_units(voltage):
|
||
|
""" Convert a voltage to IIR machine units.
|
||
|
|
||
|
Args:
|
||
|
voltage: The voltage to convert
|
||
|
|
||
|
Returns:
|
||
|
The IIR machine-units associated with the voltage.
|
||
|
"""
|
||
|
dac_range = 4.096 * 2.5
|
||
|
assert abs(voltage) <= dac_range, 'Voltage out-of-range'
|
||
|
return voltage / dac_range * 0x7FFF
|
||
|
|
||
|
|
||
|
def static_iir_output(output_voltage):
|
||
|
""" Generate IIR configuration for a static output voltage.
|
||
|
|
||
|
Args:
|
||
|
output_voltage: The desired static IIR output voltage.
|
||
|
|
||
|
Returns
|
||
|
The IIR configuration to send over Miniconf.
|
||
|
"""
|
||
|
machine_units = _voltage_to_machine_units(output_voltage)
|
||
|
return {
|
||
|
'y_min': machine_units,
|
||
|
'y_max': machine_units,
|
||
|
'y_offset': 0,
|
||
|
'ba': [1, 0, 0, 0, 0],
|
||
|
}
|
||
|
|
||
|
|
||
|
class TelemetryReader:
|
||
|
""" Helper utility to read Stabilizer telemetry. """
|
||
|
|
||
|
@classmethod
|
||
|
async def create(cls, prefix, broker):
|
||
|
"""Create a connection to the broker and an MQTT device using it."""
|
||
|
client = MqttClient(client_id='')
|
||
|
await client.connect(broker)
|
||
|
return cls(client, prefix)
|
||
|
|
||
|
|
||
|
def __init__(self, client, prefix):
|
||
|
""" Constructor. """
|
||
|
self.client = client
|
||
|
self._telemetry = []
|
||
|
self.client.on_message = self.handle_telemetry
|
||
|
self._telemetry_topic = f'{prefix}/telemetry'
|
||
|
self.client.subscribe(self._telemetry_topic)
|
||
|
|
||
|
|
||
|
def handle_telemetry(self, _client, topic, payload, _qos, _properties):
|
||
|
""" Handle incoming telemetry messages over MQTT. """
|
||
|
assert topic == self._telemetry_topic
|
||
|
self._telemetry.append(json.loads(payload))
|
||
|
|
||
|
|
||
|
def get_latest(self):
|
||
|
""" Get the latest telemetry message received. """
|
||
|
return self._telemetry[-1]
|
||
|
|
||
|
|
||
|
async def test_loopback(miniconf, telemetry, set_point):
|
||
|
""" Test loopback operation of Stabilizer.
|
||
|
|
||
|
Note:
|
||
|
Loopback is tested by configuring DACs for static output and verifying telemetry reports the
|
||
|
ADCs are measuring those values. Output OUTx should be connected in a loopback configuration
|
||
|
to INx on the device.
|
||
|
|
||
|
Args:
|
||
|
miniconf: The miniconf configuration interface.
|
||
|
telemetry: a helper utility to read inbound telemetry.
|
||
|
set_point: The desired output voltage to test.
|
||
|
"""
|
||
|
print(f'Testing loopback for Vout = {set_point:.2f}')
|
||
|
print('---------------------------------')
|
||
|
# Configure the IIRs to output at the set point
|
||
|
assert (await miniconf.command('iir_ch/0/0', static_iir_output(set_point)))['code'] == 0
|
||
|
assert (await miniconf.command('iir_ch/1/0', static_iir_output(set_point)))['code'] == 0
|
||
|
assert (await miniconf.command('telemetry_period', 1))['code'] == 0
|
||
|
|
||
|
# Wait for telemetry values to update.
|
||
|
await asyncio.sleep(2.0)
|
||
|
|
||
|
# Verify the ADCs are receiving the setpoint voltage.
|
||
|
tolerance = max(0.05 * set_point, MINIMUM_VOLTAGE_ERROR)
|
||
|
latest_values = telemetry.get_latest()
|
||
|
print(f'Latest telemtry: {latest_values}')
|
||
|
|
||
|
assert abs(latest_values['adcs'][0] - set_point) < tolerance
|
||
|
assert abs(latest_values['adcs'][1] - set_point) < tolerance
|
||
|
print('PASS')
|
||
|
print('')
|
||
|
|
||
|
|
||
|
def main():
|
||
|
""" Main program entry point. """
|
||
|
parser = argparse.ArgumentParser(description='Loopback tests for Stabilizer HITL testing',)
|
||
|
parser.add_argument('prefix', type=str,
|
||
|
help='The MQTT topic prefix of the target')
|
||
|
parser.add_argument('--broker', '-b', default='mqtt', type=str,
|
||
|
help='The MQTT broker address')
|
||
|
|
||
|
args = parser.parse_args()
|
||
|
|
||
|
async def test():
|
||
|
""" The actual testing being completed. """
|
||
|
interface = await Miniconf.create(args.prefix, args.broker)
|
||
|
telemetry = await TelemetryReader.create(args.prefix, args.broker)
|
||
|
|
||
|
# Test loopback with a static 1V output of the DACs.
|
||
|
await test_loopback(interface, telemetry, 1.0)
|
||
|
|
||
|
# Repeat test with AFE = 2x
|
||
|
print('Configuring AFEs to 2x input')
|
||
|
assert (await interface.command('afe/0', "G2"))['code'] == 0
|
||
|
assert (await interface.command('afe/1', "G2"))['code'] == 0
|
||
|
await test_loopback(interface, telemetry, 1.0)
|
||
|
|
||
|
# Test with 0V output
|
||
|
await test_loopback(interface, telemetry, 0.0)
|
||
|
|
||
|
loop = asyncio.get_event_loop()
|
||
|
sys.exit(loop.run_until_complete(test()))
|
||
|
|
||
|
|
||
|
if __name__ == '__main__':
|
||
|
main()
|