From 5c4ba78dd112834d380bbb0e397a03cca0c648b7 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 4 May 2021 13:13:44 +0200 Subject: [PATCH] Refactoring MQTT architecture --- src/bin/dual-iir.rs | 2 +- src/bin/lockin-external.rs | 2 +- src/net/messages.rs | 67 ++++----------- src/net/mod.rs | 2 +- src/net/mqtt_interface.rs | 171 ++++++++++++++----------------------- 5 files changed, 86 insertions(+), 158 deletions(-) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 65d3f2e..3b451f9 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -168,7 +168,7 @@ const APP: () = { let settings = c.resources.mqtt.settings(); // Update the IIR channels. - c.resources.settings.lock(|current| *current = settings); + c.resources.settings.lock(|current| *current = *settings); // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); diff --git a/src/bin/lockin-external.rs b/src/bin/lockin-external.rs index f9e9ae4..26634c0 100644 --- a/src/bin/lockin-external.rs +++ b/src/bin/lockin-external.rs @@ -215,7 +215,7 @@ const APP: () = { c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.1.set_gain(settings.afe[1]); - c.resources.settings.lock(|current| *current = settings); + c.resources.settings.lock(|current| *current = *settings); } #[task(binds = ETH, priority = 1)] diff --git a/src/net/messages.rs b/src/net/messages.rs index 2f54f52..167e440 100644 --- a/src/net/messages.rs +++ b/src/net/messages.rs @@ -6,10 +6,7 @@ use core::fmt::Write; #[derive(Debug, Copy, Clone)] pub enum SettingsResponseCode { NoError = 0, - NoTopic = 1, - InvalidPrefix = 2, - UnknownTopic = 3, - UpdateFailure = 4, + MiniconfError = 1, } /// Represents a generic MQTT message. @@ -70,55 +67,25 @@ impl<'a> MqttMessage<'a> { } } -impl SettingsResponse { - /// Construct a settings response upon successful settings update. - /// - /// # Args - /// * `path` - The path of the setting that was updated. - pub fn update_success(path: &str) -> Self { - let mut msg: String = String::new(); - if write!(&mut msg, "{} updated", path).is_err() { - msg = String::from("Latest update succeeded"); - } +impl From> for SettingsResponse { + fn from(result: Result<(), miniconf::Error>) -> Self { + match result { + Ok(_) => Self { + msg: String::from("OK"), + code: SettingsResponseCode::NoError as u8, + }, - Self { - msg, - code: SettingsResponseCode::NoError as u8, - } - } + Err(error) => { + let mut msg = String::new(); + if write!(&mut msg, "{:?}", error).is_err() { + msg = String::from("Miniconf Error"); + } - /// Construct a response when a settings update failed. - /// - /// # Args - /// * `path` - The settings path that configuration failed for. - /// * `err` - The settings update error that occurred. - pub fn update_failure(path: &str, err: miniconf::Error) -> Self { - let mut msg: String = String::new(); - if write!(&mut msg, "{} update failed: {:?}", path, err).is_err() { - if write!(&mut msg, "Latest update failed: {:?}", err).is_err() { - msg = String::from("Latest update failed"); + Self { + code: SettingsResponseCode::MiniconfError as u8, + msg, + } } } - - Self { - msg, - code: SettingsResponseCode::UpdateFailure as u8, - } - } - - /// Construct a response from a custom response code. - /// - /// # Args - /// * `code` - The response code to provide. - pub fn code(code: SettingsResponseCode) -> Self { - let mut msg: String = String::new(); - - // Note(unwrap): All code debug names shall fit in the 64 byte string. - write!(&mut msg, "{:?}", code).unwrap(); - - Self { - code: code as u8, - msg, - } } } diff --git a/src/net/mod.rs b/src/net/mod.rs index 9a86b85..75f92d6 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -11,7 +11,7 @@ use core::fmt::Write; mod messages; mod mqtt_interface; -use messages::{MqttMessage, SettingsResponse, SettingsResponseCode}; +use messages::{MqttMessage, SettingsResponse}; pub use mqtt_interface::MqttInterface; /// Potential actions for firmware to take. diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs index 209a67d..dabcbe8 100644 --- a/src/net/mqtt_interface.rs +++ b/src/net/mqtt_interface.rs @@ -2,27 +2,25 @@ use crate::hardware::{ design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, }; -use core::{cell::RefCell, fmt::Write}; +use core::fmt::Write; use heapless::{consts, String}; -use serde::Serialize; -use super::{Action, MqttMessage, SettingsResponse, SettingsResponseCode}; +use super::{Action, MqttMessage, SettingsResponse}; /// MQTT settings interface. pub struct MqttInterface where S: miniconf::Miniconf + Default + Clone, { - telemetry_topic: String, default_response_topic: String, - mqtt: RefCell>, - settings: RefCell, + mqtt: minimq::MqttClient, + settings: S, clock: CycleCounter, phy: EthernetPhy, network_was_reset: bool, subscribed: bool, - id: String, + settings_prefix: String, } impl MqttInterface @@ -44,23 +42,24 @@ where phy: EthernetPhy, clock: CycleCounter, ) -> Self { - let mqtt_client = + let mqtt = minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) .unwrap(); - let mut telemetry_topic: String = String::new(); - write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap(); - let mut response_topic: String = String::new(); write!(&mut response_topic, "{}/log", prefix).unwrap(); + let mut settings_prefix: String = String::new(); + write!(&mut settings_prefix, "{}/settings", prefix).unwrap(); + + // Ensure we have two remaining spaces + Self { - mqtt: RefCell::new(mqtt_client), - settings: RefCell::new(S::default()), - id: String::from(prefix), + mqtt, + settings: S::default(), + settings_prefix, clock, phy, - telemetry_topic, default_response_topic: response_topic, network_was_reset: false, subscribed: false, @@ -73,11 +72,7 @@ where /// An option containing an action that should be completed as a result of network servicing. pub fn update(&mut self) -> Option { // First, service the network stack to process any inbound and outbound traffic. - let sleep = match self - .mqtt - .borrow_mut() - .network_stack - .poll(self.clock.current_ms()) + let sleep = match self.mqtt.network_stack.poll(self.clock.current_ms()) { Ok(updated) => !updated, Err(err) => { @@ -93,13 +88,13 @@ where // sending an excessive number of DHCP requests. if !self.network_was_reset { self.network_was_reset = true; - self.mqtt.borrow_mut().network_stack.handle_link_reset(); + self.mqtt.network_stack.handle_link_reset(); } } else { self.network_was_reset = false; } - let mqtt_connected = match self.mqtt.borrow_mut().is_connected() { + let mqtt_connected = match self.mqtt.is_connected() { Ok(connected) => connected, Err(minimq::Error::Network( smoltcp_nal::NetworkError::NoIpAddress, @@ -117,34 +112,59 @@ where // If we're no longer subscribed to the settings topic, but we are connected to the broker, // resubscribe. if !self.subscribed && mqtt_connected { - let mut settings_topic: String = String::new(); - write!(&mut settings_topic, "{}/settings/#", self.id.as_str()) - .unwrap(); + // Note(unwrap): We construct a string with two more characters than the prefix + // strucutre, so we are guaranteed to have space for storage. + let mut settings_topic: String = + String::from(self.settings_prefix.as_str()); + settings_topic.push_str("/#").unwrap(); - self.mqtt - .borrow_mut() - .subscribe(&settings_topic, &[]) - .unwrap(); + self.mqtt.subscribe(&settings_topic, &[]).unwrap(); self.subscribed = true; } // Handle any MQTT traffic. + let settings = &mut self.settings; + let mqtt = &mut self.mqtt; + let prefix = self.settings_prefix.as_str(); + let default_response_topic = self.default_response_topic.as_str(); + let mut update = false; - match self.mqtt.borrow_mut().poll( - |client, topic, message, properties| { - let (response, settings_update) = - self.route_message(topic, message, properties); - client - .publish( - response.topic, - &response.message, - minimq::QoS::AtMostOnce, - &response.properties, - ) - .ok(); - update = settings_update; - }, - ) { + match mqtt.poll(|client, topic, message, properties| { + let path = match topic.strip_prefix(prefix) { + // For paths, we do not want to include the leading slash. + Some(path) => { + if path.len() > 0 { + &path[1..] + } else { + path + } + } + None => { + info!("Unexpected MQTT topic: {}", topic); + return; + } + }; + + let message: SettingsResponse = settings + .string_set(path.split('/').peekable(), message) + .and_then(|_| { + update = true; + Ok(()) + }) + .into(); + + let response = + MqttMessage::new(properties, default_response_topic, &message); + + client + .publish( + response.topic, + &response.message, + minimq::QoS::AtMostOnce, + &response.properties, + ) + .ok(); + }) { // If settings updated, Ok(_) => { if update { @@ -170,66 +190,7 @@ where } } - fn route_message<'a, 'me: 'a>( - &'me self, - topic: &str, - message: &[u8], - properties: &[minimq::Property<'a>], - ) -> (MqttMessage<'a>, bool) { - let mut update = false; - let response_msg = - if let Some(path) = topic.strip_prefix(self.id.as_str()) { - let mut parts = path[1..].split('/'); - match parts.next() { - Some("settings") => { - match self - .settings - .borrow_mut() - .string_set(parts.peekable(), message) - { - Ok(_) => { - update = true; - SettingsResponse::update_success(path) - } - Err(error) => { - SettingsResponse::update_failure(path, error) - } - } - } - Some(_) => SettingsResponse::code( - SettingsResponseCode::UnknownTopic, - ), - _ => SettingsResponse::code(SettingsResponseCode::NoTopic), - } - } else { - SettingsResponse::code(SettingsResponseCode::InvalidPrefix) - }; - - let response = MqttMessage::new( - properties, - &self.default_response_topic, - &response_msg, - ); - - (response, update) - } - - pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) { - let telemetry = - miniconf::serde_json_core::to_string::(telemetry) - .unwrap(); - self.mqtt - .borrow_mut() - .publish( - &self.telemetry_topic, - telemetry.as_bytes(), - minimq::QoS::AtMostOnce, - &[], - ) - .ok(); - } - - pub fn settings(&self) -> S { - self.settings.borrow().clone() + pub fn settings(&self) -> &S { + &self.settings } }