mqtt_mux: select topic using str op

This commit is contained in:
occheung 2020-09-23 09:42:14 +08:00
parent 9ea56ebde5
commit 7f00e5817d
1 changed files with 184 additions and 218 deletions

View File

@ -11,6 +11,8 @@ use nom::number::complete::{float, double};
use uom::si::f64::Frequency; use uom::si::f64::Frequency;
use uom::si::frequency::{hertz, kilohertz, megahertz, gigahertz}; use uom::si::frequency::{hertz, kilohertz, megahertz, gigahertz};
use arrayvec::ArrayVec;
use embedded_hal::blocking::spi::Transfer; use embedded_hal::blocking::spi::Transfer;
use core::convert::TryInto; use core::convert::TryInto;
use crate::ClockSource as UrukulClockSource; use crate::ClockSource as UrukulClockSource;
@ -20,6 +22,7 @@ use crate::Error;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum MqttTopic { pub enum MqttTopic {
Reset,
Switch(u8), Switch(u8),
Attenuation(u8), Attenuation(u8),
Clock, Clock,
@ -38,6 +41,7 @@ pub enum MqttTopic {
// Such that Urukul accepts the enum directly // Such that Urukul accepts the enum directly
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum MqttCommand { pub enum MqttCommand {
Reset,
Switch(u8, bool), Switch(u8, bool),
Attenuation(u8, f32), Attenuation(u8, f32),
Clock(UrukulClockSource, f64, u8), Clock(UrukulClockSource, f64, u8),
@ -64,7 +68,7 @@ impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
} }
pub fn process_mqtt(&mut self, topic: &str, message: &[u8]) -> Result<(), Error<E>> { pub fn process_mqtt(&mut self, topic: &str, message: &[u8]) -> Result<(), Error<E>> {
let (_, header) = self.parse_header(topic) let header = self.parse_topic(topic)
.map_err(|_| Error::MqttTopicError)?; .map_err(|_| Error::MqttTopicError)?;
info!("{:?}", header); info!("{:?}", header);
let (_, command) = self.parse_message(header, message) let (_, command) = self.parse_message(header, message)
@ -72,7 +76,7 @@ impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
info!("{:?}", command); info!("{:?}", command);
self.execute(command) self.execute(command)
} }
/*
fn parse_header<'a>(&mut self, topic: &'a str) -> IResult<&'a str, MqttTopic> { fn parse_header<'a>(&mut self, topic: &'a str) -> IResult<&'a str, MqttTopic> {
preceded( preceded(
alt(( alt((
@ -82,6 +86,10 @@ impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
alt(( alt((
switch, switch,
attenuation, attenuation,
clock,
clock_source,
clock_frequency,
clock_division,
singletone, singletone,
singletone_frequency, singletone_frequency,
singletone_amplitude, singletone_amplitude,
@ -90,9 +98,182 @@ impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
)) ))
)(topic) )(topic)
} }
*/
fn parse_topic<'a>(&mut self, topic: &'a str) -> Result<MqttTopic, Error<E>> {
let mut assigned_channel = false;
let mut assigned_profile = false;
let mut channel :u8 = 0;
let mut profile :u8 = 0;
// Verify that the topic must start with Urukul/Control/ or /Urukul/Control/
let mut header = topic.strip_prefix("/Urukul/Control/")
.or_else(|| topic.strip_prefix("Urukul/Control/"))
.ok_or(Error::MqttCommandError)?;
loop {
match header {
// The topic has a channel subtopic
_ if header.starts_with("Channel") => {
// MQTT command should only mention channel once appropriately
// Channel must be referred before profile,
// as a channel is broader than a profile
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
// Remove the "Channel" part of the subtopic
header = header.strip_prefix("Channel")
.ok_or(Error::MqttCommandError)?;
// Remove the channel number at the end of the subtopic
// But store the channel as a char, so it can be removed easily
let numeric_char :char = header.chars()
.next()
.ok_or(Error::MqttCommandError)?;
// Record the channel number
channel = numeric_char.to_digit(10)
.ok_or(Error::MqttCommandError)?
.try_into()
.unwrap();
assigned_channel = true;
header = header.strip_prefix(numeric_char)
.ok_or(Error::MqttCommandError)?;
// Remove forward slash ("/")
header = header.strip_prefix("/")
.ok_or(Error::MqttCommandError)?;
},
_ if (header.starts_with("Profile") && assigned_channel) => {
// MQTT command should only mention profile once appropriately
if assigned_profile {
return Err(Error::MqttCommandError);
}
// Remove the "Profile" part of the subtopic
header = header.strip_prefix("Profile")
.ok_or(Error::MqttCommandError)?;
// Remove the profile number at the end of the subtopic
// But store the profile as a char, so it can be removed easily
let numeric_char :char = header.chars()
.next()
.ok_or(Error::MqttCommandError)?;
// Record the channel number
profile = numeric_char.to_digit(10)
.ok_or(Error::MqttCommandError)?
.try_into()
.unwrap();
assigned_profile = true;
header = header.strip_prefix(numeric_char)
.ok_or(Error::MqttCommandError)?;
// Remove forward slash ("/")
header = header.strip_prefix("/")
.ok_or(Error::MqttCommandError)?;
},
"Reset" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Reset);
},
"Switch" => {
// Switch is a channel specific topic
if !(assigned_channel && !assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Switch(channel));
},
"Attenuation" => {
// Attenuation is a channel specific topic
if !(assigned_channel && !assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Attenuation(channel));
},
"Clock" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Clock);
},
"Clock/Source" => {
// Clock/Source refers to the Urukul clock source
// It should be common for all channels and profiles
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::ClockSource);
},
"Clock/Frequency" => {
// Clock/Frequency refers to the Urukul clock frequency
// It should be common for all channels and profiles
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::ClockFrequency);
},
"Clock/Division" => {
// Clock/Division refers to the Urukul clock division
// It should be common for all channels and profiles
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::ClockDivision);
},
"SystemClock" => {
if !(assigned_channel && !assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::SystemClock(channel));
}
"Singletone" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError)
}
return Ok(MqttTopic::Singletone(channel, profile));
}
"Singletone/Frequency" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError)
}
return Ok(MqttTopic::SingletoneFrequency(channel, profile));
}
"Singletone/Amplitude" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError)
}
return Ok(MqttTopic::SingletoneAmplitude(channel, profile));
}
"Singletone/Phase" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError)
}
return Ok(MqttTopic::SingletonePhase(channel, profile));
}
"Profile" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError)
}
return Ok(MqttTopic::Profile);
}
_ => return Err(Error::MqttCommandError),
};
}
}
fn parse_message<'a>(&mut self, topic: MqttTopic, message: &'a [u8]) -> IResult<&'a [u8], MqttCommand> { fn parse_message<'a>(&mut self, topic: MqttTopic, message: &'a [u8]) -> IResult<&'a [u8], MqttCommand> {
match topic { match topic {
MqttTopic::Reset => Ok((message, MqttCommand::Reset)),
MqttTopic::Switch(ch) => switch_message(ch, message), MqttTopic::Switch(ch) => switch_message(ch, message),
MqttTopic::Attenuation(ch) => attenuation_message(ch, message), MqttTopic::Attenuation(ch) => attenuation_message(ch, message),
MqttTopic::Clock => clock_message(message), MqttTopic::Clock => clock_message(message),
@ -110,6 +291,7 @@ impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
fn execute(&mut self, command: MqttCommand) -> Result<(), Error<E>> { fn execute(&mut self, command: MqttCommand) -> Result<(), Error<E>> {
match command { match command {
MqttCommand::Reset => self.urukul.reset(),
MqttCommand::Switch(ch, state) => self.urukul.set_channel_switch(ch.into(), state), MqttCommand::Switch(ch, state) => self.urukul.set_channel_switch(ch.into(), state),
MqttCommand::Attenuation(ch, ampl) => self.urukul.set_channel_attenuation(ch, ampl), MqttCommand::Attenuation(ch, ampl) => self.urukul.set_channel_attenuation(ch, ampl),
MqttCommand::Clock(src, freq, div) => self.urukul.set_clock(src, freq, div), MqttCommand::Clock(src, freq, div) => self.urukul.set_clock(src, freq, div),
@ -145,52 +327,6 @@ fn message_separator(message: &[u8]) -> IResult<&[u8], ()> {
)(message) )(message)
} }
// Selection parsers
fn select_channel<'a>(topic: &'a str) -> IResult<&'a str, u8> {
terminated(
map_res(
preceded(
tag("Channel"),
digit1
),
|num: &str| u8::from_str_radix(num, 10)
),
topic_separator
)(topic)
}
fn select_profile<'a>(topic: &'a str) -> IResult<&'a str, u8> {
terminated(
map_res(
preceded(
tag("Profile"),
digit1
),
|num: &str| u8::from_str_radix(num, 10)
),
topic_separator
)(topic)
}
fn select_clock<'a>(topic: &'a str) -> IResult<&'a str, ()> {
value(
(),
preceded(
tag("Clock"),
topic_separator
)
)(topic)
}
// Selection parser for Singletone
// Note: This fucntion assumes singletone is not the most specific sub-topic
fn select_singletone<'a>(topic: &'a str) -> IResult<&'a str, ()> {
preceded(
tag("Singletone"),
topic_separator
)(topic)
}
// Read whitespace // Read whitespace
fn whitespace(message: &[u8]) -> IResult<&[u8], ()> { fn whitespace(message: &[u8]) -> IResult<&[u8], ()> {
value((), take_while(is_space))(message) value((), take_while(is_space))(message)
@ -219,19 +355,6 @@ fn read_frequency(message: &[u8]) -> IResult<&[u8], f64> {
)(message) )(message)
} }
// Parser for Switch Command Topic
fn switch<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
map(
terminated(
select_channel,
tag("Switch")
),
|channel: u8| MqttTopic::Switch(channel)
)
)(topic)
}
// Parser for Switch Command Message // Parser for Switch Command Message
fn switch_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { fn switch_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
@ -245,19 +368,6 @@ fn switch_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
)(message) )(message)
} }
// Parser for Attenuation Command Topic
fn attenuation<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
map(
terminated(
select_channel,
tag("Attenuation")
),
|channel: u8| MqttTopic::Attenuation(channel)
)
)(topic)
}
// Parser for Attenuation Command Message // Parser for Attenuation Command Message
fn attenuation_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { fn attenuation_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
@ -276,19 +386,6 @@ fn attenuation_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttComman
)(message) )(message)
} }
// Parser for Clock Source Command Topic
fn clock_source<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
value(
MqttTopic::ClockSource,
preceded(
clock_source,
tag("Source")
)
)
)(topic)
}
// Parser for Clock Source Command Message // Parser for Clock Source Command Message
fn clock_source_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { fn clock_source_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
@ -300,19 +397,6 @@ fn clock_source_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
)(message) )(message)
} }
// Parser for Clock Frequency Command Topic
fn clock_frequency<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
value(
MqttTopic::ClockFrequency,
preceded(
clock_source,
tag("Frequency")
)
)
)(topic)
}
// Parser for Clock Frequency Command Message // Parser for Clock Frequency Command Message
fn clock_frequency_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { fn clock_frequency_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
@ -323,19 +407,6 @@ fn clock_frequency_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
)(message) )(message)
} }
// Parser for Clock Division Command Topic
fn clock_division<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
value(
MqttTopic::ClockDivision,
preceded(
clock_source,
tag("Division")
)
)
)(topic)
}
// Parser for Clock Division Command Message // Parser for Clock Division Command Message
fn clock_division_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { fn clock_division_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
@ -351,16 +422,6 @@ fn clock_division_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
)(message) )(message)
} }
// Parser for one-command master clock setup topic
fn clock<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
value(
MqttTopic::Clock,
tag("Clock")
)
)(topic)
}
// Parser for one-command master clock setup message // Parser for one-command master clock setup message
fn clock_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { fn clock_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
@ -422,19 +483,6 @@ fn clock_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
)(message) )(message)
} }
// Topic parser for f_sys_clk of any channels
fn system_clock<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
map(
terminated(
select_channel,
tag("SystemClock")
),
|channel: u8| MqttTopic::SystemClock(channel)
)
)(topic)
}
// Message parser for f_sys_clk of any channels // Message parser for f_sys_clk of any channels
fn system_clock_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { fn system_clock_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
@ -445,25 +493,6 @@ fn system_clock_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttComma
)(message) )(message)
} }
// Parser for Singletone frequenct Command Topic
fn singletone_frequency<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
map(
pair(
select_channel,
terminated(
select_profile,
preceded(
select_singletone,
tag("Frequency"),
)
)
),
|(channel, profile): (u8, u8)| MqttTopic::SingletoneFrequency(channel, profile)
)
)(topic)
}
// Parser for Singletone frequency Command Message // Parser for Singletone frequency Command Message
fn singletone_frequency_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { fn singletone_frequency_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
@ -474,25 +503,6 @@ fn singletone_frequency_message(channel: u8, profile: u8, message: &[u8]) -> IRe
)(message) )(message)
} }
// Parser for Singletone Amplitude Command Topic
fn singletone_amplitude<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
map(
pair(
select_channel,
terminated(
select_profile,
preceded(
select_singletone,
tag("Amplitude"),
)
)
),
|(channel, profile): (u8, u8)| MqttTopic::SingletoneAmplitude(channel, profile)
)
)(topic)
}
// Parser for Singletone AMplitude Command Message // Parser for Singletone AMplitude Command Message
fn singletone_amplitude_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { fn singletone_amplitude_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
@ -503,25 +513,6 @@ fn singletone_amplitude_message(channel: u8, profile: u8, message: &[u8]) -> IRe
)(message) )(message)
} }
// Parser for Phase Command Topic
fn singletone_phase<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
map(
pair(
select_channel,
terminated(
select_profile,
preceded(
select_singletone,
tag("Phase"),
)
)
),
|(channel, profile): (u8, u8)| MqttTopic::SingletonePhase(channel, profile)
)
)(topic)
}
// Parser for Phase Command Message // Parser for Phase Command Message
fn singletone_phase_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> { fn singletone_phase_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
@ -540,22 +531,6 @@ fn singletone_phase_message(channel: u8, profile: u8, message: &[u8]) -> IResult
)(message) )(message)
} }
// Parser for one-command singletone profile Topic
fn singletone<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
map(
pair(
select_channel,
terminated(
select_profile,
tag("Singletone")
)
),
|(channel, profile): (u8, u8)| MqttTopic::Singletone(channel, profile)
)
)(topic)
}
// Parser for one-command singletone profile Command // Parser for one-command singletone profile Command
// Using JSON like command structure // Using JSON like command structure
// Possible enhancement: further modularize parsing of all separate fields // Possible enhancement: further modularize parsing of all separate fields
@ -622,15 +597,6 @@ fn singletone_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8]
)(message) )(message)
} }
fn profile<'a>(topic: &'a str) -> IResult<&'a str, MqttTopic> {
all_consuming(
value(
MqttTopic::Profile,
tag("Profile")
)
)(topic)
}
fn profile_message(message: &[u8]) -> IResult<&[u8], MqttCommand> { fn profile_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming( all_consuming(
map( map(