Merge branch 'master' into feature/streaming-hitl

This commit is contained in:
Ryan Summers 2021-08-09 14:15:40 +02:00
commit bfbe807690
7 changed files with 33 additions and 279 deletions

12
Cargo.lock generated
View File

@ -200,7 +200,7 @@ dependencies = [
[[package]]
name = "derive_miniconf"
version = "0.1.0"
source = "git+https://github.com/quartiq/miniconf.git?rev=9c826f8#9c826f8de8d0dd1a59e1ce7bf124ac0311994b46"
source = "git+https://github.com/quartiq/miniconf.git?rev=6c19ba2#6c19ba208eb426377ff6e09416fcabdf4fd3021d"
dependencies = [
"proc-macro2",
"quote",
@ -413,17 +413,21 @@ dependencies = [
[[package]]
name = "miniconf"
version = "0.1.0"
source = "git+https://github.com/quartiq/miniconf.git?rev=9c826f8#9c826f8de8d0dd1a59e1ce7bf124ac0311994b46"
source = "git+https://github.com/quartiq/miniconf.git?rev=6c19ba2#6c19ba208eb426377ff6e09416fcabdf4fd3021d"
dependencies = [
"derive_miniconf",
"heapless 0.7.3",
"log",
"minimq",
"serde",
"serde-json-core",
]
[[package]]
name = "minimq"
version = "0.2.0"
source = "git+https://github.com/quartiq/minimq.git?rev=93813e3#93813e37e013b04ffb5e02aca7eca1859b1b2443"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ff318a5cbde4315f61fb86145ed2c00a49dc613a71aff920130b47d0a12490f"
dependencies = [
"bit_field",
"embedded-nal",

View File

@ -48,6 +48,7 @@ shared-bus = {version = "0.2.2", features = ["cortex-m"] }
serde-json-core = "0.4"
mcp23017 = "1.0"
mutex-trait = "0.2"
minimq = "0.3"
# rtt-target bump
[dependencies.rtt-logger]
@ -75,16 +76,12 @@ branch = "feature/assume-init"
[patch.crates-io.miniconf]
git = "https://github.com/quartiq/miniconf.git"
rev = "9c826f8"
rev = "6c19ba2"
[dependencies.smoltcp-nal]
git = "https://github.com/quartiq/smoltcp-nal.git"
rev = "0634188"
[dependencies.minimq]
git = "https://github.com/quartiq/minimq.git"
rev = "93813e3"
[features]
nightly = ["cortex-m/inline-asm", "dsp/nightly"]
pounder_v1_1 = [ ]

View File

@ -117,6 +117,19 @@ been used during development, but any MQTTv5 broker is supported.
Stabilizer utilizes a static IP address for broker configuration. Ensure the IP address was
[configured](#building-firmware) properly to point to your broker before continuing.
We recommend running Mosquitto through [Docker](https://docker.com) to easily run Mosquitto on
Windows, Linux, and OSX. After docker has been installed, run the following command from
the `stabilizer` repository:
```
docker run -p 1883:1883 --name mosquitto -v `pwd`/mosquitto.conf:/mosquitto/config/mosquitto.conf -v /mosquitto/data -v /mosquitto/log eclipse-mosquitto:2
```
> _Note_: The above command assumes a bash shell. If using powershell, replace `` `pwd` `` with
> `${pwd}`
This command will create a container named `mosquitto` that can be stopped and started easily via
docker.
## Test the Connection
Once your broker is running, test that Stabilizer is properly connected to it.

2
mosquitto.conf Normal file
View File

@ -0,0 +1,2 @@
allow_anonymous true
listener 1883

View File

@ -1,90 +0,0 @@
use heapless::{String, Vec};
use serde::Serialize;
use core::fmt::Write;
#[derive(Debug, Copy, Clone)]
pub enum SettingsResponseCode {
NoError = 0,
MiniconfError = 1,
}
/// Represents a generic MQTT message.
pub struct MqttMessage<'a> {
pub topic: &'a str,
pub message: Vec<u8, 128>,
pub properties: Vec<minimq::Property<'a>, 1>,
}
/// The payload of the MQTT response message to a settings update request.
#[derive(Serialize)]
pub struct SettingsResponse {
code: u8,
msg: String<64>,
}
impl<'a> MqttMessage<'a> {
/// Construct a new MQTT message from an incoming message.
///
/// # Args
/// * `properties` - A list of properties associated with the inbound message.
/// * `default_response` - The default response topic for the message
/// * `msg` - The response associated with the message. Must fit within 128 bytes.
pub fn new<'b: 'a>(
properties: &[minimq::Property<'a>],
default_response: &'b str,
msg: &impl Serialize,
) -> Self {
// Extract the MQTT response topic.
let topic = properties
.iter()
.find_map(|prop| {
if let minimq::Property::ResponseTopic(topic) = prop {
Some(topic)
} else {
None
}
})
.unwrap_or(&default_response);
// Associate any provided correlation data with the response.
let mut correlation_data: Vec<minimq::Property<'a>, 1> = Vec::new();
if let Some(data) = properties
.iter()
.find(|prop| matches!(prop, minimq::Property::CorrelationData(_)))
{
// Note(unwrap): Unwrap can not fail, as we only ever push one value.
correlation_data.push(*data).unwrap();
}
Self {
topic,
// Note(unwrap): All SettingsResponse objects are guaranteed to fit in the vector.
message: miniconf::serde_json_core::to_vec(msg).unwrap(),
properties: correlation_data,
}
}
}
impl From<Result<(), miniconf::Error>> for SettingsResponse {
fn from(result: Result<(), miniconf::Error>) -> Self {
match result {
Ok(_) => Self {
msg: String::from("OK"),
code: SettingsResponseCode::NoError as u8,
},
Err(error) => {
let mut msg = String::new();
if write!(&mut msg, "{:?}", error).is_err() {
msg = String::from("Miniconf Error");
}
Self {
code: SettingsResponseCode::MiniconfError as u8,
msg,
}
}
}
}
}

View File

@ -1,169 +0,0 @@
///! Stabilizer Run-time Settings Client
///!
///! # Design
///! Stabilizer allows for settings to be configured at run-time via MQTT using miniconf.
///! Settings are written in serialized JSON form to the settings path associated with the setting.
///!
///! # Limitations
///! The MQTT client logs failures to subscribe to the settings topic, but does not re-attempt to
///connect to it when errors occur.
///!
///! Respones to settings updates are sent without quality-of-service guarantees, so there's no
///! guarantee that the requestee will be informed that settings have been applied.
use heapless::String;
use log::info;
use super::{MqttMessage, NetworkReference, SettingsResponse, UpdateState};
use minimq::embedded_nal::IpAddr;
/// MQTT settings interface.
pub struct MiniconfClient<S>
where
S: miniconf::Miniconf + Default + Clone,
{
default_response_topic: String<128>,
mqtt: minimq::Minimq<NetworkReference, 256>,
settings: S,
subscribed: bool,
settings_prefix: String<64>,
}
impl<S> MiniconfClient<S>
where
S: miniconf::Miniconf + Default + Clone,
{
/// Construct a new MQTT settings interface.
///
/// # Args
/// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device.
/// * `broker` - The IP address of the MQTT broker to use.
pub fn new(
stack: NetworkReference,
client_id: &str,
prefix: &str,
broker: IpAddr,
) -> Self {
let mqtt = minimq::Minimq::new(broker, client_id, stack).unwrap();
let mut response_topic: String<128> = String::from(prefix);
response_topic.push_str("/log").unwrap();
let mut settings_prefix: String<64> = String::from(prefix);
settings_prefix.push_str("/settings").unwrap();
Self {
mqtt,
settings: S::default(),
settings_prefix,
default_response_topic: response_topic,
subscribed: false,
}
}
/// Update the MQTT interface and service the network
///
/// # Returns
/// An option containing an action that should be completed as a result of network servicing.
pub fn update(&mut self) -> UpdateState {
let mqtt_connected = match self.mqtt.client.is_connected() {
Ok(connected) => connected,
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => false,
Err(minimq::Error::Network(error)) => {
log::info!("Unexpected network error: {:?}", error);
false
}
Err(error) => {
log::warn!("Unexpected MQTT error: {:?}", error);
false
}
};
// If we're no longer subscribed to the settings topic, but we are connected to the broker,
// resubscribe.
if !self.subscribed && mqtt_connected {
log::info!("MQTT connected, subscribing to settings");
// Note(unwrap): We construct a string with two more characters than the prefix
// strucutre, so we are guaranteed to have space for storage.
let mut settings_topic: String<66> =
String::from(self.settings_prefix.as_str());
settings_topic.push_str("/#").unwrap();
// We do not currently handle or process potential subscription failures. Instead, this
// failure will be logged through the stabilizer logging interface.
self.mqtt.client.subscribe(&settings_topic, &[]).unwrap();
self.subscribed = true;
}
// Handle any MQTT traffic.
let settings = &mut self.settings;
let mqtt = &mut self.mqtt;
let prefix = self.settings_prefix.as_str();
let default_response_topic = self.default_response_topic.as_str();
let mut update = false;
match mqtt.poll(|client, topic, message, properties| {
let path = match topic.strip_prefix(prefix) {
// For paths, we do not want to include the leading slash.
Some(path) => {
if !path.is_empty() {
&path[1..]
} else {
path
}
}
None => {
info!("Unexpected MQTT topic: {}", topic);
return;
}
};
log::info!("Settings update: `{}`", path);
let message: SettingsResponse = settings
.string_set(path.split('/').peekable(), message)
.map(|_| {
update = true;
})
.into();
let response =
MqttMessage::new(properties, default_response_topic, &message);
client
.publish(
response.topic,
&response.message,
// TODO: When Minimq supports more QoS levels, this should be increased to
// ensure that the client has received it at least once.
minimq::QoS::AtMostOnce,
&response.properties,
)
.ok();
}) {
// If settings updated,
Ok(_) if update => UpdateState::Updated,
Ok(_) => UpdateState::NoChange,
Err(minimq::Error::SessionReset) => {
log::warn!("Settings MQTT session reset");
self.subscribed = false;
UpdateState::NoChange
}
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => UpdateState::NoChange,
Err(error) => {
log::info!("Unexpected error: {:?}", error);
UpdateState::NoChange
}
}
}
/// Get the current settings from miniconf.
pub fn settings(&self) -> &S {
&self.settings
}
}

View File

@ -10,16 +10,12 @@ pub use miniconf;
pub use serde;
pub mod data_stream;
pub mod messages;
pub mod miniconf_client;
pub mod network_processor;
pub mod shared;
pub mod telemetry;
use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack};
use data_stream::{DataStream, FrameGenerator};
use messages::{MqttMessage, SettingsResponse};
use miniconf_client::MiniconfClient;
use minimq::embedded_nal::IpAddr;
use network_processor::NetworkProcessor;
use shared::NetworkManager;
@ -49,8 +45,8 @@ pub enum NetworkState {
NoChange,
}
/// A structure of Stabilizer's default network users.
pub struct NetworkUsers<S: Default + Clone + Miniconf, T: Serialize> {
pub miniconf: MiniconfClient<S>,
pub struct NetworkUsers<S: Default + Miniconf, T: Serialize> {
pub miniconf: miniconf::MqttClient<S, NetworkReference>,
pub processor: NetworkProcessor,
stream: DataStream,
generator: Option<FrameGenerator>,
@ -59,7 +55,7 @@ pub struct NetworkUsers<S: Default + Clone + Miniconf, T: Serialize> {
impl<S, T> NetworkUsers<S, T>
where
S: Default + Clone + Miniconf,
S: Default + Miniconf,
T: Serialize,
{
/// Construct Stabilizer's default network users.
@ -94,12 +90,13 @@ where
let prefix = get_device_prefix(app, mac);
let settings = MiniconfClient::new(
let settings = miniconf::MqttClient::new(
stack_manager.acquire_stack(),
&get_client_id(app, "settings", mac),
&prefix,
broker,
);
)
.unwrap();
let telemetry = TelemetryClient::new(
stack_manager.acquire_stack(),
@ -164,8 +161,8 @@ where
};
match self.miniconf.update() {
UpdateState::Updated => NetworkState::SettingsChanged,
UpdateState::NoChange => poll_result,
Ok(true) => NetworkState::SettingsChanged,
_ => poll_result,
}
}
}