diff --git a/.github/bors.toml b/.github/bors.toml index 722246d..5d1bf89 100644 --- a/.github/bors.toml +++ b/.github/bors.toml @@ -4,4 +4,5 @@ status = [ "style", "test (stable)", "compile (stable)", + "HITL Run Status" ] diff --git a/.github/workflows/hitl_trigger.yml b/.github/workflows/hitl_trigger.yml index 62abb96..b275421 100644 --- a/.github/workflows/hitl_trigger.yml +++ b/.github/workflows/hitl_trigger.yml @@ -2,8 +2,10 @@ name: HITL Trigger on: workflow_dispatch: - pull_request: - branches: [ master ] + push: + branches: + - staging + - trying jobs: hitl-trigger: diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a42efa..04aa2fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,73 @@ -# Changelog +# Change Log -## [v0.2.0] 2019-05-28 +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/) +and this project adheres to [Semantic Versioning](http://semver.org/). + +## [Unreleased] + +### Added + +### Changed + +### Fixed + +## [v0.5.0] - 2021-04-21 + +### Added + +* Batch sample processing +* DMA for ADC and DAC batches +* Pounder profile streaming +* DSP library with lots of optimized algorithms +* Digital input support +* Hardware in the loop continuous integration testing +* Dependency updates +* MQTT settings interface through miniconf/minimq +* Multi-binary support +* DHCP support + +### Changed + +* Removed JSON-over-TCP interface + +### Fixed + +* Robust EEPROM MAC address reading slow supply start + +## [v0.4.1] - 2020-06-23 + +### Fixed + +* Fix DAC clr/ldac, SPI speed + +## [v0.4.0] - 2020-06-22 + +### Added + +* Hardware v1.1 only +* AD9959/Pounder support + +### Changed + +* HAL port + +## [v0.3.0] - 2020-01-20 + +### Added + +* Red LED handling +* EEPROM MAC address reading + +### Changed + +* Panic handler cleanup +* Dependency updates (smoltcp, rtfm) + +## [v0.2.0] - 2019-05-28 + +### Added * Initial basic release * Ethernet support @@ -9,6 +76,15 @@ * ADC/DAC timing and interrupts * Board configuration, bootstrap -## [v0.1.0] 2019-03-10 +## [v0.1.0] - 2019-03-10 + +### Added * First bits of code published + +[Unreleased]: https://github.com/quartiq/stabilizer/compare/v0.5.0...HEAD +[v0.5.0]: https://github.com/quartiq/stabilizer/compare/v0.4.1...v0.5.0 +[v0.4.1]: https://github.com/quartiq/stabilizer/compare/v0.4.0...v0.4.1 +[v0.4.0]: https://github.com/quartiq/stabilizer/compare/v0.3.0...v0.4.0 +[v0.3.0]: https://github.com/quartiq/stabilizer/compare/v0.2.0...v0.3.0 +[v0.2.0]: https://github.com/quartiq/stabilizer/compare/v0.1.0...v0.2.0 diff --git a/Cargo.lock b/Cargo.lock index 970506d..aef5a92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,7 +203,7 @@ dependencies = [ [[package]] name = "derive_miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" +source = "git+https://github.com/quartiq/miniconf.git?rev=c8d819c#c8d819cdab65f18396e65dafa3558daea29e3031" dependencies = [ "proc-macro2", "quote", @@ -416,11 +416,10 @@ dependencies = [ [[package]] name = "miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" +source = "git+https://github.com/quartiq/miniconf.git?rev=c8d819c#c8d819cdab65f18396e65dafa3558daea29e3031" dependencies = [ "derive_miniconf", "heapless 0.6.1", - "minimq", "serde", "serde-json-core", ] @@ -428,7 +427,7 @@ dependencies = [ [[package]] name = "minimq" version = "0.2.0" -source = "git+https://github.com/quartiq/minimq.git?rev=933687c2e4b#933687c2e4bc8a4d972de9a4d1508b0b554a8b38" +source = "git+https://github.com/quartiq/minimq.git?rev=b3f364d#b3f364d55dea35da6572f78ddb91c87bfbb453bf" dependencies = [ "bit_field", "embedded-nal", @@ -658,6 +657,12 @@ dependencies = [ "semver", ] +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + [[package]] name = "semver" version = "0.9.0" @@ -685,9 +690,10 @@ dependencies = [ [[package]] name = "serde-json-core" version = "0.2.0" -source = "git+https://github.com/rust-embedded-community/serde-json-core.git?rev=ee06ac91bc#ee06ac91bc43b72450a92198a00d9e5c5b9946d2" +source = "git+https://github.com/rust-embedded-community/serde-json-core.git?rev=da460d1#da460d123e217f0e822a3977eb2170ed5d279d5e" dependencies = [ - "heapless 0.5.6", + "heapless 0.6.1", + "ryu", "serde", ] @@ -726,7 +732,7 @@ dependencies = [ [[package]] name = "stabilizer" -version = "0.4.1" +version = "0.5.0" dependencies = [ "ad9959", "asm-delay", @@ -742,6 +748,7 @@ dependencies = [ "log", "mcp23017", "miniconf", + "minimq", "nb 1.0.0", "panic-semihosting", "paste", diff --git a/Cargo.toml b/Cargo.toml index f4db4d8..4ab885e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "stabilizer" -version = "0.4.1" +version = "0.5.0" authors = ["Robert Jördens "] description = "Firmware for the Sinara Stabilizer board (stm32h743, eth, poe, 2 adc, 2 dac)" categories = ["embedded", "no-std", "hardware-support", "science"] @@ -56,19 +56,19 @@ version = "0.9.0" [patch.crates-io.miniconf] git = "https://github.com/quartiq/miniconf.git" -rev = "314fa5587d" +rev = "c8d819c" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" rev = "8468f11" -[patch.crates-io.minimq] +[dependencies.minimq] git = "https://github.com/quartiq/minimq.git" -rev = "933687c2e4b" +rev = "b3f364d" [patch.crates-io.serde-json-core] git = "https://github.com/rust-embedded-community/serde-json-core.git" -rev = "ee06ac91bc" +rev = "da460d1" [features] semihosting = ["panic-semihosting", "cortex-m-log/semihosting"] diff --git a/miniconf.py b/miniconf.py index b059f78..77e491c 100644 --- a/miniconf.py +++ b/miniconf.py @@ -9,6 +9,7 @@ import argparse import asyncio import json import logging +import sys from gmqtt import Client as MqttClient @@ -51,7 +52,8 @@ class Miniconf: logger.warning('Unexpected response on topic: %s', topic) return - self.inflight[topic].set_result(payload.decode('ascii')) + response = json.loads(payload) + self.inflight[topic].set_result((response['code'], response['msg'])) del self.inflight[topic] async def command(self, path, value): @@ -62,7 +64,8 @@ class Miniconf: value: The value to write to the path. Returns: - The received response to the command. + (code, msg) tuple as a response to the command. `code` is zero for success and `msg` is + a use-readable message indicating further information. """ setting_topic = f'{self.prefix}/settings/{path}' response_topic = f'{self.prefix}/response/{path}' @@ -107,12 +110,17 @@ def main(): async def configure_settings(): interface = await Miniconf.create(args.prefix, args.broker) + failures = 0 for kv in args.settings: path, value = kv.split("=", 1) - response = await interface.command(path, json.loads(value)) + code, response = await interface.command(path, json.loads(value)) print(response) + if code != 0: + failures += 1 - loop.run_until_complete(configure_settings()) + return failures + + sys.exit(loop.run_until_complete(configure_settings())) if __name__ == '__main__': diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 9f48989..4ceb04c 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -4,7 +4,7 @@ use stabilizer::{hardware, net}; -use miniconf::{minimq, Miniconf}; +use miniconf::Miniconf; use serde::Deserialize; use dsp::iir; @@ -13,7 +13,7 @@ use hardware::{ DigitalInput1, InputPin, SystemTimer, AFE0, AFE1, }; -use net::{Action, MiniconfInterface}; +use net::{Action, MqttInterface}; const SCALE: f32 = i16::MAX as _; @@ -32,9 +32,17 @@ pub struct Settings { impl Default for Settings { fn default() -> Self { Self { + // Analog frontend programmable gain amplifier gains (G1, G2, G5, G10) afe: [AfeGain::G1, AfeGain::G1], + // IIR filter tap gains are an array `[b0, b1, b2, a1, a2]` such that the + // new output is computed as `y0 = a1*y1 + a2*y2 + b0*x0 + b1*x1 + b2*x2`. + // The array is `iir_state[channel-index][cascade-index][coeff-index]`. + // The IIR coefficients can be mapped to other transfer function + // representations, for example as described in https://arxiv.org/abs/1508.06319 iir_ch: [[iir::IIR::new(1., -SCALE, SCALE); IIR_CASCADE_LENGTH]; 2], + // Permit the DI1 digital input to suppress filter output updates. allow_hold: false, + // Force suppress filter output updates. force_hold: false, telemetry_period_secs: 10, } @@ -48,11 +56,10 @@ const APP: () = { digital_inputs: (DigitalInput0, DigitalInput1), adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), - mqtt_config: MiniconfInterface, + mqtt: MqttInterface, telemetry: net::Telemetry, settings: Settings, - // Format: iir_state[ch][cascade-no][coeff] #[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])] iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2], } @@ -62,7 +69,7 @@ const APP: () = { // Configure the microcontroller let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); - let mqtt_config = MiniconfInterface::new( + let mqtt = MqttInterface::new( stabilizer.net.stack, "", &net::get_device_prefix( @@ -90,9 +97,9 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, - telemetry: net::Telemetry::default(), + mqtt, digital_inputs: stabilizer.digital_inputs, - mqtt_config, + telemetry: net::Telemetry::default(), settings: Settings::default(), } } @@ -161,14 +168,10 @@ const APP: () = { ]; } - #[idle(resources=[mqtt_config], spawn=[settings_update])] + #[idle(resources=[mqtt], spawn=[settings_update])] fn idle(mut c: idle::Context) -> ! { loop { - match c - .resources - .mqtt_config - .lock(|config_interface| config_interface.update()) - { + match c.resources.mqtt.lock(|mqtt| mqtt.update()) { Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::UpdateSettings) => { c.spawn.settings_update().unwrap() @@ -178,42 +181,23 @@ const APP: () = { } } - #[task(priority = 1, resources=[mqtt_config, afes, settings])] + #[task(priority = 1, resources=[mqtt, afes, settings])] fn settings_update(mut c: settings_update::Context) { - let settings = &c.resources.mqtt_config.mqtt.settings; - // Update the IIR channels. - c.resources.settings.lock(|current| *current = *settings); + let settings = c.resources.mqtt.settings(); + c.resources.settings.lock(|current| *current = settings); // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.1.set_gain(settings.afe[1]); } - #[task(priority = 1, resources=[mqtt_config, settings, telemetry], schedule=[telemetry])] + #[task(priority = 1, resources=[mqtt, settings, telemetry], schedule=[telemetry])] fn telemetry(mut c: telemetry::Context) { let telemetry = c.resources.telemetry.lock(|telemetry| telemetry.clone()); - // Serialize telemetry outside of a critical section to prevent blocking the processing - // task. - let telemetry = miniconf::serde_json_core::to_string::< - heapless::consts::U256, - _, - >(&telemetry) - .unwrap(); - - c.resources.mqtt_config.mqtt.client(|client| { - // TODO: Incorporate current MQTT prefix instead of hard-coded value. - client - .publish( - "dt/sinara/dual-iir/telemetry", - telemetry.as_bytes(), - minimq::QoS::AtMostOnce, - &[], - ) - .ok() - }); + c.resources.mqtt.publish_telemetry(&telemetry); let telemetry_period = c .resources diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index f2f7181..6e0b4c9 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -17,8 +17,8 @@ use stabilizer::hardware::{ AFE1, }; -use miniconf::{minimq, Miniconf}; -use stabilizer::net::{Action, MiniconfInterface}; +use miniconf::Miniconf; +use stabilizer::net::{Action, MqttInterface}; #[derive(Copy, Clone, Debug, Deserialize, Miniconf)] enum Conf { @@ -64,7 +64,7 @@ const APP: () = { afes: (AFE0, AFE1), adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), - mqtt_config: MiniconfInterface, + mqtt: MqttInterface, settings: Settings, telemetry: net::Telemetry, digital_inputs: (DigitalInput0, DigitalInput1), @@ -79,7 +79,7 @@ const APP: () = { // Configure the microcontroller let (mut stabilizer, _pounder) = setup(c.core, c.device); - let mqtt_config = MiniconfInterface::new( + let mqtt = MqttInterface::new( stabilizer.net.stack, "", &net::get_device_prefix( @@ -120,8 +120,8 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, + mqtt, digital_inputs: stabilizer.digital_inputs, - mqtt_config, timestamper: stabilizer.timestamper, telemetry: net::Telemetry::default(), @@ -212,14 +212,10 @@ const APP: () = { [dac_samples[0][0] as i16, dac_samples[1][0] as i16]; } - #[idle(resources=[mqtt_config], spawn=[settings_update])] + #[idle(resources=[mqtt], spawn=[settings_update])] fn idle(mut c: idle::Context) -> ! { loop { - match c - .resources - .mqtt_config - .lock(|config_interface| config_interface.update()) - { + match c.resources.mqtt.lock(|mqtt| mqtt.update()) { Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::UpdateSettings) => { c.spawn.settings_update().unwrap() @@ -229,17 +225,17 @@ const APP: () = { } } - #[task(priority = 1, resources=[mqtt_config, settings, afes])] + #[task(priority = 1, resources=[mqtt, settings, afes])] fn settings_update(mut c: settings_update::Context) { - let settings = &c.resources.mqtt_config.mqtt.settings; + let settings = c.resources.mqtt.settings(); c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.1.set_gain(settings.afe[1]); - c.resources.settings.lock(|current| *current = *settings); + c.resources.settings.lock(|current| *current = settings); } - #[task(priority = 1, resources=[mqtt_config, digital_inputs, settings, telemetry], schedule=[telemetry])] + #[task(priority = 1, resources=[mqtt, digital_inputs, settings, telemetry], schedule=[telemetry])] fn telemetry(mut c: telemetry::Context) { let mut telemetry = c.resources.telemetry.lock(|telemetry| telemetry.clone()); @@ -249,25 +245,7 @@ const APP: () = { c.resources.digital_inputs.1.is_high().unwrap(), ]; - // Serialize telemetry outside of a critical section to prevent blocking the processing - // task. - let telemetry = miniconf::serde_json_core::to_string::< - heapless::consts::U256, - _, - >(&telemetry) - .unwrap(); - - c.resources.mqtt_config.mqtt.client(|client| { - // TODO: Incorporate current MQTT prefix instead of hard-coded value. - client - .publish( - "dt/sinara/dual-iir/telemetry", - telemetry.as_bytes(), - minimq::QoS::AtMostOnce, - &[], - ) - .ok() - }); + c.resources.mqtt.publish_telemetry(&telemetry); let telemetry_period = c .resources diff --git a/src/net/messages.rs b/src/net/messages.rs new file mode 100644 index 0000000..2f54f52 --- /dev/null +++ b/src/net/messages.rs @@ -0,0 +1,124 @@ +use heapless::{consts, String, Vec}; +use serde::Serialize; + +use core::fmt::Write; + +#[derive(Debug, Copy, Clone)] +pub enum SettingsResponseCode { + NoError = 0, + NoTopic = 1, + InvalidPrefix = 2, + UnknownTopic = 3, + UpdateFailure = 4, +} + +/// Represents a generic MQTT message. +pub struct MqttMessage<'a> { + pub topic: &'a str, + pub message: Vec, + pub properties: Vec, consts::U1>, +} + +/// The payload of the MQTT response message to a settings update request. +#[derive(Serialize)] +pub struct SettingsResponse { + code: u8, + msg: String, +} + +impl<'a> MqttMessage<'a> { + /// Construct a new MQTT message from an incoming message. + /// + /// # Args + /// * `properties` - A list of properties associated with the inbound message. + /// * `default_response` - The default response topic for the message + /// * `msg` - The response associated with the message. Must fit within 128 bytes. + pub fn new<'b: 'a>( + properties: &[minimq::Property<'a>], + default_response: &'b str, + msg: &impl Serialize, + ) -> Self { + // Extract the MQTT response topic. + let topic = properties + .iter() + .find_map(|prop| { + if let minimq::Property::ResponseTopic(topic) = prop { + Some(topic) + } else { + None + } + }) + .unwrap_or(&default_response); + + // Associate any provided correlation data with the response. + let mut correlation_data: Vec, consts::U1> = + Vec::new(); + if let Some(data) = properties + .iter() + .find(|prop| matches!(prop, minimq::Property::CorrelationData(_))) + { + // Note(unwrap): Unwrap can not fail, as we only ever push one value. + correlation_data.push(*data).unwrap(); + } + + Self { + topic, + // Note(unwrap): All SettingsResponse objects are guaranteed to fit in the vector. + message: miniconf::serde_json_core::to_vec(msg).unwrap(), + properties: correlation_data, + } + } +} + +impl SettingsResponse { + /// Construct a settings response upon successful settings update. + /// + /// # Args + /// * `path` - The path of the setting that was updated. + pub fn update_success(path: &str) -> Self { + let mut msg: String = String::new(); + if write!(&mut msg, "{} updated", path).is_err() { + msg = String::from("Latest update succeeded"); + } + + Self { + msg, + code: SettingsResponseCode::NoError as u8, + } + } + + /// Construct a response when a settings update failed. + /// + /// # Args + /// * `path` - The settings path that configuration failed for. + /// * `err` - The settings update error that occurred. + pub fn update_failure(path: &str, err: miniconf::Error) -> Self { + let mut msg: String = String::new(); + if write!(&mut msg, "{} update failed: {:?}", path, err).is_err() { + if write!(&mut msg, "Latest update failed: {:?}", err).is_err() { + msg = String::from("Latest update failed"); + } + } + + Self { + msg, + code: SettingsResponseCode::UpdateFailure as u8, + } + } + + /// Construct a response from a custom response code. + /// + /// # Args + /// * `code` - The response code to provide. + pub fn code(code: SettingsResponseCode) -> Self { + let mut msg: String = String::new(); + + // Note(unwrap): All code debug names shall fit in the 64 byte string. + write!(&mut msg, "{:?}", code).unwrap(); + + Self { + code: code as u8, + msg, + } + } +} diff --git a/src/net/mod.rs b/src/net/mod.rs index 2a4ba63..6a36652 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,11 +1,18 @@ -use crate::hardware::{ - design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, -}; +///! Stabilizer network management module +///! +///! # Design +///! The stabilizer network architecture supports numerous layers to permit transmission of +///! telemetry (via MQTT), configuration of run-time settings (via MQTT + Miniconf), and live data +///! streaming over raw UDP/TCP sockets. This module encompasses the main processing routines +///! related to Stabilizer networking operations. +use heapless::{consts, String}; use core::fmt::Write; -use heapless::{consts, String}; -use miniconf::minimq; +mod messages; +mod mqtt_interface; +use messages::{MqttMessage, SettingsResponse, SettingsResponseCode}; +pub use mqtt_interface::MqttInterface; mod telemetry; pub use telemetry::Telemetry; @@ -19,101 +26,6 @@ pub enum Action { UpdateSettings, } -/// MQTT settings interface. -pub struct MiniconfInterface -where - S: miniconf::Miniconf + Default, -{ - pub mqtt: miniconf::MqttInterface, - clock: CycleCounter, - phy: EthernetPhy, - network_was_reset: bool, -} - -impl MiniconfInterface -where - S: miniconf::Miniconf + Default, -{ - /// Construct a new MQTT settings interface. - /// - /// # Args - /// * `stack` - The network stack to use for communication. - /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. - /// * `prefix` - The MQTT device prefix to use for this device. - /// * `phy` - The PHY driver for querying the link state. - /// * `clock` - The clock to utilize for querying the current system time. - pub fn new( - stack: NetworkStack, - client_id: &str, - prefix: &str, - phy: EthernetPhy, - clock: CycleCounter, - ) -> Self { - let mqtt = { - let mqtt_client = { - minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) - .unwrap() - }; - - miniconf::MqttInterface::new(mqtt_client, prefix, S::default()) - .unwrap() - }; - - Self { - mqtt, - clock, - phy, - network_was_reset: false, - } - } - - /// Update the MQTT interface and service the network - /// - /// # Returns - /// An option containing an action that should be completed as a result of network servicing. - pub fn update(&mut self) -> Option { - let now = self.clock.current_ms(); - - // First, service the network stack to process and inbound and outbound traffic. - let sleep = match self.mqtt.network_stack().poll(now) { - Ok(updated) => !updated, - Err(err) => { - log::info!("Network error: {:?}", err); - false - } - }; - - // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network - // stack. - if self.phy.poll_link() == false { - // Only reset the network stack once per link reconnection. This prevents us from - // sending an excessive number of DHCP requests. - if !self.network_was_reset { - self.network_was_reset = true; - self.mqtt.network_stack().handle_link_reset(); - } - } else { - self.network_was_reset = false; - } - - // Finally, service the MQTT interface and handle any necessary messages. - match self.mqtt.update() { - Ok(true) => Some(Action::UpdateSettings), - Ok(false) if sleep => Some(Action::Sleep), - Ok(_) => None, - - Err(miniconf::MqttError::Network( - smoltcp_nal::NetworkError::NoIpAddress, - )) => None, - - Err(error) => { - log::info!("Unexpected error: {:?}", error); - None - } - } - } -} - /// Get the MQTT prefix of a device. /// /// # Args diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs new file mode 100644 index 0000000..9b73793 --- /dev/null +++ b/src/net/mqtt_interface.rs @@ -0,0 +1,244 @@ +use crate::hardware::{ + design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, +}; + +use core::{cell::RefCell, fmt::Write}; + +use heapless::{consts, String}; +use serde::Serialize; + +use super::{Action, MqttMessage, SettingsResponse, SettingsResponseCode}; + +/// MQTT settings interface. +pub struct MqttInterface +where + S: miniconf::Miniconf + Default + Clone, +{ + telemetry_topic: String, + default_response_topic: String, + mqtt: RefCell>, + settings: RefCell, + clock: CycleCounter, + phy: EthernetPhy, + network_was_reset: bool, + subscribed: bool, + id: String, +} + +impl MqttInterface +where + S: miniconf::Miniconf + Default + Clone, +{ + /// Construct a new MQTT settings interface. + /// + /// # Args + /// * `stack` - The network stack to use for communication. + /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. + /// * `prefix` - The MQTT device prefix to use for this device. + /// * `phy` - The PHY driver for querying the link state. + /// * `clock` - The clock to utilize for querying the current system time. + pub fn new( + stack: NetworkStack, + client_id: &str, + prefix: &str, + phy: EthernetPhy, + clock: CycleCounter, + ) -> Self { + let mqtt_client = + minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) + .unwrap(); + + let mut telemetry_topic: String = String::new(); + write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap(); + + let mut response_topic: String = String::new(); + write!(&mut response_topic, "{}/log", prefix).unwrap(); + + Self { + mqtt: RefCell::new(mqtt_client), + settings: RefCell::new(S::default()), + id: String::from(prefix), + clock, + phy, + telemetry_topic, + default_response_topic: response_topic, + network_was_reset: false, + subscribed: false, + } + } + + /// Update the MQTT interface and service the network + /// + /// # Returns + /// An option containing an action that should be completed as a result of network servicing. + pub fn update(&mut self) -> Option { + // First, service the network stack to process any inbound and outbound traffic. + let sleep = match self + .mqtt + .borrow_mut() + .network_stack + .poll(self.clock.current_ms()) + { + Ok(updated) => !updated, + Err(err) => { + log::info!("Network error: {:?}", err); + false + } + }; + + // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network + // stack. + if self.phy.poll_link() == false { + // Only reset the network stack once per link reconnection. This prevents us from + // sending an excessive number of DHCP requests. + if !self.network_was_reset { + self.network_was_reset = true; + self.mqtt.borrow_mut().network_stack.handle_link_reset(); + } + } else { + self.network_was_reset = false; + } + + let mqtt_connected = match self.mqtt.borrow_mut().is_connected() { + Ok(connected) => connected, + Err(minimq::Error::Network( + smoltcp_nal::NetworkError::NoIpAddress, + )) => false, + Err(minimq::Error::Network(error)) => { + log::info!("Unexpected network error: {:?}", error); + false + } + Err(error) => { + log::warn!("Unexpected MQTT error: {:?}", error); + false + } + }; + + // If we're no longer subscribed to the settings topic, but we are connected to the broker, + // resubscribe. + if !self.subscribed && mqtt_connected { + let mut settings_topic: String = String::new(); + write!(&mut settings_topic, "{}/settings/#", self.id.as_str()) + .unwrap(); + + self.mqtt + .borrow_mut() + .subscribe(&settings_topic, &[]) + .unwrap(); + self.subscribed = true; + } + + // Handle any MQTT traffic. + let mut update = false; + match self.mqtt.borrow_mut().poll( + |client, topic, message, properties| { + let (response, settings_update) = + self.route_message(topic, message, properties); + client + .publish( + response.topic, + &response.message, + minimq::QoS::AtMostOnce, + &response.properties, + ) + .ok(); + update = settings_update; + }, + ) { + // If settings updated, + Ok(_) => { + if update { + Some(Action::UpdateSettings) + } else if sleep { + Some(Action::Sleep) + } else { + None + } + } + Err(minimq::Error::Disconnected) => { + self.subscribed = false; + None + } + Err(minimq::Error::Network( + smoltcp_nal::NetworkError::NoIpAddress, + )) => None, + + Err(error) => { + log::info!("Unexpected error: {:?}", error); + None + } + } + } + + fn route_message<'a, 'me: 'a>( + &'me self, + topic: &str, + message: &[u8], + properties: &[minimq::Property<'a>], + ) -> (MqttMessage<'a>, bool) { + let mut update = false; + let response_msg = + if let Some(path) = topic.strip_prefix(self.id.as_str()) { + let mut parts = path[1..].split('/'); + match parts.next() { + Some("settings") => { + match self + .settings + .borrow_mut() + .string_set(parts.peekable(), message) + { + Ok(_) => { + update = true; + SettingsResponse::update_success(path) + } + Err(error) => { + SettingsResponse::update_failure(path, error) + } + } + } + Some(_) => SettingsResponse::code( + SettingsResponseCode::UnknownTopic, + ), + _ => SettingsResponse::code(SettingsResponseCode::NoTopic), + } + } else { + SettingsResponse::code(SettingsResponseCode::InvalidPrefix) + }; + + let response = MqttMessage::new( + properties, + &self.default_response_topic, + &response_msg, + ); + + (response, update) + } + + /// Publish telemetry to the default telemetry topic. + /// + /// # Note + /// Telemetry is transmitted in a "best-effort" manner. There is no guarantee it will be + /// transmitted. + /// + /// # Args + /// * `telemetry` - The telemetry message to transmit. + pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) { + let telemetry = + miniconf::serde_json_core::to_string::(telemetry) + .unwrap(); + self.mqtt + .borrow_mut() + .publish( + &self.telemetry_topic, + telemetry.as_bytes(), + minimq::QoS::AtMostOnce, + &[], + ) + .ok(); + } + + /// Get a copy of the current settings. + pub fn settings(&self) -> S { + self.settings.borrow().clone() + } +}