Merge branch 'feature/mqtt-utility-script' into rs/dev

This commit is contained in:
Ryan Summers 2021-03-01 14:46:32 +01:00
commit 7e959d4828
1 changed files with 53 additions and 56 deletions

View File

@ -2,31 +2,22 @@
""" """
Author: Vertigo Designs, Ryan Summers Author: Vertigo Designs, Ryan Summers
Description: Provides an API for controlling Booster NGFW over MQTT. Description: Provides an API for controlling Stabilizer over Miniconf (MQTT).
""" """
import argparse import argparse
import asyncio import asyncio
import json import json
import logging
from gmqtt import Client as MqttClient from gmqtt import Client as MqttClient
def parse_value(value):
""" Parse a command-line value into the most appropriate associated python datatype. """
if value.isnumeric():
return int(value)
try:
return float(value)
except ValueError:
return value
class MiniconfApi: class MiniconfApi:
""" An asynchronous API for controlling Miniconf devices using the MQTT control interface. """ """ An asynchronous API for controlling Miniconf devices using the MQTT control interface. """
@classmethod @classmethod
async def create(cls, identifier, broker): async def create(cls, identifier, broker):
""" Create a connection to MQTT for communication with booster. """ """ Create a connection to MQTT for communication with the device. """
client = MqttClient(client_id='') client = MqttClient(client_id='')
await client.connect(broker) await client.connect(broker)
return cls(client, identifier) return cls(client, identifier)
@ -39,67 +30,62 @@ class MiniconfApi:
client: A connected MQTT5 client. client: A connected MQTT5 client.
identifier: The ID of the device to control. identifier: The ID of the device to control.
""" """
self.response_topic = f'{identifier}/feedback'
self.client = client self.client = client
self.identifier = identifier self.identifier = identifier
self.command_complete = asyncio.Event()
self.client.on_message = self._handle_response self.client.on_message = self._handle_response
self.response = None self.inflight_settings = dict()
self.logger = logging.getLogger('stabilizer.miniconf')
self.client.subscribe(self.response_topic) self.client.subscribe(f'{identifier}/feedback/#')
def _handle_response(self, client, topic, payload, *_args, **_kwargs): def _handle_response(self, _client, topic, payload, *_args, **_kwargs):
""" Callback function for when messages are received over MQTT. """ Callback function for when messages are received over MQTT.
Args: Args:
client: The MQTT client. _client: The MQTT client.
topic: The topic that the message was received on. topic: The topic that the message was received on.
payload: The payload of the message. payload: The payload of the message.
""" """
if topic != self.response_topic: if topic not in self.inflight_settings:
raise Exception(f'Unknown topic: {topic}') self.logger.warning('Unknown response topic: %s', topic)
return
# Indicate a response was received. # Indicate a response was received for the provided topic.
self.response = payload.decode('ascii') self.inflight_settings[topic].set_result(payload.decode('ascii'))
self.command_complete.set()
async def _command(self, topic, message): async def command(self, path, value):
""" Send a command to a booster control topic. """ Write the provided data to the specified path.
Args: Args:
topic: The topic to send the message to. setting: The path to write the message to.
message: The message to send to the provided topic. value: The value to write to the path.
Returns: Returns:
The received response to the command. The received response to the command.
""" """
self.command_complete.clear() setting_topic = f'{self.identifier}/{path}'
self.client.publish(topic, payload=message, qos=0, retain=False, response_topic = f'{self.identifier}/feedback/{path}'
response_topic=self.response_topic) assert response_topic not in self.inflight_settings, \
await self.command_complete.wait() 'Only one in-flight message per topic is supported'
response = self.response self.logger.debug('Sending %s to "%s"', value, setting_topic)
self.response = None self.inflight_settings[response_topic] = asyncio.get_running_loop().create_future()
self.client.publish(setting_topic, payload=value, qos=0, retain=False,
response_topic=response_topic)
response = await self.inflight_settings[response_topic]
del self.inflight_settings[response_topic]
return response return response
async def set_setting(self, setting, message):
""" Change the provided setting with the provided data. """
return await self._command(f'{self.identifier}/settings/{setting}', message)
async def commit(self):
""" Commit staged settings to become active. """
return await self._command(f'{self.identifier}/commit', 'commit')
async def configure_settings(args): async def configure_settings(args):
""" Configure an RF channel. """ """ Configure an RF channel. """
logger = logging.getLogger('stabilizer')
# Establish a communication interface with Booster. # Establish a communication interface with stabilizer.
interface = await MiniconfApi.create(args.stabilizer, args.broker) interface = await MiniconfApi.create(args.stabilizer, args.broker)
request = None request = None
@ -107,29 +93,29 @@ async def configure_settings(args):
# In the exceptional case that this is a terminal value, there is no key available and only a # In the exceptional case that this is a terminal value, there is no key available and only a
# single value. # single value.
if len(args.values) == 1 and '=' not in args.values[0]: if len(args.values) == 1 and '=' not in args.values[0]:
request = parse_value(args.values[0]) if args.values[0][0].isalpha():
request = args.values[0]
else:
request = json.loads(args.values[0])
else: else:
# Convert all of the values into a key-value list. # Convert all of the values into a key-value list.
request = dict() request = dict()
for pair in args.values: for pair in args.values:
key, value = pair.split('=') key, value = pair.split('=')
request[str(key)] = parse_value(value) request[str(key)] = json.loads(value)
response = json.dumps(request) logger.debug('Parsed request: %s', request)
# TODO: Should we escape quotes in the dumped request? response = await interface.command(f'settings/{args.setting}', json.dumps(request))
response = await interface.set_setting(args.setting, request) logger.info(response)
print(f'+ {response}')
if args.commit: if args.commit:
response = await interface.commit() response = await interface.command('commit', 'commit')
print(f'+ {response}') logger.info(response)
def main(): def main():
""" Main program entry point. """ """ Main program entry point. """
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(description='Stabilizer settings modification utility')
formatter_class=argparse.RawTextHelpFormatter,
description='Modify booster RF channel configuration')
parser.add_argument('--stabilizer', type=str, default='stabilizer', parser.add_argument('--stabilizer', type=str, default='stabilizer',
help='The identifier of the stabilizer to configure') help='The identifier of the stabilizer to configure')
parser.add_argument('--setting', required=True, type=str, help='The setting path to configure') parser.add_argument('--setting', required=True, type=str, help='The setting path to configure')
@ -138,9 +124,20 @@ def main():
help='The value of settings. key=value list or a single value is accepted.') help='The value of settings. key=value list or a single value is accepted.')
parser.add_argument('--commit', action='store_true', parser.add_argument('--commit', action='store_true',
help='Specified true to commit after updating settings.') help='Specified true to commit after updating settings.')
parser.add_argument('-v', '--verbose', action='store_true', help='Enable verbose logging')
args = parser.parse_args()
logger = logging.getLogger('stabilizer')
logger.setLevel(logging.INFO)
if args.verbose:
logger.setLevel(logging.DEBUG)
logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s')
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
loop.run_until_complete(configure_settings(parser.parse_args())) loop.run_until_complete(configure_settings(args))
if __name__ == '__main__': if __name__ == '__main__':