From ca6596c7b8c9b8da1ef75baa8df086b3f9e52176 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Tue, 2 Mar 2021 19:04:39 +0100 Subject: [PATCH] cli: rework * use plain json for argument format * rename feedback -> response to match code * reorganize args * tweak variable names to be more in line with miniconf and mqtt * use `mqtt` as broker * remove traces of stabilizer from script, rename to miniconf.py * add usage example * dual-iir/lockin-external: update broker address (for HITL), change mqtt path to convention --- miniconf.py | 115 +++++++++++++++++++++++++++++++ src/bin/dual-iir.rs | 10 ++- src/bin/lockin-external.rs | 8 ++- stabilizer.py | 138 ------------------------------------- 4 files changed, 128 insertions(+), 143 deletions(-) create mode 100644 miniconf.py delete mode 100644 stabilizer.py diff --git a/miniconf.py b/miniconf.py new file mode 100644 index 0000000..a5e48dc --- /dev/null +++ b/miniconf.py @@ -0,0 +1,115 @@ +#!/usr/bin/python +""" +Author: Vertigo Designs, Ryan Summers + Robert Jördens + +Description: Provides an API for controlling Miniconf devices over MQTT. +""" +import argparse +import asyncio +import logging + +from gmqtt import Client as MqttClient + +logger = logging.getLogger(__name__) + + +class Miniconf: + """An asynchronous API for controlling Miniconf devices using MQTT.""" + + @classmethod + async def create(cls, prefix, broker): + """Create a connection to the broker and a Miniconf device using it.""" + client = MqttClient(client_id='') + await client.connect(broker) + return cls(client, prefix) + + def __init__(self, client, prefix): + """Constructor. + + Args: + client: A connected MQTT5 client. + prefix: The MQTT toptic prefix of the device to control. + """ + self.client = client + self.prefix = prefix + self.inflight = {} + self.client.on_message = self._handle_response + self.client.subscribe(f'{prefix}/response/#') + + def _handle_response(self, _client, topic, payload, *_args, **_kwargs): + """Callback function for when messages are received over MQTT. + + Args: + _client: The MQTT client. + topic: The topic that the message was received on. + payload: The payload of the message. + """ + if topic not in self.inflight: + # TODO use correlation_data to distinguish clients and requests + logger.warning('Unexpected response on topic: %s', topic) + return + + self.inflight[topic].set_result(payload.decode('ascii')) + del self.inflight[topic] + + async def command(self, path, value): + """Write the provided data to the specified path. + + Args: + path: The path to write the message to. + value: The value to write to the path. + + Returns: + The received response to the command. + """ + setting_topic = f'{self.prefix}/settings/{path}' + response_topic = f'{self.prefix}/response/{path}' + if response_topic in self.inflight: + # TODO use correlation_data to distinguish clients and requests + raise NotImplementedError( + 'Only one in-flight message per topic is supported') + + logger.debug('Sending %s to "%s"', value, setting_topic) + fut = asyncio.get_running_loop().create_future() + self.inflight[response_topic] = fut + self.client.publish(setting_topic, payload=value, qos=0, retain=True, + response_topic=response_topic) + return await fut + + +def main(): + parser = argparse.ArgumentParser( + description='Miniconf command line interface.', + epilog='''Example: + miniconf.py -v -b mqtt dt/sinara/stabilizer afe/0 '"G10"' + ''') + parser.add_argument('-v', '--verbose', action='count', + help='Increase logging verbosity') + parser.add_argument('--broker', '-b', default='mqtt', type=str, + help='The MQTT broker address') + parser.add_argument('prefix', type=str, + help='The MQTT topic prefix of the target') + parser.add_argument('path', type=str, + help='The setting path to configure') + parser.add_argument('value', type=str, + help='The value of setting in JSON format') + + args = parser.parse_args() + + logging.basicConfig( + format='%(asctime)s [%(levelname)s] %(message)s', + level=logging.WARN - 10*args.verbose) + + loop = asyncio.get_event_loop() + + async def configure_settings(): + interface = await Miniconf.create(args.prefix, args.broker) + response = await interface.command(args.path, args.value) + logger.info(response) + + loop.run_until_complete(configure_settings()) + + +if __name__ == '__main__': + main() diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 66c2a11..fb128af 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -62,13 +62,17 @@ const APP: () = { let mqtt_interface = { let mqtt_client = { - let broker = IpAddr::V4(Ipv4Addr::new(10, 34, 16, 1)); + let broker = IpAddr::V4(Ipv4Addr::new(10, 34, 16, 10)); minimq::MqttClient::new(broker, "", stabilizer.net.stack) .unwrap() }; - MqttInterface::new(mqtt_client, "stabilizer", Settings::default()) - .unwrap() + MqttInterface::new( + mqtt_client, + "dt/sinara/stabilizer", + Settings::default(), + ) + .unwrap() }; // Enable ADC/DAC events diff --git a/src/bin/lockin-external.rs b/src/bin/lockin-external.rs index 5b85489..ebfddd8 100644 --- a/src/bin/lockin-external.rs +++ b/src/bin/lockin-external.rs @@ -83,8 +83,12 @@ const APP: () = { .unwrap() }; - MqttInterface::new(mqtt_client, "lockin", Settings::default()) - .unwrap() + MqttInterface::new( + mqtt_client, + "dt/sinara/lockin", + Settings::default(), + ) + .unwrap() }; let settings = Settings::default(); diff --git a/stabilizer.py b/stabilizer.py deleted file mode 100644 index 1b3ac65..0000000 --- a/stabilizer.py +++ /dev/null @@ -1,138 +0,0 @@ -#!/usr/bin/python -""" -Author: Vertigo Designs, Ryan Summers - -Description: Provides an API for controlling Stabilizer over Miniconf (MQTT). -""" -import argparse -import asyncio -import json -import logging - -from gmqtt import Client as MqttClient - - -class MiniconfApi: - """ An asynchronous API for controlling Miniconf devices using the MQTT control interface. """ - - @classmethod - async def create(cls, identifier, broker): - """ Create a connection to MQTT for communication with the device. """ - client = MqttClient(client_id='') - await client.connect(broker) - return cls(client, identifier) - - - def __init__(self, client, identifier): - """ Consructor. - - Args: - client: A connected MQTT5 client. - identifier: The ID of the device to control. - """ - self.client = client - self.identifier = identifier - self.client.on_message = self._handle_response - self.inflight_settings = dict() - self.logger = logging.getLogger('stabilizer.miniconf') - - self.client.subscribe(f'{identifier}/feedback/#') - - - def _handle_response(self, _client, topic, payload, *_args, **_kwargs): - """ Callback function for when messages are received over MQTT. - - Args: - _client: The MQTT client. - topic: The topic that the message was received on. - payload: The payload of the message. - """ - if topic not in self.inflight_settings: - self.logger.warning('Unknown response topic: %s', topic) - return - - # Indicate a response was received for the provided topic. - self.inflight_settings[topic].set_result(payload.decode('ascii')) - - - async def command(self, path, value): - """ Write the provided data to the specified path. - - Args: - setting: The path to write the message to. - value: The value to write to the path. - - Returns: - The received response to the command. - """ - setting_topic = f'{self.identifier}/{path}' - response_topic = f'{self.identifier}/feedback/{path}' - assert response_topic not in self.inflight_settings, \ - 'Only one in-flight message per topic is supported' - - self.logger.debug('Sending %s to "%s"', value, setting_topic) - self.inflight_settings[response_topic] = asyncio.get_running_loop().create_future() - self.client.publish(setting_topic, payload=value, qos=0, retain=True, - response_topic=response_topic) - - response = await self.inflight_settings[response_topic] - del self.inflight_settings[response_topic] - - return response - - -async def configure_settings(args): - """ Configure an RF channel. """ - logger = logging.getLogger('stabilizer') - - # Establish a communication interface with stabilizer. - interface = await MiniconfApi.create(args.stabilizer, args.broker) - - request = None - - # In the exceptional case that this is a terminal value, there is no key available and only a - # single value. - if len(args.values) == 1 and '=' not in args.values[0]: - if args.values[0][0].isalpha(): - request = args.values[0] - else: - request = json.loads(args.values[0]) - else: - # Convert all of the values into a key-value list. - request = dict() - for pair in args.values: - key, value = pair.split('=') - request[str(key)] = json.loads(value) - logger.debug('Parsed request: %s', request) - - response = await interface.command(f'settings/{args.setting}', json.dumps(request)) - logger.info(response) - - -def main(): - """ Main program entry point. """ - parser = argparse.ArgumentParser(description='Stabilizer settings modification utility') - parser.add_argument('--stabilizer', type=str, default='stabilizer', - help='The identifier of the stabilizer to configure') - parser.add_argument('--setting', required=True, type=str, help='The setting path to configure') - parser.add_argument('--broker', default='10.34.16.1', type=str, help='The MQTT broker address') - parser.add_argument('values', nargs='+', type=str, - help='The value of settings. key=value list or a single value is accepted.') - parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose logging') - - args = parser.parse_args() - - logger = logging.getLogger('stabilizer') - logger.setLevel(logging.INFO) - - if args.verbose: - logger.setLevel(logging.DEBUG) - - logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s') - - loop = asyncio.get_event_loop() - loop.run_until_complete(configure_settings(args)) - - -if __name__ == '__main__': - main()