pounder_test/src/net/miniconf_client.rs

162 lines
5.7 KiB
Rust
Raw Normal View History

2021-05-06 18:33:07 +08:00
///! Stabilizer Run-time Settings Client
///!
///! # Design
///! Stabilizer allows for settings to be configured at run-time via MQTT using miniconf.
///! Settings are written in serialized JSON form to the settings path associated with the setting.
///!
///! # Limitations
///! The MQTT client logs failures to subscribe to the settings topic, but does not re-attempt to
///connect to it when errors occur.
///!
///! Respones to settings updates are sent without quality-of-service guarantees, so there's no
///! guarantee that the requestee will be informed that settings have been applied.
use heapless::String;
2021-06-01 19:11:16 +08:00
use log::info;
2021-04-29 03:03:38 +08:00
2021-05-05 21:39:33 +08:00
use super::{MqttMessage, NetworkReference, SettingsResponse, UpdateState};
2021-05-06 18:35:04 +08:00
use crate::hardware::design_parameters::MQTT_BROKER;
2021-04-29 03:03:38 +08:00
/// MQTT settings interface.
2021-05-05 01:52:41 +08:00
pub struct MiniconfClient<S>
2021-04-29 03:03:38 +08:00
where
S: miniconf::Miniconf + Default + Clone,
{
default_response_topic: String<128>,
2021-05-27 19:42:52 +08:00
mqtt: minimq::Minimq<NetworkReference, 256>,
2021-05-04 19:13:44 +08:00
settings: S,
2021-04-29 03:03:38 +08:00
subscribed: bool,
settings_prefix: String<64>,
2021-04-29 03:03:38 +08:00
}
2021-05-05 01:52:41 +08:00
impl<S> MiniconfClient<S>
2021-04-29 03:03:38 +08:00
where
S: miniconf::Miniconf + Default + Clone,
{
/// Construct a new MQTT settings interface.
///
/// # Args
/// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device.
2021-05-05 21:39:33 +08:00
pub fn new(stack: NetworkReference, client_id: &str, prefix: &str) -> Self {
2021-05-04 19:13:44 +08:00
let mqtt =
2021-05-27 19:42:52 +08:00
minimq::Minimq::new(MQTT_BROKER.into(), client_id, stack).unwrap();
2021-04-29 03:03:38 +08:00
let mut response_topic: String<128> = String::from(prefix);
2021-05-04 19:50:17 +08:00
response_topic.push_str("/log").unwrap();
2021-05-04 19:13:44 +08:00
let mut settings_prefix: String<64> = String::from(prefix);
2021-05-04 19:50:17 +08:00
settings_prefix.push_str("/settings").unwrap();
2021-05-04 19:13:44 +08:00
2021-04-29 03:03:38 +08:00
Self {
2021-05-04 19:13:44 +08:00
mqtt,
settings: S::default(),
settings_prefix,
2021-04-29 03:03:38 +08:00
default_response_topic: response_topic,
subscribed: false,
}
}
/// Update the MQTT interface and service the network
///
/// # Returns
/// An option containing an action that should be completed as a result of network servicing.
2021-05-05 01:52:41 +08:00
pub fn update(&mut self) -> UpdateState {
2021-05-27 19:42:52 +08:00
let mqtt_connected = match self.mqtt.client.is_connected() {
2021-04-29 17:28:35 +08:00
Ok(connected) => connected,
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => false,
Err(minimq::Error::Network(error)) => {
log::info!("Unexpected network error: {:?}", error);
false
}
Err(error) => {
log::warn!("Unexpected MQTT error: {:?}", error);
false
}
};
2021-04-29 03:03:38 +08:00
// If we're no longer subscribed to the settings topic, but we are connected to the broker,
// resubscribe.
2021-04-29 17:28:35 +08:00
if !self.subscribed && mqtt_connected {
2021-05-04 19:13:44 +08:00
// Note(unwrap): We construct a string with two more characters than the prefix
// strucutre, so we are guaranteed to have space for storage.
let mut settings_topic: String<66> =
2021-05-04 19:13:44 +08:00
String::from(self.settings_prefix.as_str());
settings_topic.push_str("/#").unwrap();
2021-04-29 03:03:38 +08:00
// We do not currently handle or process potential subscription failures. Instead, this
// failure will be logged through the stabilizer logging interface.
2021-05-27 19:42:52 +08:00
self.mqtt.client.subscribe(&settings_topic, &[]).unwrap();
2021-04-29 03:03:38 +08:00
self.subscribed = true;
}
// Handle any MQTT traffic.
2021-05-04 19:13:44 +08:00
let settings = &mut self.settings;
let mqtt = &mut self.mqtt;
let prefix = self.settings_prefix.as_str();
let default_response_topic = self.default_response_topic.as_str();
2021-04-29 03:03:38 +08:00
let mut update = false;
2021-05-04 19:13:44 +08:00
match mqtt.poll(|client, topic, message, properties| {
let path = match topic.strip_prefix(prefix) {
// For paths, we do not want to include the leading slash.
Some(path) => {
if !path.is_empty() {
2021-05-04 19:13:44 +08:00
&path[1..]
} else {
path
}
}
None => {
info!("Unexpected MQTT topic: {}", topic);
return;
}
};
let message: SettingsResponse = settings
.string_set(path.split('/').peekable(), message)
.map(|_| {
2021-05-04 19:13:44 +08:00
update = true;
})
.into();
let response =
MqttMessage::new(properties, default_response_topic, &message);
client
.publish(
response.topic,
&response.message,
2021-05-04 19:50:17 +08:00
// TODO: When Minimq supports more QoS levels, this should be increased to
// ensure that the client has received it at least once.
2021-05-04 19:13:44 +08:00
minimq::QoS::AtMostOnce,
&response.properties,
)
.ok();
}) {
2021-04-29 03:03:38 +08:00
// If settings updated,
2021-05-05 01:52:41 +08:00
Ok(_) if update => UpdateState::Updated,
Ok(_) => UpdateState::NoChange,
2021-05-06 22:32:57 +08:00
Err(minimq::Error::SessionReset) => {
2021-04-29 03:03:38 +08:00
self.subscribed = false;
2021-05-05 01:52:41 +08:00
UpdateState::NoChange
2021-04-29 03:03:38 +08:00
}
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
2021-05-05 01:52:41 +08:00
)) => UpdateState::NoChange,
2021-04-29 03:03:38 +08:00
Err(error) => {
log::info!("Unexpected error: {:?}", error);
2021-05-05 01:52:41 +08:00
UpdateState::NoChange
2021-04-29 03:03:38 +08:00
}
}
}
2021-05-06 18:33:07 +08:00
/// Get the current settings from miniconf.
2021-05-04 19:13:44 +08:00
pub fn settings(&self) -> &S {
&self.settings
2021-04-29 03:03:38 +08:00
}
}