mqtt_mux: add feedback

pull/4/head
occheung 2020-09-28 14:15:37 +08:00
parent 90446bcce2
commit c467b9a1b9
3 changed files with 186 additions and 25 deletions

View File

@ -1,8 +1,9 @@
#![no_main]
#![no_std]
#![feature(core_intrinsics)]
#![feature(assoc_char_funcs)]
use log::{ trace };
use log::{ trace, warn };
use stm32h7xx_hal::gpio::Speed;
use stm32h7xx_hal::{pac, prelude::*, spi};
use stm32h7xx_hal::ethernet;
@ -262,11 +263,28 @@ fn main() -> ! {
}
// Process MQTT messages about Urukul/Control
let connection = client
let connection = match client
.poll(|_client, topic, message, _properties| {
// Why is topic a string while message is a slice?
mqtt_mux.process_mqtt(topic, message).unwrap();
}).is_ok();
mqtt_mux.process_mqtt_ingress(topic, message);
}) {
Ok(_) => true,
Err(e) => {
warn!("{:?}", e);
false
},
};
// Process MQTT response messages about Urukul
match mqtt_mux.process_mqtt_egress().unwrap() {
Some((topic, message)) => client.publish(
topic,
message.as_bytes(),
QoS::AtMostOnce,
&[]
).unwrap(),
None => {},
}
if connection && !has_subscribed && tick {
match client.subscribe("Urukul/Control/#", &[]) {
@ -276,14 +294,6 @@ fn main() -> ! {
};
}
if connection && tick && (time % 3000) == 0 {
client.publish("Urukul/Channel1/Switch", mqtt_mux
.get_switch_status_message(1)
.unwrap()
.as_bytes(), QoS::AtMostOnce, &[])
.unwrap();
}
// Reset tick flag
tick = false;
}

View File

