diff --git a/Cargo.lock b/Cargo.lock index 13c7828..9007083 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -203,6 +203,7 @@ dependencies = [ [[package]] name = "derive_miniconf" version = "0.1.0" +source = "git+https://github.com/quartiq/miniconf.git?rev=c8d819c#c8d819cdab65f18396e65dafa3558daea29e3031" dependencies = [ "proc-macro2", "quote", @@ -415,10 +416,10 @@ dependencies = [ [[package]] name = "miniconf" version = "0.1.0" +source = "git+https://github.com/quartiq/miniconf.git?rev=c8d819c#c8d819cdab65f18396e65dafa3558daea29e3031" dependencies = [ "derive_miniconf", "heapless 0.6.1", - "minimq", "serde", "serde-json-core", ] @@ -426,7 +427,7 @@ dependencies = [ [[package]] name = "minimq" version = "0.2.0" -source = "git+https://github.com/quartiq/minimq.git?rev=933687c2e4b#933687c2e4bc8a4d972de9a4d1508b0b554a8b38" +source = "git+https://github.com/quartiq/minimq.git?branch=rs/issue-40/copyable-properties#c95c758b620ee98752852bb643df8557a7200f3f" dependencies = [ "bit_field", "embedded-nal", @@ -656,6 +657,12 @@ dependencies = [ "semver", ] +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + [[package]] name = "semver" version = "0.9.0" @@ -683,9 +690,10 @@ dependencies = [ [[package]] name = "serde-json-core" version = "0.2.0" -source = "git+https://github.com/rust-embedded-community/serde-json-core.git?rev=ee06ac91bc#ee06ac91bc43b72450a92198a00d9e5c5b9946d2" +source = "git+https://github.com/quartiq/serde-json-core.git?branch=feature/dependency-update#a304506a1efb4a90a6ef3faf71ec3ef5f8433fb4" dependencies = [ - "heapless 0.5.6", + "heapless 0.6.1", + "ryu", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 00ba5c6..29480d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -44,7 +44,7 @@ paste = "1" dsp = { path = "dsp" } ad9959 = { path = "ad9959" } generic-array = "0.14" -miniconf = { version = "0.1.0", features = ["minimq-support"] } +miniconf = "0.1.0" [dependencies.mcp23017] git = "https://github.com/mrd0ll4r/mcp23017.git" @@ -55,13 +55,8 @@ features = ["stm32h743v", "rt", "unproven", "ethernet", "quadspi"] version = "0.9.0" [patch.crates-io.miniconf] -path = "../miniconf" -# git = "https://github.com/quartiq/miniconf.git" -# branch = "feature/mqtt-removal" - -[patch.crates-io.minimq] -git = "https://github.com/quartiq/minimq.git" -rev = "933687c2e4b" +git = "https://github.com/quartiq/miniconf.git" +rev = "c8d819c" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" @@ -69,11 +64,11 @@ rev = "8468f11" [dependencies.minimq] git = "https://github.com/quartiq/minimq.git" -rev = "933687c2e4b" +branch = "rs/issue-40/copyable-properties" [patch.crates-io.serde-json-core] -git = "https://github.com/rust-embedded-community/serde-json-core.git" -rev = "ee06ac91bc" +git = "https://github.com/quartiq/serde-json-core.git" +branch = "feature/dependency-update" [features] semihosting = ["panic-semihosting", "cortex-m-log/semihosting"] diff --git a/src/net/mod.rs b/src/net/mod.rs index de55d1a..a1e1399 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,11 +1,11 @@ -use crate::hardware::{ - design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, -}; +use heapless::{consts, String}; -use core::{cell::RefCell, fmt::Write}; +use core::fmt::Write; -use heapless::{consts, String, Vec}; -use serde::Serialize; +mod mqtt_interface; +mod router; +pub use mqtt_interface::MqttInterface; +use router::{RouteResult, SettingsResponse}; /// Potential actions for firmware to take. pub enum Action { @@ -16,178 +16,6 @@ pub enum Action { UpdateSettings, } -/// MQTT settings interface. -pub struct MqttInterface -where - S: miniconf::Miniconf + Default + Clone, -{ - telemetry_topic: String, - mqtt: RefCell>, - miniconf: RefCell>, - clock: CycleCounter, - phy: EthernetPhy, - network_was_reset: bool, - subscribed: bool, -} - -impl MqttInterface -where - S: miniconf::Miniconf + Default + Clone, -{ - /// Construct a new MQTT settings interface. - /// - /// # Args - /// * `stack` - The network stack to use for communication. - /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. - /// * `prefix` - The MQTT device prefix to use for this device. - /// * `phy` - The PHY driver for querying the link state. - /// * `clock` - The clock to utilize for querying the current system time. - pub fn new( - stack: NetworkStack, - client_id: &str, - prefix: &str, - phy: EthernetPhy, - clock: CycleCounter, - ) -> Self { - let mqtt_client = - minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) - .unwrap(); - let config = - miniconf::MiniconfInterface::new(prefix, S::default()).unwrap(); - - let mut telemetry_topic: String = String::new(); - write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap(); - - Self { - mqtt: RefCell::new(mqtt_client), - miniconf: RefCell::new(config), - clock, - phy, - telemetry_topic, - network_was_reset: false, - subscribed: false, - } - } - - /// Update the MQTT interface and service the network - /// - /// # Returns - /// An option containing an action that should be completed as a result of network servicing. - pub fn update(&mut self) -> Option { - let now = self.clock.current_ms(); - - // First, service the network stack to process and inbound and outbound traffic. - let sleep = match self.mqtt.borrow_mut().network_stack.poll(now) { - Ok(updated) => !updated, - Err(err) => { - log::info!("Network error: {:?}", err); - false - } - }; - - // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network - // stack. - if self.phy.poll_link() == false { - // Only reset the network stack once per link reconnection. This prevents us from - // sending an excessive number of DHCP requests. - if !self.network_was_reset { - self.network_was_reset = true; - self.mqtt.borrow_mut().network_stack.handle_link_reset(); - } - } else { - self.network_was_reset = false; - } - - // If we're no longer subscribed to the settings topic, but we are connected to the broker, - // resubscribe. - if !self.subscribed && self.mqtt.borrow_mut().is_connected().unwrap() { - self.mqtt - .borrow_mut() - .subscribe( - self.miniconf.borrow_mut().get_listening_topic(), - &[], - ) - .unwrap(); - self.subscribed = true; - } - - let mut update = false; - - // Handle any MQTT traffic. - match self.mqtt.borrow_mut().poll( - |client, topic, message, properties| { - if let Some(response) = self.miniconf.borrow_mut().process( - topic, - miniconf::Message::from(message, properties), - ) { - let mut response_properties: Vec< - minimq::Property, - consts::U1, - > = Vec::new(); - if let Some(data) = response.correlation_data { - response_properties - .push(minimq::Property::CorrelationData(data)) - .unwrap(); - } - - // Make a best-effort attempt to send the response. - client - .publish( - response.topic, - &response.data.into_bytes(), - minimq::QoS::AtMostOnce, - &response_properties, - ) - .ok(); - update = true; - } - }, - ) { - // If settings updated, - Ok(_) => { - if update { - Some(Action::UpdateSettings) - } else if sleep { - Some(Action::Sleep) - } else { - None - } - } - Err(minimq::Error::Disconnected) => { - self.subscribed = false; - None - } - Err(minimq::Error::Network( - smoltcp_nal::NetworkError::NoIpAddress, - )) => None, - - Err(error) => { - log::info!("Unexpected error: {:?}", error); - None - } - } - } - - pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) { - let telemetry = - miniconf::serde_json_core::to_string::(telemetry) - .unwrap(); - self.mqtt - .borrow_mut() - .publish( - &self.telemetry_topic, - telemetry.as_bytes(), - minimq::QoS::AtMostOnce, - &[], - ) - .ok(); - } - - pub fn settings(&self) -> S { - self.miniconf.borrow().settings.clone() - } -} - /// Get the MQTT prefix of a device. /// /// # Args diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs new file mode 100644 index 0000000..bcc6ae7 --- /dev/null +++ b/src/net/mqtt_interface.rs @@ -0,0 +1,212 @@ +use crate::hardware::{ + design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, +}; + +use core::{cell::RefCell, fmt::Write}; + +use heapless::{consts, String}; +use serde::Serialize; + +use super::{Action, RouteResult, SettingsResponse}; + +/// MQTT settings interface. +pub struct MqttInterface +where + S: miniconf::Miniconf + Default + Clone, +{ + telemetry_topic: String, + default_response_topic: String, + mqtt: RefCell>, + settings: RefCell, + clock: CycleCounter, + phy: EthernetPhy, + network_was_reset: bool, + subscribed: bool, + id: String, +} + +impl MqttInterface +where + S: miniconf::Miniconf + Default + Clone, +{ + /// Construct a new MQTT settings interface. + /// + /// # Args + /// * `stack` - The network stack to use for communication. + /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. + /// * `prefix` - The MQTT device prefix to use for this device. + /// * `phy` - The PHY driver for querying the link state. + /// * `clock` - The clock to utilize for querying the current system time. + pub fn new( + stack: NetworkStack, + client_id: &str, + prefix: &str, + phy: EthernetPhy, + clock: CycleCounter, + ) -> Self { + let mqtt_client = + minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) + .unwrap(); + + let mut telemetry_topic: String = String::new(); + write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap(); + + let mut response_topic: String = String::new(); + write!(&mut response_topic, "{}/log", prefix).unwrap(); + + Self { + mqtt: RefCell::new(mqtt_client), + settings: RefCell::new(S::default()), + id: String::from(prefix), + clock, + phy, + telemetry_topic, + default_response_topic: response_topic, + network_was_reset: false, + subscribed: false, + } + } + + /// Update the MQTT interface and service the network + /// + /// # Returns + /// An option containing an action that should be completed as a result of network servicing. + pub fn update(&mut self) -> Option { + // First, service the network stack to process any inbound and outbound traffic. + let sleep = match self + .mqtt + .borrow_mut() + .network_stack + .poll(self.clock.current_ms()) + { + Ok(updated) => !updated, + Err(err) => { + log::info!("Network error: {:?}", err); + false + } + }; + + // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network + // stack. + if self.phy.poll_link() == false { + // Only reset the network stack once per link reconnection. This prevents us from + // sending an excessive number of DHCP requests. + if !self.network_was_reset { + self.network_was_reset = true; + self.mqtt.borrow_mut().network_stack.handle_link_reset(); + } + } else { + self.network_was_reset = false; + } + + // If we're no longer subscribed to the settings topic, but we are connected to the broker, + // resubscribe. + if !self.subscribed && self.mqtt.borrow_mut().is_connected().unwrap() { + let mut settings_topic: String = String::new(); + write!(&mut settings_topic, "{}/settings/#", self.id.as_str()) + .unwrap(); + + self.mqtt + .borrow_mut() + .subscribe(&settings_topic, &[]) + .unwrap(); + self.subscribed = true; + } + + // Handle any MQTT traffic. + let mut update = false; + match self.mqtt.borrow_mut().poll( + |client, topic, message, properties| { + let (response, settings_update) = + self.route_message(topic, message, properties); + client + .publish( + response.response_topic, + &response.message, + minimq::QoS::AtMostOnce, + &response.properties, + ) + .ok(); + update = settings_update; + }, + ) { + // If settings updated, + Ok(_) => { + if update { + Some(Action::UpdateSettings) + } else if sleep { + Some(Action::Sleep) + } else { + None + } + } + Err(minimq::Error::Disconnected) => { + self.subscribed = false; + None + } + Err(minimq::Error::Network( + smoltcp_nal::NetworkError::NoIpAddress, + )) => None, + + Err(error) => { + log::info!("Unexpected error: {:?}", error); + None + } + } + } + + fn route_message<'a, 'me: 'a>( + &'me self, + topic: &str, + message: &[u8], + properties: &[minimq::Property<'a>], + ) -> (RouteResult<'a>, bool) { + let mut response = + RouteResult::new(properties, &self.default_response_topic); + let mut update = false; + + if let Some(path) = topic.strip_prefix(self.id.as_str()) { + let mut parts = path[1..].split('/'); + match parts.next() { + Some("settings") => { + let result = self + .settings + .borrow_mut() + .string_set(parts.peekable(), message); + update = result.is_ok(); + response.set_message(SettingsResponse::new(result, topic)); + } + Some(_) => response.set_message(SettingsResponse::custom( + "Unknown topic", + 255, + )), + _ => response + .set_message(SettingsResponse::custom("No topic", 254)), + } + } else { + response + .set_message(SettingsResponse::custom("Invalid prefix", 253)); + } + + (response, update) + } + + pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) { + let telemetry = + miniconf::serde_json_core::to_string::(telemetry) + .unwrap(); + self.mqtt + .borrow_mut() + .publish( + &self.telemetry_topic, + telemetry.as_bytes(), + minimq::QoS::AtMostOnce, + &[], + ) + .ok(); + } + + pub fn settings(&self) -> S { + self.settings.borrow().clone() + } +} diff --git a/src/net/router.rs b/src/net/router.rs new file mode 100644 index 0000000..c8b7776 --- /dev/null +++ b/src/net/router.rs @@ -0,0 +1,91 @@ +use heapless::{consts, String, Vec}; +use serde::Serialize; + +use core::fmt::Write; + +pub struct RouteResult<'a> { + pub response_topic: &'a str, + pub message: Vec, + pub properties: Vec, consts::U1>, +} + +#[derive(Serialize)] +pub struct SettingsResponse { + code: u8, + msg: String, +} + +impl<'a> RouteResult<'a> { + pub fn new<'b: 'a>( + properties: &[minimq::Property<'a>], + default_response: &'b str, + ) -> Self { + // Extract the MQTT response topic. + let response_topic = properties + .iter() + .find_map(|prop| { + if let minimq::Property::ResponseTopic(topic) = prop { + Some(topic) + } else { + None + } + }) + .unwrap_or(&default_response); + + // Associate any provided correlation data with the response. + let mut correlation_data: Vec, consts::U1> = + Vec::new(); + if let Some(data) = properties + .iter() + .find(|prop| matches!(prop, minimq::Property::CorrelationData(_))) + { + // Note(unwrap): Unwrap can not fail, as we only ever push one value. + correlation_data.push(*data).unwrap(); + } + + RouteResult { + response_topic, + message: Vec::new(), + properties: correlation_data, + } + } + + pub fn set_message(&mut self, response: impl Serialize) { + self.message = miniconf::serde_json_core::to_vec(&response).unwrap(); + } +} + +impl SettingsResponse { + pub fn new(result: Result<(), miniconf::Error>, path: &str) -> Self { + match result { + Ok(_) => { + let mut msg: String = String::new(); + if write!(&mut msg, "{} updated", path).is_err() { + msg = String::from("Latest update succeeded"); + } + + Self { msg, code: 0 } + } + Err(error) => { + let mut msg: String = String::new(); + if write!(&mut msg, "{} update failed: {:?}", path, error) + .is_err() + { + if write!(&mut msg, "Latest update failed: {:?}", error) + .is_err() + { + msg = String::from("Latest update failed"); + } + } + Self { msg, code: 5 } + } + } + } + + pub fn custom(msg: &str, code: u8) -> Self { + Self { + code, + msg: String::from(msg), + } + } +}