From c467b9a1b90e0e18d4230c3b75395943a829a8b4 Mon Sep 17 00:00:00 2001 From: occheung Date: Mon, 28 Sep 2020 14:15:37 +0800 Subject: [PATCH] mqtt_mux: add feedback --- src/main.rs | 34 ++++++---- src/mqtt_mux.rs | 161 ++++++++++++++++++++++++++++++++++++++++++++---- src/urukul.rs | 16 ++++- 3 files changed, 186 insertions(+), 25 deletions(-) diff --git a/src/main.rs b/src/main.rs index aefae0a..1b0b0db 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,9 @@ #![no_main] #![no_std] #![feature(core_intrinsics)] +#![feature(assoc_char_funcs)] -use log::{ trace }; +use log::{ trace, warn }; use stm32h7xx_hal::gpio::Speed; use stm32h7xx_hal::{pac, prelude::*, spi}; use stm32h7xx_hal::ethernet; @@ -262,11 +263,28 @@ fn main() -> ! { } // Process MQTT messages about Urukul/Control - let connection = client + let connection = match client .poll(|_client, topic, message, _properties| { // Why is topic a string while message is a slice? - mqtt_mux.process_mqtt(topic, message).unwrap(); - }).is_ok(); + mqtt_mux.process_mqtt_ingress(topic, message); + }) { + Ok(_) => true, + Err(e) => { + warn!("{:?}", e); + false + }, + }; + + // Process MQTT response messages about Urukul + match mqtt_mux.process_mqtt_egress().unwrap() { + Some((topic, message)) => client.publish( + topic, + message.as_bytes(), + QoS::AtMostOnce, + &[] + ).unwrap(), + None => {}, + } if connection && !has_subscribed && tick { match client.subscribe("Urukul/Control/#", &[]) { @@ -276,14 +294,6 @@ fn main() -> ! { }; } - if connection && tick && (time % 3000) == 0 { - client.publish("Urukul/Channel1/Switch", mqtt_mux - .get_switch_status_message(1) - .unwrap() - .as_bytes(), QoS::AtMostOnce, &[]) - .unwrap(); - } - // Reset tick flag tick = false; } diff --git a/src/mqtt_mux.rs b/src/mqtt_mux.rs index b8ed0dc..f507def 100644 --- a/src/mqtt_mux.rs +++ b/src/mqtt_mux.rs @@ -1,4 +1,3 @@ -use log::info; use nom::IResult; use nom::combinator::{value, map, map_res, opt, all_consuming}; use nom::sequence::{terminated, preceded, pair}; @@ -7,7 +6,9 @@ use nom::character::complete::digit1; use nom::character::is_space; use nom::branch::{permutation, alt}; use nom::number::complete::{float, double}; - +use heapless::String; +use heapless::consts::*; +use ryu; use embedded_hal::blocking::spi::Transfer; use core::convert::TryInto; use crate::urukul::ClockSource as UrukulClockSource; @@ -35,6 +36,7 @@ pub enum MqttTopic { // Such that Urukul accepts the enum directly #[derive(Debug, Clone)] pub enum MqttCommand { + ProcessError, Reset, Switch(u8, bool), Attenuation(u8, f32), @@ -51,24 +53,158 @@ pub enum MqttCommand { } pub struct MqttMux { - urukul: Urukul + urukul: Urukul, + yet_to_respond: Option, + str_builder: String, + float_buffer: ryu::Buffer, } impl MqttMux where SPI: Transfer { pub fn new(urukul: Urukul) -> Self { MqttMux { - urukul + urukul: urukul, + yet_to_respond: None, + str_builder: String::new(), + float_buffer: ryu::Buffer::new(), } } - pub fn process_mqtt(&mut self, topic: &str, message: &[u8]) -> Result<(), Error> { - let header = self.parse_topic(topic) - .map_err(|_| Error::MqttTopicError)?; - info!("Parsed command topic: {:?}", header); - let (_, command) = self.parse_message(header, message) - .map_err(|_| Error::MqttCommandError)?; - info!("Parsed comamnd message: {:?}", command); - self.execute(command) + // Instead of using a return type, the result of processing the command is stored internally + // Invoke process_mqtt_egress to get a response after invoking ingress handler + pub fn process_mqtt_ingress(&mut self, topic: &str, message: &[u8]) { + let topic = match self.parse_topic(topic) { + Ok(t) => t, + Err(_) => { + self.yet_to_respond = Some(MqttCommand::ProcessError); + return; + } + }; + let command = match self.parse_message(topic, message) { + Ok((_, cmd)) => cmd, + Err(_) => { + self.yet_to_respond = Some(MqttCommand::ProcessError); + return; + } + }; + self.yet_to_respond = match self.execute(command.clone()) { + Err(_) => Some(MqttCommand::ProcessError), + Ok(()) => Some(command) + }; + } + + // Be sure to call egress function after each ingress. + // Otherwise, response will be lost if successive valid MQTT messages were captured + // without calling egress in between + pub fn process_mqtt_egress(&mut self) -> Result)>, Error> { + // Remove previously executed command, and process it afterwards + let prev_cmd = self.yet_to_respond.clone(); + self.yet_to_respond = None; + + match prev_cmd { + Some(cmd) => match cmd { + MqttCommand::ProcessError => Ok( + Some(( + "Urukul/Feedback/Error", + String::from("Cannot parse the previous command.") + )) + ), + MqttCommand::Reset => Ok( + Some(( + "Urukul/Feedback/Reset", + { + String::from( + match self.urukul.test() { + Ok(0) => "Reset successful.", + _ => "Reset error!", + } + ) + } + )) + ), + MqttCommand::Switch(ch, _) => Ok( + Some(( + { + self.str_builder.clear(); + self.str_builder.push_str("Urukul/Feedback/Channel") + .map_err(|_| Error::StringOutOfSpace)?; + self.str_builder.push(char::from_digit(ch.into(), 10).unwrap()) + .map_err(|_| Error::StringOutOfSpace)?; + self.str_builder.push_str("/Switch") + .map_err(|_| Error::StringOutOfSpace)?; + self.str_builder.as_str() + }, + { + String::from( + if self.urukul.get_channel_switch_status(ch.into())? { + "on" + } else { + "off" + } + ) + } + )) + ), + MqttCommand::Attenuation(ch, _) => Ok( + Some(( + { + self.str_builder.clear(); + self.str_builder.push_str("Urukul/Feedback/Channel") + .map_err(|_| Error::StringOutOfSpace)?; + self.str_builder.push(char::from_digit(ch.into(), 10).unwrap()) + .map_err(|_| Error::StringOutOfSpace)?; + self.str_builder.push_str("/Attenuation") + .map_err(|_| Error::StringOutOfSpace)?; + self.str_builder.as_str() + }, + { + String::from( + self.float_buffer.format_finite( + self.urukul.get_channel_attenuation(ch)? + ) + ) + } + )) + ), + MqttCommand::SystemClock(ch, _) => Ok( + Some(( + { + self.str_builder.clear(); + self.str_builder.push_str("Urukul/Feedback/Channel") + .map_err(|_| Error::StringOutOfSpace)?; + self.str_builder.push(char::from_digit(ch.into(), 10).unwrap()) + .map_err(|_| Error::StringOutOfSpace)?; + self.str_builder.push_str("/SystemClock") + .map_err(|_| Error::StringOutOfSpace)?; + self.str_builder.as_str() + }, + { + let mut message_str = String::from( + self.float_buffer.format_finite( + self.urukul.get_channel_sys_clk(ch)? + ) + ); + message_str.push_str(" Hz") + .map_err(|_| Error::StringOutOfSpace)?; + message_str + } + )) + ), + MqttCommand::Profile(_) => Ok( + Some(( + "Urukul/Feedback/Profile", + { + let mut message_str = String::new(); + let prof = self.urukul.get_profile()?; + message_str.push(char::from_digit(prof.into(), 10).unwrap()) + .map_err(|_| Error::StringOutOfSpace)?; + message_str + } + )) + ), + _ => Ok(Some(("Urukul/Feedback/Unimplemented", String::from("test")))), + }, + None => Ok(None), + } } fn parse_topic<'a>(&mut self, topic: &'a str) -> Result> { @@ -263,6 +399,7 @@ impl MqttMux where SPI: Transfer { fn execute(&mut self, command: MqttCommand) -> Result<(), Error> { match command { + MqttCommand::ProcessError => Ok(()), MqttCommand::Reset => self.urukul.reset(), MqttCommand::Switch(ch, state) => self.urukul.set_channel_switch(ch.into(), state), MqttCommand::Attenuation(ch, ampl) => self.urukul.set_channel_attenuation(ch, ampl), diff --git a/src/urukul.rs b/src/urukul.rs index deb27f9..e136aa9 100644 --- a/src/urukul.rs +++ b/src/urukul.rs @@ -12,7 +12,7 @@ use crate::dds::{ DDS, RAMOperationMode }; /* * Enum for structuring error */ -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Error { SPI(E), CSError, @@ -26,6 +26,8 @@ pub enum Error { ParameterError, MqttTopicError, MqttCommandError, + VectorOutOfSpace, + StringOutOfSpace, } #[derive(Debug, Clone)] @@ -233,6 +235,10 @@ where } } + pub fn get_channel_attenuation(&mut self, channel: u8) -> Result> { + self.attenuator.get_channel_attenuation(channel) + } + pub fn set_channel_attenuation(&mut self, channel: u8, attenuation: f32) -> Result<(), Error> { if channel >= 4 || attenuation < 0.0 || attenuation > 31.5 { return Err(Error::ParameterError); @@ -240,6 +246,10 @@ where self.attenuator.set_channel_attenuation(channel, attenuation) } + pub fn get_profile(&mut self) -> Result> { + Ok(self.config_register.get_configuration(CFGMask::PROFILE)) + } + pub fn set_profile(&mut self, profile: u8) -> Result<(), Error> { if profile >= 8 { return Err(Error::ParameterError); @@ -292,6 +302,10 @@ where self.dds[usize::from(channel)].set_sys_clk_frequency(f_sys_clk).map(|_| ()) } + pub fn get_channel_sys_clk(&mut self, channel: u8) -> Result> { + Ok(self.dds[usize::from(channel)].get_f_sys_clk()) + } + // Multi-dds channel functions // Do not allow reading of DDS registers // Make sure only 1 SPI transaction is compelted per function call