Adding new miniconf implementation

master
Ryan Summers 2021-04-28 21:03:38 +02:00
parent f38e3b9608
commit 0922cc42af
5 changed files with 327 additions and 193 deletions

16
Cargo.lock generated
View File

@ -203,6 +203,7 @@ dependencies = [
[[package]]
name = "derive_miniconf"
version = "0.1.0"
source = "git+https://github.com/quartiq/miniconf.git?rev=c8d819c#c8d819cdab65f18396e65dafa3558daea29e3031"
dependencies = [
"proc-macro2",
"quote",
@ -415,10 +416,10 @@ dependencies = [
[[package]]
name = "miniconf"
version = "0.1.0"
source = "git+https://github.com/quartiq/miniconf.git?rev=c8d819c#c8d819cdab65f18396e65dafa3558daea29e3031"
dependencies = [
"derive_miniconf",
"heapless 0.6.1",
"minimq",
"serde",
"serde-json-core",
]
@ -426,7 +427,7 @@ dependencies = [
[[package]]
name = "minimq"
version = "0.2.0"
source = "git+https://github.com/quartiq/minimq.git?rev=933687c2e4b#933687c2e4bc8a4d972de9a4d1508b0b554a8b38"
source = "git+https://github.com/quartiq/minimq.git?branch=rs/issue-40/copyable-properties#c95c758b620ee98752852bb643df8557a7200f3f"
dependencies = [
"bit_field",
"embedded-nal",
@ -656,6 +657,12 @@ dependencies = [
"semver",
]
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]]
name = "semver"
version = "0.9.0"
@ -683,9 +690,10 @@ dependencies = [
[[package]]
name = "serde-json-core"
version = "0.2.0"
source = "git+https://github.com/rust-embedded-community/serde-json-core.git?rev=ee06ac91bc#ee06ac91bc43b72450a92198a00d9e5c5b9946d2"
source = "git+https://github.com/quartiq/serde-json-core.git?branch=feature/dependency-update#a304506a1efb4a90a6ef3faf71ec3ef5f8433fb4"
dependencies = [
"heapless 0.5.6",
"heapless 0.6.1",
"ryu",
"serde",
]

View File

@ -44,7 +44,7 @@ paste = "1"
dsp = { path = "dsp" }
ad9959 = { path = "ad9959" }
generic-array = "0.14"
miniconf = { version = "0.1.0", features = ["minimq-support"] }
miniconf = "0.1.0"
[dependencies.mcp23017]
git = "https://github.com/mrd0ll4r/mcp23017.git"
@ -55,13 +55,8 @@ features = ["stm32h743v", "rt", "unproven", "ethernet", "quadspi"]
version = "0.9.0"
[patch.crates-io.miniconf]
path = "../miniconf"
# git = "https://github.com/quartiq/miniconf.git"
# branch = "feature/mqtt-removal"
[patch.crates-io.minimq]
git = "https://github.com/quartiq/minimq.git"
rev = "933687c2e4b"
git = "https://github.com/quartiq/miniconf.git"
rev = "c8d819c"
[dependencies.smoltcp-nal]
git = "https://github.com/quartiq/smoltcp-nal.git"
@ -69,11 +64,11 @@ rev = "8468f11"
[dependencies.minimq]
git = "https://github.com/quartiq/minimq.git"
rev = "933687c2e4b"
branch = "rs/issue-40/copyable-properties"
[patch.crates-io.serde-json-core]
git = "https://github.com/rust-embedded-community/serde-json-core.git"
rev = "ee06ac91bc"
git = "https://github.com/quartiq/serde-json-core.git"
branch = "feature/dependency-update"
[features]
semihosting = ["panic-semihosting", "cortex-m-log/semihosting"]

View File

@ -1,11 +1,11 @@
use crate::hardware::{
design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack,
};
use heapless::{consts, String};
use core::{cell::RefCell, fmt::Write};
use core::fmt::Write;
use heapless::{consts, String, Vec};
use serde::Serialize;
mod mqtt_interface;
mod router;
pub use mqtt_interface::MqttInterface;
use router::{RouteResult, SettingsResponse};
/// Potential actions for firmware to take.
pub enum Action {
@ -16,178 +16,6 @@ pub enum Action {
UpdateSettings,
}
/// MQTT settings interface.
pub struct MqttInterface<S>
where
S: miniconf::Miniconf + Default + Clone,
{
telemetry_topic: String<consts::U128>,
mqtt: RefCell<minimq::MqttClient<minimq::consts::U256, NetworkStack>>,
miniconf: RefCell<miniconf::MiniconfInterface<S>>,
clock: CycleCounter,
phy: EthernetPhy,
network_was_reset: bool,
subscribed: bool,
}
impl<S> MqttInterface<S>
where
S: miniconf::Miniconf + Default + Clone,
{
/// Construct a new MQTT settings interface.
///
/// # Args
/// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device.
/// * `phy` - The PHY driver for querying the link state.
/// * `clock` - The clock to utilize for querying the current system time.
pub fn new(
stack: NetworkStack,
client_id: &str,
prefix: &str,
phy: EthernetPhy,
clock: CycleCounter,
) -> Self {
let mqtt_client =
minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack)
.unwrap();
let config =
miniconf::MiniconfInterface::new(prefix, S::default()).unwrap();
let mut telemetry_topic: String<consts::U128> = String::new();
write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap();
Self {
mqtt: RefCell::new(mqtt_client),
miniconf: RefCell::new(config),
clock,
phy,
telemetry_topic,
network_was_reset: false,
subscribed: false,
}
}
/// Update the MQTT interface and service the network
///
/// # Returns
/// An option containing an action that should be completed as a result of network servicing.
pub fn update(&mut self) -> Option<Action> {
let now = self.clock.current_ms();
// First, service the network stack to process and inbound and outbound traffic.
let sleep = match self.mqtt.borrow_mut().network_stack.poll(now) {
Ok(updated) => !updated,
Err(err) => {
log::info!("Network error: {:?}", err);
false
}
};
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
// stack.
if self.phy.poll_link() == false {
// Only reset the network stack once per link reconnection. This prevents us from
// sending an excessive number of DHCP requests.
if !self.network_was_reset {
self.network_was_reset = true;
self.mqtt.borrow_mut().network_stack.handle_link_reset();
}
} else {
self.network_was_reset = false;
}
// If we're no longer subscribed to the settings topic, but we are connected to the broker,
// resubscribe.
if !self.subscribed && self.mqtt.borrow_mut().is_connected().unwrap() {
self.mqtt
.borrow_mut()
.subscribe(
self.miniconf.borrow_mut().get_listening_topic(),
&[],
)
.unwrap();
self.subscribed = true;
}
let mut update = false;
// Handle any MQTT traffic.
match self.mqtt.borrow_mut().poll(
|client, topic, message, properties| {
if let Some(response) = self.miniconf.borrow_mut().process(
topic,
miniconf::Message::from(message, properties),
) {
let mut response_properties: Vec<
minimq::Property,
consts::U1,
> = Vec::new();
if let Some(data) = response.correlation_data {
response_properties
.push(minimq::Property::CorrelationData(data))
.unwrap();
}
// Make a best-effort attempt to send the response.
client
.publish(
response.topic,
&response.data.into_bytes(),
minimq::QoS::AtMostOnce,
&response_properties,
)
.ok();
update = true;
}
},
) {
// If settings updated,
Ok(_) => {
if update {
Some(Action::UpdateSettings)
} else if sleep {
Some(Action::Sleep)
} else {
None
}
}
Err(minimq::Error::Disconnected) => {
self.subscribed = false;
None
}
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => None,
Err(error) => {
log::info!("Unexpected error: {:?}", error);
None
}
}
}
pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) {
let telemetry =
miniconf::serde_json_core::to_string::<consts::U256, _>(telemetry)
.unwrap();
self.mqtt
.borrow_mut()
.publish(
&self.telemetry_topic,
telemetry.as_bytes(),
minimq::QoS::AtMostOnce,
&[],
)
.ok();
}
pub fn settings(&self) -> S {
self.miniconf.borrow().settings.clone()
}
}
/// Get the MQTT prefix of a device.
///
/// # Args

212
src/net/mqtt_interface.rs Normal file
View File

@ -0,0 +1,212 @@
use crate::hardware::{
design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack,
};
use core::{cell::RefCell, fmt::Write};
use heapless::{consts, String};
use serde::Serialize;
use super::{Action, RouteResult, SettingsResponse};
/// MQTT settings interface.
pub struct MqttInterface<S>
where
S: miniconf::Miniconf + Default + Clone,
{
telemetry_topic: String<consts::U128>,
default_response_topic: String<consts::U128>,
mqtt: RefCell<minimq::MqttClient<minimq::consts::U256, NetworkStack>>,
settings: RefCell<S>,
clock: CycleCounter,
phy: EthernetPhy,
network_was_reset: bool,
subscribed: bool,
id: String<consts::U32>,
}
impl<S> MqttInterface<S>
where
S: miniconf::Miniconf + Default + Clone,
{
/// Construct a new MQTT settings interface.
///
/// # Args
/// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device.
/// * `phy` - The PHY driver for querying the link state.
/// * `clock` - The clock to utilize for querying the current system time.
pub fn new(
stack: NetworkStack,
client_id: &str,
prefix: &str,
phy: EthernetPhy,
clock: CycleCounter,
) -> Self {
let mqtt_client =
minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack)
.unwrap();
let mut telemetry_topic: String<consts::U128> = String::new();
write!(&mut telemetry_topic, "{}/telemetry", prefix).unwrap();
let mut response_topic: String<consts::U128> = String::new();
write!(&mut response_topic, "{}/log", prefix).unwrap();
Self {
mqtt: RefCell::new(mqtt_client),
settings: RefCell::new(S::default()),
id: String::from(prefix),
clock,
phy,
telemetry_topic,
default_response_topic: response_topic,
network_was_reset: false,
subscribed: false,
}
}
/// Update the MQTT interface and service the network
///
/// # Returns
/// An option containing an action that should be completed as a result of network servicing.
pub fn update(&mut self) -> Option<Action> {
// First, service the network stack to process any inbound and outbound traffic.
let sleep = match self
.mqtt
.borrow_mut()
.network_stack
.poll(self.clock.current_ms())
{
Ok(updated) => !updated,
Err(err) => {
log::info!("Network error: {:?}", err);
false
}
};
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
// stack.
if self.phy.poll_link() == false {
// Only reset the network stack once per link reconnection. This prevents us from
// sending an excessive number of DHCP requests.
if !self.network_was_reset {
self.network_was_reset = true;
self.mqtt.borrow_mut().network_stack.handle_link_reset();
}
} else {
self.network_was_reset = false;
}
// If we're no longer subscribed to the settings topic, but we are connected to the broker,
// resubscribe.
if !self.subscribed && self.mqtt.borrow_mut().is_connected().unwrap() {
let mut settings_topic: String<consts::U128> = String::new();
write!(&mut settings_topic, "{}/settings/#", self.id.as_str())
.unwrap();
self.mqtt
.borrow_mut()
.subscribe(&settings_topic, &[])
.unwrap();
self.subscribed = true;
}
// Handle any MQTT traffic.
let mut update = false;
match self.mqtt.borrow_mut().poll(
|client, topic, message, properties| {
let (response, settings_update) =
self.route_message(topic, message, properties);
client
.publish(
response.response_topic,
&response.message,
minimq::QoS::AtMostOnce,
&response.properties,
)
.ok();
update = settings_update;
},
) {
// If settings updated,
Ok(_) => {
if update {
Some(Action::UpdateSettings)
} else if sleep {
Some(Action::Sleep)
} else {
None
}
}
Err(minimq::Error::Disconnected) => {
self.subscribed = false;
None
}
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => None,
Err(error) => {
log::info!("Unexpected error: {:?}", error);
None
}
}
}
fn route_message<'a, 'me: 'a>(
&'me self,
topic: &str,
message: &[u8],
properties: &[minimq::Property<'a>],
) -> (RouteResult<'a>, bool) {
let mut response =
RouteResult::new(properties, &self.default_response_topic);
let mut update = false;
if let Some(path) = topic.strip_prefix(self.id.as_str()) {
let mut parts = path[1..].split('/');
match parts.next() {
Some("settings") => {
let result = self
.settings
.borrow_mut()
.string_set(parts.peekable(), message);
update = result.is_ok();
response.set_message(SettingsResponse::new(result, topic));
}
Some(_) => response.set_message(SettingsResponse::custom(
"Unknown topic",
255,
)),
_ => response
.set_message(SettingsResponse::custom("No topic", 254)),
}
} else {
response
.set_message(SettingsResponse::custom("Invalid prefix", 253));
}
(response, update)
}
pub fn publish_telemetry(&mut self, telemetry: &impl Serialize) {
let telemetry =
miniconf::serde_json_core::to_string::<consts::U256, _>(telemetry)
.unwrap();
self.mqtt
.borrow_mut()
.publish(
&self.telemetry_topic,
telemetry.as_bytes(),
minimq::QoS::AtMostOnce,
&[],
)
.ok();
}
pub fn settings(&self) -> S {
self.settings.borrow().clone()
}
}

91
src/net/router.rs Normal file
View File

@ -0,0 +1,91 @@
use heapless::{consts, String, Vec};
use serde::Serialize;
use core::fmt::Write;
pub struct RouteResult<'a> {
pub response_topic: &'a str,
pub message: Vec<u8, consts::U128>,
pub properties: Vec<minimq::Property<'a>, consts::U1>,
}
#[derive(Serialize)]
pub struct SettingsResponse {
code: u8,
msg: String<heapless::consts::U64>,
}
impl<'a> RouteResult<'a> {
pub fn new<'b: 'a>(
properties: &[minimq::Property<'a>],
default_response: &'b str,
) -> Self {
// Extract the MQTT response topic.
let response_topic = properties
.iter()
.find_map(|prop| {
if let minimq::Property::ResponseTopic(topic) = prop {
Some(topic)
} else {
None
}
})
.unwrap_or(&default_response);
// Associate any provided correlation data with the response.
let mut correlation_data: Vec<minimq::Property<'a>, consts::U1> =
Vec::new();
if let Some(data) = properties
.iter()
.find(|prop| matches!(prop, minimq::Property::CorrelationData(_)))
{
// Note(unwrap): Unwrap can not fail, as we only ever push one value.
correlation_data.push(*data).unwrap();
}
RouteResult {
response_topic,
message: Vec::new(),
properties: correlation_data,
}
}
pub fn set_message(&mut self, response: impl Serialize) {
self.message = miniconf::serde_json_core::to_vec(&response).unwrap();
}
}
impl SettingsResponse {
pub fn new(result: Result<(), miniconf::Error>, path: &str) -> Self {
match result {
Ok(_) => {
let mut msg: String<consts::U64> = String::new();
if write!(&mut msg, "{} updated", path).is_err() {
msg = String::from("Latest update succeeded");
}
Self { msg, code: 0 }
}
Err(error) => {
let mut msg: String<consts::U64> = String::new();
if write!(&mut msg, "{} update failed: {:?}", path, error)
.is_err()
{
if write!(&mut msg, "Latest update failed: {:?}", error)
.is_err()
{
msg = String::from("Latest update failed");
}
}
Self { msg, code: 5 }
}
}
}
pub fn custom(msg: &str, code: u8) -> Self {
Self {
code,
msg: String::from(msg),
}
}
}