From 36cc423f70081886ae306c5b9fd9486b9d96bbeb Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 22 Apr 2021 15:16:33 +0200 Subject: [PATCH] Refactoring MQTT to support telemetry --- Cargo.lock | 6 +- Cargo.toml | 4 +- src/bin/dual-iir.rs | 22 +++--- src/bin/lockin-external.rs | 22 +++--- src/net/mod.rs | 148 ++++++++++++++++++++++++++++++------- 5 files changed, 146 insertions(+), 56 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 970506d..ff026ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,7 +203,7 @@ dependencies = [ [[package]] name = "derive_miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" +source = "git+https://github.com/quartiq/miniconf.git?branch=feature/mqtt-removal#6b6b9d7973ac5b4771c211c775c51f82d4a7727f" dependencies = [ "proc-macro2", "quote", @@ -416,11 +416,10 @@ dependencies = [ [[package]] name = "miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" +source = "git+https://github.com/quartiq/miniconf.git?branch=feature/mqtt-removal#6b6b9d7973ac5b4771c211c775c51f82d4a7727f" dependencies = [ "derive_miniconf", "heapless 0.6.1", - "minimq", "serde", "serde-json-core", ] @@ -742,6 +741,7 @@ dependencies = [ "log", "mcp23017", "miniconf", + "minimq", "nb 1.0.0", "panic-semihosting", "paste", diff --git a/Cargo.toml b/Cargo.toml index f4db4d8..db89ef5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,13 +56,13 @@ version = "0.9.0" [patch.crates-io.miniconf] git = "https://github.com/quartiq/miniconf.git" -rev = "314fa5587d" +branch = "feature/mqtt-removal" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" rev = "8468f11" -[patch.crates-io.minimq] +[dependencies.minimq] git = "https://github.com/quartiq/minimq.git" rev = "933687c2e4b" diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index eaa82ae..b275de0 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -13,7 +13,7 @@ use hardware::{ InputPin, AFE0, AFE1, }; -use net::{Action, MiniconfInterface}; +use net::{Action, MqttInterface}; const SCALE: f32 = i16::MAX as _; @@ -46,7 +46,7 @@ const APP: () = { digital_input1: DigitalInput1, adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), - mqtt_config: MiniconfInterface, + mqtt: MqttInterface, // Format: iir_state[ch][cascade-no][coeff] #[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])] @@ -59,7 +59,7 @@ const APP: () = { // Configure the microcontroller let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); - let mqtt_config = MiniconfInterface::new( + let mqtt = MqttInterface::new( stabilizer.net.stack, "", &net::get_device_prefix( @@ -86,7 +86,7 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, - mqtt_config, + mqtt, digital_input1: stabilizer.digital_inputs.1, settings: Settings::default(), } @@ -143,14 +143,10 @@ const APP: () = { } } - #[idle(resources=[mqtt_config], spawn=[settings_update])] + #[idle(resources=[mqtt], spawn=[settings_update])] fn idle(mut c: idle::Context) -> ! { loop { - match c - .resources - .mqtt_config - .lock(|config_interface| config_interface.update()) - { + match c.resources.mqtt.lock(|mqtt| mqtt.update()) { Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::UpdateSettings) => { c.spawn.settings_update().unwrap() @@ -160,12 +156,12 @@ const APP: () = { } } - #[task(priority = 1, resources=[mqtt_config, afes, settings])] + #[task(priority = 1, resources=[mqtt, afes, settings])] fn settings_update(mut c: settings_update::Context) { - let settings = &c.resources.mqtt_config.mqtt.settings; + let settings = c.resources.mqtt.settings(); // Update the IIR channels. - c.resources.settings.lock(|current| *current = *settings); + c.resources.settings.lock(|current| *current = settings); // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); diff --git a/src/bin/lockin-external.rs b/src/bin/lockin-external.rs index 082b0d2..f9e9ae4 100644 --- a/src/bin/lockin-external.rs +++ b/src/bin/lockin-external.rs @@ -16,7 +16,7 @@ use stabilizer::hardware::{ }; use miniconf::Miniconf; -use stabilizer::net::{Action, MiniconfInterface}; +use stabilizer::net::{Action, MqttInterface}; #[derive(Copy, Clone, Debug, Deserialize, Miniconf)] enum Conf { @@ -60,7 +60,7 @@ const APP: () = { afes: (AFE0, AFE1), adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), - mqtt_config: MiniconfInterface, + mqtt: MqttInterface, settings: Settings, timestamper: InputStamper, @@ -73,7 +73,7 @@ const APP: () = { // Configure the microcontroller let (mut stabilizer, _pounder) = setup(c.core, c.device); - let mqtt_config = MiniconfInterface::new( + let mqtt = MqttInterface::new( stabilizer.net.stack, "", &net::get_device_prefix( @@ -113,7 +113,7 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, - mqtt_config, + mqtt, timestamper: stabilizer.timestamper, settings, @@ -195,14 +195,10 @@ const APP: () = { } } - #[idle(resources=[mqtt_config], spawn=[settings_update])] + #[idle(resources=[mqtt], spawn=[settings_update])] fn idle(mut c: idle::Context) -> ! { loop { - match c - .resources - .mqtt_config - .lock(|config_interface| config_interface.update()) - { + match c.resources.mqtt.lock(|mqtt| mqtt.update()) { Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::UpdateSettings) => { c.spawn.settings_update().unwrap() @@ -212,14 +208,14 @@ const APP: () = { } } - #[task(priority = 1, resources=[mqtt_config, settings, afes])] + #[task(priority = 1, resources=[mqtt, settings, afes])] fn settings_update(mut c: settings_update::Context) { - let settings = &c.resources.mqtt_config.mqtt.settings; + let settings = c.resources.mqtt.settings(); c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.1.set_gain(settings.afe[1]); - c.resources.settings.lock(|current| *current = *settings); + c.resources.settings.lock(|current| *current = settings); } #[task(binds = ETH, priority = 1)] diff --git a/src/net/mod.rs b/src/net/mod.rs index 13b514c..e2398b8 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -2,10 +2,10 @@ use crate::hardware::{ design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, }; -use core::fmt::Write; +use core::{cell::RefCell, fmt::Write}; -use heapless::{consts, String}; -use miniconf::minimq; +use heapless::{consts, String, Vec}; +use serde::Serialize; /// Potential actions for firmware to take. pub enum Action { @@ -17,19 +17,22 @@ pub enum Action { } /// MQTT settings interface. -pub struct MiniconfInterface +pub struct MqttInterface where - S: miniconf::Miniconf + Default, + S: miniconf::Miniconf + Default + Clone, { - pub mqtt: miniconf::MqttInterface, + telemetry_topic: String, + mqtt: RefCell>, + miniconf: RefCell>, clock: CycleCounter, phy: EthernetPhy, network_was_reset: bool, + subscribed: bool, } -impl MiniconfInterface +impl MqttInterface where - S: miniconf::Miniconf + Default, + S: miniconf::Miniconf + Default + Clone, { /// Construct a new MQTT settings interface. /// @@ -46,21 +49,23 @@ where phy: EthernetPhy, clock: CycleCounter, ) -> Self { - let mqtt = { - let mqtt_client = { - minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) - .unwrap() - }; + let mqtt_client = + minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) + .unwrap(); + let config = + miniconf::MiniconfInterface::new(prefix, S::default()).unwrap(); - miniconf::MqttInterface::new(mqtt_client, prefix, S::default()) - .unwrap() - }; + let mut telemetry_topic: String = String::new(); + write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap(); Self { - mqtt, + mqtt: RefCell::new(mqtt_client), + miniconf: RefCell::new(config), clock, phy, + telemetry_topic, network_was_reset: false, + subscribed: false, } } @@ -72,7 +77,7 @@ where let now = self.clock.current_ms(); // First, service the network stack to process and inbound and outbound traffic. - let sleep = match self.mqtt.network_stack().poll(now) { + let sleep = match self.mqtt.borrow_mut().network_stack.poll(now) { Ok(updated) => !updated, Err(err) => { log::info!("Network error: {:?}", err); @@ -87,19 +92,93 @@ where // sending an excessive number of DHCP requests. if !self.network_was_reset { self.network_was_reset = true; - self.mqtt.network_stack().handle_link_reset(); + self.mqtt.borrow_mut().network_stack.handle_link_reset(); } } else { self.network_was_reset = false; } - // Finally, service the MQTT interface and handle any necessary messages. - match self.mqtt.update() { - Ok(true) => Some(Action::UpdateSettings), - Ok(false) if sleep => Some(Action::Sleep), - Ok(_) => None, + // If we're no longer subscribed to the settings topic, but we are connected to the broker, + // resubscribe. + if !self.subscribed && self.mqtt.borrow_mut().is_connected().unwrap() { + self.mqtt + .borrow_mut() + .subscribe( + self.miniconf.borrow_mut().get_listening_topic(), + &[], + ) + .unwrap(); + self.subscribed = true; + } - Err(miniconf::MqttError::Network( + let mut update = false; + + // Handle any MQTT traffic. + match self.mqtt.borrow_mut().poll( + |client, topic, message, properties| { + // Find correlation-data and response topics. + let correlation_data = properties.iter().find_map(|prop| { + if let minimq::Property::CorrelationData(data) = prop { + Some(*data) + } else { + None + } + }); + let response_topic = properties.iter().find_map(|prop| { + if let minimq::Property::ResponseTopic(topic) = prop { + Some(*topic) + } else { + None + } + }); + + let incoming = miniconf::Message { + data: message, + correlation_data, + response_topic, + }; + + if let Some(response) = + self.miniconf.borrow_mut().process(topic, incoming) + { + let mut response_properties: Vec< + minimq::Property, + consts::U1, + > = Vec::new(); + if let Some(data) = response.correlation_data { + response_properties + .push(minimq::Property::CorrelationData(data)) + .unwrap(); + } + + // Make a best-effort attempt to send the response. + client + .publish( + response.topic, + &response.data.into_bytes(), + minimq::QoS::AtMostOnce, + &response_properties, + ) + .ok(); + update = true; + } + }, + ) { + // If settings updated, + Ok(_) => { + if update { + Some(Action::UpdateSettings) + } else if sleep { + Some(Action::Sleep) + } else { + None + } + } + Err(minimq::Error::Disconnected) => { + self.subscribed = false; + None + } + Err(minimq::Error::Network( smoltcp_nal::NetworkError::NoIpAddress, )) => None, @@ -109,6 +188,25 @@ where } } } + + pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) { + let telemetry = + miniconf::serde_json_core::to_string::(telemetry) + .unwrap(); + self.mqtt + .borrow_mut() + .publish( + &self.telemetry_topic, + telemetry.as_bytes(), + minimq::QoS::AtMostOnce, + &[], + ) + .ok(); + } + + pub fn settings(&self) -> S { + self.miniconf.borrow().settings.clone() + } } /// Get the MQTT prefix of a device.