humpback-dds/src/mqtt_mux.rs

1387 lines
54 KiB
Rust

use crate::config::{ChannelConfig, ProfileSetup, SingleTone, UrukulConfig, RAM};
use crate::dds::{RAMDestination, RAMOperationMode};
use crate::flash::Flash;
use crate::flash_store::{update_flash, FlashStore};
use crate::urukul::ClockSource as UrukulClockSource;
use crate::urukul::Error;
use crate::urukul::Urukul;
use core::convert::TryInto;
use embedded_hal::blocking::spi::Transfer;
use heapless::{consts::*, String, Vec};
use nom::branch::{alt, permutation};
use nom::bytes::complete::{tag, tag_no_case, take_while};
use nom::character::complete::digit1;
use nom::character::is_space;
use nom::combinator::{all_consuming, map, map_res, opt, value};
use nom::number::complete::{double, float};
use nom::sequence::{pair, preceded, terminated};
use nom::IResult;
use ryu;
#[derive(Debug, Clone, PartialEq)]
pub enum MqttTopic {
Reset,
Switch(u8),
Attenuation(u8),
Clock,
ClockSource,
ClockFrequency,
ClockDivision,
SystemClock(u8),
Singletone(u8, u8),
SingletoneFrequency(u8, u8),
SingletoneAmplitude(u8, u8),
SingletonePhase(u8, u8),
Profile,
Save,
Load,
DefaultFTW(u8),
DefaultASF(u8),
AppendBytes,
CommitBuffer,
RAM(u8, u8),
}
// Prossible change: Make this enum public to all comm protocol (if any)
// Such that Urukul accepts the enum directly
#[derive(Debug, Clone)]
pub enum MqttCommand {
ProcessError(&'static str),
Reset,
Switch(u8, bool),
Attenuation(u8, f32),
Clock(UrukulClockSource, f64, u8),
ClockSource(UrukulClockSource),
ClockFrequency(f64),
ClockDivision(u8),
SystemClock(u8, f64),
Singletone(u8, u8, f64, f64, f64),
SingletoneFrequency(u8, u8, f64),
SingletoneAmplitude(u8, u8, f64),
SingletonePhase(u8, u8, f64),
Profile(u8),
Save,
Load,
DefaultFTW(u8, f64),
DefaultASF(u8, f64),
AppendBytes(usize),
CommitBuffer(RAMDestination, u16, u8),
RAM(u8, u8, u16, u16, RAMOperationMode, u16),
}
pub struct MqttMux<'s, SPI> {
urukul: Urukul<SPI>,
yet_to_respond: Option<MqttCommand>,
flash_controller: Flash,
flash_store: FlashStore,
name: &'s str,
float_buffer: ryu::Buffer,
}
const CHANNELS: [&str; 4] = ["ch0", "ch1", "ch2", "ch3"];
static mut SERDE_BUFFER: [u8; 64] = [0; 64];
impl<'s, SPI, E> MqttMux<'s, SPI>
where
SPI: Transfer<u8, Error = E>,
{
pub fn new(
urukul: Urukul<SPI>,
flash_controller: Flash,
flash_store: FlashStore,
name: &'s str,
) -> Self {
Self {
urukul: urukul,
yet_to_respond: None,
flash_controller,
flash_store,
name: name,
float_buffer: ryu::Buffer::new(),
}
}
fn load_device(&mut self) -> Result<(), Error<E>> {
self.urukul.reset()?;
let profile = match self
.flash_store
.read_value::<UrukulConfig>("urukul")
.unwrap()
{
Some(urukul_config) => {
self.urukul.set_clock(
urukul_config.clk_src,
urukul_config.clk_freq,
urukul_config.clk_div,
)?;
self.urukul.set_profile(urukul_config.profile)?;
urukul_config.profile
}
None => 0,
};
for (channel, channel_tag) in CHANNELS.iter().enumerate() {
match self
.flash_store
.read_value::<ChannelConfig>(channel_tag)
.unwrap()
{
Some(channel_config) => {
self.urukul
.set_channel_switch(channel as u32, channel_config.sw)?;
self.urukul
.set_channel_attenuation(channel as u8, channel_config.att)?;
self.urukul
.set_channel_sys_clk(channel as u8, channel_config.sys_clk)?;
self.urukul
.set_channel_default_ftw(channel as u8, channel_config.freq)?;
self.urukul
.set_channel_default_asf(channel as u8, channel_config.asf)?;
if let ProfileSetup::Singletone(singletone) = channel_config.profile {
self.urukul.set_channel_single_tone_profile(
channel as u8,
profile,
singletone.freq,
singletone.phase,
singletone.asf,
)?;
} else if let ProfileSetup::RAM(ram) = channel_config.profile {
let op_mode = match ram.op_mode {
0 => RAMOperationMode::DirectSwitch,
1 => RAMOperationMode::RampUp,
2 => RAMOperationMode::BidirectionalRamp,
3 => RAMOperationMode::ContinuousBidirectionalRamp,
_ => RAMOperationMode::ContinuousRecirculate,
};
self.urukul.set_channel_ram_profile(
channel as u8,
profile,
ram.start,
ram.end,
op_mode,
ram.stride,
)?;
}
}
None => (),
};
}
Ok(())
}
fn save_device(&mut self) -> Result<(), Error<E>> {
let urukul_config = UrukulConfig {
clk_src: { self.urukul.get_clock_source()? },
clk_freq: { self.urukul.get_clock_frequency() },
clk_div: { self.urukul.get_clock_division()? },
profile: { self.urukul.get_profile()? },
};
unsafe {
self.flash_store
.write_value("urukul", &urukul_config, &mut SERDE_BUFFER)
.unwrap();
}
for channel in 0..4 {
let ram = self.urukul.get_channel_ram_mode_enabled(channel as u8)?;
let profile_setup = if ram {
let (start, end, stride, op_mode) = self
.urukul
.get_channel_ram_profile(channel as u8, urukul_config.profile)?;
let ram_profile = RAM {
start,
end,
stride,
op_mode,
};
ProfileSetup::RAM(ram_profile)
} else {
let (freq, phase, asf) = self
.urukul
.get_channel_single_tone_profile(channel as u8, urukul_config.profile)?;
let singletone_profile = SingleTone { freq, phase, asf };
ProfileSetup::Singletone(singletone_profile)
};
let channel_config = ChannelConfig {
sw: { self.urukul.get_channel_switch_status(channel as u32)? },
att: { self.urukul.get_channel_attenuation(channel as u8)? },
sys_clk: { self.urukul.get_channel_sys_clk(channel as u8)? },
freq: { self.urukul.get_channel_default_ftw(channel as u8)? },
asf: { self.urukul.get_channel_default_asf(channel as u8)? },
profile: profile_setup,
};
unsafe {
self.flash_store
.write_value(CHANNELS[channel], &channel_config, &mut SERDE_BUFFER)
.unwrap();
}
}
update_flash(&mut self.flash_controller, &mut self.flash_store).unwrap();
Ok(())
}
// Instead of using a return type, the result of processing the command is stored internally
// Invoke process_mqtt_egress to get a response after invoking ingress handler
pub fn process_mqtt_ingress(&mut self, topic: &str, message: &[u8]) {
let topic = match self.parse_topic(topic) {
Ok(t) => t,
Err(_) => {
self.yet_to_respond = Some(MqttCommand::ProcessError("Cannot prase MQTT topic"));
return;
}
};
// All MQTT messages should be parsed except appending bytes to buffer
// Directly pass the buffer to urukul in this case
let command = if topic == MqttTopic::AppendBytes {
let length = message.len();
if self.urukul.append_dds_ram_buffer(message).is_err() {
self.yet_to_respond =
Some(MqttCommand::ProcessError("Cannot push bytes to buffer"));
return;
}
MqttCommand::AppendBytes(length)
} else {
match self.parse_message(topic, message) {
Ok((_, cmd)) => cmd,
Err(_) => {
self.yet_to_respond =
Some(MqttCommand::ProcessError("Cannot parse MQTT message"));
return;
}
}
};
self.yet_to_respond = match self.execute(command.clone()) {
Err(_) => Some(MqttCommand::ProcessError("Cannot execute MQTT command")),
Ok(()) => Some(command),
};
}
// Be sure to call egress function after each ingress.
// Otherwise, response will be lost if successive valid MQTT messages were captured
// without calling egress in between
pub fn process_mqtt_egress(
&mut self,
) -> Result<Vec<(String<U128>, String<U64>), U4>, Error<E>> {
// Remove previously executed command, and process it afterwards
let prev_cmd = self.yet_to_respond.clone();
self.yet_to_respond = None;
let mut vec = Vec::new();
match prev_cmd {
Some(cmd) => match cmd {
MqttCommand::ProcessError(e_str) => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Error")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
String::from(e_str),
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::Reset => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Reset")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
String::from(match self.urukul.test() {
Ok(0) => "Reset successful.",
_ => "Reset error!",
}),
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::Switch(ch, _) => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push_str("/Switch")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
String::from(if self.urukul.get_channel_switch_status(ch.into())? {
"on"
} else {
"off"
})
},
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::Attenuation(ch, _) => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push_str("/Attenuation")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
String::from(
self.float_buffer
.format_finite(self.urukul.get_channel_attenuation(ch)?),
)
},
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::Clock(_, _, _) => {
vec.push(self.get_clock_source_message()?)
.map_err(|_| Error::VectorOutOfSpace)?;
vec.push(self.get_clock_frequency_message()?)
.map_err(|_| Error::VectorOutOfSpace)?;
vec.push(self.get_clock_division_message()?)
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::ClockSource(_) => {
vec.push(self.get_clock_source_message()?)
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::ClockFrequency(_) => {
vec.push(self.get_clock_frequency_message()?)
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::ClockDivision(_) => {
vec.push(self.get_clock_division_message()?)
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::SystemClock(ch, _) => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push_str("/SystemClock")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let mut message_str = String::from(
self.float_buffer
.format_finite(self.urukul.get_channel_sys_clk(ch)?),
);
message_str
.push_str(" Hz")
.map_err(|_| Error::StringOutOfSpace)?;
message_str
},
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::Singletone(ch, pr, _, _, _)
| MqttCommand::SingletoneFrequency(ch, pr, _)
| MqttCommand::SingletoneAmplitude(ch, pr, _)
| MqttCommand::SingletonePhase(ch, pr, _) => {
let (f_out, phase, ampl) =
self.urukul.get_channel_single_tone_profile(ch, pr)?;
vec.push(self.get_single_tone_frequency_message(ch, f_out)?)
.map_err(|_| Error::StringOutOfSpace)?;
vec.push(self.get_single_tone_phase_message(ch, phase)?)
.map_err(|_| Error::StringOutOfSpace)?;
vec.push(self.get_single_tone_amplitude_message(ch, ampl)?)
.map_err(|_| Error::StringOutOfSpace)?;
Ok(vec)
}
MqttCommand::Profile(_) => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Profile")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let mut message_str = String::new();
let prof = self.urukul.get_profile()?;
message_str
.push(char::from_digit(prof.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
message_str
},
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::Save => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Save")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
String::from("Saved device."),
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::Load => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Load")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
String::from("Loaded from flash."),
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::DefaultFTW(ch, _freq) => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push_str("/Background/Frequency")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let freq = self.urukul.get_channel_default_ftw(ch)?;
let mut message_str =
String::from(self.float_buffer.format_finite(freq));
message_str
.push_str(" Hz")
.map_err(|_| Error::StringOutOfSpace)?;
message_str
},
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::DefaultASF(ch, _ampl) => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push_str("/Background/Amplitude")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let ampl = self.urukul.get_channel_default_asf(ch)?;
let message_str = String::from(self.float_buffer.format_finite(ampl));
message_str
},
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::AppendBytes(len) => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Buffer/Append")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let mut message_str = String::from("Pushed ");
message_str
.push_str(self.float_buffer.format_finite(len as f64))
.map_err(|_| Error::StringOutOfSpace)?;
message_str
.push_str(" bytes to buffer.")
.map_err(|_| Error::StringOutOfSpace)?;
message_str
},
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::CommitBuffer(_dest, _start_addr, ch) => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Buffer/Commit")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let mut message_str = String::from("Pushed bytes to channel ");
message_str
.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
message_str
},
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
MqttCommand::RAM(ch, _pr, _start_addr, _end_addr, _op_mode, _) => {
vec.push((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/RAM")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let mut message_str = String::from("Selected RAM profile for channel ");
message_str
.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
message_str
},
))
.map_err(|_| Error::VectorOutOfSpace)?;
Ok(vec)
}
},
None => Ok(vec),
}
}
fn parse_topic<'a>(&mut self, topic: &'a str) -> Result<MqttTopic, Error<E>> {
let mut assigned_channel = false;
let mut assigned_profile = false;
let mut channel: u8 = 0;
let mut profile: u8 = 0;
// Verify that the topic starts with <name>/Control/ or /<name>/Control/
let mut header = {
let mut topic_builder_with_slash: String<U128> = String::from("/");
topic_builder_with_slash
.push_str(self.name)
.map_err(|_| Error::StringOutOfSpace)?;
topic_builder_with_slash
.push_str("/Control/")
.map_err(|_| Error::StringOutOfSpace)?;
let topic_builder: &str = topic_builder_with_slash
.as_str()
.strip_prefix("/")
.ok_or(Error::StringOutOfSpace)?;
topic
.strip_prefix(topic_builder_with_slash.as_str())
.or_else(|| topic.strip_prefix(topic_builder))
.ok_or(Error::MqttCommandError)?
};
loop {
match header {
// The topic has a channel subtopic
_ if header.starts_with("Channel") => {
// MQTT command should only mention channel once appropriately
// Channel must be referred before profile,
// as a channel is broader than a profile
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
// Remove the "Channel" part of the subtopic
header = header
.strip_prefix("Channel")
.ok_or(Error::MqttCommandError)?;
// Remove the channel number at the end of the subtopic
// But store the channel as a char, so it can be removed easily
let numeric_char: char =
header.chars().next().ok_or(Error::MqttCommandError)?;
// Record the channel number
channel = numeric_char
.to_digit(10)
.ok_or(Error::MqttCommandError)?
.try_into()
.unwrap();
assigned_channel = true;
header = header
.strip_prefix(numeric_char)
.ok_or(Error::MqttCommandError)?;
// Remove forward slash ("/")
header = header.strip_prefix("/").ok_or(Error::MqttCommandError)?;
}
_ if (header.starts_with("Profile") && assigned_channel) => {
// MQTT command should only mention profile once appropriately
if assigned_profile {
return Err(Error::MqttCommandError);
}
// Remove the "Profile" part of the subtopic
header = header
.strip_prefix("Profile")
.ok_or(Error::MqttCommandError)?;
// Remove the profile number at the end of the subtopic
// But store the profile as a char, so it can be removed easily
let numeric_char: char =
header.chars().next().ok_or(Error::MqttCommandError)?;
// Record the channel number
profile = numeric_char
.to_digit(10)
.ok_or(Error::MqttCommandError)?
.try_into()
.unwrap();
assigned_profile = true;
header = header
.strip_prefix(numeric_char)
.ok_or(Error::MqttCommandError)?;
// Remove forward slash ("/")
header = header.strip_prefix("/").ok_or(Error::MqttCommandError)?;
}
"Reset" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Reset);
}
"Switch" => {
// Switch is a channel specific topic
if !(assigned_channel && !assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Switch(channel));
}
"Attenuation" => {
// Attenuation is a channel specific topic
if !(assigned_channel && !assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Attenuation(channel));
}
"Clock" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Clock);
}
"Clock/Source" => {
// Clock/Source refers to the Urukul clock source
// It should be common for all channels and profiles
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::ClockSource);
}
"Clock/Frequency" => {
// Clock/Frequency refers to the Urukul clock frequency
// It should be common for all channels and profiles
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::ClockFrequency);
}
"Clock/Division" => {
// Clock/Division refers to the Urukul clock division
// It should be common for all channels and profiles
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::ClockDivision);
}
"SystemClock" => {
if !(assigned_channel && !assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::SystemClock(channel));
}
"Singletone" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Singletone(channel, profile));
}
"Singletone/Frequency" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::SingletoneFrequency(channel, profile));
}
"Singletone/Amplitude" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::SingletoneAmplitude(channel, profile));
}
"Singletone/Phase" => {
if !(assigned_channel && assigned_profile) {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::SingletonePhase(channel, profile));
}
"Profile" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Profile);
}
"Save" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Save);
}
"Load" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::Load);
}
"Background/Frequency" => {
if !assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::DefaultFTW(channel));
}
"Background/Amplitude" => {
if !assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::DefaultASF(channel));
}
"Buffer/Append" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::AppendBytes);
}
"Buffer/Commit" => {
if assigned_channel || assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::CommitBuffer);
}
"RAM" => {
if !assigned_channel || !assigned_profile {
return Err(Error::MqttCommandError);
}
return Ok(MqttTopic::RAM(channel, profile));
}
_ => return Err(Error::MqttCommandError),
};
}
}
fn parse_message<'a>(
&mut self,
topic: MqttTopic,
message: &'a [u8],
) -> IResult<&'a [u8], MqttCommand> {
match topic {
MqttTopic::Reset => Ok((message, MqttCommand::Reset)),
MqttTopic::Switch(ch) => switch_message(ch, message),
MqttTopic::Attenuation(ch) => attenuation_message(ch, message),
MqttTopic::Clock => clock_message(message),
MqttTopic::ClockSource => clock_source_message(message),
MqttTopic::ClockFrequency => clock_frequency_message(message),
MqttTopic::ClockDivision => clock_division_message(message),
MqttTopic::SystemClock(ch) => system_clock_message(ch, message),
MqttTopic::Singletone(ch, prof) => singletone_message(ch, prof, message),
MqttTopic::SingletoneFrequency(ch, prof) => {
singletone_frequency_message(ch, prof, message)
}
MqttTopic::SingletoneAmplitude(ch, prof) => {
singletone_amplitude_message(ch, prof, message)
}
MqttTopic::SingletonePhase(ch, prof) => singletone_phase_message(ch, prof, message),
MqttTopic::Profile => profile_message(message),
MqttTopic::Save => Ok((message, MqttCommand::Save)),
MqttTopic::Load => Ok((message, MqttCommand::Load)),
MqttTopic::DefaultFTW(ch) => default_frequency_message(ch, message),
MqttTopic::DefaultASF(ch) => default_amplitude_message(ch, message),
MqttTopic::AppendBytes => unreachable!(), // This topic should not be parsed
MqttTopic::CommitBuffer => commit_buffer_message(message),
MqttTopic::RAM(ch, pr) => ram_message(ch, pr, message),
}
}
fn execute(&mut self, command: MqttCommand) -> Result<(), Error<E>> {
match command {
MqttCommand::ProcessError(_) => Ok(()),
MqttCommand::Reset => self.urukul.reset(),
MqttCommand::Switch(ch, state) => self.urukul.set_channel_switch(ch.into(), state),
MqttCommand::Attenuation(ch, ampl) => self.urukul.set_channel_attenuation(ch, ampl),
MqttCommand::Clock(src, freq, div) => self.urukul.set_clock(src, freq, div),
MqttCommand::ClockSource(src) => self.urukul.set_clock_source(src),
MqttCommand::ClockFrequency(freq) => self.urukul.set_clock_frequency(freq),
MqttCommand::ClockDivision(div) => self.urukul.set_clock_division(div),
MqttCommand::SystemClock(ch, freq) => self.urukul.set_channel_sys_clk(ch, freq),
MqttCommand::Singletone(ch, prof, freq, ampl, deg) => self
.urukul
.set_channel_single_tone_profile(ch, prof, freq, ampl, deg),
MqttCommand::SingletoneFrequency(ch, prof, freq) => self
.urukul
.set_channel_single_tone_profile_frequency(ch, prof, freq),
MqttCommand::SingletoneAmplitude(ch, prof, ampl) => self
.urukul
.set_channel_single_tone_profile_amplitude(ch, prof, ampl),
MqttCommand::SingletonePhase(ch, prof, deg) => self
.urukul
.set_channel_single_tone_profile_phase(ch, prof, deg),
MqttCommand::Profile(prof) => self.urukul.set_profile(prof),
MqttCommand::Save => self.save_device(),
MqttCommand::Load => self.load_device(),
MqttCommand::DefaultFTW(ch, freq) => self.urukul.set_channel_default_ftw(ch, freq),
MqttCommand::DefaultASF(ch, ampl) => self.urukul.set_channel_default_asf(ch, ampl),
MqttCommand::AppendBytes(_) => Ok(()), // The bytes were not parsed and pushed
MqttCommand::CommitBuffer(dest, start_addr, ch) => self
.urukul
.commit_ram_buffer_to_channel(ch, start_addr, dest),
MqttCommand::RAM(ch, pr, start_addr, end_addr, op_mode, ramp_rate) => self
.urukul
.set_channel_ram_profile(ch, pr, start_addr, end_addr, op_mode, ramp_rate),
}
}
fn get_clock_source_message(&mut self) -> Result<(String<U128>, String<U64>), Error<E>> {
Ok((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Clock/Source")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
self.urukul.get_clock_source().map(|src| match src {
UrukulClockSource::OSC => String::from("OSC"),
UrukulClockSource::MMCX => String::from("MMCX"),
UrukulClockSource::SMA => String::from("SMA"),
})?,
))
}
fn get_clock_frequency_message(&mut self) -> Result<(String<U128>, String<U64>), Error<E>> {
Ok((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Clock/Frequency")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let mut freq_str = String::from(
self.float_buffer
.format_finite(self.urukul.get_clock_frequency()),
);
freq_str
.push_str(" Hz")
.map_err(|_| Error::StringOutOfSpace)?;
freq_str
},
))
}
fn get_clock_division_message(&mut self) -> Result<(String<U128>, String<U64>), Error<E>> {
Ok((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Clock/Division")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
self.urukul.get_clock_division().map(|src| match src {
1 => String::from("1"),
2 => String::from("2"),
4 => String::from("4"),
_ => unreachable!(),
})?
},
))
}
fn get_single_tone_frequency_message(
&mut self,
ch: u8,
f_out: f64,
) -> Result<(String<U128>, String<U64>), Error<E>> {
Ok((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push_str("/Singletone/Frequency")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let mut message_str = String::from(self.float_buffer.format_finite(f_out));
message_str
.push_str(" Hz")
.map_err(|_| Error::StringOutOfSpace)?;
message_str
},
))
}
fn get_single_tone_amplitude_message(
&mut self,
ch: u8,
ampl: f64,
) -> Result<(String<U128>, String<U64>), Error<E>> {
Ok((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push_str("/Singletone/Amplitude")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let message_str = String::from(self.float_buffer.format_finite(ampl));
message_str
},
))
}
fn get_single_tone_phase_message(
&mut self,
ch: u8,
phase: f64,
) -> Result<(String<U128>, String<U64>), Error<E>> {
Ok((
{
let mut topic_string = String::from(self.name);
topic_string
.push_str("/Feedback/Channel")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push(char::from_digit(ch.into(), 10).unwrap())
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
.push_str("/Singletone/Phase")
.map_err(|_| Error::StringOutOfSpace)?;
topic_string
},
{
let mut message_str = String::from(self.float_buffer.format_finite(phase));
message_str
.push_str(" deg")
.map_err(|_| Error::StringOutOfSpace)?;
message_str
},
))
}
}
// Read message parameter separator (optional comma and whitespace)
fn message_separator(message: &[u8]) -> IResult<&[u8], ()> {
preceded(opt(tag(",")), whitespace)(message)
}
// Read whitespace
fn whitespace(message: &[u8]) -> IResult<&[u8], ()> {
value((), take_while(is_space))(message)
}
// Reader for uom instances
fn read_frequency(message: &[u8]) -> IResult<&[u8], f64> {
map(
pair(
double,
opt(preceded(
whitespace,
alt((
value(1.0, tag_no_case("hz")),
value(1_000.0, tag_no_case("khz")),
value(1_000_000.0, tag_no_case("mhz")),
value(1_000_000_000.0, tag_no_case("ghz")),
)),
)),
),
|(freq, unit): (f64, Option<f64>)| freq * unit.map_or(1.0, |mul| mul),
)(message)
}
// Parser for Switch Command Message
fn switch_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(
alt((value(true, tag("on")), value(false, tag("off")))),
|switch| MqttCommand::Switch(channel, switch),
))(message)
}
// Parser for Attenuation Command Message
fn attenuation_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(
terminated(float, opt(preceded(whitespace, tag_no_case("db")))),
|att: f32| MqttCommand::Attenuation(channel, att),
))(message)
}
// Parser for Clock Source Command Message
fn clock_source_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(alt((
value(
MqttCommand::ClockSource(UrukulClockSource::OSC),
tag_no_case("OSC"),
),
value(
MqttCommand::ClockSource(UrukulClockSource::MMCX),
tag_no_case("MMCX"),
),
value(
MqttCommand::ClockSource(UrukulClockSource::SMA),
tag_no_case("SMA"),
),
)))(message)
}
// Parser for Clock Frequency Command Message
fn clock_frequency_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(read_frequency, |freq: f64| {
MqttCommand::ClockFrequency(freq)
}))(message)
}
// Parser for Clock Division Command Message
fn clock_division_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(digit1, |div: &[u8]| {
MqttCommand::ClockDivision(
u8::from_str_radix(core::str::from_utf8(div).unwrap(), 10).unwrap(),
)
}))(message)
}
// Parser for one-command master clock setup message
fn clock_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(
permutation((
preceded(
tag_no_case("source:"),
preceded(
whitespace,
terminated(
alt((
value(UrukulClockSource::OSC, tag_no_case("OSC")),
value(UrukulClockSource::MMCX, tag_no_case("MMCX")),
value(UrukulClockSource::SMA, tag_no_case("SMA")),
)),
message_separator,
),
),
),
preceded(
tag_no_case("frequency:"),
preceded(whitespace, terminated(read_frequency, message_separator)),
),
preceded(
tag_no_case("division:"),
preceded(
whitespace,
terminated(
map_res(digit1, |div: &[u8]| {
u8::from_str_radix(core::str::from_utf8(div).unwrap(), 10)
}),
message_separator,
),
),
),
)),
|(src, freq, div): (UrukulClockSource, f64, u8)| MqttCommand::Clock(src, freq, div),
))(message)
}
// Message parser for f_sys_clk of any channels
fn system_clock_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(read_frequency, |freq: f64| {
MqttCommand::SystemClock(channel, freq)
}))(message)
}
// Parser for Singletone frequency Command Message
fn singletone_frequency_message(
channel: u8,
profile: u8,
message: &[u8],
) -> IResult<&[u8], MqttCommand> {
all_consuming(map(read_frequency, |freq: f64| {
MqttCommand::SingletoneFrequency(channel, profile, freq)
}))(message)
}
// Parser for default frequency Command Message
fn default_frequency_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(read_frequency, |freq: f64| {
MqttCommand::DefaultFTW(channel, freq)
}))(message)
}
// Parser for Singletone Amplitude Command Message
fn singletone_amplitude_message(
channel: u8,
profile: u8,
message: &[u8],
) -> IResult<&[u8], MqttCommand> {
all_consuming(map(double, |ampl: f64| {
MqttCommand::SingletoneAmplitude(channel, profile, ampl)
}))(message)
}
// Parser for Default Amplitude Command Message
fn default_amplitude_message(channel: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(double, |ampl: f64| {
MqttCommand::DefaultASF(channel, ampl)
}))(message)
}
// Parser for Phase Command Message
fn singletone_phase_message(
channel: u8,
profile: u8,
message: &[u8],
) -> IResult<&[u8], MqttCommand> {
all_consuming(map(
terminated(double, opt(preceded(whitespace, tag_no_case("deg")))),
|deg: f64| MqttCommand::SingletonePhase(channel, profile, deg),
))(message)
}
// Parser for one-command singletone profile Command
fn singletone_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(
permutation((
preceded(
tag_no_case("frequency:"),
preceded(whitespace, terminated(read_frequency, message_separator)),
),
preceded(
tag_no_case("amplitude:"),
preceded(whitespace, terminated(double, message_separator)),
),
opt(preceded(
tag_no_case("phase:"),
preceded(
whitespace,
terminated(
double,
preceded(
opt(preceded(whitespace, tag_no_case("deg"))),
message_separator,
),
),
),
)),
)),
|(freq, ampl, phase): (f64, f64, Option<f64>)| {
MqttCommand::Singletone(channel, profile, freq, phase.unwrap_or(0.0), ampl)
},
))(message)
}
fn profile_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(digit1, |num: &[u8]| {
MqttCommand::Profile(u8::from_str_radix(core::str::from_utf8(num).unwrap(), 10).unwrap())
}))(message)
}
fn commit_buffer_message(message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(
permutation((
preceded(
tag_no_case("dest:"),
preceded(
whitespace,
terminated(
alt((
value(RAMDestination::Frequency, tag_no_case("frequency")),
value(RAMDestination::Amplitude, tag_no_case("amplitude")),
value(RAMDestination::Phase, tag_no_case("phase")),
value(RAMDestination::Polar, tag_no_case("polar")),
)),
message_separator,
),
),
),
preceded(
tag_no_case("start:"),
preceded(
whitespace,
terminated(
map_res(digit1, |start_addr: &[u8]| {
u16::from_str_radix(core::str::from_utf8(start_addr).unwrap(), 10)
}),
message_separator,
),
),
),
preceded(
tag_no_case("ch:"),
preceded(
whitespace,
terminated(
map_res(digit1, |ch: &[u8]| {
u8::from_str_radix(core::str::from_utf8(ch).unwrap(), 10)
}),
message_separator,
),
),
),
)),
|(dest, start_addr, ch): (RAMDestination, u16, u8)| {
MqttCommand::CommitBuffer(dest, start_addr, ch)
},
))(message)
}
fn ram_message(channel: u8, profile: u8, message: &[u8]) -> IResult<&[u8], MqttCommand> {
all_consuming(map(
permutation((
preceded(
tag_no_case("start:"),
preceded(
whitespace,
terminated(
map_res(digit1, |ch: &[u8]| {
u16::from_str_radix(core::str::from_utf8(ch).unwrap(), 10)
}),
message_separator,
),
),
),
preceded(
tag_no_case("end:"),
preceded(
whitespace,
terminated(
map_res(digit1, |ch: &[u8]| {
u16::from_str_radix(core::str::from_utf8(ch).unwrap(), 10)
}),
message_separator,
),
),
),
preceded(
tag_no_case("op_mode:"),
preceded(
whitespace,
terminated(
alt((
value(RAMOperationMode::DirectSwitch, tag_no_case("DS")),
value(RAMOperationMode::RampUp, tag_no_case("RU")),
value(RAMOperationMode::BidirectionalRamp, tag_no_case("BDR")),
value(
RAMOperationMode::ContinuousBidirectionalRamp,
tag_no_case("CBDR"),
),
value(RAMOperationMode::ContinuousRecirculate, tag_no_case("CR")),
)),
message_separator,
),
),
),
preceded(
tag_no_case("ramp:"),
preceded(
whitespace,
terminated(
map_res(digit1, |ch: &[u8]| {
u16::from_str_radix(core::str::from_utf8(ch).unwrap(), 10)
}),
message_separator,
),
),
),
)),
|(start_addr, end_addr, op_mode, ramp_rate): (u16, u16, RAMOperationMode, u16)| {
MqttCommand::RAM(channel, profile, start_addr, end_addr, op_mode, ramp_rate)
},
))(message)
}