diff --git a/Cargo.lock b/Cargo.lock index 0bf7496..9db369a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -200,7 +200,7 @@ dependencies = [ [[package]] name = "derive_miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=9c826f8#9c826f8de8d0dd1a59e1ce7bf124ac0311994b46" +source = "git+https://github.com/quartiq/miniconf.git?rev=6c19ba2#6c19ba208eb426377ff6e09416fcabdf4fd3021d" dependencies = [ "proc-macro2", "quote", @@ -413,17 +413,21 @@ dependencies = [ [[package]] name = "miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=9c826f8#9c826f8de8d0dd1a59e1ce7bf124ac0311994b46" +source = "git+https://github.com/quartiq/miniconf.git?rev=6c19ba2#6c19ba208eb426377ff6e09416fcabdf4fd3021d" dependencies = [ "derive_miniconf", + "heapless 0.7.3", + "log", + "minimq", "serde", "serde-json-core", ] [[package]] name = "minimq" -version = "0.2.0" -source = "git+https://github.com/quartiq/minimq.git?rev=93813e3#93813e37e013b04ffb5e02aca7eca1859b1b2443" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ff318a5cbde4315f61fb86145ed2c00a49dc613a71aff920130b47d0a12490f" dependencies = [ "bit_field", "embedded-nal", diff --git a/Cargo.toml b/Cargo.toml index 2ade123..09f2031 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ shared-bus = {version = "0.2.2", features = ["cortex-m"] } serde-json-core = "0.4" mcp23017 = "1.0" mutex-trait = "0.2" +minimq = "0.3" # rtt-target bump [dependencies.rtt-logger] @@ -75,16 +76,12 @@ branch = "feature/assume-init" [patch.crates-io.miniconf] git = "https://github.com/quartiq/miniconf.git" -rev = "9c826f8" +rev = "6c19ba2" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" rev = "0634188" -[dependencies.minimq] -git = "https://github.com/quartiq/minimq.git" -rev = "93813e3" - [features] nightly = ["cortex-m/inline-asm", "dsp/nightly"] pounder_v1_1 = [ ] diff --git a/src/net/messages.rs b/src/net/messages.rs deleted file mode 100644 index 3887fef..0000000 --- a/src/net/messages.rs +++ /dev/null @@ -1,90 +0,0 @@ -use heapless::{String, Vec}; -use serde::Serialize; - -use core::fmt::Write; - -#[derive(Debug, Copy, Clone)] -pub enum SettingsResponseCode { - NoError = 0, - MiniconfError = 1, -} - -/// Represents a generic MQTT message. -pub struct MqttMessage<'a> { - pub topic: &'a str, - pub message: Vec, - pub properties: Vec, 1>, -} - -/// The payload of the MQTT response message to a settings update request. -#[derive(Serialize)] -pub struct SettingsResponse { - code: u8, - msg: String<64>, -} - -impl<'a> MqttMessage<'a> { - /// Construct a new MQTT message from an incoming message. - /// - /// # Args - /// * `properties` - A list of properties associated with the inbound message. - /// * `default_response` - The default response topic for the message - /// * `msg` - The response associated with the message. Must fit within 128 bytes. - pub fn new<'b: 'a>( - properties: &[minimq::Property<'a>], - default_response: &'b str, - msg: &impl Serialize, - ) -> Self { - // Extract the MQTT response topic. - let topic = properties - .iter() - .find_map(|prop| { - if let minimq::Property::ResponseTopic(topic) = prop { - Some(topic) - } else { - None - } - }) - .unwrap_or(&default_response); - - // Associate any provided correlation data with the response. - let mut correlation_data: Vec, 1> = Vec::new(); - if let Some(data) = properties - .iter() - .find(|prop| matches!(prop, minimq::Property::CorrelationData(_))) - { - // Note(unwrap): Unwrap can not fail, as we only ever push one value. - correlation_data.push(*data).unwrap(); - } - - Self { - topic, - // Note(unwrap): All SettingsResponse objects are guaranteed to fit in the vector. - message: miniconf::serde_json_core::to_vec(msg).unwrap(), - properties: correlation_data, - } - } -} - -impl From> for SettingsResponse { - fn from(result: Result<(), miniconf::Error>) -> Self { - match result { - Ok(_) => Self { - msg: String::from("OK"), - code: SettingsResponseCode::NoError as u8, - }, - - Err(error) => { - let mut msg = String::new(); - if write!(&mut msg, "{:?}", error).is_err() { - msg = String::from("Miniconf Error"); - } - - Self { - code: SettingsResponseCode::MiniconfError as u8, - msg, - } - } - } - } -} diff --git a/src/net/miniconf_client.rs b/src/net/miniconf_client.rs deleted file mode 100644 index 79750a8..0000000 --- a/src/net/miniconf_client.rs +++ /dev/null @@ -1,169 +0,0 @@ -///! Stabilizer Run-time Settings Client -///! -///! # Design -///! Stabilizer allows for settings to be configured at run-time via MQTT using miniconf. -///! Settings are written in serialized JSON form to the settings path associated with the setting. -///! -///! # Limitations -///! The MQTT client logs failures to subscribe to the settings topic, but does not re-attempt to -///connect to it when errors occur. -///! -///! Respones to settings updates are sent without quality-of-service guarantees, so there's no -///! guarantee that the requestee will be informed that settings have been applied. -use heapless::String; -use log::info; - -use super::{MqttMessage, NetworkReference, SettingsResponse, UpdateState}; -use minimq::embedded_nal::IpAddr; - -/// MQTT settings interface. -pub struct MiniconfClient -where - S: miniconf::Miniconf + Default + Clone, -{ - default_response_topic: String<128>, - mqtt: minimq::Minimq, - settings: S, - subscribed: bool, - settings_prefix: String<64>, -} - -impl MiniconfClient -where - S: miniconf::Miniconf + Default + Clone, -{ - /// Construct a new MQTT settings interface. - /// - /// # Args - /// * `stack` - The network stack to use for communication. - /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. - /// * `prefix` - The MQTT device prefix to use for this device. - /// * `broker` - The IP address of the MQTT broker to use. - pub fn new( - stack: NetworkReference, - client_id: &str, - prefix: &str, - broker: IpAddr, - ) -> Self { - let mqtt = minimq::Minimq::new(broker, client_id, stack).unwrap(); - - let mut response_topic: String<128> = String::from(prefix); - response_topic.push_str("/log").unwrap(); - - let mut settings_prefix: String<64> = String::from(prefix); - settings_prefix.push_str("/settings").unwrap(); - - Self { - mqtt, - settings: S::default(), - settings_prefix, - default_response_topic: response_topic, - subscribed: false, - } - } - - /// Update the MQTT interface and service the network - /// - /// # Returns - /// An option containing an action that should be completed as a result of network servicing. - pub fn update(&mut self) -> UpdateState { - let mqtt_connected = match self.mqtt.client.is_connected() { - Ok(connected) => connected, - Err(minimq::Error::Network( - smoltcp_nal::NetworkError::NoIpAddress, - )) => false, - Err(minimq::Error::Network(error)) => { - log::info!("Unexpected network error: {:?}", error); - false - } - Err(error) => { - log::warn!("Unexpected MQTT error: {:?}", error); - false - } - }; - - // If we're no longer subscribed to the settings topic, but we are connected to the broker, - // resubscribe. - if !self.subscribed && mqtt_connected { - log::info!("MQTT connected, subscribing to settings"); - // Note(unwrap): We construct a string with two more characters than the prefix - // strucutre, so we are guaranteed to have space for storage. - let mut settings_topic: String<66> = - String::from(self.settings_prefix.as_str()); - settings_topic.push_str("/#").unwrap(); - - // We do not currently handle or process potential subscription failures. Instead, this - // failure will be logged through the stabilizer logging interface. - self.mqtt.client.subscribe(&settings_topic, &[]).unwrap(); - self.subscribed = true; - } - - // Handle any MQTT traffic. - let settings = &mut self.settings; - let mqtt = &mut self.mqtt; - let prefix = self.settings_prefix.as_str(); - let default_response_topic = self.default_response_topic.as_str(); - - let mut update = false; - match mqtt.poll(|client, topic, message, properties| { - let path = match topic.strip_prefix(prefix) { - // For paths, we do not want to include the leading slash. - Some(path) => { - if !path.is_empty() { - &path[1..] - } else { - path - } - } - None => { - info!("Unexpected MQTT topic: {}", topic); - return; - } - }; - - log::info!("Settings update: `{}`", path); - - let message: SettingsResponse = settings - .string_set(path.split('/').peekable(), message) - .map(|_| { - update = true; - }) - .into(); - - let response = - MqttMessage::new(properties, default_response_topic, &message); - - client - .publish( - response.topic, - &response.message, - // TODO: When Minimq supports more QoS levels, this should be increased to - // ensure that the client has received it at least once. - minimq::QoS::AtMostOnce, - &response.properties, - ) - .ok(); - }) { - // If settings updated, - Ok(_) if update => UpdateState::Updated, - Ok(_) => UpdateState::NoChange, - Err(minimq::Error::SessionReset) => { - log::warn!("Settings MQTT session reset"); - self.subscribed = false; - UpdateState::NoChange - } - Err(minimq::Error::Network( - smoltcp_nal::NetworkError::NoIpAddress, - )) => UpdateState::NoChange, - Err(error) => { - log::info!("Unexpected error: {:?}", error); - UpdateState::NoChange - } - } - } - - /// Get the current settings from miniconf. - pub fn settings(&self) -> &S { - &self.settings - } -} diff --git a/src/net/mod.rs b/src/net/mod.rs index f7c88db..24cd36e 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -10,16 +10,12 @@ pub use miniconf; pub use serde; pub mod data_stream; -pub mod messages; -pub mod miniconf_client; pub mod network_processor; pub mod shared; pub mod telemetry; use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack}; use data_stream::{DataStream, FrameGenerator}; -use messages::{MqttMessage, SettingsResponse}; -use miniconf_client::MiniconfClient; use minimq::embedded_nal::IpAddr; use network_processor::NetworkProcessor; use shared::NetworkManager; @@ -49,8 +45,8 @@ pub enum NetworkState { NoChange, } /// A structure of Stabilizer's default network users. -pub struct NetworkUsers { - pub miniconf: MiniconfClient, +pub struct NetworkUsers { + pub miniconf: miniconf::MqttClient, pub processor: NetworkProcessor, stream: DataStream, generator: Option, @@ -59,7 +55,7 @@ pub struct NetworkUsers { impl NetworkUsers where - S: Default + Clone + Miniconf, + S: Default + Miniconf, T: Serialize, { /// Construct Stabilizer's default network users. @@ -94,12 +90,13 @@ where let prefix = get_device_prefix(app, mac); - let settings = MiniconfClient::new( + let settings = miniconf::MqttClient::new( stack_manager.acquire_stack(), &get_client_id(app, "settings", mac), &prefix, broker, - ); + ) + .unwrap(); let telemetry = TelemetryClient::new( stack_manager.acquire_stack(), @@ -164,8 +161,8 @@ where }; match self.miniconf.update() { - UpdateState::Updated => NetworkState::SettingsChanged, - UpdateState::NoChange => poll_result, + Ok(true) => NetworkState::SettingsChanged, + _ => poll_result, } } }