diff --git a/src/main.rs b/src/main.rs index 1b0b0db..3ad6605 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,7 +18,7 @@ use cortex_m; use cortex_m_rt::entry; use rtic::cyccnt::{Instant, U32Ext}; -use heapless::consts; +use heapless::{ String, consts, consts::* }; #[macro_use] pub mod bitmask_macro; @@ -218,10 +218,10 @@ fn main() -> ! { let mut urukul = Urukul::new( parts.spi1, parts.spi2, parts.spi3, parts.spi4, parts.spi5, parts.spi6, parts.spi7 ); - urukul.reset().unwrap(); - let mut mqtt_mux = MqttMux::new(urukul); + let device_name = "Urukul"; + let mut mqtt_mux = MqttMux::new(urukul, device_name); // Time unit in ms let mut time: u32 = 0; @@ -239,7 +239,7 @@ fn main() -> ! { let mut client = MqttClient::::new( IpAddr::V4(Ipv4Addr::new(192, 168, 1, 125)), - "Urukul", + device_name, tcp_stack, ) .unwrap(); @@ -276,18 +276,19 @@ fn main() -> ! { }; // Process MQTT response messages about Urukul - match mqtt_mux.process_mqtt_egress().unwrap() { - Some((topic, message)) => client.publish( - topic, + for (topic, message) in mqtt_mux.process_mqtt_egress().unwrap() { + client.publish( + topic.as_str(), message.as_bytes(), QoS::AtMostOnce, &[] - ).unwrap(), - None => {}, + ).unwrap(); } if connection && !has_subscribed && tick { - match client.subscribe("Urukul/Control/#", &[]) { + let mut str_builder: String = String::from(device_name); + str_builder.push_str("/Control/#").unwrap(); + match client.subscribe(str_builder.as_str(), &[]) { Ok(()) => has_subscribed = true, Err(minimq::Error::NotReady) => {}, _e => {}, diff --git a/src/mqtt_mux.rs b/src/mqtt_mux.rs index f507def..c1eab5d 100644 --- a/src/mqtt_mux.rs +++ b/src/mqtt_mux.rs @@ -6,8 +6,7 @@ use nom::character::complete::digit1; use nom::character::is_space; use nom::branch::{permutation, alt}; use nom::number::complete::{float, double}; -use heapless::String; -use heapless::consts::*; +use heapless::{ Vec, String, consts::* }; use ryu; use embedded_hal::blocking::spi::Transfer; use core::convert::TryInto; @@ -52,19 +51,19 @@ pub enum MqttCommand { Profile(u8) } -pub struct MqttMux { +pub struct MqttMux<'s, SPI> { urukul: Urukul, yet_to_respond: Option, - str_builder: String, + name: &'s str, float_buffer: ryu::Buffer, } -impl MqttMux where SPI: Transfer { - pub fn new(urukul: Urukul) -> Self { +impl<'s, SPI, E> MqttMux<'s, SPI> where SPI: Transfer { + pub fn new(urukul: Urukul, name: &'s str) -> Self { MqttMux { urukul: urukul, yet_to_respond: None, - str_builder: String::new(), + name: name, float_buffer: ryu::Buffer::new(), } } @@ -95,43 +94,56 @@ impl MqttMux where SPI: Transfer { // Be sure to call egress function after each ingress. // Otherwise, response will be lost if successive valid MQTT messages were captured // without calling egress in between - pub fn process_mqtt_egress(&mut self) -> Result)>, Error> { + pub fn process_mqtt_egress(&mut self) -> Result, String), U4>, Error> { // Remove previously executed command, and process it afterwards let prev_cmd = self.yet_to_respond.clone(); self.yet_to_respond = None; + let mut vec = Vec::new(); match prev_cmd { Some(cmd) => match cmd { - MqttCommand::ProcessError => Ok( - Some(( - "Urukul/Feedback/Error", - String::from("Cannot parse the previous command.") - )) - ), - MqttCommand::Reset => Ok( - Some(( - "Urukul/Feedback/Reset", + MqttCommand::ProcessError => { + vec.push(( { + let mut topic_string = String::from(self.name); + topic_string.push_str("/Feedback/Error") + .map_err(|_| Error::StringOutOfSpace)?; + topic_string + }, + String::from("Cannot parse the previous command.") + )).map_err(|_| Error::VectorOutOfSpace)?; + Ok(vec) + } + + MqttCommand::Reset => { + vec.push(( + { + let mut topic_string = String::from(self.name); + topic_string.push_str("/Feedback/Reset") + .map_err(|_| Error::StringOutOfSpace)?; + topic_string + }, String::from( match self.urukul.test() { Ok(0) => "Reset successful.", _ => "Reset error!", } ) + )).map_err(|_| Error::VectorOutOfSpace)?; + Ok(vec) } - )) - ), - MqttCommand::Switch(ch, _) => Ok( - Some(( + + MqttCommand::Switch(ch, _) => { + vec.push(( { - self.str_builder.clear(); - self.str_builder.push_str("Urukul/Feedback/Channel") + let mut topic_string = String::from(self.name); + topic_string.push_str("/Feedback/Channel") .map_err(|_| Error::StringOutOfSpace)?; - self.str_builder.push(char::from_digit(ch.into(), 10).unwrap()) + topic_string.push(char::from_digit(ch.into(), 10).unwrap()) .map_err(|_| Error::StringOutOfSpace)?; - self.str_builder.push_str("/Switch") + topic_string.push_str("/Switch") .map_err(|_| Error::StringOutOfSpace)?; - self.str_builder.as_str() + topic_string }, { String::from( @@ -142,19 +154,21 @@ impl MqttMux where SPI: Transfer { } ) } - )) - ), - MqttCommand::Attenuation(ch, _) => Ok( - Some(( + )).map_err(|_| Error::VectorOutOfSpace)?; + Ok(vec) + } + + MqttCommand::Attenuation(ch, _) => { + vec.push(( { - self.str_builder.clear(); - self.str_builder.push_str("Urukul/Feedback/Channel") + let mut topic_string = String::from(self.name); + topic_string.push_str("/Feedback/Channel") .map_err(|_| Error::StringOutOfSpace)?; - self.str_builder.push(char::from_digit(ch.into(), 10).unwrap()) + topic_string.push(char::from_digit(ch.into(), 10).unwrap()) .map_err(|_| Error::StringOutOfSpace)?; - self.str_builder.push_str("/Attenuation") + topic_string.push_str("/Attenuation") .map_err(|_| Error::StringOutOfSpace)?; - self.str_builder.as_str() + topic_string }, { String::from( @@ -168,14 +182,14 @@ impl MqttMux where SPI: Transfer { MqttCommand::SystemClock(ch, _) => Ok( Some(( { - self.str_builder.clear(); - self.str_builder.push_str("Urukul/Feedback/Channel") + let mut topic_string = String::from(self.name); + topic_string.push_str("/Feedback/Channel") .map_err(|_| Error::StringOutOfSpace)?; - self.str_builder.push(char::from_digit(ch.into(), 10).unwrap()) + topic_string.push(char::from_digit(ch.into(), 10).unwrap()) .map_err(|_| Error::StringOutOfSpace)?; - self.str_builder.push_str("/SystemClock") + topic_string.push_str("/SystemClock") .map_err(|_| Error::StringOutOfSpace)?; - self.str_builder.as_str() + topic_string }, { let mut message_str = String::from( @@ -213,10 +227,20 @@ impl MqttMux where SPI: Transfer { let mut channel :u8 = 0; let mut profile :u8 = 0; - // Verify that the topic must start with Urukul/Control/ or /Urukul/Control/ - let mut header = topic.strip_prefix("/Urukul/Control/") - .or_else(|| topic.strip_prefix("Urukul/Control/")) - .ok_or(Error::MqttCommandError)?; + // Verify that the topic starts with /Control/ or //Control/ + let mut header = { + let mut topic_builder_with_slash: String = String::from("/"); + topic_builder_with_slash.push_str(self.name) + .map_err(|_| Error::StringOutOfSpace)?; + topic_builder_with_slash.push_str("/Control/") + .map_err(|_| Error::StringOutOfSpace)?; + let topic_builder: &str = topic_builder_with_slash.as_str() + .strip_prefix("/") + .ok_or(Error::StringOutOfSpace)?; + topic.strip_prefix(topic_builder_with_slash.as_str()) + .or_else(|| topic.strip_prefix(topic_builder)) + .ok_or(Error::MqttCommandError)? + }; loop { match header {