From a3ff8aae752f5140d723162cee8b0c90963a7db0 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 4 May 2021 14:21:00 +0200 Subject: [PATCH] Updating miniconf utility to use correlation data --- miniconf.py | 84 ++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 26 deletions(-) diff --git a/miniconf.py b/miniconf.py index 77e491c..40f163e 100644 --- a/miniconf.py +++ b/miniconf.py @@ -10,6 +10,7 @@ import asyncio import json import logging import sys +import uuid from gmqtt import Client as MqttClient @@ -33,28 +34,52 @@ class Miniconf: client: A connected MQTT5 client. prefix: The MQTT toptic prefix of the device to control. """ + self.uuid = uuid.uuid1(prefix) + self.request_id = 0 self.client = client self.prefix = prefix self.inflight = {} self.client.on_message = self._handle_response self.client.subscribe(f'{prefix}/response/#') - def _handle_response(self, _client, topic, payload, *_args, **_kwargs): + def _handle_response(self, _client, _topic, payload, _qos, properties): """Callback function for when messages are received over MQTT. Args: _client: The MQTT client. - topic: The topic that the message was received on. + _topic: The topic that the message was received on. payload: The payload of the message. + _qos: The quality-of-service level of the received packet + properties: A dictionary of properties associated with the message. """ - if topic not in self.inflight: - # TODO use correlation_data to distinguish clients and requests - logger.warning('Unexpected response on topic: %s', topic) + # Extract corrleation data from the properties + try: + correlation_data = json.loads(properties['correlation_data']) + except (json.decoder.JSONDecodeError, KeyError): + logger.warning('Ignoring message with invalid correlation data') return - response = json.loads(payload) - self.inflight[topic].set_result((response['code'], response['msg'])) - del self.inflight[topic] + # Validate the correlation data. + try: + if correlation_data['id'] != self.uuid.hex: + logger.info('Ignoring correlation data for different ID') + return + pid = correlation_data['pid'] + except KeyError: + logger.warning('Ignoring unknown correlation data: %s', correlation_data) + return + + if pid not in self.inflight: + logger.warning('Unexpected pid: %s', pid) + return + + try: + response = json.loads(payload) + self.inflight[pid].set_result((response['code'], response['msg'])) + del self.inflight[pid] + except json.decoder.JSONDecodeError: + logger.warning('Invalid response format: %s', payload) + async def command(self, path, value): """Write the provided data to the specified path. @@ -69,25 +94,34 @@ class Miniconf: """ setting_topic = f'{self.prefix}/settings/{path}' response_topic = f'{self.prefix}/response/{path}' - if response_topic in self.inflight: - # TODO use correlation_data to distinguish clients and requests - raise NotImplementedError( - 'Only one in-flight message per topic is supported') + + # Assign a unique identifier to this update request. + pid = self.request_id + self.request_id += 1 + assert pid not in self.inflight, 'Invalid PID encountered' + + correlation_data = json.dumps({ + 'id': self.uuid.hex, + 'pid': pid, + }) value = json.dumps(value) logger.info('Sending %s to "%s"', value, setting_topic) fut = asyncio.get_running_loop().create_future() - self.inflight[response_topic] = fut + + self.inflight[pid] = fut self.client.publish(setting_topic, payload=value, qos=0, retain=True, - response_topic=response_topic) + response_topic=response_topic, + correlation_data=correlation_data) return await fut def main(): + """ Main program entry point. """ parser = argparse.ArgumentParser( - description='Miniconf command line interface.', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog='''Examples: + description='Miniconf command line interface.', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog='''Examples: %(prog)s dt/sinara/stabilizer afe/0='"G2"' iir_ch/0/0=\ '{"y_min": -32767, "y_max": 32767, "y_offset": 0, "ba": [1.0, 0, 0, 0, 0]}' ''') @@ -103,22 +137,20 @@ def main(): args = parser.parse_args() logging.basicConfig( - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - level=logging.WARN - 10*args.verbose) + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + level=logging.WARN - 10*args.verbose) loop = asyncio.get_event_loop() async def configure_settings(): interface = await Miniconf.create(args.prefix, args.broker) - failures = 0 - for kv in args.settings: - path, value = kv.split("=", 1) + for key_value in args.settings: + path, value = key_value.split("=", 1) code, response = await interface.command(path, json.loads(value)) - print(response) + print(f'{path}: {response}') if code != 0: - failures += 1 - - return failures + return code + return 0 sys.exit(loop.run_until_complete(configure_settings()))