mqtt: changeable root topic name

pull/4/head
occheung 2020-09-29 17:02:15 +08:00
parent 95443a2283
commit b75c83be61
2 changed files with 79 additions and 54 deletions

View File

@ -18,7 +18,7 @@ use cortex_m;
use cortex_m_rt::entry;
use rtic::cyccnt::{Instant, U32Ext};
use heapless::consts;
use heapless::{ String, consts, consts::* };
#[macro_use]
pub mod bitmask_macro;
@ -218,10 +218,10 @@ fn main() -> ! {
let mut urukul = Urukul::new(
parts.spi1, parts.spi2, parts.spi3, parts.spi4, parts.spi5, parts.spi6, parts.spi7
);
urukul.reset().unwrap();
let mut mqtt_mux = MqttMux::new(urukul);
let device_name = "Urukul";
let mut mqtt_mux = MqttMux::new(urukul, device_name);
// Time unit in ms
let mut time: u32 = 0;
@ -239,7 +239,7 @@ fn main() -> ! {
let mut client = MqttClient::<consts::U256, _>::new(
IpAddr::V4(Ipv4Addr::new(192, 168, 1, 125)),
"Urukul",
device_name,
tcp_stack,
)
.unwrap();
@ -276,18 +276,19 @@ fn main() -> ! {
};
// Process MQTT response messages about Urukul
match mqtt_mux.process_mqtt_egress().unwrap() {
Some((topic, message)) => client.publish(
topic,
for (topic, message) in mqtt_mux.process_mqtt_egress().unwrap() {
client.publish(
topic.as_str(),
message.as_bytes(),
QoS::AtMostOnce,
&[]
).unwrap(),
None => {},
).unwrap();
}
if connection && !has_subscribed && tick {
match client.subscribe("Urukul/Control/#", &[]) {
let mut str_builder: String<U128> = String::from(device_name);
str_builder.push_str("/Control/#").unwrap();
match client.subscribe(str_builder.as_str(), &[]) {
Ok(()) => has_subscribed = true,
Err(minimq::Error::NotReady) => {},
_e => {},

View File

@ -6,8 +6,7 @@ use nom::character::complete::digit1;
use nom::character::is_space;
use nom::branch::{permutation, alt};
use nom::number::complete::{float, double};
use heapless::String;
use heapless::consts::*;
use heapless::{ Vec, String, consts::* };
use ryu;
use embedded_hal::blocking::spi::Transfer;
use core::convert::TryInto;
@ -52,19 +51,19 @@ pub enum MqttCommand {
Profile(u8)
}
pub struct MqttMux<SPI> {
pub struct MqttMux<'s, SPI> {
urukul: Urukul<SPI>,
yet_to_respond: Option<MqttCommand>,
str_builder: String<U128>,
name: &'s str,
float_buffer: ryu::Buffer,
}
impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
pub fn new(urukul: Urukul<SPI>) -> Self {
impl<'s, SPI, E> MqttMux<'s, SPI> where SPI: Transfer<u8, Error = E> {
pub fn new(urukul: Urukul<SPI>, name: &'s str) -> Self {
MqttMux {
urukul: urukul,
yet_to_respond: None,
str_builder: String::new(),
name: name,
float_buffer: ryu::Buffer::new(),
}
}
@ -95,43 +94,56 @@ impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
// Be sure to call egress function after each ingress.
// Otherwise, response will be lost if successive valid MQTT messages were captured
// without calling egress in between
pub fn process_mqtt_egress(&mut self) -> Result<Option<(&str, String<U64>)>, Error<E>> {
pub fn process_mqtt_egress(&mut self) -> Result<Vec<(String<U128>, String<U64>), U4>, Error<E>> {
// Remove previously executed command, and process it afterwards
let prev_cmd = self.yet_to_respond.clone();
self.yet_to_respond = None;
let mut vec = Vec::new();
match prev_cmd {
Some(cmd) => match cmd {
MqttCommand::ProcessError => Ok(
Some((
"Urukul/Feedback/Error",
String::from("Cannot parse the previous command.")
))
),
MqttCommand::Reset => Ok(
Some((
"Urukul/Feedback/Reset",
MqttCommand::ProcessError => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string.push_str("/Feedback/Error")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
String::from("Cannot parse the previous command.")
)).map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::Reset => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string.push_str("/Feedback/Reset")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
String::from(
match self.urukul.test() {
Ok(0) => "Reset successful.",
_ => "Reset error!",
}
)
)).map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
))
),
MqttCommand::Switch(ch, _) => Ok(
Some((
MqttCommand::Switch(ch, _) => {
vec.push((
{
self.str_builder.clear();
self.str_builder.push_str("Urukul/Feedback/Channel")
let mut topic_string = String::from(self.name);
topic_string.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push(char::from_digit(ch.into(), 10).unwrap())
topic_string.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push_str("/Switch")
topic_string.push_str("/Switch")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.as_str()
topic_string
},
{
String::from(
@ -142,19 +154,21 @@ impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
}
)
}
))
),
MqttCommand::Attenuation(ch, _) => Ok(
Some((
)).map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::Attenuation(ch, _) => {
vec.push((
{
self.str_builder.clear();
self.str_builder.push_str("Urukul/Feedback/Channel")
let mut topic_string = String::from(self.name);
topic_string.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push(char::from_digit(ch.into(), 10).unwrap())
topic_string.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push_str("/Attenuation")
topic_string.push_str("/Attenuation")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.as_str()
topic_string
},
{
String::from(
@ -168,14 +182,14 @@ impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
MqttCommand::SystemClock(ch, _) => Ok(
Some((
{
self.str_builder.clear();
self.str_builder.push_str("Urukul/Feedback/Channel")
let mut topic_string = String::from(self.name);
topic_string.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push(char::from_digit(ch.into(), 10).unwrap())
topic_string.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push_str("/SystemClock")
topic_string.push_str("/SystemClock")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.as_str()
topic_string
},
{
let mut message_str = String::from(
@ -213,10 +227,20 @@ impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
let mut channel :u8 = 0;
let mut profile :u8 = 0;
// Verify that the topic must start with Urukul/Control/ or /Urukul/Control/
let mut header = topic.strip_prefix("/Urukul/Control/")
.or_else(|| topic.strip_prefix("Urukul/Control/"))
.ok_or(Error::MqttCommandError)?;
// Verify that the topic starts with <name>/Control/ or /<name>/Control/
let mut header = {
let mut topic_builder_with_slash: String<U128> = String::from("/");
topic_builder_with_slash.push_str(self.name)
.map_err(|_| Error::StringOutOfSpace)?;
topic_builder_with_slash.push_str("/Control/")
.map_err(|_| Error::StringOutOfSpace)?;
let topic_builder: &str = topic_builder_with_slash.as_str()
.strip_prefix("/")
.ok_or(Error::StringOutOfSpace)?;
topic.strip_prefix(topic_builder_with_slash.as_str())
.or_else(|| topic.strip_prefix(topic_builder))
.ok_or(Error::MqttCommandError)?
};
loop {
match header {