diff --git a/miniconf.py b/miniconf.py index 77e491c..40f163e 100644 --- a/miniconf.py +++ b/miniconf.py @@ -10,6 +10,7 @@ import asyncio import json import logging import sys +import uuid from gmqtt import Client as MqttClient @@ -33,28 +34,52 @@ class Miniconf: client: A connected MQTT5 client. prefix: The MQTT toptic prefix of the device to control. """ + self.uuid = uuid.uuid1(prefix) + self.request_id = 0 self.client = client self.prefix = prefix self.inflight = {} self.client.on_message = self._handle_response self.client.subscribe(f'{prefix}/response/#') - def _handle_response(self, _client, topic, payload, *_args, **_kwargs): + def _handle_response(self, _client, _topic, payload, _qos, properties): """Callback function for when messages are received over MQTT. Args: _client: The MQTT client. - topic: The topic that the message was received on. + _topic: The topic that the message was received on. payload: The payload of the message. + _qos: The quality-of-service level of the received packet + properties: A dictionary of properties associated with the message. """ - if topic not in self.inflight: - # TODO use correlation_data to distinguish clients and requests - logger.warning('Unexpected response on topic: %s', topic) + # Extract corrleation data from the properties + try: + correlation_data = json.loads(properties['correlation_data']) + except (json.decoder.JSONDecodeError, KeyError): + logger.warning('Ignoring message with invalid correlation data') return - response = json.loads(payload) - self.inflight[topic].set_result((response['code'], response['msg'])) - del self.inflight[topic] + # Validate the correlation data. + try: + if correlation_data['id'] != self.uuid.hex: + logger.info('Ignoring correlation data for different ID') + return + pid = correlation_data['pid'] + except KeyError: + logger.warning('Ignoring unknown correlation data: %s', correlation_data) + return + + if pid not in self.inflight: + logger.warning('Unexpected pid: %s', pid) + return + + try: + response = json.loads(payload) + self.inflight[pid].set_result((response['code'], response['msg'])) + del self.inflight[pid] + except json.decoder.JSONDecodeError: + logger.warning('Invalid response format: %s', payload) + async def command(self, path, value): """Write the provided data to the specified path. @@ -69,25 +94,34 @@ class Miniconf: """ setting_topic = f'{self.prefix}/settings/{path}' response_topic = f'{self.prefix}/response/{path}' - if response_topic in self.inflight: - # TODO use correlation_data to distinguish clients and requests - raise NotImplementedError( - 'Only one in-flight message per topic is supported') + + # Assign a unique identifier to this update request. + pid = self.request_id + self.request_id += 1 + assert pid not in self.inflight, 'Invalid PID encountered' + + correlation_data = json.dumps({ + 'id': self.uuid.hex, + 'pid': pid, + }) value = json.dumps(value) logger.info('Sending %s to "%s"', value, setting_topic) fut = asyncio.get_running_loop().create_future() - self.inflight[response_topic] = fut + + self.inflight[pid] = fut self.client.publish(setting_topic, payload=value, qos=0, retain=True, - response_topic=response_topic) + response_topic=response_topic, + correlation_data=correlation_data) return await fut def main(): + """ Main program entry point. """ parser = argparse.ArgumentParser( - description='Miniconf command line interface.', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog='''Examples: + description='Miniconf command line interface.', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog='''Examples: %(prog)s dt/sinara/stabilizer afe/0='"G2"' iir_ch/0/0=\ '{"y_min": -32767, "y_max": 32767, "y_offset": 0, "ba": [1.0, 0, 0, 0, 0]}' ''') @@ -103,22 +137,20 @@ def main(): args = parser.parse_args() logging.basicConfig( - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - level=logging.WARN - 10*args.verbose) + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + level=logging.WARN - 10*args.verbose) loop = asyncio.get_event_loop() async def configure_settings(): interface = await Miniconf.create(args.prefix, args.broker) - failures = 0 - for kv in args.settings: - path, value = kv.split("=", 1) + for key_value in args.settings: + path, value = key_value.split("=", 1) code, response = await interface.command(path, json.loads(value)) - print(response) + print(f'{path}: {response}') if code != 0: - failures += 1 - - return failures + return code + return 0 sys.exit(loop.run_until_complete(configure_settings())) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 1416e83..c023236 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -185,7 +185,7 @@ const APP: () = { fn settings_update(mut c: settings_update::Context) { // Update the IIR channels. let settings = c.resources.mqtt.settings(); - c.resources.settings.lock(|current| *current = settings); + c.resources.settings.lock(|current| *current = *settings); // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index 69df37c..fdeb83d 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -231,7 +231,7 @@ const APP: () = { c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.1.set_gain(settings.afe[1]); - c.resources.settings.lock(|current| *current = settings); + c.resources.settings.lock(|current| *current = *settings); } #[task(priority = 1, resources=[mqtt, digital_inputs, settings, telemetry], schedule=[telemetry])] diff --git a/src/net/messages.rs b/src/net/messages.rs index 2f54f52..167e440 100644 --- a/src/net/messages.rs +++ b/src/net/messages.rs @@ -6,10 +6,7 @@ use core::fmt::Write; #[derive(Debug, Copy, Clone)] pub enum SettingsResponseCode { NoError = 0, - NoTopic = 1, - InvalidPrefix = 2, - UnknownTopic = 3, - UpdateFailure = 4, + MiniconfError = 1, } /// Represents a generic MQTT message. @@ -70,55 +67,25 @@ impl<'a> MqttMessage<'a> { } } -impl SettingsResponse { - /// Construct a settings response upon successful settings update. - /// - /// # Args - /// * `path` - The path of the setting that was updated. - pub fn update_success(path: &str) -> Self { - let mut msg: String = String::new(); - if write!(&mut msg, "{} updated", path).is_err() { - msg = String::from("Latest update succeeded"); - } +impl From> for SettingsResponse { + fn from(result: Result<(), miniconf::Error>) -> Self { + match result { + Ok(_) => Self { + msg: String::from("OK"), + code: SettingsResponseCode::NoError as u8, + }, - Self { - msg, - code: SettingsResponseCode::NoError as u8, - } - } + Err(error) => { + let mut msg = String::new(); + if write!(&mut msg, "{:?}", error).is_err() { + msg = String::from("Miniconf Error"); + } - /// Construct a response when a settings update failed. - /// - /// # Args - /// * `path` - The settings path that configuration failed for. - /// * `err` - The settings update error that occurred. - pub fn update_failure(path: &str, err: miniconf::Error) -> Self { - let mut msg: String = String::new(); - if write!(&mut msg, "{} update failed: {:?}", path, err).is_err() { - if write!(&mut msg, "Latest update failed: {:?}", err).is_err() { - msg = String::from("Latest update failed"); + Self { + code: SettingsResponseCode::MiniconfError as u8, + msg, + } } } - - Self { - msg, - code: SettingsResponseCode::UpdateFailure as u8, - } - } - - /// Construct a response from a custom response code. - /// - /// # Args - /// * `code` - The response code to provide. - pub fn code(code: SettingsResponseCode) -> Self { - let mut msg: String = String::new(); - - // Note(unwrap): All code debug names shall fit in the 64 byte string. - write!(&mut msg, "{:?}", code).unwrap(); - - Self { - code: code as u8, - msg, - } } } diff --git a/src/net/mod.rs b/src/net/mod.rs index f6dbe28..9b4d22d 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -11,7 +11,7 @@ use core::fmt::Write; mod messages; mod mqtt_interface; -use messages::{MqttMessage, SettingsResponse, SettingsResponseCode}; +use messages::{MqttMessage, SettingsResponse}; pub use mqtt_interface::MqttInterface; mod telemetry; diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs index 9b73793..cef26dd 100644 --- a/src/net/mqtt_interface.rs +++ b/src/net/mqtt_interface.rs @@ -2,27 +2,23 @@ use crate::hardware::{ design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, }; -use core::{cell::RefCell, fmt::Write}; - use heapless::{consts, String}; -use serde::Serialize; -use super::{Action, MqttMessage, SettingsResponse, SettingsResponseCode}; +use super::{Action, MqttMessage, SettingsResponse}; /// MQTT settings interface. pub struct MqttInterface where S: miniconf::Miniconf + Default + Clone, { - telemetry_topic: String, default_response_topic: String, - mqtt: RefCell>, - settings: RefCell, + mqtt: minimq::MqttClient, + settings: S, clock: CycleCounter, phy: EthernetPhy, network_was_reset: bool, subscribed: bool, - id: String, + settings_prefix: String, } impl MqttInterface @@ -44,23 +40,22 @@ where phy: EthernetPhy, clock: CycleCounter, ) -> Self { - let mqtt_client = + let mqtt = minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) .unwrap(); - let mut telemetry_topic: String = String::new(); - write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap(); + let mut response_topic: String = String::from(prefix); + response_topic.push_str("/log").unwrap(); - let mut response_topic: String = String::new(); - write!(&mut response_topic, "{}/log", prefix).unwrap(); + let mut settings_prefix: String = String::from(prefix); + settings_prefix.push_str("/settings").unwrap(); Self { - mqtt: RefCell::new(mqtt_client), - settings: RefCell::new(S::default()), - id: String::from(prefix), + mqtt, + settings: S::default(), + settings_prefix, clock, phy, - telemetry_topic, default_response_topic: response_topic, network_was_reset: false, subscribed: false, @@ -73,11 +68,7 @@ where /// An option containing an action that should be completed as a result of network servicing. pub fn update(&mut self) -> Option { // First, service the network stack to process any inbound and outbound traffic. - let sleep = match self - .mqtt - .borrow_mut() - .network_stack - .poll(self.clock.current_ms()) + let sleep = match self.mqtt.network_stack.poll(self.clock.current_ms()) { Ok(updated) => !updated, Err(err) => { @@ -88,18 +79,19 @@ where // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network // stack. - if self.phy.poll_link() == false { + match self.phy.poll_link() { + true => self.network_was_reset = false, + // Only reset the network stack once per link reconnection. This prevents us from // sending an excessive number of DHCP requests. - if !self.network_was_reset { + false if !self.network_was_reset => { self.network_was_reset = true; - self.mqtt.borrow_mut().network_stack.handle_link_reset(); + self.mqtt.network_stack.handle_link_reset(); } - } else { - self.network_was_reset = false; - } + _ => {}, + }; - let mqtt_connected = match self.mqtt.borrow_mut().is_connected() { + let mqtt_connected = match self.mqtt.is_connected() { Ok(connected) => connected, Err(minimq::Error::Network( smoltcp_nal::NetworkError::NoIpAddress, @@ -117,34 +109,61 @@ where // If we're no longer subscribed to the settings topic, but we are connected to the broker, // resubscribe. if !self.subscribed && mqtt_connected { - let mut settings_topic: String = String::new(); - write!(&mut settings_topic, "{}/settings/#", self.id.as_str()) - .unwrap(); + // Note(unwrap): We construct a string with two more characters than the prefix + // strucutre, so we are guaranteed to have space for storage. + let mut settings_topic: String = + String::from(self.settings_prefix.as_str()); + settings_topic.push_str("/#").unwrap(); - self.mqtt - .borrow_mut() - .subscribe(&settings_topic, &[]) - .unwrap(); + self.mqtt.subscribe(&settings_topic, &[]).unwrap(); self.subscribed = true; } // Handle any MQTT traffic. + let settings = &mut self.settings; + let mqtt = &mut self.mqtt; + let prefix = self.settings_prefix.as_str(); + let default_response_topic = self.default_response_topic.as_str(); + let mut update = false; - match self.mqtt.borrow_mut().poll( - |client, topic, message, properties| { - let (response, settings_update) = - self.route_message(topic, message, properties); - client - .publish( - response.topic, - &response.message, - minimq::QoS::AtMostOnce, - &response.properties, - ) - .ok(); - update = settings_update; - }, - ) { + match mqtt.poll(|client, topic, message, properties| { + let path = match topic.strip_prefix(prefix) { + // For paths, we do not want to include the leading slash. + Some(path) => { + if path.len() > 0 { + &path[1..] + } else { + path + } + } + None => { + info!("Unexpected MQTT topic: {}", topic); + return; + } + }; + + let message: SettingsResponse = settings + .string_set(path.split('/').peekable(), message) + .and_then(|_| { + update = true; + Ok(()) + }) + .into(); + + let response = + MqttMessage::new(properties, default_response_topic, &message); + + client + .publish( + response.topic, + &response.message, + // TODO: When Minimq supports more QoS levels, this should be increased to + // ensure that the client has received it at least once. + minimq::QoS::AtMostOnce, + &response.properties, + ) + .ok(); + }) { // If settings updated, Ok(_) => { if update { @@ -170,75 +189,7 @@ where } } - fn route_message<'a, 'me: 'a>( - &'me self, - topic: &str, - message: &[u8], - properties: &[minimq::Property<'a>], - ) -> (MqttMessage<'a>, bool) { - let mut update = false; - let response_msg = - if let Some(path) = topic.strip_prefix(self.id.as_str()) { - let mut parts = path[1..].split('/'); - match parts.next() { - Some("settings") => { - match self - .settings - .borrow_mut() - .string_set(parts.peekable(), message) - { - Ok(_) => { - update = true; - SettingsResponse::update_success(path) - } - Err(error) => { - SettingsResponse::update_failure(path, error) - } - } - } - Some(_) => SettingsResponse::code( - SettingsResponseCode::UnknownTopic, - ), - _ => SettingsResponse::code(SettingsResponseCode::NoTopic), - } - } else { - SettingsResponse::code(SettingsResponseCode::InvalidPrefix) - }; - - let response = MqttMessage::new( - properties, - &self.default_response_topic, - &response_msg, - ); - - (response, update) - } - - /// Publish telemetry to the default telemetry topic. - /// - /// # Note - /// Telemetry is transmitted in a "best-effort" manner. There is no guarantee it will be - /// transmitted. - /// - /// # Args - /// * `telemetry` - The telemetry message to transmit. - pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) { - let telemetry = - miniconf::serde_json_core::to_string::(telemetry) - .unwrap(); - self.mqtt - .borrow_mut() - .publish( - &self.telemetry_topic, - telemetry.as_bytes(), - minimq::QoS::AtMostOnce, - &[], - ) - .ok(); - } - - /// Get a copy of the current settings. - pub fn settings(&self) -> S { - self.settings.borrow().clone() + pub fn settings(&self) -> &S { + &self.settings } }