Moving miniconf client out of Stabilizer

This commit is contained in:
Ryan Summers 2021-08-06 13:57:53 +02:00
parent f7c77bd860
commit 3caa946b17
5 changed files with 17 additions and 279 deletions

12
Cargo.lock generated
View File

@ -200,7 +200,7 @@ dependencies = [
[[package]] [[package]]
name = "derive_miniconf" name = "derive_miniconf"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/quartiq/miniconf.git?rev=9c826f8#9c826f8de8d0dd1a59e1ce7bf124ac0311994b46" source = "git+https://github.com/quartiq/miniconf.git?branch=feature/client-integration#46aa4791e2874dbce6f75a0db15f636a4e7a134c"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -413,17 +413,21 @@ dependencies = [
[[package]] [[package]]
name = "miniconf" name = "miniconf"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/quartiq/miniconf.git?rev=9c826f8#9c826f8de8d0dd1a59e1ce7bf124ac0311994b46" source = "git+https://github.com/quartiq/miniconf.git?branch=feature/client-integration#46aa4791e2874dbce6f75a0db15f636a4e7a134c"
dependencies = [ dependencies = [
"derive_miniconf", "derive_miniconf",
"heapless 0.7.3",
"log",
"minimq",
"serde", "serde",
"serde-json-core", "serde-json-core",
] ]
[[package]] [[package]]
name = "minimq" name = "minimq"
version = "0.2.0" version = "0.3.0"
source = "git+https://github.com/quartiq/minimq.git?rev=93813e3#93813e37e013b04ffb5e02aca7eca1859b1b2443" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff318a5cbde4315f61fb86145ed2c00a49dc613a71aff920130b47d0a12490f"
dependencies = [ dependencies = [
"bit_field", "bit_field",
"embedded-nal", "embedded-nal",

View File

@ -48,6 +48,7 @@ shared-bus = {version = "0.2.2", features = ["cortex-m"] }
serde-json-core = "0.4" serde-json-core = "0.4"
mcp23017 = "1.0" mcp23017 = "1.0"
mutex-trait = "0.2" mutex-trait = "0.2"
minimq = "0.3"
# rtt-target bump # rtt-target bump
[dependencies.rtt-logger] [dependencies.rtt-logger]
@ -75,16 +76,12 @@ branch = "feature/assume-init"
[patch.crates-io.miniconf] [patch.crates-io.miniconf]
git = "https://github.com/quartiq/miniconf.git" git = "https://github.com/quartiq/miniconf.git"
rev = "9c826f8" branch = "feature/client-integration"
[dependencies.smoltcp-nal] [dependencies.smoltcp-nal]
git = "https://github.com/quartiq/smoltcp-nal.git" git = "https://github.com/quartiq/smoltcp-nal.git"
rev = "0634188" rev = "0634188"
[dependencies.minimq]
git = "https://github.com/quartiq/minimq.git"
rev = "93813e3"
[features] [features]
nightly = ["cortex-m/inline-asm", "dsp/nightly"] nightly = ["cortex-m/inline-asm", "dsp/nightly"]
pounder_v1_1 = [ ] pounder_v1_1 = [ ]

View File

@ -1,90 +0,0 @@
use heapless::{String, Vec};
use serde::Serialize;
use core::fmt::Write;
#[derive(Debug, Copy, Clone)]
pub enum SettingsResponseCode {
NoError = 0,
MiniconfError = 1,
}
/// Represents a generic MQTT message.
pub struct MqttMessage<'a> {
pub topic: &'a str,
pub message: Vec<u8, 128>,
pub properties: Vec<minimq::Property<'a>, 1>,
}
/// The payload of the MQTT response message to a settings update request.
#[derive(Serialize)]
pub struct SettingsResponse {
code: u8,
msg: String<64>,
}
impl<'a> MqttMessage<'a> {
/// Construct a new MQTT message from an incoming message.
///
/// # Args
/// * `properties` - A list of properties associated with the inbound message.
/// * `default_response` - The default response topic for the message
/// * `msg` - The response associated with the message. Must fit within 128 bytes.
pub fn new<'b: 'a>(
properties: &[minimq::Property<'a>],
default_response: &'b str,
msg: &impl Serialize,
) -> Self {
// Extract the MQTT response topic.
let topic = properties
.iter()
.find_map(|prop| {
if let minimq::Property::ResponseTopic(topic) = prop {
Some(topic)
} else {
None
}
})
.unwrap_or(&default_response);
// Associate any provided correlation data with the response.
let mut correlation_data: Vec<minimq::Property<'a>, 1> = Vec::new();
if let Some(data) = properties
.iter()
.find(|prop| matches!(prop, minimq::Property::CorrelationData(_)))
{
// Note(unwrap): Unwrap can not fail, as we only ever push one value.
correlation_data.push(*data).unwrap();
}
Self {
topic,
// Note(unwrap): All SettingsResponse objects are guaranteed to fit in the vector.
message: miniconf::serde_json_core::to_vec(msg).unwrap(),
properties: correlation_data,
}
}
}
impl From<Result<(), miniconf::Error>> for SettingsResponse {
fn from(result: Result<(), miniconf::Error>) -> Self {
match result {
Ok(_) => Self {
msg: String::from("OK"),
code: SettingsResponseCode::NoError as u8,
},
Err(error) => {
let mut msg = String::new();
if write!(&mut msg, "{:?}", error).is_err() {
msg = String::from("Miniconf Error");
}
Self {
code: SettingsResponseCode::MiniconfError as u8,
msg,
}
}
}
}
}

View File

@ -1,169 +0,0 @@
///! Stabilizer Run-time Settings Client
///!
///! # Design
///! Stabilizer allows for settings to be configured at run-time via MQTT using miniconf.
///! Settings are written in serialized JSON form to the settings path associated with the setting.
///!
///! # Limitations
///! The MQTT client logs failures to subscribe to the settings topic, but does not re-attempt to
///connect to it when errors occur.
///!
///! Respones to settings updates are sent without quality-of-service guarantees, so there's no
///! guarantee that the requestee will be informed that settings have been applied.
use heapless::String;
use log::info;
use super::{MqttMessage, NetworkReference, SettingsResponse, UpdateState};
use minimq::embedded_nal::IpAddr;
/// MQTT settings interface.
pub struct MiniconfClient<S>
where
S: miniconf::Miniconf + Default + Clone,
{
default_response_topic: String<128>,
mqtt: minimq::Minimq<NetworkReference, 256>,
settings: S,
subscribed: bool,
settings_prefix: String<64>,
}
impl<S> MiniconfClient<S>
where
S: miniconf::Miniconf + Default + Clone,
{
/// Construct a new MQTT settings interface.
///
/// # Args
/// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device.
/// * `broker` - The IP address of the MQTT broker to use.
pub fn new(
stack: NetworkReference,
client_id: &str,
prefix: &str,
broker: IpAddr,
) -> Self {
let mqtt = minimq::Minimq::new(broker, client_id, stack).unwrap();
let mut response_topic: String<128> = String::from(prefix);
response_topic.push_str("/log").unwrap();
let mut settings_prefix: String<64> = String::from(prefix);
settings_prefix.push_str("/settings").unwrap();
Self {
mqtt,
settings: S::default(),
settings_prefix,
default_response_topic: response_topic,
subscribed: false,
}
}
/// Update the MQTT interface and service the network
///
/// # Returns
/// An option containing an action that should be completed as a result of network servicing.
pub fn update(&mut self) -> UpdateState {
let mqtt_connected = match self.mqtt.client.is_connected() {
Ok(connected) => connected,
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => false,
Err(minimq::Error::Network(error)) => {
log::info!("Unexpected network error: {:?}", error);
false
}
Err(error) => {
log::warn!("Unexpected MQTT error: {:?}", error);
false
}
};
// If we're no longer subscribed to the settings topic, but we are connected to the broker,
// resubscribe.
if !self.subscribed && mqtt_connected {
log::info!("MQTT connected, subscribing to settings");
// Note(unwrap): We construct a string with two more characters than the prefix
// strucutre, so we are guaranteed to have space for storage.
let mut settings_topic: String<66> =
String::from(self.settings_prefix.as_str());
settings_topic.push_str("/#").unwrap();
// We do not currently handle or process potential subscription failures. Instead, this
// failure will be logged through the stabilizer logging interface.
self.mqtt.client.subscribe(&settings_topic, &[]).unwrap();
self.subscribed = true;
}
// Handle any MQTT traffic.
let settings = &mut self.settings;
let mqtt = &mut self.mqtt;
let prefix = self.settings_prefix.as_str();
let default_response_topic = self.default_response_topic.as_str();
let mut update = false;
match mqtt.poll(|client, topic, message, properties| {
let path = match topic.strip_prefix(prefix) {
// For paths, we do not want to include the leading slash.
Some(path) => {
if !path.is_empty() {
&path[1..]
} else {
path
}
}
None => {
info!("Unexpected MQTT topic: {}", topic);
return;
}
};
log::info!("Settings update: `{}`", path);
let message: SettingsResponse = settings
.string_set(path.split('/').peekable(), message)
.map(|_| {
update = true;
})
.into();
let response =
MqttMessage::new(properties, default_response_topic, &message);
client
.publish(
response.topic,
&response.message,
// TODO: When Minimq supports more QoS levels, this should be increased to
// ensure that the client has received it at least once.
minimq::QoS::AtMostOnce,
&response.properties,
)
.ok();
}) {
// If settings updated,
Ok(_) if update => UpdateState::Updated,
Ok(_) => UpdateState::NoChange,
Err(minimq::Error::SessionReset) => {
log::warn!("Settings MQTT session reset");
self.subscribed = false;
UpdateState::NoChange
}
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => UpdateState::NoChange,
Err(error) => {
log::info!("Unexpected error: {:?}", error);
UpdateState::NoChange
}
}
}
/// Get the current settings from miniconf.
pub fn settings(&self) -> &S {
&self.settings
}
}

View File

@ -10,16 +10,12 @@ pub use miniconf;
pub use serde; pub use serde;
pub mod data_stream; pub mod data_stream;
pub mod messages;
pub mod miniconf_client;
pub mod network_processor; pub mod network_processor;
pub mod shared; pub mod shared;
pub mod telemetry; pub mod telemetry;
use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack}; use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack};
use data_stream::{DataStream, FrameGenerator}; use data_stream::{DataStream, FrameGenerator};
use messages::{MqttMessage, SettingsResponse};
use miniconf_client::MiniconfClient;
use minimq::embedded_nal::IpAddr; use minimq::embedded_nal::IpAddr;
use network_processor::NetworkProcessor; use network_processor::NetworkProcessor;
use shared::NetworkManager; use shared::NetworkManager;
@ -27,7 +23,7 @@ use telemetry::TelemetryClient;
use core::fmt::Write; use core::fmt::Write;
use heapless::String; use heapless::String;
use miniconf::Miniconf; use miniconf::{Miniconf, MiniconfClient};
use serde::Serialize; use serde::Serialize;
use smoltcp_nal::embedded_nal::SocketAddr; use smoltcp_nal::embedded_nal::SocketAddr;
@ -49,8 +45,8 @@ pub enum NetworkState {
NoChange, NoChange,
} }
/// A structure of Stabilizer's default network users. /// A structure of Stabilizer's default network users.
pub struct NetworkUsers<S: Default + Clone + Miniconf, T: Serialize> { pub struct NetworkUsers<S: Default + Miniconf, T: Serialize> {
pub miniconf: MiniconfClient<S>, pub miniconf: MiniconfClient<S, NetworkReference>,
pub processor: NetworkProcessor, pub processor: NetworkProcessor,
stream: DataStream, stream: DataStream,
generator: Option<FrameGenerator>, generator: Option<FrameGenerator>,
@ -59,7 +55,7 @@ pub struct NetworkUsers<S: Default + Clone + Miniconf, T: Serialize> {
impl<S, T> NetworkUsers<S, T> impl<S, T> NetworkUsers<S, T>
where where
S: Default + Clone + Miniconf, S: Default + Miniconf,
T: Serialize, T: Serialize,
{ {
/// Construct Stabilizer's default network users. /// Construct Stabilizer's default network users.
@ -99,7 +95,7 @@ where
&get_client_id(app, "settings", mac), &get_client_id(app, "settings", mac),
&prefix, &prefix,
broker, broker,
); ).unwrap();
let telemetry = TelemetryClient::new( let telemetry = TelemetryClient::new(
stack_manager.acquire_stack(), stack_manager.acquire_stack(),
@ -164,8 +160,8 @@ where
}; };
match self.miniconf.update() { match self.miniconf.update() {
UpdateState::Updated => NetworkState::SettingsChanged, Ok(true) => NetworkState::SettingsChanged,
UpdateState::NoChange => poll_result, _ => poll_result,
} }
} }
} }