use nom::IResult; use nom::combinator::{value, map, map_res, opt, all_consuming}; use nom::sequence::{terminated, preceded, pair}; use nom::bytes::complete::{tag, tag_no_case, take_while}; use nom::character::complete::digit1; use nom::character::is_space; use nom::branch::{permutation, alt}; use nom::number::complete::{float, double}; use heapless::{ Vec, String, consts::* }; use ryu; use embedded_hal::blocking::spi::Transfer; use core::convert::TryInto; use crate::urukul::ClockSource as UrukulClockSource; use crate::urukul::Urukul; use crate::urukul::Error; #[derive(Debug, Clone)] pub enum MqttTopic { Reset, Switch(u8), Attenuation(u8), Clock, ClockSource, ClockFrequency, ClockDivision, SystemClock(u8), Singletone(u8, u8), SingletoneFrequency(u8, u8), SingletoneAmplitude(u8, u8), SingletonePhase(u8, u8), Profile, } // Prossible change: Make this enum public to all comm protocol (if any) // Such that Urukul accepts the enum directly #[derive(Debug, Clone)] pub enum MqttCommand { ProcessError(&'static str), Reset, Switch(u8, bool), Attenuation(u8, f32), Clock(UrukulClockSource, f64, u8), ClockSource(UrukulClockSource), ClockFrequency(f64), ClockDivision(u8), SystemClock(u8, f64), Singletone(u8, u8, f64, f64, f64), SingletoneFrequency(u8, u8, f64), SingletoneAmplitude(u8, u8, f64), SingletonePhase(u8, u8, f64), Profile(u8) } pub struct MqttMux<'s, SPI> { urukul: Urukul, yet_to_respond: Option, name: &'s str, float_buffer: ryu::Buffer, } impl<'s, SPI, E> MqttMux<'s, SPI> where SPI: Transfer { pub fn new(urukul: Urukul, name: &'s str) -> Self { MqttMux { urukul: urukul, yet_to_respond: None, name: name, float_buffer: ryu::Buffer::new(), } } // Instead of using a return type, the result of processing the command is stored internally // Invoke process_mqtt_egress to get a response after invoking ingress handler pub fn process_mqtt_ingress(&mut self, topic: &str, message: &[u8]) { let topic = match self.parse_topic(topic) { Ok(t) => t, Err(_) => { self.yet_to_respond = Some(MqttCommand::ProcessError("Cannot prase MQTT topic")); return; } }; let command = match self.parse_message(topic, message) { Ok((_, cmd)) => cmd, Err(_) => { self.yet_to_respond = Some(MqttCommand::ProcessError("Cannot parse MQTT message")); return; } }; self.yet_to_respond = match self.execute(command.clone()) { Err(_) => Some(MqttCommand::ProcessError("Cannot execute MQTT command")), Ok(()) => Some(command) }; } // Be sure to call egress function after each ingress. // Otherwise, response will be lost if successive valid MQTT messages were captured // without calling egress in between pub fn process_mqtt_egress(&mut self) -> Result, String), U4>, Error> { // Remove previously executed command, and process it afterwards let prev_cmd = self.yet_to_respond.clone(); self.yet_to_respond = None; let mut vec = Vec::new(); match prev_cmd { Some(cmd) => match cmd { MqttCommand::ProcessError(e_str) => { vec.push(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Error") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, String::from(e_str) )).map_err(|_| Error::VectorOutOfSpace)?; Ok(vec) } MqttCommand::Reset => { vec.push(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Reset") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, String::from( match self.urukul.test() { Ok(0) => "Reset successful.", _ => "Reset error!", } ) )).map_err(|_| Error::VectorOutOfSpace)?; Ok(vec) } MqttCommand::Switch(ch, _) => { vec.push(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Channel") .map_err(|_| Error::StringOutOfSpace)?; topic_string.push(char::from_digit(ch.into(), 10).unwrap()) .map_err(|_| Error::StringOutOfSpace)?; topic_string.push_str("/Switch") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, { String::from( if self.urukul.get_channel_switch_status(ch.into())? { "on" } else { "off" } ) } )).map_err(|_| Error::VectorOutOfSpace)?; Ok(vec) } MqttCommand::Attenuation(ch, _) => { vec.push(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Channel") .map_err(|_| Error::StringOutOfSpace)?; topic_string.push(char::from_digit(ch.into(), 10).unwrap()) .map_err(|_| Error::StringOutOfSpace)?; topic_string.push_str("/Attenuation") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, { String::from( self.float_buffer.format_finite( self.urukul.get_channel_attenuation(ch)? ) ) } )).map_err(|_| Error::VectorOutOfSpace)?; Ok(vec) } MqttCommand::Clock(_, _, _) => { vec.push( self.get_clock_source_message()? ).map_err(|_| Error::VectorOutOfSpace)?; vec.push( self.get_clock_frequency_message()? ).map_err(|_| Error::VectorOutOfSpace)?; vec.push( self.get_clock_division_message()? ).map_err(|_| Error::VectorOutOfSpace)?; Ok(vec) } MqttCommand::ClockSource(_) => { vec.push( self.get_clock_source_message()? ).map_err(|_| Error::VectorOutOfSpace)?; Ok(vec) } MqttCommand::ClockFrequency(_) => { vec.push( self.get_clock_frequency_message()? ).map_err(|_| Error::VectorOutOfSpace)?; Ok(vec) } MqttCommand::ClockDivision(_) => { vec.push( self.get_clock_division_message()? ).map_err(|_| Error::VectorOutOfSpace)?; Ok(vec) } MqttCommand::SystemClock(ch, _) => { vec.push(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Channel") .map_err(|_| Error::StringOutOfSpace)?; topic_string.push(char::from_digit(ch.into(), 10).unwrap()) .map_err(|_| Error::StringOutOfSpace)?; topic_string.push_str("/SystemClock") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, { let mut message_str = String::from( self.float_buffer.format_finite( self.urukul.get_channel_sys_clk(ch)? ) ); message_str.push_str(" Hz") .map_err(|_| Error::StringOutOfSpace)?; message_str } )).map_err(|_| Error::VectorOutOfSpace)?; Ok(vec) } MqttCommand::Singletone(ch, pr, _, _, _) | MqttCommand::SingletoneFrequency(ch, pr, _) | MqttCommand::SingletoneAmplitude(ch, pr, _) | MqttCommand::SingletonePhase(ch, pr, _) => { let (f_out, phase, ampl) = self.urukul.get_channel_single_tone_profile(ch, pr)?; vec.push(self.get_single_tone_frequency_message(ch, f_out)?) .map_err(|_| Error::StringOutOfSpace)?; vec.push(self.get_single_tone_phase_message(ch, phase)?) .map_err(|_| Error::StringOutOfSpace)?; vec.push(self.get_single_tone_amplitude_message(ch, ampl)?) .map_err(|_| Error::StringOutOfSpace)?; Ok(vec) } MqttCommand::Profile(_) => { vec.push(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Profile") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, { let mut message_str = String::new(); let prof = self.urukul.get_profile()?; message_str.push(char::from_digit(prof.into(), 10).unwrap()) .map_err(|_| Error::StringOutOfSpace)?; message_str } )).map_err(|_| Error::VectorOutOfSpace)?; Ok(vec) } }, None => Ok(vec), } } fn parse_topic<'a>(&mut self, topic: &'a str) -> Result> { let mut assigned_channel = false; let mut assigned_profile = false; let mut channel :u8 = 0; let mut profile :u8 = 0; // Verify that the topic starts with /Control/ or //Control/ let mut header = { let mut topic_builder_with_slash: String = String::from("/"); topic_builder_with_slash.push_str(self.name) .map_err(|_| Error::StringOutOfSpace)?; topic_builder_with_slash.push_str("/Control/") .map_err(|_| Error::StringOutOfSpace)?; let topic_builder: &str = topic_builder_with_slash.as_str() .strip_prefix("/") .ok_or(Error::StringOutOfSpace)?; topic.strip_prefix(topic_builder_with_slash.as_str()) .or_else(|| topic.strip_prefix(topic_builder)) .ok_or(Error::MqttCommandError)? }; loop { match header { // The topic has a channel subtopic _ if header.starts_with("Channel") => { // MQTT command should only mention channel once appropriately // Channel must be referred before profile, // as a channel is broader than a profile if assigned_channel || assigned_profile { return Err(Error::MqttCommandError); } // Remove the "Channel" part of the subtopic header = header.strip_prefix("Channel") .ok_or(Error::MqttCommandError)?; // Remove the channel number at the end of the subtopic // But store the channel as a char, so it can be removed easily let numeric_char :char = header.chars() .next() .ok_or(Error::MqttCommandError)?; // Record the channel number channel = numeric_char.to_digit(10) .ok_or(Error::MqttCommandError)? .try_into() .unwrap(); assigned_channel = true; header = header.strip_prefix(numeric_char) .ok_or(Error::MqttCommandError)?; // Remove forward slash ("/") header = header.strip_prefix("/") .ok_or(Error::MqttCommandError)?; }, _ if (header.starts_with("Profile") && assigned_channel) => { // MQTT command should only mention profile once appropriately if assigned_profile { return Err(Error::MqttCommandError); } // Remove the "Profile" part of the subtopic header = header.strip_prefix("Profile") .ok_or(Error::MqttCommandError)?; // Remove the profile number at the end of the subtopic // But store the profile as a char, so it can be removed easily let numeric_char :char = header.chars() .next() .ok_or(Error::MqttCommandError)?; // Record the channel number profile = numeric_char.to_digit(10) .ok_or(Error::MqttCommandError)? .try_into() .unwrap(); assigned_profile = true; header = header.strip_prefix(numeric_char) .ok_or(Error::MqttCommandError)?; // Remove forward slash ("/") header = header.strip_prefix("/") .ok_or(Error::MqttCommandError)?; }, "Reset" => { if assigned_channel || assigned_profile { return Err(Error::MqttCommandError); } return Ok(MqttTopic::Reset); }, "Switch" => { // Switch is a channel specific topic if !(assigned_channel && !assigned_profile) { return Err(Error::MqttCommandError); } return Ok(MqttTopic::Switch(channel)); }, "Attenuation" => { // Attenuation is a channel specific topic if !(assigned_channel && !assigned_profile) { return Err(Error::MqttCommandError); } return Ok(MqttTopic::Attenuation(channel)); }, "Clock" => { if assigned_channel || assigned_profile { return Err(Error::MqttCommandError); } return Ok(MqttTopic::Clock); }, "Clock/Source" => { // Clock/Source refers to the Urukul clock source // It should be common for all channels and profiles if assigned_channel || assigned_profile { return Err(Error::MqttCommandError); } return Ok(MqttTopic::ClockSource); }, "Clock/Frequency" => { // Clock/Frequency refers to the Urukul clock frequency // It should be common for all channels and profiles if assigned_channel || assigned_profile { return Err(Error::MqttCommandError); } return Ok(MqttTopic::ClockFrequency); }, "Clock/Division" => { // Clock/Division refers to the Urukul clock division // It should be common for all channels and profiles if assigned_channel || assigned_profile { return Err(Error::MqttCommandError); } return Ok(MqttTopic::ClockDivision); }, "SystemClock" => { if !(assigned_channel && !assigned_profile) { return Err(Error::MqttCommandError); } return Ok(MqttTopic::SystemClock(channel)); } "Singletone" => { if !(assigned_channel && assigned_profile) { return Err(Error::MqttCommandError) } return Ok(MqttTopic::Singletone(channel, profile)); } "Singletone/Frequency" => { if !(assigned_channel && assigned_profile) { return Err(Error::MqttCommandError) } return Ok(MqttTopic::SingletoneFrequency(channel, profile)); } "Singletone/Amplitude" => { if !(assigned_channel && assigned_profile) { return Err(Error::MqttCommandError) } return Ok(MqttTopic::SingletoneAmplitude(channel, profile)); } "Singletone/Phase" => { if !(assigned_channel && assigned_profile) { return Err(Error::MqttCommandError) } return Ok(MqttTopic::SingletonePhase(channel, profile)); } "Profile" => { if assigned_channel || assigned_profile { return Err(Error::MqttCommandError) } return Ok(MqttTopic::Profile); } _ => return Err(Error::MqttCommandError), }; } } fn parse_message<'a>(&mut self, topic: MqttTopic, message: &'a [u8]) -> IResult<&'a [u8], MqttCommand> { match topic { MqttTopic::Reset => Ok((message, MqttCommand::Reset)), MqttTopic::Switch(ch) => switch_message(ch, message), MqttTopic::Attenuation(ch) => attenuation_message(ch, message), MqttTopic::Clock => clock_message(message), MqttTopic::ClockSource => clock_source_message(message), MqttTopic::ClockFrequency => clock_frequency_message(message), MqttTopic::ClockDivision => clock_division_message(message), MqttTopic::SystemClock(ch) => system_clock_message(ch, message), MqttTopic::Singletone(ch, prof) => singletone_message(ch, prof, message), MqttTopic::SingletoneFrequency(ch, prof) => singletone_frequency_message(ch, prof, message), MqttTopic::SingletoneAmplitude(ch, prof) => singletone_amplitude_message(ch, prof, message), MqttTopic::SingletonePhase(ch, prof) => singletone_phase_message(ch, prof, message), MqttTopic::Profile => profile_message(message), } } fn execute(&mut self, command: MqttCommand) -> Result<(), Error> { match command { MqttCommand::ProcessError(_) => Ok(()), MqttCommand::Reset => self.urukul.reset(), MqttCommand::Switch(ch, state) => self.urukul.set_channel_switch(ch.into(), state), MqttCommand::Attenuation(ch, ampl) => self.urukul.set_channel_attenuation(ch, ampl), MqttCommand::Clock(src, freq, div) => self.urukul.set_clock(src, freq, div), MqttCommand::ClockSource(src) => self.urukul.set_clock_source(src), MqttCommand::ClockFrequency(freq) => self.urukul.set_clock_frequency(freq), MqttCommand::ClockDivision(div) => self.urukul.set_clock_division(div), MqttCommand::SystemClock(ch, freq) => self.urukul.set_channel_sys_clk(ch, freq), MqttCommand::Singletone(ch, prof, freq, ampl, deg) => self.urukul.set_channel_single_tone_profile(ch, prof, freq, ampl, deg), MqttCommand::SingletoneFrequency(ch, prof, freq) => self.urukul.set_channel_single_tone_profile_frequency(ch, prof, freq), MqttCommand::SingletoneAmplitude(ch, prof, ampl) => self.urukul.set_channel_single_tone_profile_amplitude(ch, prof, ampl), MqttCommand::SingletonePhase(ch, prof, deg) => self.urukul.set_channel_single_tone_profile_phase(ch, prof, deg), MqttCommand::Profile(prof) => self.urukul.set_profile(prof), } } fn get_clock_source_message(&mut self) -> Result<(String, String), Error> { Ok(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Clock/Source") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, self.urukul.get_clock_source().map( |src| match src { UrukulClockSource::OSC => String::from("OSC"), UrukulClockSource::MMCX => String::from("MMCX"), UrukulClockSource::SMA => String::from("SMA") } )? )) } fn get_clock_frequency_message(&mut self) -> Result<(String, String), Error> { Ok(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Clock/Frequency") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, { let mut freq_str = String::from( self.float_buffer.format_finite( self.urukul.get_clock_frequency() ) ); freq_str.push_str(" Hz").map_err(|_| Error::StringOutOfSpace)?; freq_str } )) } fn get_clock_division_message(&mut self) -> Result<(String, String), Error> { Ok(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Clock/Division") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, { self.urukul.get_clock_division().map( |src| match src { 1 => String::from("1"), 2 => String::from("2"), 4 => String::from("4"), _ => unreachable!() } )? } )) } fn get_single_tone_frequency_message(&mut self, ch: u8, f_out: f64) -> Result<(String, String), Error> { Ok(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Channel") .map_err(|_| Error::StringOutOfSpace)?; topic_string.push(char::from_digit(ch.into(), 10).unwrap()) .map_err(|_| Error::StringOutOfSpace)?; topic_string.push_str("/Singletone/Frequency") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, { let mut message_str = String::from( self.float_buffer.format_finite(f_out) ); message_str.push_str(" Hz") .map_err(|_| Error::StringOutOfSpace)?; message_str } )) } fn get_single_tone_amplitude_message(&mut self, ch: u8, ampl: f64) -> Result<(String, String), Error> { Ok(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Channel") .map_err(|_| Error::StringOutOfSpace)?; topic_string.push(char::from_digit(ch.into(), 10).unwrap()) .map_err(|_| Error::StringOutOfSpace)?; topic_string.push_str("/Singletone/Amplitude") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, { let message_str = String::from( self.float_buffer.format_finite(ampl) ); message_str } )) } fn get_single_tone_phase_message(&mut self, ch: u8, phase: f64) -> Result<(String, String), Error> { Ok(( { let mut topic_string = String::from(self.name); topic_string.push_str("/Feedback/Channel") .map_err(|_| Error::StringOutOfSpace)?; topic_string.push(char::from_digit(ch.into(), 10).unwrap()) .map_err(|_| Error::StringOutOfSpace)?; topic_string.push_str("/Singletone/Phase") .map_err(|_| Error::StringOutOfSpace)?; topic_string }, { let mut message_str = String::from( self.float_buffer.format_finite(phase) ); message_str.push_str(" deg") .map_err(|_| Error::StringOutOfSpace)?; message_str } )) } } // Read message parameter separator (optional comma and whitespace) fn message_separator(message: &[u8]) -> IResult<&[u8], ()> { preceded( opt( tag(",") ), whitespace )(message) } // Read whitespace fn whitespace(message: &[u8]) -> IResult<&[u8], ()> { value((), take_while(is_space))(message) } // Reader for uom instances fn read_frequency(message: &[u8]) -> IResult<&[u8], f64> { map( pair( double, opt( preceded( whitespace, alt(( value(1.0, tag_no_case("hz")), value(1_000.0, tag_no_case("khz")), value(1_000_000.0, tag_no_case("mhz")), value(1_000_000_000.0, tag_no_case("ghz")) )) ) ) ), |(freq, unit): (f64, Option)| { freq * unit.map_or(1.0, |mul| mul) } )(message) } // Parser for Switch Command Message fn switch_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( alt(( value(true, tag("on")), value(false, tag("off")) )), |switch| MqttCommand::Switch(channel, switch) ) )(message) } // Parser for Attenuation Command Message fn attenuation_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( terminated( float, opt( preceded( whitespace, tag_no_case("db") ) ) ), |att: f32| MqttCommand::Attenuation(channel, att) ) )(message) } // Parser for Clock Source Command Message fn clock_source_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( alt(( value(MqttCommand::ClockSource(UrukulClockSource::OSC), tag_no_case("OSC")), value(MqttCommand::ClockSource(UrukulClockSource::MMCX), tag_no_case("MMCX")), value(MqttCommand::ClockSource(UrukulClockSource::SMA), tag_no_case("SMA")) )) )(message) } // Parser for Clock Frequency Command Message fn clock_frequency_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( read_frequency, |freq: f64| MqttCommand::ClockFrequency(freq) ) )(message) } // Parser for Clock Division Command Message fn clock_division_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( digit1, |div: &[u8]| MqttCommand::ClockDivision( u8::from_str_radix( core::str::from_utf8(div).unwrap(), 10 ).unwrap() ) ) )(message) } // Parser for one-command master clock setup message fn clock_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( permutation(( preceded( tag_no_case("source:"), preceded( whitespace, terminated( alt(( value(UrukulClockSource::OSC, tag_no_case("OSC")), value(UrukulClockSource::MMCX, tag_no_case("MMCX")), value(UrukulClockSource::SMA, tag_no_case("SMA")) )), message_separator ) ) ), preceded( tag_no_case("frequency:"), preceded( whitespace, terminated( read_frequency, message_separator ) ) ), preceded( tag_no_case("division:"), preceded( whitespace, terminated( map_res( digit1, |div: &[u8]| u8::from_str_radix(core::str::from_utf8(div).unwrap(), 10) ), message_separator ) ) ) )), |(src, freq, div): (UrukulClockSource, f64, u8)| MqttCommand::Clock(src, freq, div) ) )(message) } // Message parser for f_sys_clk of any channels fn system_clock_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( read_frequency, |freq: f64| MqttCommand::SystemClock(channel, freq) ) )(message) } // Parser for Singletone frequency Command Message fn singletone_frequency_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( read_frequency, |freq: f64| MqttCommand::SingletoneFrequency(channel, profile, freq) ) )(message) } // Parser for Singletone AMplitude Command Message fn singletone_amplitude_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( double, |ampl: f64| MqttCommand::SingletoneAmplitude(channel, profile, ampl) ) )(message) } // Parser for Phase Command Message fn singletone_phase_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( terminated( double, opt( preceded( whitespace, tag_no_case("deg") ) ) ), |deg: f64| MqttCommand::SingletonePhase(channel, profile, deg) ) )(message) } // Parser for one-command singletone profile Command fn singletone_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( permutation(( preceded( tag_no_case("frequency:"), preceded( whitespace, terminated( read_frequency, message_separator ) ) ), preceded( tag_no_case("phase:"), preceded( whitespace, terminated( double, preceded( opt( preceded( whitespace, tag_no_case("deg") ) ), message_separator ) ) ) ), preceded( tag_no_case("amplitude:"), preceded( whitespace, terminated( double, message_separator ) ) ) )), |(freq, phase, ampl): (f64, f64, f64)| MqttCommand::Singletone(channel, profile, freq, phase, ampl) ) )(message) } fn profile_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { all_consuming( map( digit1, |num: &[u8]| { MqttCommand::Profile( u8::from_str_radix(core::str::from_utf8(num).unwrap(), 10).unwrap() ) } ) )(message) }