Refactoring net module

master
Ryan Summers 2021-04-29 11:54:16 +02:00
parent aad026161f
commit 7ddf2271f3
4 changed files with 168 additions and 120 deletions

124
src/net/messages.rs Normal file
View File

@ -0,0 +1,124 @@
use heapless::{consts, String, Vec};
use serde::Serialize;
use core::fmt::Write;
#[derive(Debug, Copy, Clone)]
pub enum SettingsResponseCode {
NoError = 0,
NoTopic = 1,
InvalidPrefix = 2,
UnknownTopic = 3,
UpdateFailure = 4,
}
/// Represents a generic MQTT message.
pub struct MqttMessage<'a> {
pub topic: &'a str,
pub message: Vec<u8, consts::U128>,
pub properties: Vec<minimq::Property<'a>, consts::U1>,
}
/// The payload of the MQTT response message to a settings update request.
#[derive(Serialize)]
pub struct SettingsResponse {
code: u8,
msg: String<heapless::consts::U64>,
}
impl<'a> MqttMessage<'a> {
/// Construct a new MQTT message from an incoming message.
///
/// # Args
/// * `properties` - A list of properties associated with the inbound message.
/// * `default_response` - The default response topic for the message
/// * `msg` - The response associated with the message. Must fit within 128 bytes.
pub fn new<'b: 'a>(
properties: &[minimq::Property<'a>],
default_response: &'b str,
msg: &impl Serialize,
) -> Self {
// Extract the MQTT response topic.
let topic = properties
.iter()
.find_map(|prop| {
if let minimq::Property::ResponseTopic(topic) = prop {
Some(topic)
} else {
None
}
})
.unwrap_or(&default_response);
// Associate any provided correlation data with the response.
let mut correlation_data: Vec<minimq::Property<'a>, consts::U1> =
Vec::new();
if let Some(data) = properties
.iter()
.find(|prop| matches!(prop, minimq::Property::CorrelationData(_)))
{
// Note(unwrap): Unwrap can not fail, as we only ever push one value.
correlation_data.push(*data).unwrap();
}
Self {
topic,
// Note(unwrap): All SettingsResponse objects are guaranteed to fit in the vector.
message: miniconf::serde_json_core::to_vec(msg).unwrap(),
properties: correlation_data,
}
}
}
impl SettingsResponse {
/// Construct a settings response upon successful settings update.
///
/// # Args
/// * `path` - The path of the setting that was updated.
pub fn update_success(path: &str) -> Self {
let mut msg: String<consts::U64> = String::new();
if write!(&mut msg, "{} updated", path).is_err() {
msg = String::from("Latest update succeeded");
}
Self {
msg,
code: SettingsResponseCode::NoError as u8,
}
}
/// Construct a response when a settings update failed.
///
/// # Args
/// * `path` - The settings path that configuration failed for.
/// * `err` - The settings update error that occurred.
pub fn update_failure(path: &str, err: miniconf::Error) -> Self {
let mut msg: String<consts::U64> = String::new();
if write!(&mut msg, "{} update failed: {:?}", path, err).is_err() {
if write!(&mut msg, "Latest update failed: {:?}", err).is_err() {
msg = String::from("Latest update failed");
}
}
Self {
msg,
code: SettingsResponseCode::UpdateFailure as u8,
}
}
/// Construct a response from a custom response code.
///
/// # Args
/// * `code` - The response code to provide.
pub fn code(code: SettingsResponseCode) -> Self {
let mut msg: String<consts::U64> = String::new();
// Note(unwrap): All code debug names shall fit in the 64 byte string.
write!(&mut msg, "{:?}", code).unwrap();
Self {
code: code as u8,
msg,
}
}
}

View File

@ -1,11 +1,18 @@
///! Stabilizer network management module
///!
///! # Design
///! The stabilizer network architecture supports numerous layers to permit transmission of
///! telemetry (via MQTT), configuration of run-time settings (via MQTT + Miniconf), and live data
///! streaming over raw UDP/TCP sockets. This module encompasses the main processing routines
///! related to Stabilizer networking operations.
use heapless::{consts, String};
use core::fmt::Write;
mod messages;
mod mqtt_interface;
mod router;
use messages::{MqttMessage, SettingsResponse, SettingsResponseCode};
pub use mqtt_interface::MqttInterface;
use router::{RouteResult, SettingsResponse};
/// Potential actions for firmware to take.
pub enum Action {

View File

@ -7,7 +7,7 @@ use core::{cell::RefCell, fmt::Write};
use heapless::{consts, String};
use serde::Serialize;
use super::{Action, RouteResult, SettingsResponse};
use super::{Action, MqttMessage, SettingsResponse, SettingsResponseCode};
/// MQTT settings interface.
pub struct MqttInterface<S>
@ -136,7 +136,7 @@ where
self.route_message(topic, message, properties);
client
.publish(
response.response_topic,
response.topic,
&response.message,
minimq::QoS::AtMostOnce,
&response.properties,
@ -175,33 +175,41 @@ where
topic: &str,
message: &[u8],
properties: &[minimq::Property<'a>],
) -> (RouteResult<'a>, bool) {
let mut response =
RouteResult::new(properties, &self.default_response_topic);
) -> (MqttMessage<'a>, bool) {
let mut update = false;
if let Some(path) = topic.strip_prefix(self.id.as_str()) {
let mut parts = path[1..].split('/');
match parts.next() {
Some("settings") => {
let result = self
.settings
.borrow_mut()
.string_set(parts.peekable(), message);
update = result.is_ok();
response.set_message(SettingsResponse::new(result, topic));
let response_msg =
if let Some(path) = topic.strip_prefix(self.id.as_str()) {
let mut parts = path[1..].split('/');
match parts.next() {
Some("settings") => {
match self
.settings
.borrow_mut()
.string_set(parts.peekable(), message)
{
Ok(_) => {
update = true;
SettingsResponse::update_success(path)
}
Err(error) => {
SettingsResponse::update_failure(path, error)
}
}
}
Some(_) => SettingsResponse::code(
SettingsResponseCode::UnknownTopic,
),
_ => SettingsResponse::code(SettingsResponseCode::NoTopic),
}
Some(_) => response.set_message(SettingsResponse::custom(
"Unknown topic",
255,
)),
_ => response
.set_message(SettingsResponse::custom("No topic", 254)),
}
} else {
response
.set_message(SettingsResponse::custom("Invalid prefix", 253));
}
} else {
SettingsResponse::code(SettingsResponseCode::InvalidPrefix)
};
let response = MqttMessage::new(
properties,
&self.default_response_topic,
&response_msg,
);
(response, update)
}

View File

@ -1,91 +0,0 @@
use heapless::{consts, String, Vec};
use serde::Serialize;
use core::fmt::Write;
pub struct RouteResult<'a> {
pub response_topic: &'a str,
pub message: Vec<u8, consts::U128>,
pub properties: Vec<minimq::Property<'a>, consts::U1>,
}
#[derive(Serialize)]
pub struct SettingsResponse {
code: u8,
msg: String<heapless::consts::U64>,
}
impl<'a> RouteResult<'a> {
pub fn new<'b: 'a>(
properties: &[minimq::Property<'a>],
default_response: &'b str,
) -> Self {
// Extract the MQTT response topic.
let response_topic = properties
.iter()
.find_map(|prop| {
if let minimq::Property::ResponseTopic(topic) = prop {
Some(topic)
} else {
None
}
})
.unwrap_or(&default_response);
// Associate any provided correlation data with the response.
let mut correlation_data: Vec<minimq::Property<'a>, consts::U1> =
Vec::new();
if let Some(data) = properties
.iter()
.find(|prop| matches!(prop, minimq::Property::CorrelationData(_)))
{
// Note(unwrap): Unwrap can not fail, as we only ever push one value.
correlation_data.push(*data).unwrap();
}
RouteResult {
response_topic,
message: Vec::new(),
properties: correlation_data,
}
}
pub fn set_message(&mut self, response: impl Serialize) {
self.message = miniconf::serde_json_core::to_vec(&response).unwrap();
}
}
impl SettingsResponse {
pub fn new(result: Result<(), miniconf::Error>, path: &str) -> Self {
match result {
Ok(_) => {
let mut msg: String<consts::U64> = String::new();
if write!(&mut msg, "{} updated", path).is_err() {
msg = String::from("Latest update succeeded");
}
Self { msg, code: 0 }
}
Err(error) => {
let mut msg: String<consts::U64> = String::new();
if write!(&mut msg, "{} update failed: {:?}", path, error)
.is_err()
{
if write!(&mut msg, "Latest update failed: {:?}", error)
.is_err()
{
msg = String::from("Latest update failed");
}
}
Self { msg, code: 5 }
}
}
}
pub fn custom(msg: &str, code: u8) -> Self {
Self {
code,
msg: String::from(msg),
}
}
}