347: MQTT/Miniconf refactor r=ryan-summers a=ryan-summers

This PR refactors the MQTT/Miniconf API to make use of the refactored miniconf API. Routing has been moved to the `net` module.

**TODO**:
- [x] Test on hardware
- [x] Document modules
- [x] Merge dependency changes, update dependencies to use `rev`
- [x] Update `miniconf.py` to be `stabilizer.py`, update API

Co-authored-by: Ryan Summers <ryan.summers@vertigo-designs.com>
This commit is contained in:
bors[bot] 2021-05-05 15:39:26 +00:00 committed by GitHub
commit ed34b69823
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 381 additions and 168 deletions

23
Cargo.lock generated
View File

@ -203,7 +203,7 @@ dependencies = [
[[package]] [[package]]
name = "derive_miniconf" name = "derive_miniconf"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" source = "git+https://github.com/quartiq/miniconf.git?rev=c6f2b28#c6f2b28f735e27b337eaa986846536e904c6f2bd"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -416,11 +416,9 @@ dependencies = [
[[package]] [[package]]
name = "miniconf" name = "miniconf"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" source = "git+https://github.com/quartiq/miniconf.git?rev=c6f2b28#c6f2b28f735e27b337eaa986846536e904c6f2bd"
dependencies = [ dependencies = [
"derive_miniconf", "derive_miniconf",
"heapless 0.6.1",
"minimq",
"serde", "serde",
"serde-json-core", "serde-json-core",
] ]
@ -428,7 +426,7 @@ dependencies = [
[[package]] [[package]]
name = "minimq" name = "minimq"
version = "0.2.0" version = "0.2.0"
source = "git+https://github.com/quartiq/minimq.git?rev=933687c2e4b#933687c2e4bc8a4d972de9a4d1508b0b554a8b38" source = "git+https://github.com/quartiq/minimq.git?rev=b3f364d#b3f364d55dea35da6572f78ddb91c87bfbb453bf"
dependencies = [ dependencies = [
"bit_field", "bit_field",
"embedded-nal", "embedded-nal",
@ -658,6 +656,12 @@ dependencies = [
"semver", "semver",
] ]
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]] [[package]]
name = "semver" name = "semver"
version = "0.9.0" version = "0.9.0"
@ -684,10 +688,12 @@ dependencies = [
[[package]] [[package]]
name = "serde-json-core" name = "serde-json-core"
version = "0.2.0" version = "0.3.0"
source = "git+https://github.com/rust-embedded-community/serde-json-core.git?rev=ee06ac91bc#ee06ac91bc43b72450a92198a00d9e5c5b9946d2" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39af17f40c2a28d2c9a7918663ddc8a10f54cc6f109ead5c3f010869761df186"
dependencies = [ dependencies = [
"heapless 0.5.6", "heapless 0.6.1",
"ryu",
"serde", "serde",
] ]
@ -742,6 +748,7 @@ dependencies = [
"log", "log",
"mcp23017", "mcp23017",
"miniconf", "miniconf",
"minimq",
"nb 1.0.0", "nb 1.0.0",
"panic-semihosting", "panic-semihosting",
"paste", "paste",

View File

@ -56,19 +56,15 @@ version = "0.9.0"
[patch.crates-io.miniconf] [patch.crates-io.miniconf]
git = "https://github.com/quartiq/miniconf.git" git = "https://github.com/quartiq/miniconf.git"
rev = "314fa5587d" rev = "c6f2b28"
[dependencies.smoltcp-nal] [dependencies.smoltcp-nal]
git = "https://github.com/quartiq/smoltcp-nal.git" git = "https://github.com/quartiq/smoltcp-nal.git"
rev = "8468f11" rev = "8468f11"
[patch.crates-io.minimq] [dependencies.minimq]
git = "https://github.com/quartiq/minimq.git" git = "https://github.com/quartiq/minimq.git"
rev = "933687c2e4b" rev = "b3f364d"
[patch.crates-io.serde-json-core]
git = "https://github.com/rust-embedded-community/serde-json-core.git"
rev = "ee06ac91bc"
[features] [features]
semihosting = ["panic-semihosting", "cortex-m-log/semihosting"] semihosting = ["panic-semihosting", "cortex-m-log/semihosting"]

View File

@ -9,11 +9,12 @@ import argparse
import asyncio import asyncio
import json import json
import logging import logging
import sys
import uuid
from gmqtt import Client as MqttClient from gmqtt import Client as MqttClient
logger = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class Miniconf: class Miniconf:
"""An asynchronous API for controlling Miniconf devices using MQTT.""" """An asynchronous API for controlling Miniconf devices using MQTT."""
@ -32,27 +33,33 @@ class Miniconf:
client: A connected MQTT5 client. client: A connected MQTT5 client.
prefix: The MQTT toptic prefix of the device to control. prefix: The MQTT toptic prefix of the device to control.
""" """
self.uuid = uuid.uuid1()
self.request_id = 0
self.client = client self.client = client
self.prefix = prefix self.prefix = prefix
self.inflight = {} self.inflight = {}
self.client.on_message = self._handle_response self.client.on_message = self._handle_response
self.client.subscribe(f'{prefix}/response/#') self.client.subscribe(f'{prefix}/response/{self.uuid.hex}')
def _handle_response(self, _client, topic, payload, *_args, **_kwargs): def _handle_response(self, _client, _topic, payload, _qos, properties):
"""Callback function for when messages are received over MQTT. """Callback function for when messages are received over MQTT.
Args: Args:
_client: The MQTT client. _client: The MQTT client.
topic: The topic that the message was received on. _topic: The topic that the message was received on.
payload: The payload of the message. payload: The payload of the message.
_qos: The quality-of-service level of the received packet
properties: A dictionary of properties associated with the message.
""" """
if topic not in self.inflight: # Extract corrleation data from the properties
# TODO use correlation_data to distinguish clients and requests correlation_data = json.loads(properties['correlation_data'][0].decode('ascii'))
logger.warning('Unexpected response on topic: %s', topic)
return # Get the request ID from the correlation data
request_id = correlation_data['request_id']
self.inflight[request_id].set_result(json.loads(payload))
del self.inflight[request_id]
self.inflight[topic].set_result(payload.decode('ascii'))
del self.inflight[topic]
async def command(self, path, value): async def command(self, path, value):
"""Write the provided data to the specified path. """Write the provided data to the specified path.
@ -62,25 +69,33 @@ class Miniconf:
value: The value to write to the path. value: The value to write to the path.
Returns: Returns:
The received response to the command. The response to the command as a dictionary.
""" """
setting_topic = f'{self.prefix}/settings/{path}' setting_topic = f'{self.prefix}/settings/{path}'
response_topic = f'{self.prefix}/response/{path}' response_topic = f'{self.prefix}/response/{self.uuid.hex}'
if response_topic in self.inflight:
# TODO use correlation_data to distinguish clients and requests # Assign a unique identifier to this update request.
raise NotImplementedError( request_id = self.request_id
'Only one in-flight message per topic is supported') self.request_id += 1
assert request_id not in self.inflight, 'Invalid ID encountered'
correlation_data = json.dumps({
'request_id': request_id,
}).encode('ascii')
value = json.dumps(value) value = json.dumps(value)
logger.info('Sending %s to "%s"', value, setting_topic) LOGGER.info('Sending %s to "%s"', value, setting_topic)
fut = asyncio.get_running_loop().create_future() fut = asyncio.get_running_loop().create_future()
self.inflight[response_topic] = fut
self.inflight[request_id] = fut
self.client.publish(setting_topic, payload=value, qos=0, retain=True, self.client.publish(setting_topic, payload=value, qos=0, retain=True,
response_topic=response_topic) response_topic=response_topic,
correlation_data=correlation_data)
return await fut return await fut
def main(): def main():
""" Main program entry point. """
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Miniconf command line interface.', description='Miniconf command line interface.',
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
@ -107,12 +122,15 @@ def main():
async def configure_settings(): async def configure_settings():
interface = await Miniconf.create(args.prefix, args.broker) interface = await Miniconf.create(args.prefix, args.broker)
for kv in args.settings: for key_value in args.settings:
path, value = kv.split("=", 1) path, value = key_value.split("=", 1)
response = await interface.command(path, json.loads(value)) response = await interface.command(path, json.loads(value))
print(response) print(f'{path}: {response}')
if response['code'] != 0:
return response['code']
return 0
loop.run_until_complete(configure_settings()) sys.exit(loop.run_until_complete(configure_settings()))
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -13,7 +13,7 @@ use hardware::{
InputPin, AFE0, AFE1, InputPin, AFE0, AFE1,
}; };
use net::{Action, MiniconfInterface}; use net::{Action, MqttInterface};
const SCALE: f32 = i16::MAX as _; const SCALE: f32 = i16::MAX as _;
@ -54,7 +54,7 @@ const APP: () = {
digital_input1: DigitalInput1, digital_input1: DigitalInput1,
adcs: (Adc0Input, Adc1Input), adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output), dacs: (Dac0Output, Dac1Output),
mqtt_config: MiniconfInterface<Settings>, mqtt: MqttInterface<Settings>,
#[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])] #[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])]
iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2], iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2],
@ -66,7 +66,7 @@ const APP: () = {
// Configure the microcontroller // Configure the microcontroller
let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device);
let mqtt_config = MiniconfInterface::new( let mqtt = MqttInterface::new(
stabilizer.net.stack, stabilizer.net.stack,
"", "",
&net::get_device_prefix( &net::get_device_prefix(
@ -93,7 +93,7 @@ const APP: () = {
afes: stabilizer.afes, afes: stabilizer.afes,
adcs: stabilizer.adcs, adcs: stabilizer.adcs,
dacs: stabilizer.dacs, dacs: stabilizer.dacs,
mqtt_config, mqtt,
digital_input1: stabilizer.digital_inputs.1, digital_input1: stabilizer.digital_inputs.1,
settings: Settings::default(), settings: Settings::default(),
} }
@ -150,14 +150,10 @@ const APP: () = {
} }
} }
#[idle(resources=[mqtt_config], spawn=[settings_update])] #[idle(resources=[mqtt], spawn=[settings_update])]
fn idle(mut c: idle::Context) -> ! { fn idle(mut c: idle::Context) -> ! {
loop { loop {
match c match c.resources.mqtt.lock(|mqtt| mqtt.update()) {
.resources
.mqtt_config
.lock(|config_interface| config_interface.update())
{
Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::Sleep) => cortex_m::asm::wfi(),
Some(Action::UpdateSettings) => { Some(Action::UpdateSettings) => {
c.spawn.settings_update().unwrap() c.spawn.settings_update().unwrap()
@ -167,9 +163,9 @@ const APP: () = {
} }
} }
#[task(priority = 1, resources=[mqtt_config, afes, settings])] #[task(priority = 1, resources=[mqtt, afes, settings])]
fn settings_update(mut c: settings_update::Context) { fn settings_update(mut c: settings_update::Context) {
let settings = &c.resources.mqtt_config.mqtt.settings; let settings = c.resources.mqtt.settings();
// Update the IIR channels. // Update the IIR channels.
c.resources.settings.lock(|current| *current = *settings); c.resources.settings.lock(|current| *current = *settings);

View File

@ -16,7 +16,7 @@ use stabilizer::hardware::{
}; };
use miniconf::Miniconf; use miniconf::Miniconf;
use stabilizer::net::{Action, MiniconfInterface}; use stabilizer::net::{Action, MqttInterface};
#[derive(Copy, Clone, Debug, Deserialize, Miniconf)] #[derive(Copy, Clone, Debug, Deserialize, Miniconf)]
enum Conf { enum Conf {
@ -60,7 +60,7 @@ const APP: () = {
afes: (AFE0, AFE1), afes: (AFE0, AFE1),
adcs: (Adc0Input, Adc1Input), adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output), dacs: (Dac0Output, Dac1Output),
mqtt_config: MiniconfInterface<Settings>, mqtt: MqttInterface<Settings>,
settings: Settings, settings: Settings,
timestamper: InputStamper, timestamper: InputStamper,
@ -73,7 +73,7 @@ const APP: () = {
// Configure the microcontroller // Configure the microcontroller
let (mut stabilizer, _pounder) = setup(c.core, c.device); let (mut stabilizer, _pounder) = setup(c.core, c.device);
let mqtt_config = MiniconfInterface::new( let mqtt = MqttInterface::new(
stabilizer.net.stack, stabilizer.net.stack,
"", "",
&net::get_device_prefix( &net::get_device_prefix(
@ -113,7 +113,7 @@ const APP: () = {
afes: stabilizer.afes, afes: stabilizer.afes,
adcs: stabilizer.adcs, adcs: stabilizer.adcs,
dacs: stabilizer.dacs, dacs: stabilizer.dacs,
mqtt_config, mqtt,
timestamper: stabilizer.timestamper, timestamper: stabilizer.timestamper,
settings, settings,
@ -195,14 +195,10 @@ const APP: () = {
} }
} }
#[idle(resources=[mqtt_config], spawn=[settings_update])] #[idle(resources=[mqtt], spawn=[settings_update])]
fn idle(mut c: idle::Context) -> ! { fn idle(mut c: idle::Context) -> ! {
loop { loop {
match c match c.resources.mqtt.lock(|mqtt| mqtt.update()) {
.resources
.mqtt_config
.lock(|config_interface| config_interface.update())
{
Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::Sleep) => cortex_m::asm::wfi(),
Some(Action::UpdateSettings) => { Some(Action::UpdateSettings) => {
c.spawn.settings_update().unwrap() c.spawn.settings_update().unwrap()
@ -212,9 +208,9 @@ const APP: () = {
} }
} }
#[task(priority = 1, resources=[mqtt_config, settings, afes])] #[task(priority = 1, resources=[mqtt, settings, afes])]
fn settings_update(mut c: settings_update::Context) { fn settings_update(mut c: settings_update::Context) {
let settings = &c.resources.mqtt_config.mqtt.settings; let settings = c.resources.mqtt.settings();
c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.0.set_gain(settings.afe[0]);
c.resources.afes.1.set_gain(settings.afe[1]); c.resources.afes.1.set_gain(settings.afe[1]);

91
src/net/messages.rs Normal file
View File

@ -0,0 +1,91 @@
use heapless::{consts, String, Vec};
use serde::Serialize;
use core::fmt::Write;
#[derive(Debug, Copy, Clone)]
pub enum SettingsResponseCode {
NoError = 0,
MiniconfError = 1,
}
/// Represents a generic MQTT message.
pub struct MqttMessage<'a> {
pub topic: &'a str,
pub message: Vec<u8, consts::U128>,
pub properties: Vec<minimq::Property<'a>, consts::U1>,
}
/// The payload of the MQTT response message to a settings update request.
#[derive(Serialize)]
pub struct SettingsResponse {
code: u8,
msg: String<heapless::consts::U64>,
}
impl<'a> MqttMessage<'a> {
/// Construct a new MQTT message from an incoming message.
///
/// # Args
/// * `properties` - A list of properties associated with the inbound message.
/// * `default_response` - The default response topic for the message
/// * `msg` - The response associated with the message. Must fit within 128 bytes.
pub fn new<'b: 'a>(
properties: &[minimq::Property<'a>],
default_response: &'b str,
msg: &impl Serialize,
) -> Self {
// Extract the MQTT response topic.
let topic = properties
.iter()
.find_map(|prop| {
if let minimq::Property::ResponseTopic(topic) = prop {
Some(topic)
} else {
None
}
})
.unwrap_or(&default_response);
// Associate any provided correlation data with the response.
let mut correlation_data: Vec<minimq::Property<'a>, consts::U1> =
Vec::new();
if let Some(data) = properties
.iter()
.find(|prop| matches!(prop, minimq::Property::CorrelationData(_)))
{
// Note(unwrap): Unwrap can not fail, as we only ever push one value.
correlation_data.push(*data).unwrap();
}
Self {
topic,
// Note(unwrap): All SettingsResponse objects are guaranteed to fit in the vector.
message: miniconf::serde_json_core::to_vec(msg).unwrap(),
properties: correlation_data,
}
}
}
impl From<Result<(), miniconf::Error>> for SettingsResponse {
fn from(result: Result<(), miniconf::Error>) -> Self {
match result {
Ok(_) => Self {
msg: String::from("OK"),
code: SettingsResponseCode::NoError as u8,
},
Err(error) => {
let mut msg = String::new();
if write!(&mut msg, "{:?}", error).is_err() {
msg = String::from("Miniconf Error");
}
Self {
code: SettingsResponseCode::MiniconfError as u8,
msg,
}
}
}
}
}

View File

@ -1,11 +1,18 @@
use crate::hardware::{ ///! Stabilizer network management module
design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, ///!
}; ///! # Design
///! The stabilizer network architecture supports numerous layers to permit transmission of
///! telemetry (via MQTT), configuration of run-time settings (via MQTT + Miniconf), and live data
///! streaming over raw UDP/TCP sockets. This module encompasses the main processing routines
///! related to Stabilizer networking operations.
use heapless::{consts, String};
use core::fmt::Write; use core::fmt::Write;
use heapless::{consts, String}; mod messages;
use miniconf::minimq; mod mqtt_interface;
use messages::{MqttMessage, SettingsResponse};
pub use mqtt_interface::MqttInterface;
/// Potential actions for firmware to take. /// Potential actions for firmware to take.
pub enum Action { pub enum Action {
@ -16,101 +23,6 @@ pub enum Action {
UpdateSettings, UpdateSettings,
} }
/// MQTT settings interface.
pub struct MiniconfInterface<S>
where
S: miniconf::Miniconf + Default,
{
pub mqtt: miniconf::MqttInterface<S, NetworkStack, minimq::consts::U256>,
clock: CycleCounter,
phy: EthernetPhy,
network_was_reset: bool,
}
impl<S> MiniconfInterface<S>
where
S: miniconf::Miniconf + Default,
{
/// Construct a new MQTT settings interface.
///
/// # Args
/// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device.
/// * `phy` - The PHY driver for querying the link state.
/// * `clock` - The clock to utilize for querying the current system time.
pub fn new(
stack: NetworkStack,
client_id: &str,
prefix: &str,
phy: EthernetPhy,
clock: CycleCounter,
) -> Self {
let mqtt = {
let mqtt_client = {
minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack)
.unwrap()
};
miniconf::MqttInterface::new(mqtt_client, prefix, S::default())
.unwrap()
};
Self {
mqtt,
clock,
phy,
network_was_reset: false,
}
}
/// Update the MQTT interface and service the network
///
/// # Returns
/// An option containing an action that should be completed as a result of network servicing.
pub fn update(&mut self) -> Option<Action> {
let now = self.clock.current_ms();
// First, service the network stack to process and inbound and outbound traffic.
let sleep = match self.mqtt.network_stack().poll(now) {
Ok(updated) => !updated,
Err(err) => {
log::info!("Network error: {:?}", err);
false
}
};
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
// stack.
if self.phy.poll_link() == false {
// Only reset the network stack once per link reconnection. This prevents us from
// sending an excessive number of DHCP requests.
if !self.network_was_reset {
self.network_was_reset = true;
self.mqtt.network_stack().handle_link_reset();
}
} else {
self.network_was_reset = false;
}
// Finally, service the MQTT interface and handle any necessary messages.
match self.mqtt.update() {
Ok(true) => Some(Action::UpdateSettings),
Ok(false) if sleep => Some(Action::Sleep),
Ok(_) => None,
Err(miniconf::MqttError::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => None,
Err(error) => {
log::info!("Unexpected error: {:?}", error);
None
}
}
}
}
/// Get the MQTT prefix of a device. /// Get the MQTT prefix of a device.
/// ///
/// # Args /// # Args

197
src/net/mqtt_interface.rs Normal file
View File

@ -0,0 +1,197 @@
use crate::hardware::{
design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack,
};
use heapless::{consts, String};
use super::{Action, MqttMessage, SettingsResponse};
/// MQTT settings interface.
pub struct MqttInterface<S>
where
S: miniconf::Miniconf + Default + Clone,
{
default_response_topic: String<consts::U128>,
mqtt: minimq::MqttClient<minimq::consts::U256, NetworkStack>,
settings: S,
clock: CycleCounter,
phy: EthernetPhy,
network_was_reset: bool,
subscribed: bool,
settings_prefix: String<consts::U64>,
}
impl<S> MqttInterface<S>
where
S: miniconf::Miniconf + Default + Clone,
{
/// Construct a new MQTT settings interface.
///
/// # Args
/// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device.
/// * `phy` - The PHY driver for querying the link state.
/// * `clock` - The clock to utilize for querying the current system time.
pub fn new(
stack: NetworkStack,
client_id: &str,
prefix: &str,
phy: EthernetPhy,
clock: CycleCounter,
) -> Self {
let mqtt =
minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack)
.unwrap();
let mut response_topic: String<consts::U128> = String::from(prefix);
response_topic.push_str("/log").unwrap();
let mut settings_prefix: String<consts::U64> = String::from(prefix);
settings_prefix.push_str("/settings").unwrap();
Self {
mqtt,
settings: S::default(),
settings_prefix,
clock,
phy,
default_response_topic: response_topic,
network_was_reset: false,
subscribed: false,
}
}
/// Update the MQTT interface and service the network
///
/// # Returns
/// An option containing an action that should be completed as a result of network servicing.
pub fn update(&mut self) -> Option<Action> {
// First, service the network stack to process any inbound and outbound traffic.
let sleep = match self.mqtt.network_stack.poll(self.clock.current_ms())
{
Ok(updated) => !updated,
Err(err) => {
log::info!("Network error: {:?}", err);
false
}
};
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
// stack.
match self.phy.poll_link() {
true => self.network_was_reset = false,
// Only reset the network stack once per link reconnection. This prevents us from
// sending an excessive number of DHCP requests.
false if !self.network_was_reset => {
self.network_was_reset = true;
self.mqtt.network_stack.handle_link_reset();
}
_ => {}
};
let mqtt_connected = match self.mqtt.is_connected() {
Ok(connected) => connected,
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => false,
Err(minimq::Error::Network(error)) => {
log::info!("Unexpected network error: {:?}", error);
false
}
Err(error) => {
log::warn!("Unexpected MQTT error: {:?}", error);
false
}
};
// If we're no longer subscribed to the settings topic, but we are connected to the broker,
// resubscribe.
if !self.subscribed && mqtt_connected {
// Note(unwrap): We construct a string with two more characters than the prefix
// strucutre, so we are guaranteed to have space for storage.
let mut settings_topic: String<consts::U66> =
String::from(self.settings_prefix.as_str());
settings_topic.push_str("/#").unwrap();
// We do not currently handle or process potential subscription failures. Instead, this
// failure will be logged through the stabilizer logging interface.
self.mqtt.subscribe(&settings_topic, &[]).unwrap();
self.subscribed = true;
}
// Handle any MQTT traffic.
let settings = &mut self.settings;
let mqtt = &mut self.mqtt;
let prefix = self.settings_prefix.as_str();
let default_response_topic = self.default_response_topic.as_str();
let mut update = false;
match mqtt.poll(|client, topic, message, properties| {
let path = match topic.strip_prefix(prefix) {
// For paths, we do not want to include the leading slash.
Some(path) => {
if path.len() > 0 {
&path[1..]
} else {
path
}
}
None => {
info!("Unexpected MQTT topic: {}", topic);
return;
}
};
let message: SettingsResponse = settings
.string_set(path.split('/').peekable(), message)
.and_then(|_| {
update = true;
Ok(())
})
.into();
let response =
MqttMessage::new(properties, default_response_topic, &message);
client
.publish(
response.topic,
&response.message,
// TODO: When Minimq supports more QoS levels, this should be increased to
// ensure that the client has received it at least once.
minimq::QoS::AtMostOnce,
&response.properties,
)
.ok();
}) {
// If settings updated,
Ok(_) => {
if update {
Some(Action::UpdateSettings)
} else if sleep {
Some(Action::Sleep)
} else {
None
}
}
Err(minimq::Error::Disconnected) => {
self.subscribed = false;
None
}
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => None,
Err(error) => {
log::info!("Unexpected error: {:?}", error);
None
}
}
}
pub fn settings(&self) -> &S {
&self.settings
}
}