humpback-dds/src/mqtt_mux.rs

593 lines
22 KiB
Rust

use log::info;
use nom::IResult;
use nom::combinator::{value, map, map_res, not, opt, all_consuming};
use nom::sequence::{terminated, preceded, pair, delimited, tuple};
use nom::bytes::complete::{take, tag, tag_no_case, take_while};
use nom::character::complete::digit1;
use nom::character::is_space;
use nom::branch::alt;
use nom::number::complete::{float, double};
use uom::si::f64::Frequency;
use uom::si::frequency::{hertz, kilohertz, megahertz, gigahertz};
use arrayvec::ArrayVec;
use embedded_hal::blocking::spi::Transfer;
use core::convert::TryInto;
use crate::ClockSource as UrukulClockSource;
use crate::ClockSource::*;
use crate::Urukul;
use crate::Error;
#[derive(Debug, Clone)]
pub enum MqttTopic {
Reset,
Switch(u8),
Attenuation(u8),
Clock,
ClockSource,
ClockFrequency,
ClockDivision,
SystemClock(u8),
Singletone(u8, u8),
SingletoneFrequency(u8, u8),
SingletoneAmplitude(u8, u8),
SingletonePhase(u8, u8),
Profile,
}
// Prossible change: Make this enum public to all comm protocol (if any)
// Such that Urukul accepts the enum directly
#[derive(Debug, Clone)]
pub enum MqttCommand {
Reset,
Switch(u8, bool),
Attenuation(u8, f32),
Clock(UrukulClockSource, f64, u8),
ClockSource(UrukulClockSource),
ClockFrequency(f64),
ClockDivision(u8),
SystemClock(u8, f64),
Singletone(u8, u8, f64, f64, f64),
SingletoneFrequency(u8, u8, f64),
SingletoneAmplitude(u8, u8, f64),
SingletonePhase(u8, u8, f64),
Profile(u8)
}
pub struct MqttMux<SPI> {
urukul: Urukul<SPI>
}
impl<SPI, E> MqttMux<SPI> where SPI: Transfer<u8, Error = E> {
pub fn new(urukul: Urukul<SPI>) -> Self {
MqttMux {
urukul
}
}
pub fn process_mqtt(&mut self, topic: &str, message: &[u8]) -> Result<(), Error<E>> {
let header = self.parse_topic(topic)
.map_err(|_| Error::MqttTopicError)?;
info!("Parsed command topic: {:?}", header);
let (_, command) = self.parse_message(header, message)
.map_err(|_| Error::MqttCommandError)?;
info!("Parsed comamnd message: {:?}", command);
self.execute(command)
}
fn parse_topic<'a>(&mut self, topic: &'a str) -> Result<MqttTopic, Error<E>> {
let mut assigned_channel = false;
let mut assigned_profile = false;
let mut channel :u8 = 0;
let mut profile :u8 = 0;
// Verify that the topic must start with Urukul/Control/ or /Urukul/Control/
let mut header = topic.strip_prefix("/Urukul/Control/")
.or_else(|| topic.strip_prefix("Urukul/Control/"))
.ok_or(Error::MqttCommandError)?;
loop {
match header {
// The topic has a channel subtopic
_ if header.starts_with("Channel") => {
// MQTT command should only mention channel once appropriately
// Channel must be referred before profile,
// as a channel is broader than a profile
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
// Remove the "Channel" part of the subtopic
header = header.strip_prefix("Channel")
.ok_or(Error::MqttCommandError)?;
// Remove the channel number at the end of the subtopic
// But store the channel as a char, so it can be removed easily
let numeric_char :char = header.chars()
.next()
.ok_or(Error::MqttCommandError)?;
// Record the channel number
channel = numeric_char.to_digit(10)
.ok_or(Error::MqttCommandError)?
.try_into()
.unwrap();
assigned_channel = true;
header = header.strip_prefix(numeric_char)
.ok_or(Error::MqttCommandError)?;
// Remove forward slash ("/")
header = header.strip_prefix("/")
.ok_or(Error::MqttCommandError)?;
},
_ if (header.starts_with("Profile") && assigned_channel) => {
// MQTT command should only mention profile once appropriately
if assigned_profile {
return Err(Error::MqttCommandError);
}
// Remove the "Profile" part of the subtopic
header = header.strip_prefix("Profile")
.ok_or(Error::MqttCommandError)?;
// Remove the profile number at the end of the subtopic
// But store the profile as a char, so it can be removed easily
let numeric_char :char = header.chars()
.next()
.ok_or(Error::MqttCommandError)?;
// Record the channel number
profile = numeric_char.to_digit(10)
.ok_or(Error::MqttCommandError)?
.try_into()
.unwrap();
assigned_profile = true;
header = header.strip_prefix(numeric_char)
.ok_or(Error::MqttCommandError)?;
// Remove forward slash ("/")
header = header.strip_prefix("/")
.ok_or(Error::MqttCommandError)?;
},
"Reset" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Reset);
},
"Switch" => {
// Switch is a channel specific topic
if !(assigned_channel && !assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Switch(channel));
},
"Attenuation" => {
// Attenuation is a channel specific topic
if !(assigned_channel && !assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Attenuation(channel));
},
"Clock" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Clock);
},
"Clock/Source" => {
// Clock/Source refers to the Urukul clock source
// It should be common for all channels and profiles
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::ClockSource);
},
"Clock/Frequency" => {
// Clock/Frequency refers to the Urukul clock frequency
// It should be common for all channels and profiles
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::ClockFrequency);
},
"Clock/Division" => {
// Clock/Division refers to the Urukul clock division
// It should be common for all channels and profiles
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::ClockDivision);
},
"SystemClock" => {
if !(assigned_channel && !assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::SystemClock(channel));
}
"Singletone" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError)
}
return Ok(MqttTopic::Singletone(channel, profile));
}
"Singletone/Frequency" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError)
}
return Ok(MqttTopic::SingletoneFrequency(channel, profile));
}
"Singletone/Amplitude" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError)
}
return Ok(MqttTopic::SingletoneAmplitude(channel, profile));
}
"Singletone/Phase" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError)
}
return Ok(MqttTopic::SingletonePhase(channel, profile));
}
"Profile" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError)
}
return Ok(MqttTopic::Profile);
}
_ => return Err(Error::MqttCommandError),
};
}
}
fn parse_message<'a>(&mut self, topic: MqttTopic, message: &'a [u8]) -> IResult<&'a [u8], MqttCommand> {
match topic {
MqttTopic::Reset => Ok((message, MqttCommand::Reset)),
MqttTopic::Switch(ch) => switch_message(ch, message),
MqttTopic::Attenuation(ch) => attenuation_message(ch, message),
MqttTopic::Clock => clock_message(message),
MqttTopic::ClockSource => clock_source_message(message),
MqttTopic::ClockFrequency => clock_frequency_message(message),
MqttTopic::ClockDivision => clock_division_message(message),
MqttTopic::SystemClock(ch) => system_clock_message(ch, message),
MqttTopic::Singletone(ch, prof) => singletone_message(ch, prof, message),
MqttTopic::SingletoneFrequency(ch, prof) => singletone_frequency_message(ch, prof, message),
MqttTopic::SingletoneAmplitude(ch, prof) => singletone_amplitude_message(ch, prof, message),
MqttTopic::SingletonePhase(ch, prof) => singletone_phase_message(ch, prof, message),
MqttTopic::Profile => profile_message(message),
}
}
fn execute(&mut self, command: MqttCommand) -> Result<(), Error<E>> {
match command {
MqttCommand::Reset => self.urukul.reset(),
MqttCommand::Switch(ch, state) => self.urukul.set_channel_switch(ch.into(), state),
MqttCommand::Attenuation(ch, ampl) => self.urukul.set_channel_attenuation(ch, ampl),
MqttCommand::Clock(src, freq, div) => self.urukul.set_clock(src, freq, div),
MqttCommand::ClockSource(src) => self.urukul.set_clock_source(src),
MqttCommand::ClockFrequency(freq) => self.urukul.set_clock_frequency(freq),
MqttCommand::ClockDivision(div) => self.urukul.set_clock_division(div),
MqttCommand::SystemClock(ch, freq) => self.urukul.set_channel_sys_clk(ch, freq),
MqttCommand::Singletone(ch, prof, freq, ampl, deg) => self.urukul.set_channel_single_tone_profile(ch, prof, freq, ampl, deg),
MqttCommand::SingletoneFrequency(ch, prof, freq) => self.urukul.set_channel_single_tone_profile_frequency(ch, prof, freq),
MqttCommand::SingletoneAmplitude(ch, prof, ampl) => self.urukul.set_channel_single_tone_profile_amplitude(ch, prof, ampl),
MqttCommand::SingletonePhase(ch, prof, deg) => self.urukul.set_channel_single_tone_profile_phase(ch, prof, deg),
MqttCommand::Profile(prof) => self.urukul.set_profile(prof),
}
}
}
// Topic separator parser
fn topic_separator<'a>(topic: &'a str) -> IResult<&'a str, ()> {
value((), tag("/"))(topic)
}
// Message separator parser
fn message_separator(message: &[u8]) -> IResult<&[u8], ()> {
value(
(),
preceded(
whitespace,
preceded(
tag("/"),
whitespace
)
)
)(message)
}
// Read whitespace
fn whitespace(message: &[u8]) -> IResult<&[u8], ()> {
value((), take_while(is_space))(message)
}
// Reader for uom instances
fn read_frequency(message: &[u8]) -> IResult<&[u8], f64> {
map(
pair(
double,
opt(
preceded(
whitespace,
alt((
value(1.0, tag_no_case("hz")),
value(1_000.0, tag_no_case("khz")),
value(1_000_000.0, tag_no_case("mhz")),
value(1_000_000_000.0, tag_no_case("ghz"))
))
)
)
),
|(freq, unit): (f64, Option<f64>)| {
freq * unit.map_or(1.0, |mul| mul)
}
)(message)
}
// Parser for Switch Command Message
fn switch_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
alt((
value(true, tag("on")),
value(false, tag("off"))
)),
|switch| MqttCommand::Switch(channel, switch)
)
)(message)
}
// Parser for Attenuation Command Message
fn attenuation_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
terminated(
float,
opt(
preceded(
whitespace,
tag_no_case("db")
)
)
),
|att: f32| MqttCommand::Attenuation(channel, att)
)
)(message)
}
// Parser for Clock Source Command Message
fn clock_source_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
alt((
value(MqttCommand::ClockSource(UrukulClockSource::OSC), tag_no_case("OSC")),
value(MqttCommand::ClockSource(UrukulClockSource::MMCX), tag_no_case("MMCX")),
value(MqttCommand::ClockSource(UrukulClockSource::SMA), tag_no_case("SMA"))
))
)(message)
}
// Parser for Clock Frequency Command Message
fn clock_frequency_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
read_frequency,
|freq: f64| MqttCommand::ClockFrequency(freq)
)
)(message)
}
// Parser for Clock Division Command Message
fn clock_division_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
digit1,
|div: &[u8]| MqttCommand::ClockDivision(
u8::from_str_radix(
core::str::from_utf8(div).unwrap(),
10
).unwrap()
)
)
)(message)
}
// Parser for one-command master clock setup message
// Possible improvements: Chop off redundant braces and quotes
// Allow optional parameters/permutation of parameters
fn clock_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
delimited(
tag("{"),
tuple((
preceded(
whitespace,
preceded(
tag("\"source\":"),
preceded(
whitespace,
terminated(
alt((
value(UrukulClockSource::OSC, tag_no_case("OSC")),
value(UrukulClockSource::MMCX, tag_no_case("MMCX")),
value(UrukulClockSource::SMA, tag_no_case("SMA"))
)),
tag(",")
)
)
)
),
preceded(
whitespace,
preceded(
tag("\"frequency\":"),
preceded(
whitespace,
terminated(
read_frequency,
tag(",")
)
)
)
),
preceded(
whitespace,
preceded(
tag("\"division\":"),
preceded(
whitespace,
terminated(
map_res(
digit1,
|div: &[u8]| u8::from_str_radix(core::str::from_utf8(div).unwrap(), 10)
),
whitespace
)
)
)
)
)),
tag("}")
),
|(src, freq, div): (UrukulClockSource, f64, u8)| MqttCommand::Clock(src, freq, div)
)
)(message)
}
// Message parser for f_sys_clk of any channels
fn system_clock_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
read_frequency,
|freq: f64| MqttCommand::SystemClock(channel, freq)
)
)(message)
}
// Parser for Singletone frequency Command Message
fn singletone_frequency_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
read_frequency,
|freq: f64| MqttCommand::SingletoneFrequency(channel, profile, freq)
)
)(message)
}
// Parser for Singletone AMplitude Command Message
fn singletone_amplitude_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
double,
|ampl: f64| MqttCommand::SingletoneAmplitude(channel, profile, ampl)
)
)(message)
}
// Parser for Phase Command Message
fn singletone_phase_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
terminated(
double,
opt(
preceded(
whitespace,
tag_no_case("deg")
)
)
),
|deg: f64| MqttCommand::SingletonePhase(channel, profile, deg)
)
)(message)
}
// Parser for one-command singletone profile Command
// Using JSON like command structure
// Possible improvements: Chop off redundant braces and quotes
// Allow optional parameters/permutation of parameters
fn singletone_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
tuple((
preceded(
tag("{"),
preceded(
whitespace,
preceded(
tag("\"frequency\":"),
preceded(
whitespace,
read_frequency
)
)
)
),
preceded(
tag(","),
preceded(
whitespace,
preceded(
tag("\"amplitude\":"),
preceded(
whitespace,
double
)
)
)
),
preceded(
tag(","),
preceded(
whitespace,
preceded(
tag("\"phase\":"),
preceded(
whitespace,
terminated(
double,
preceded(
opt(
preceded(
whitespace,
tag_no_case("deg")
)
),
preceded(
whitespace,
tag("}")
)
)
)
)
)
)
)
)),
|(freq, ampl, phase): (f64, f64, f64)| MqttCommand::Singletone(channel, profile, freq, phase, ampl)
)
)(message)
}
fn profile_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(
map(
digit1,
|num: &[u8]| {
MqttCommand::Profile(
u8::from_str_radix(core::str::from_utf8(num).unwrap(), 10).unwrap()
)
}
)
)(message)
}