From d47f9b465553272a8a62bef74ab9ac6298d194ff Mon Sep 17 00:00:00 2001 From: occheung Date: Mon, 21 Sep 2020 17:31:34 +0800 Subject: [PATCH] mqtt_mux: use nom --- Cargo.lock | 30 ++- Cargo.toml | 16 +- src/lib.rs | 22 +- src/mqtt_mux.rs | 548 +++++++++++++++++++++++++++++++++--------------- 4 files changed, 438 insertions(+), 178 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 746ff46..e32c0a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -244,11 +244,13 @@ dependencies = [ "log", "minimq", "nb 1.0.0", + "nom", "panic-halt", "panic-itm", "scpi", "smoltcp", "stm32h7xx-hal", + "uom 0.29.0", ] [[package]] @@ -425,6 +427,12 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "memchr" +version = "2.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" + [[package]] name = "minimq" version = "0.1.0" @@ -458,6 +466,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2178127478ae4ee9be7180bc9c3bffb6354dd7238400db567102f98c413a9f35" +[[package]] +name = "nom" +version = "5.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffb4262d26ed83a1c0a33a38fe2bb15797329c85770da05e6b828ddb782627af" +dependencies = [ + "memchr", + "version_check", +] + [[package]] name = "num-integer" version = "0.1.43" @@ -601,7 +619,7 @@ dependencies = [ "lexical-core", "libm", "scpi_derive", - "uom", + "uom 0.28.0", ] [[package]] @@ -780,6 +798,16 @@ dependencies = [ "typenum", ] +[[package]] +name = "uom" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb593f5252356bfb829112f8fca2d0982d48588d2d6bb5a92553b0dfc4c9aba" +dependencies = [ + "num-traits", + "typenum", +] + [[package]] name = "url" version = "2.1.1" diff --git a/Cargo.toml b/Cargo.toml index 043890b..104593e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ embedded-nal = "0.1.0" minimq = { git = "https://github.com/quartiq/minimq.git", branch = "master" } heapless = "0.5.5" arrayvec = { version = "0.5.1", default-features = false, features = [ "array-sizes-33-128", "array-sizes-129-255" ] } +nom = { version = "5.1.2", default-features = false, features = [] } # Logging and Panicking panic-itm = "0.4.1" @@ -32,17 +33,10 @@ branch = "issue-4" default-features = false features = [ "build-info", "unit-frequency", "unit-angle" ] -# [dependencies.uom] -# version = "0.29.0" -# default-features = false -# features = [ -# "autoconvert", -# "usize", "u8", "u16", "u32", "u64", -# "isize", "i8", "i16", "i32", "i64", -# "f32", "f64", -# "si", -# "try-from" -# ] +[dependencies.uom] +version = "0.29.0" +default-features = false +features = [ "autoconvert", "f32", "f64", "si" ] # Use below SCPI dependency when need to modify SCPI fork offline # [dependencies.scpi] diff --git a/src/lib.rs b/src/lib.rs index 3d35399..d47c0c3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,10 +43,11 @@ pub enum Error { DDSCLKError, DDSRAMError, ParameterError, + MqttTopicError, MqttCommandError, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ClockSource { OSC, SMA, @@ -229,28 +230,47 @@ where } fn set_channel_attenuation(&mut self, channel: u8, attenuation: f32) -> Result<(), Error> { + if channel >= 4 || attenuation < 0.0 || attenuation > 31.5 { + return Err(Error::ParameterError); + } self.attenuator.set_channel_attenuation(channel, attenuation) } fn set_profile(&mut self, profile: u8) -> Result<(), Error> { + if profile >= 8 { + return Err(Error::ParameterError); + } self.config_register.set_configurations(&mut [ (CFGMask::PROFILE, profile.into()) ]).map(|_| ()) } fn set_channel_single_tone_profile(&mut self, channel: u8, profile: u8, frequency: f64, phase: f64, amplitude: f64) -> Result<(), Error> { + if channel >= 4 || profile >= 8 || frequency < 0.0 || phase >= 360.0 || + phase < 0.0 || amplitude < 0.0 || amplitude > 1.0 { + return Err(Error::ParameterError); + } self.dds[usize::from(channel)].set_single_tone_profile(profile, frequency, phase, amplitude) } fn set_channel_single_tone_profile_frequency(&mut self, channel: u8, profile: u8, frequency: f64)-> Result<(), Error> { + if channel >= 4 || profile >= 8 || frequency < 0.0 { + return Err(Error::ParameterError); + } self.dds[usize::from(channel)].set_single_tone_profile_frequency(profile, frequency) } fn set_channel_single_tone_profile_phase(&mut self, channel: u8, profile: u8, phase: f64)-> Result<(), Error> { + if channel >= 4 || profile >= 8 || phase >= 360.0 || phase < 0.0 { + return Err(Error::ParameterError); + } self.dds[usize::from(channel)].set_single_tone_profile_phase(profile, phase) } fn set_channel_single_tone_profile_amplitude(&mut self, channel: u8, profile: u8, amplitude: f64)-> Result<(), Error> { + if channel >= 4 || profile >= 8 || amplitude < 0.0 || amplitude > 1.0 { + return Err(Error::ParameterError); + } self.dds[usize::from(channel)].set_single_tone_profile_amplitude(profile, amplitude) } diff --git a/src/mqtt_mux.rs b/src/mqtt_mux.rs index ce4a170..16f32c5 100644 --- a/src/mqtt_mux.rs +++ b/src/mqtt_mux.rs @@ -1,4 +1,15 @@ use log::info; +use nom::IResult; +use nom::combinator::{value, map, map_res, not, opt, all_consuming}; +use nom::sequence::{terminated, preceded, pair, delimited, tuple}; +use nom::bytes::complete::{take, tag, tag_no_case, take_while}; +use nom::character::complete::digit1; +use nom::character::is_space; +use nom::branch::alt; +use nom::number::complete::{float, double}; + +use uom::si::f64::Frequency; +use uom::si::frequency::{hertz, kilohertz, megahertz, gigahertz}; use embedded_hal::blocking::spi::Transfer; use core::convert::TryInto; @@ -7,27 +18,29 @@ use crate::ClockSource::*; use crate::Urukul; use crate::Error; -#[derive(Debug)] -pub enum MqttCommandType { - // Urukul/Control/Clock/Source - ClockSource(UrukulClockSource), - // Urukul/Control/Clock/Division - ClockDivision(u8), - // Urukul/Control/ChannelX/Switch - Switch(u8, bool), - // Urukul/Control/ChannelX/Attenuation - Attenuation(u8, f32), - // Urukul/Control/ChannelX/SystemClock - SystemClock(u8, f64), - // Urukul/Control/ChannelX/ProfileY/Frequency - SingleToneFrequency(u8, u8, f64), - // Urukul/Control/ChannelX/ProfileY/Amplitude - SingleToneAmplitude(u8, u8, f64), - // Urukul/Control/ChannelX/ProfileY/Phase - SingleTonePhase(u8, u8, f64), +#[derive(Debug, Clone)] +pub enum MqttTopic { + Switch(u8), + Attenuation(u8), + Singletone(u8, u8), + SingletoneFrequency(u8, u8), + SingletoneAmplitude(u8, u8), + SingletonePhase(u8, u8), + Profile, } -use crate::mqtt_mux::MqttCommandType::*; +// Prossible change: Make this enum public to all comm protocol (if any) +// Such that Urukul accepts the enum directly +#[derive(Debug, Clone)] +pub enum MqttCommand { + Switch(u8, bool), + Attenuation(u8, f32), + Singletone(u8, u8, f64, f64, f64), + SingletoneFrequency(u8, u8, f64), + SingletoneAmplitude(u8, u8, f64), + SingletonePhase(u8, u8, f64), + Profile(u8) +} pub struct MqttMux { urukul: Urukul @@ -40,158 +53,363 @@ impl MqttMux where SPI: Transfer { } } - pub fn handle_command(&mut self, topic: &str, message: &[u8]) -> Result<(), Error> { - let command = self.parse(topic, message)?; + pub fn process_mqtt(&mut self, topic: &str, message: &[u8]) -> Result<(), Error> { + let (_, header) = self.parse_header(topic) + .map_err(|_| Error::MqttTopicError)?; + info!("{:?}", header); + let (_, command) = self.parse_message(header, message) + .map_err(|_| Error::MqttCommandError)?; + info!("{:?}", command); self.execute(command) } - // MQTT command are not case tolerant - // If the command differs by case, space or delimiter, it is a wrong command - // A starting forward slash ("/") is acceptable, as per MQTT standard - // Topic should contain the appropriate command header - // Message should provide the parameter - fn parse(&mut self, topic: &str, message: &[u8]) -> Result> { - let mut assigned_channel = false; - let mut assigned_profile = false; - let mut channel :u8 = 0; - let mut profile :u8 = 0; + fn parse_header<'a>(&mut self, topic: &'a str) -> IResult<&'a str, MqttTopic> { + preceded( + alt(( + tag("Urukul/Control/"), + tag("/Urukul/Control/") + )), + alt(( + switch, + attenuation, + singletone, + singletone_frequency, + singletone_amplitude, + singletone_phase, + profile // Note: Put profile at the end + )) + )(topic) + } - // Verify that the topic must start with Urukul/Control/ or /Urukul/Control/ - let mut header = topic.strip_prefix("/Urukul/Control/") - .or_else(|| topic.strip_prefix("Urukul/Control/")) - .ok_or(Error::MqttCommandError)?; - - loop { - match header { - // The topic has a channel subtopic - _ if header.starts_with("Channel") => { - // MQTT command should only mention channel once appropriately - // Channel must be referred before profile, - // as a channel is broader than a profile - if assigned_channel || assigned_profile { - return Err(Error::MqttCommandError); - } - // Remove the "Channel" part of the subtopic - header = header.strip_prefix("Channel") - .ok_or(Error::MqttCommandError)?; - // Remove the channel number at the end of the subtopic - // But store the channel as a char, so it can be removed easily - let numeric_char :char = header.chars() - .next() - .ok_or(Error::MqttCommandError)?; - // Record the channel number - channel = numeric_char.to_digit(10) - .ok_or(Error::MqttCommandError)? - .try_into() - .unwrap(); - assigned_channel = true; - header = header.strip_prefix(numeric_char) - .ok_or(Error::MqttCommandError)?; - // Remove forward slash ("/") - header = header.strip_prefix("/") - .ok_or(Error::MqttCommandError)?; - }, - - _ if header.starts_with("Profile") => { - // MQTT command should only mention profile once appropriately - if assigned_profile { - return Err(Error::MqttCommandError); - } - // Remove the "Profile" part of the subtopic - header = header.strip_prefix("Profile") - .ok_or(Error::MqttCommandError)?; - // Remove the profile number at the end of the subtopic - // But store the profile as a char, so it can be removed easily - let numeric_char :char = header.chars() - .next() - .ok_or(Error::MqttCommandError)?; - // Record the channel number - profile = numeric_char.to_digit(10) - .ok_or(Error::MqttCommandError)? - .try_into() - .unwrap(); - assigned_profile = true; - header = header.strip_prefix(numeric_char) - .ok_or(Error::MqttCommandError)?; - // Remove forward slash ("/") - header = header.strip_prefix("/") - .ok_or(Error::MqttCommandError)?; - }, - - "Clock/Source" => { - // Clock/Source refers to the Urukul clock source - // It should be common for all channels and profiles - if assigned_channel || assigned_profile { - return Err(Error::MqttCommandError); - } - let source_string = core::str::from_utf8(message).unwrap(); - - return match source_string { - _ if source_string.eq_ignore_ascii_case("OSC") => { - Ok(ClockSource(OSC)) - }, - _ if source_string.eq_ignore_ascii_case("SMA") => { - Ok(ClockSource(SMA)) - }, - _ if source_string.eq_ignore_ascii_case("MMCX") => { - Ok(ClockSource(MMCX)) - }, - _ => Err(Error::MqttCommandError), - }; - } - - "Clock/Division" => { - // Clock/Division refers to the Urukul clock division - // It should be common for all channels and profiles - if assigned_channel || assigned_profile { - return Err(Error::MqttCommandError); - } - - let division = u8::from_str_radix(core::str::from_utf8(message).unwrap(), 10) - .map_or_else( - |_| Err(Error::MqttCommandError), - |div| if div == 1 || div == 2 || div == 4 { - Ok(div) - } else { - Err(Error::MqttCommandError) - })?; - return Ok(ClockDivision(division)); - } - - "Switch" => { - // Switch is a channel specific topic - if !(assigned_channel && !assigned_profile) { - return Err(Error::MqttCommandError); - } - - let switch_string = core::str::from_utf8(message).unwrap(); - - return match switch_string { - _ if switch_string.eq_ignore_ascii_case("on") => { - Ok(Switch(channel, true)) - }, - _ if switch_string.eq_ignore_ascii_case("off") => { - Ok(Switch(channel, false)) - }, - _ => Err(Error::MqttCommandError), - }; - }, - - // TODO: Cover all commands - - _ => return Err(Error::MqttCommandError), - }; + fn parse_message<'a>(&mut self, topic: MqttTopic, message: &'a [u8]) -> IResult<&'a [u8], MqttCommand> { + match topic { + MqttTopic::Switch(ch) => switch_message(ch, message), + MqttTopic::Attenuation(ch) => attenuation_message(ch, message), + MqttTopic::Singletone(ch, prof) => singletone_message(ch, prof, message), + MqttTopic::SingletoneFrequency(ch, prof) => singletone_frequency_message(ch, prof, message), + MqttTopic::SingletoneAmplitude(ch, prof) => singletone_amplitude_message(ch, prof, message), + MqttTopic::SingletonePhase(ch, prof) => singletone_phase_message(ch, prof, message), + MqttTopic::Profile => profile_message(message), } } - // TODO: Implement this - // Only need to sort the command enum - // Obviously. This is what a MUX does - fn execute(&mut self, command_type: MqttCommandType) -> Result<(), Error> { - info!("{:?}", command_type); - match command_type { - Switch(channel, status) => self.urukul.set_channel_switch(channel as u32, status), - _ => Ok(()) + fn execute(&mut self, command: MqttCommand) -> Result<(), Error> { + match command { + MqttCommand::Switch(ch, state) => self.urukul.set_channel_switch(ch.into(), state), + MqttCommand::Attenuation(ch, ampl) => self.urukul.set_channel_attenuation(ch, ampl), + MqttCommand::Singletone(ch, prof, freq, ampl, deg) => self.urukul.set_channel_single_tone_profile(ch, prof, freq, ampl, deg), + MqttCommand::SingletoneFrequency(ch, prof, freq) => self.urukul.set_channel_single_tone_profile_frequency(ch, prof, freq), + MqttCommand::SingletoneAmplitude(ch, prof, ampl) => self.urukul.set_channel_single_tone_profile_amplitude(ch, prof, ampl), + MqttCommand::SingletonePhase(ch, prof, deg) => self.urukul.set_channel_single_tone_profile_phase(ch, prof, deg), + MqttCommand::Profile(prof) => self.urukul.set_profile(prof), } } -} \ No newline at end of file +} + +// Topic separator parser +fn topic_separator<'a>(topic: &'a str) -> IResult<&'a str, ()> { + value((), tag("/"))(topic) +} + +// Message separator parser +fn message_separator(message: &[u8]) -> IResult<&[u8], ()> { + value( + (), + preceded( + whitespace, + preceded( + tag("/"), + whitespace + ) + ) + )(message) +} + +// Selection parsers +fn select_channel<'a>(topic: &'a str) -> IResult<&'a str, u8> { + terminated( + map_res( + preceded( + tag("Channel"), + digit1 + ), + |num: &str| u8::from_str_radix(num, 10) + ), + topic_separator + )(topic) +} + +fn select_profile<'a>(topic: &'a str) -> IResult<&'a str, u8> { + terminated( + map_res( + preceded( + tag("Profile"), + digit1 + ), + |num: &str| u8::from_str_radix(num, 10) + ), + topic_separator + )(topic) +} + +// Selection parser for Singletone +// Note: This fucntion assumes singletone is not the most specific sub-topic +fn select_singletone<'a>(topic: &'a str) -> IResult<&'a str, ()> { + preceded( + tag("Singletone"), + topic_separator + )(topic) +} + +fn check_end_slice(message: &[u8]) -> IResult<&[u8], ()> { + not( + take(1_usize) + )(message) +} + +// Read whitespace +fn whitespace(message: &[u8]) -> IResult<&[u8], ()> { + value((), take_while(is_space))(message) +} + +// Reader for uom instances +fn read_frequency(message: &[u8]) -> IResult<&[u8], f64> { + map( + pair( + double, + opt( + preceded( + whitespace, + alt(( + value(1.0, tag_no_case("hz")), + value(1_000.0, tag_no_case("khz")), + value(1_000_000.0, tag_no_case("mhz")), + value(1_000_000_000.0, tag_no_case("ghz")) + )) + ) + ) + ), + |(freq, unit): (f64, Option)| { + freq * unit.map_or(1.0, |mul| mul) + } + )(message) +} + +// Parser for Switch Command Topic +fn switch<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { + all_consuming( + map( + terminated( + select_channel, + tag("Switch") + ), + |channel: u8| MqttTopic::Switch(channel) + ) + )(topic) +} + +// Parser for Switch Command Message +fn switch_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { + all_consuming( + map( + alt(( + value(true, tag("on")), + value(false, tag("off")) + )), + |switch| MqttCommand::Switch(channel, switch) + ) + )(message) +} + +// Parser for Attenuation Command Topic +fn attenuation<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { + all_consuming( + map( + terminated( + select_channel, + tag("Attenuation") + ), + |channel: u8| MqttTopic::Attenuation(channel) + ) + )(topic) +} + +// Parser for Attenuation Command Message +fn attenuation_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { + all_consuming( + map( + terminated( + float, + opt( + preceded( + whitespace, + tag_no_case("db") + ) + ) + ), + |att: f32| MqttCommand::Attenuation(channel, att) + ) + )(message) +} + +// Parser for Singletone frequenct Command Topic +fn singletone_frequency<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { + all_consuming( + map( + pair( + select_channel, + terminated( + select_profile, + preceded( + select_singletone, + tag("Frequency"), + ) + ) + ), + |(channel, profile): (u8, u8)| MqttTopic::SingletoneFrequency(channel, profile) + ) + )(topic) +} + +// Parser for Singletone frequency Command Message +fn singletone_frequency_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { + all_consuming( + map( + read_frequency, + |freq: f64| MqttCommand::SingletoneFrequency(channel, profile, freq) + ) + )(message) +} + +// Parser for Singletone Amplitude Command Topic +fn singletone_amplitude<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { + all_consuming( + map( + pair( + select_channel, + terminated( + select_profile, + preceded( + select_singletone, + tag("Amplitude"), + ) + ) + ), + |(channel, profile): (u8, u8)| MqttTopic::SingletoneAmplitude(channel, profile) + ) + )(topic) +} + +// Parser for Singletone AMplitude Command Message +fn singletone_amplitude_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { + all_consuming( + map( + double, + |ampl: f64| MqttCommand::SingletoneAmplitude(channel, profile, ampl) + ) + )(message) +} + +// Parser for Phase Command Topic +fn singletone_phase<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { + all_consuming( + map( + pair( + select_channel, + terminated( + select_profile, + preceded( + select_singletone, + tag("Phase"), + ) + ) + ), + |(channel, profile): (u8, u8)| MqttTopic::SingletonePhase(channel, profile) + ) + )(topic) +} + +// Parser for Phase Command Message +fn singletone_phase_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { + all_consuming( + map( + terminated( + double, + opt( + preceded( + whitespace, + tag_no_case("deg") + ) + ) + ), + |deg: f64| MqttCommand::SingletonePhase(channel, profile, deg) + ) + )(message) +} + +// Parser for one-command singletone profile Topic +fn singletone<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { + all_consuming( + map( + pair( + select_channel, + terminated( + select_profile, + tag("Singletone") + ) + ), + |(channel, profile): (u8, u8)| MqttTopic::Singletone(channel, profile) + ) + )(topic) +} + +// Parser for one-command singletone profile Command +fn singletone_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { + all_consuming( + map( + tuple(( + read_frequency, + preceded( + message_separator, + double + ), + preceded( + message_separator, + terminated( + double, + opt( + preceded( + whitespace, + tag_no_case("deg") + ) + ) + ) + ) + + )), + |(freq, ampl, phase): (f64, f64, f64)| MqttCommand::Singletone(channel, profile, freq, ampl, phase) + ) + )(message) +} + +fn profile<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> { + all_consuming( + value( + MqttTopic::Profile, + tag("Profile") + ) + )(topic) +} + +fn profile_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { + all_consuming( + map( + digit1, + |num: &[u8]| { + MqttCommand::Profile( + u8::from_str_radix(core::str::from_utf8(num).unwrap(), 10).unwrap() + ) + } + ) + )(message) +}