@ -1,4 +1,3 @@
use log::info;
use nom::IResult;
use nom::combinator::{value, map, map_res, opt, all_consuming};
use nom::sequence::{terminated, preceded, pair};
@ -7,7 +6,9 @@ use nom::character::complete::digit1;
use nom::character::is_space;
use nom::branch::{permutation, alt};
use nom::number::complete::{float, double};
use heapless::String;
use heapless::consts::*;
use ryu;
use embedded_hal::blocking::spi::Transfer;
use core::convert::TryInto;
use crate::urukul::ClockSource as UrukulClockSource;
@ -35,6 +36,7 @@ pub enum MqttTopic {
// Such that Urukul accepts the enum directly
#[derive(Debug, Clone)]
pub enum MqttCommand {
ProcessError,
Reset,
Switch(u8, bool),
Attenuation(u8, f32),
@ -51,24 +53,158 @@ pub enum MqttCommand {
}
pub struct MqttMux<SPI> {
urukul: Urukul<SPI>
urukul: Urukul<SPI>,
yet_to_respond: Option<MqttCommand>,
str_builder: String<U128>,
float_buffer: ryu::Buffer,
}
impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
pub fn new(urukul: Urukul<SPI>) -> Self {
MqttMux {
urukul
urukul: urukul,
yet_to_respond: None,
str_builder: String::new(),
float_buffer: ryu::Buffer::new(),
}
}
pub fn process_mqtt(&mut self, topic: &str, message: &[u8]) -> Result<(), Error<E>> {
let header = self.parse_topic(topic)
.map_err(|_| Error::MqttTopicError)?;
info!("Parsed command topic: {:?}", header);
let (_, command) = self.parse_message(header, message)
.map_err(|_| Error::MqttCommandError)?;
info!("Parsed comamnd message: {:?}", command);
self.execute(command)
// Instead of using a return type, the result of processing the command is stored internally
// Invoke process_mqtt_egress to get a response after invoking ingress handler
pub fn process_mqtt_ingress(&mut self, topic: &str, message: &[u8]) {
let topic = match self.parse_topic(topic) {
Ok(t) => t,
Err(_) => {
self.yet_to_respond = Some(MqttCommand::ProcessError);
return;
}
};
let command = match self.parse_message(topic, message) {
Ok((_, cmd)) => cmd,
Err(_) => {
self.yet_to_respond = Some(MqttCommand::ProcessError);
return;
}
};
self.yet_to_respond = match self.execute(command.clone()) {
Err(_) => Some(MqttCommand::ProcessError),
Ok(()) => Some(command)
};
}
// Be sure to call egress function after each ingress.
// Otherwise, response will be lost if successive valid MQTT messages were captured
// without calling egress in between
pub fn process_mqtt_egress(&mut self) -> Result<Option<(&str, String<U64>)>, Error<E>> {
// Remove previously executed command, and process it afterwards
let prev_cmd = self.yet_to_respond.clone();
self.yet_to_respond = None;
match prev_cmd {
Some(cmd) => match cmd {
MqttCommand::ProcessError => Ok(
Some((
"Urukul/Feedback/Error",
String::from("Cannot parse the previous command.")
))
),
MqttCommand::Reset => Ok(
Some((
"Urukul/Feedback/Reset",
{
String::from(
match self.urukul.test() {
Ok(0) => "Reset successful.",
_ => "Reset error!",
}
)
}
))
),
MqttCommand::Switch(ch, _) => Ok(
Some((
{
self.str_builder.clear();
self.str_builder.push_str("Urukul/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push_str("/Switch")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.as_str()
},
{
String::from(
if self.urukul.get_channel_switch_status(ch.into())? {
"on"
} else {
"off"
}
)
}
))
),
MqttCommand::Attenuation(ch, _) => Ok(
Some((
{
self.str_builder.clear();
self.str_builder.push_str("Urukul/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push_str("/Attenuation")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.as_str()
},
{
String::from(
self.float_buffer.format_finite(
self.urukul.get_channel_attenuation(ch)?
)
)
}
))
),
MqttCommand::SystemClock(ch, _) => Ok(
Some((
{
self.str_builder.clear();
self.str_builder.push_str("Urukul/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.push_str("/SystemClock")
.map_err(|_| Error::StringOutOfSpace)?;
self.str_builder.as_str()
},
{
let mut message_str = String::from(
self.float_buffer.format_finite(
self.urukul.get_channel_sys_clk(ch)?
)
);
message_str.push_str(" Hz")
.map_err(|_| Error::StringOutOfSpace)?;
message_str
}
))
),
MqttCommand::Profile(_) => Ok(
Some((
"Urukul/Feedback/Profile",
{
let mut message_str = String::new();
let prof = self.urukul.get_profile()?;
message_str.push(char::from_digit(prof.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
message_str
}
))
),
_ => Ok(Some(("Urukul/Feedback/Unimplemented", String::from("test")))),
},
None => Ok(None),
}
}
fn parse_topic<'a>(&mut self, topic: &'a str) -> Result<MqttTopic, Error<E>> {
@ -263,6 +399,7 @@ impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
fn execute(&mut self, command: MqttCommand) -> Result<(), Error<E>> {
match command {
MqttCommand::ProcessError => Ok(()),
MqttCommand::Reset => self.urukul.reset(),
MqttCommand::Switch(ch, state) => self.urukul.set_channel_switch(ch.into(), state),
MqttCommand::Attenuation(ch, ampl) => self.urukul.set_channel_attenuation(ch, ampl),

View File

@ -12,7 +12,7 @@ use crate::dds::{ DDS, RAMOperationMode };
/*
* Enum for structuring error
*/
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum Error<E> {
SPI(E),
CSError,
@ -26,6 +26,8 @@ pub enum Error<E> {
ParameterError,
MqttTopicError,
MqttCommandError,
VectorOutOfSpace,
StringOutOfSpace,
}
#[derive(Debug, Clone)]
@ -233,6 +235,10 @@ where
}
}
pub fn get_channel_attenuation(&mut self, channel: u8) -> Result<f32, Error<E>> {
self.attenuator.get_channel_attenuation(channel)
}
pub fn set_channel_attenuation(&mut self, channel: u8, attenuation: f32) -> Result<(), Error<E>> {
if channel >= 4 || attenuation < 0.0 || attenuation > 31.5 {
return Err(Error::ParameterError);
@ -240,6 +246,10 @@ where
self.attenuator.set_channel_attenuation(channel, attenuation)
}
pub fn get_profile(&mut self) -> Result<u8, Error<E>> {
Ok(self.config_register.get_configuration(CFGMask::PROFILE))
}
pub fn set_profile(&mut self, profile: u8) -> Result<(), Error<E>> {
if profile >= 8 {
return Err(Error::ParameterError);
@ -292,6 +302,10 @@ where
self.dds[usize::from(channel)].set_sys_clk_frequency(f_sys_clk).map(|_| ())
}
pub fn get_channel_sys_clk(&mut self, channel: u8) -> Result<f64, Error<E>> {
Ok(self.dds[usize::from(channel)].get_f_sys_clk())
}
// Multi-dds channel functions
// Do not allow reading of DDS registers
// Make sure only 1 SPI transaction is compelted per function call