diff --git a/src/mqtt_mux.rs b/src/mqtt_mux.rs index 349996f..9cc1a12 100644 --- a/src/mqtt_mux.rs +++ b/src/mqtt_mux.rs @@ -11,6 +11,8 @@ use nom::number::complete::{float, double}; use uom::si::f64::Frequency; use uom::si::frequency::{hertz, kilohertz, megahertz, gigahertz}; +use arrayvec::ArrayVec; + use embedded_hal::blocking::spi::Transfer; use core::convert::TryInto; use crate::ClockSource as UrukulClockSource; @@ -20,6 +22,7 @@ use crate::Error; #[derive(Debug, Clone)] pub enum MqttTopic { + Reset, Switch(u8), Attenuation(u8), Clock, @@ -38,6 +41,7 @@ pub enum MqttTopic { // Such that Urukul accepts the enum directly #[derive(Debug, Clone)] pub enum MqttCommand { + Reset, Switch(u8, bool), Attenuation(u8, f32), Clock(UrukulClockSource, f64, u8), @@ -64,7 +68,7 @@ impl MqttMux where SPI: Transfer { } pub fn process_mqtt(&mut self, topic: &str, message: &[u8]) -> Result<(), Error> { - let (_, header) = self.parse_header(topic) + let header = self.parse_topic(topic) .map_err(|_| Error::MqttTopicError)?; info!("{:?}", header); let (_, command) = self.parse_message(header, message) @@ -72,7 +76,7 @@ impl MqttMux where SPI: Transfer { info!("{:?}", command); self.execute(command) } - +/* fn parse_header<'a>(&mut self, topic: &'a str) -> IResult<&'a str, MqttTopic> { preceded( alt(( @@ -82,6 +86,10 @@ impl MqttMux where SPI: Transfer { alt(( switch, attenuation, + clock, + clock_source, + clock_frequency, + clock_division, singletone, singletone_frequency, singletone_amplitude, @@ -90,9 +98,182 @@ impl MqttMux where SPI: Transfer { )) )(topic) } +*/ + fn parse_topic<'a>(&mut self, topic: &'a str) -> Result> { + let mut assigned_channel = false; + let mut assigned_profile = false; + let mut channel :u8 = 0; + let mut profile :u8 = 0; + + // Verify that the topic must start with Urukul/Control/ or /Urukul/Control/ + let mut header = topic.strip_prefix("/Urukul/Control/") + .or_else(|| topic.strip_prefix("Urukul/Control/")) + .ok_or(Error::MqttCommandError)?; + + loop { + match header { + // The topic has a channel subtopic + _ if header.starts_with("Channel") => { + // MQTT command should only mention channel once appropriately + // Channel must be referred before profile, + // as a channel is broader than a profile + if assigned_channel || assigned_profile { + return Err(Error::MqttCommandError); + } + // Remove the "Channel" part of the subtopic + header = header.strip_prefix("Channel") + .ok_or(Error::MqttCommandError)?; + // Remove the channel number at the end of the subtopic + // But store the channel as a char, so it can be removed easily + let numeric_char :char = header.chars() + .next() + .ok_or(Error::MqttCommandError)?; + // Record the channel number + channel = numeric_char.to_digit(10) + .ok_or(Error::MqttCommandError)? + .try_into() + .unwrap(); + assigned_channel = true; + header = header.strip_prefix(numeric_char) + .ok_or(Error::MqttCommandError)?; + // Remove forward slash ("/") + header = header.strip_prefix("/") + .ok_or(Error::MqttCommandError)?; + }, + + _ if (header.starts_with("Profile") && assigned_channel) => { + // MQTT command should only mention profile once appropriately + if assigned_profile { + return Err(Error::MqttCommandError); + } + // Remove the "Profile" part of the subtopic + header = header.strip_prefix("Profile") + .ok_or(Error::MqttCommandError)?; + // Remove the profile number at the end of the subtopic + // But store the profile as a char, so it can be removed easily + let numeric_char :char = header.chars() + .next() + .ok_or(Error::MqttCommandError)?; + // Record the channel number + profile = numeric_char.to_digit(10) + .ok_or(Error::MqttCommandError)? + .try_into() + .unwrap(); + assigned_profile = true; + header = header.strip_prefix(numeric_char) + .ok_or(Error::MqttCommandError)?; + // Remove forward slash ("/") + header = header.strip_prefix("/") + .ok_or(Error::MqttCommandError)?; + }, + + "Reset" => { + if assigned_channel || assigned_profile { + return Err(Error::MqttCommandError); + } + return Ok(MqttTopic::Reset); + }, + + "Switch" => { + // Switch is a channel specific topic + if !(assigned_channel && !assigned_profile) { + return Err(Error::MqttCommandError); + } + return Ok(MqttTopic::Switch(channel)); + }, + + "Attenuation" => { + // Attenuation is a channel specific topic + if !(assigned_channel && !assigned_profile) { + return Err(Error::MqttCommandError); + } + return Ok(MqttTopic::Attenuation(channel)); + }, + + "Clock" => { + if assigned_channel || assigned_profile { + return Err(Error::MqttCommandError); + } + return Ok(MqttTopic::Clock); + }, + + "Clock/Source" => { + // Clock/Source refers to the Urukul clock source + // It should be common for all channels and profiles + if assigned_channel || assigned_profile { + return Err(Error::MqttCommandError); + } + return Ok(MqttTopic::ClockSource); + }, + + "Clock/Frequency" => { + // Clock/Frequency refers to the Urukul clock frequency + // It should be common for all channels and profiles + if assigned_channel || assigned_profile { + return Err(Error::MqttCommandError); + } + return Ok(MqttTopic::ClockFrequency); + }, + + "Clock/Division" => { + // Clock/Division refers to the Urukul clock division + // It should be common for all channels and profiles + if assigned_channel || assigned_profile { + return Err(Error::MqttCommandError); + } + return Ok(MqttTopic::ClockDivision); + }, + + "SystemClock" => { + if !(assigned_channel && !assigned_profile) { + return Err(Error::MqttCommandError); + } + return Ok(MqttTopic::SystemClock(channel)); + } + + "Singletone" => { + if !(assigned_channel && assigned_profile) { + return Err(Error::MqttCommandError) + } + return Ok(MqttTopic::Singletone(channel, profile)); + } + + "Singletone/Frequency" => { + if !(assigned_channel && assigned_profile) { + return Err(Error::MqttCommandError) + } + return Ok(MqttTopic::SingletoneFrequency(channel, profile)); + } + + "Singletone/Amplitude" => { + if !(assigned_channel && assigned_profile) { + return Err(Error::MqttCommandError) + } + return Ok(MqttTopic::SingletoneAmplitude(channel, profile)); + } + + "Singletone/Phase" => { + if !(assigned_channel && assigned_profile) { + return Err(Error::MqttCommandError) + } + return Ok(MqttTopic::SingletonePhase(channel, profile)); + } + + "Profile" => { + if assigned_channel || assigned_profile { + return Err(Error::MqttCommandError) + } + return Ok(MqttTopic::Profile); + } + + _ => return Err(Error::MqttCommandError), + }; + } + } fn parse_message<'a>(&mut self, topic: MqttTopic, message: &'a [u8]) -> IResult<&'a [u8], MqttCommand> { match topic { + MqttTopic::Reset => Ok((message, MqttCommand::Reset)), MqttTopic::Switch(ch) => switch_message(ch, message), MqttTopic::Attenuation(ch) => attenuation_message(ch, message), MqttTopic::Clock => clock_message(message), @@ -110,6 +291,7 @@ impl MqttMux where SPI: Transfer { fn execute(&mut self, command: MqttCommand) -> Result<(), Error> { match command { + MqttCommand::Reset => self.urukul.reset(), MqttCommand::Switch(ch, state) => self.urukul.set_channel_switch(ch.into(), state), MqttCommand::Attenuation(ch, ampl) => self.urukul.set_channel_attenuation(ch, ampl), MqttCommand::Clock(src, freq, div) => self.urukul.set_clock(src, freq, div), @@ -145,52 +327,6 @@ fn message_separator(message: &[u8]) -> IResult<&[u8], ()> { )(message) } -// Selection parsers -fn select_channel<'a>(topic: &'a str) -> IResult<&'a str, u8> { - terminated( - map_res( - preceded( - tag("Channel"), - digit1 - ), - |num: &str| u8::from_str_radix(num, 10) - ), - topic_separator - )(topic) -} - -fn select_profile<'a>(topic: &'a str) -> IResult<&'a str, u8> { - terminated( - map_res( - preceded( - tag("Profile"), - digit1 - ), - |num: &str| u8::from_str_radix(num, 10) - ), - topic_separator - )(topic) -} - -fn select_clock<'a>(topic: &'a str) -> IResult<&'a str, ()> { - value( - (), - preceded( - tag("Clock"), - topic_separator - ) - )(topic) -} - -// Selection parser for Singletone -// Note: This fucntion assumes singletone is not the most specific sub-topic -fn select_singletone<'a>(topic: &'a str) -> IResult<&'a str, ()> { - preceded( - tag("Singletone"), - topic_separator - )(topic) -} - // Read whitespace fn whitespace(message: &[u8]) -> IResult<&[u8], ()> { value((), take_while(is_space))(message) @@ -219,19 +355,6 @@ fn read_frequency(message: &[u8]) -> IResult<&[u8], f64> { )(message) } -// Parser for Switch Command Topic -fn switch<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - map( - terminated( - select_channel, - tag("Switch") - ), - |channel: u8| MqttTopic::Switch(channel) - ) - )(topic) -} - // Parser for Switch Command Message fn switch_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( @@ -245,19 +368,6 @@ fn switch_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { )(message) } -// Parser for Attenuation Command Topic -fn attenuation<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - map( - terminated( - select_channel, - tag("Attenuation") - ), - |channel: u8| MqttTopic::Attenuation(channel) - ) - )(topic) -} - // Parser for Attenuation Command Message fn attenuation_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( @@ -276,19 +386,6 @@ fn attenuation_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttComman )(message) } -// Parser for Clock Source Command Topic -fn clock_source<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - value( - MqttTopic::ClockSource, - preceded( - clock_source, - tag("Source") - ) - ) - )(topic) -} - // Parser for Clock Source Command Message fn clock_source_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( @@ -300,19 +397,6 @@ fn clock_source_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { )(message) } -// Parser for Clock Frequency Command Topic -fn clock_frequency<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - value( - MqttTopic::ClockFrequency, - preceded( - clock_source, - tag("Frequency") - ) - ) - )(topic) -} - // Parser for Clock Frequency Command Message fn clock_frequency_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( @@ -323,19 +407,6 @@ fn clock_frequency_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { )(message) } -// Parser for Clock Division Command Topic -fn clock_division<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - value( - MqttTopic::ClockDivision, - preceded( - clock_source, - tag("Division") - ) - ) - )(topic) -} - // Parser for Clock Division Command Message fn clock_division_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( @@ -351,16 +422,6 @@ fn clock_division_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { )(message) } -// Parser for one-command master clock setup topic -fn clock<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - value( - MqttTopic::Clock, - tag("Clock") - ) - )(topic) -} - // Parser for one-command master clock setup message fn clock_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( @@ -422,19 +483,6 @@ fn clock_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { )(message) } -// Topic parser for f_sys_clk of any channels -fn system_clock<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - map( - terminated( - select_channel, - tag("SystemClock") - ), - |channel: u8| MqttTopic::SystemClock(channel) - ) - )(topic) -} - // Message parser for f_sys_clk of any channels fn system_clock_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( @@ -445,25 +493,6 @@ fn system_clock_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttComma )(message) } -// Parser for Singletone frequenct Command Topic -fn singletone_frequency<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - map( - pair( - select_channel, - terminated( - select_profile, - preceded( - select_singletone, - tag("Frequency"), - ) - ) - ), - |(channel, profile): (u8, u8)| MqttTopic::SingletoneFrequency(channel, profile) - ) - )(topic) -} - // Parser for Singletone frequency Command Message fn singletone_frequency_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( @@ -474,25 +503,6 @@ fn singletone_frequency_message(channel: u8, profile: u8, message: &[u8]) -> IRe )(message) } -// Parser for Singletone Amplitude Command Topic -fn singletone_amplitude<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - map( - pair( - select_channel, - terminated( - select_profile, - preceded( - select_singletone, - tag("Amplitude"), - ) - ) - ), - |(channel, profile): (u8, u8)| MqttTopic::SingletoneAmplitude(channel, profile) - ) - )(topic) -} - // Parser for Singletone AMplitude Command Message fn singletone_amplitude_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( @@ -503,25 +513,6 @@ fn singletone_amplitude_message(channel: u8, profile: u8, message: &[u8]) -> IRe )(message) } -// Parser for Phase Command Topic -fn singletone_phase<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - map( - pair( - select_channel, - terminated( - select_profile, - preceded( - select_singletone, - tag("Phase"), - ) - ) - ), - |(channel, profile): (u8, u8)| MqttTopic::SingletonePhase(channel, profile) - ) - )(topic) -} - // Parser for Phase Command Message fn singletone_phase_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( @@ -540,22 +531,6 @@ fn singletone_phase_message(channel: u8, profile: u8, message: &[u8]) -> IResult )(message) } -// Parser for one-command singletone profile Topic -fn singletone<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - map( - pair( - select_channel, - terminated( - select_profile, - tag("Singletone") - ) - ), - |(channel, profile): (u8, u8)| MqttTopic::Singletone(channel, profile) - ) - )(topic) -} - // Parser for one-command singletone profile Command // Using JSON like command structure // Possible enhancement: further modularize parsing of all separate fields @@ -622,15 +597,6 @@ fn singletone_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8] )(message) } -fn profile<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { - all_consuming( - value( - MqttTopic::Profile, - tag("Profile") - ) - )(topic) -} - fn profile_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map(