diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 50e0dd8..0463aa6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -25,8 +25,7 @@ jobs: - run: > zip bin.zip target/*/release/dual-iir - target/*/release/lockin-external - target/*/release/lockin-internal + target/*/release/lockin - id: create_release uses: actions/create-release@v1 env: diff --git a/Cargo.lock b/Cargo.lock index 4aa5c80..35e1aa8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -206,7 +206,7 @@ dependencies = [ [[package]] name = "derive_miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" +source = "git+https://github.com/quartiq/miniconf.git?rev=c6f2b28#c6f2b28f735e27b337eaa986846536e904c6f2bd" dependencies = [ "proc-macro2", "quote", @@ -419,11 +419,9 @@ dependencies = [ [[package]] name = "miniconf" version = "0.1.0" -source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" +source = "git+https://github.com/quartiq/miniconf.git?rev=c6f2b28#c6f2b28f735e27b337eaa986846536e904c6f2bd" dependencies = [ "derive_miniconf", - "heapless 0.6.1", - "minimq", "serde", "serde-json-core", ] @@ -431,7 +429,7 @@ dependencies = [ [[package]] name = "minimq" version = "0.2.0" -source = "git+https://github.com/quartiq/minimq.git?rev=933687c2e4b#933687c2e4bc8a4d972de9a4d1508b0b554a8b38" +source = "git+https://github.com/quartiq/minimq.git?rev=b3f364d#b3f364d55dea35da6572f78ddb91c87bfbb453bf" dependencies = [ "bit_field", "embedded-nal", @@ -655,6 +653,12 @@ dependencies = [ "semver", ] +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" + [[package]] name = "semver" version = "0.9.0" @@ -681,10 +685,12 @@ dependencies = [ [[package]] name = "serde-json-core" -version = "0.2.0" -source = "git+https://github.com/rust-embedded-community/serde-json-core.git?rev=ee06ac91bc#ee06ac91bc43b72450a92198a00d9e5c5b9946d2" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39af17f40c2a28d2c9a7918663ddc8a10f54cc6f109ead5c3f010869761df186" dependencies = [ - "heapless 0.5.6", + "heapless 0.6.1", + "ryu", "serde", ] @@ -740,6 +746,7 @@ dependencies = [ "log", "mcp23017", "miniconf", + "minimq", "nb 1.0.0", "panic-semihosting", "paste", diff --git a/Cargo.toml b/Cargo.toml index 806600c..bfddf5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,19 +63,15 @@ rev = "a2e3ad5" [patch.crates-io.miniconf] git = "https://github.com/quartiq/miniconf.git" -rev = "314fa5587d" +rev = "c6f2b28" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" rev = "8468f11" -[patch.crates-io.minimq] +[dependencies.minimq] git = "https://github.com/quartiq/minimq.git" -rev = "933687c2e4b" - -[patch.crates-io.serde-json-core] -git = "https://github.com/rust-embedded-community/serde-json-core.git" -rev = "ee06ac91bc" +rev = "b3f364d" [features] semihosting = ["panic-semihosting", "cortex-m-log/semihosting"] diff --git a/README.md b/README.md index f91976a..80cda1d 100644 --- a/README.md +++ b/README.md @@ -29,9 +29,7 @@ to implement different use cases. Several applications are provides by default * anti-windup * derivative kick avoidance -### Lockin external - -### Lockin internal +### Lockin ## Minimal bootstrapping documentation diff --git a/miniconf.py b/miniconf.py index b059f78..7f88f65 100644 --- a/miniconf.py +++ b/miniconf.py @@ -9,11 +9,12 @@ import argparse import asyncio import json import logging +import sys +import uuid from gmqtt import Client as MqttClient -logger = logging.getLogger(__name__) - +LOGGER = logging.getLogger(__name__) class Miniconf: """An asynchronous API for controlling Miniconf devices using MQTT.""" @@ -32,27 +33,33 @@ class Miniconf: client: A connected MQTT5 client. prefix: The MQTT toptic prefix of the device to control. """ + self.uuid = uuid.uuid1() + self.request_id = 0 self.client = client self.prefix = prefix self.inflight = {} self.client.on_message = self._handle_response - self.client.subscribe(f'{prefix}/response/#') + self.client.subscribe(f'{prefix}/response/{self.uuid.hex}') - def _handle_response(self, _client, topic, payload, *_args, **_kwargs): + def _handle_response(self, _client, _topic, payload, _qos, properties): """Callback function for when messages are received over MQTT. Args: _client: The MQTT client. - topic: The topic that the message was received on. + _topic: The topic that the message was received on. payload: The payload of the message. + _qos: The quality-of-service level of the received packet + properties: A dictionary of properties associated with the message. """ - if topic not in self.inflight: - # TODO use correlation_data to distinguish clients and requests - logger.warning('Unexpected response on topic: %s', topic) - return + # Extract corrleation data from the properties + correlation_data = json.loads(properties['correlation_data'][0].decode('ascii')) + + # Get the request ID from the correlation data + request_id = correlation_data['request_id'] + + self.inflight[request_id].set_result(json.loads(payload)) + del self.inflight[request_id] - self.inflight[topic].set_result(payload.decode('ascii')) - del self.inflight[topic] async def command(self, path, value): """Write the provided data to the specified path. @@ -62,29 +69,37 @@ class Miniconf: value: The value to write to the path. Returns: - The received response to the command. + The response to the command as a dictionary. """ setting_topic = f'{self.prefix}/settings/{path}' - response_topic = f'{self.prefix}/response/{path}' - if response_topic in self.inflight: - # TODO use correlation_data to distinguish clients and requests - raise NotImplementedError( - 'Only one in-flight message per topic is supported') + response_topic = f'{self.prefix}/response/{self.uuid.hex}' + + # Assign a unique identifier to this update request. + request_id = self.request_id + self.request_id += 1 + assert request_id not in self.inflight, 'Invalid ID encountered' + + correlation_data = json.dumps({ + 'request_id': request_id, + }).encode('ascii') value = json.dumps(value) - logger.info('Sending %s to "%s"', value, setting_topic) + LOGGER.info('Sending %s to "%s"', value, setting_topic) fut = asyncio.get_running_loop().create_future() - self.inflight[response_topic] = fut + + self.inflight[request_id] = fut self.client.publish(setting_topic, payload=value, qos=0, retain=True, - response_topic=response_topic) + response_topic=response_topic, + correlation_data=correlation_data) return await fut def main(): + """ Main program entry point. """ parser = argparse.ArgumentParser( - description='Miniconf command line interface.', - formatter_class=argparse.RawDescriptionHelpFormatter, - epilog='''Examples: + description='Miniconf command line interface.', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog='''Examples: %(prog)s dt/sinara/stabilizer afe/0='"G2"' iir_ch/0/0=\ '{"y_min": -32767, "y_max": 32767, "y_offset": 0, "ba": [1.0, 0, 0, 0, 0]}' ''') @@ -100,19 +115,22 @@ def main(): args = parser.parse_args() logging.basicConfig( - format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', - level=logging.WARN - 10*args.verbose) + format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', + level=logging.WARN - 10*args.verbose) loop = asyncio.get_event_loop() async def configure_settings(): interface = await Miniconf.create(args.prefix, args.broker) - for kv in args.settings: - path, value = kv.split("=", 1) + for key_value in args.settings: + path, value = key_value.split("=", 1) response = await interface.command(path, json.loads(value)) - print(response) + print(f'{path}: {response}') + if response['code'] != 0: + return response['code'] + return 0 - loop.run_until_complete(configure_settings()) + sys.exit(loop.run_until_complete(configure_settings())) if __name__ == '__main__': diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 0225175..5c0b0f1 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -13,7 +13,7 @@ use hardware::{ InputPin, AFE0, AFE1, }; -use net::{Action, MiniconfInterface}; +use net::{Action, MqttInterface}; const SCALE: f32 = i16::MAX as _; @@ -54,7 +54,7 @@ const APP: () = { digital_input1: DigitalInput1, adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), - mqtt_config: MiniconfInterface, + mqtt: MqttInterface, #[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])] iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2], @@ -66,7 +66,7 @@ const APP: () = { // Configure the microcontroller let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); - let mqtt_config = MiniconfInterface::new( + let mqtt = MqttInterface::new( stabilizer.net.stack, "", &net::get_device_prefix( @@ -93,7 +93,7 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, - mqtt_config, + mqtt, digital_input1: stabilizer.digital_inputs.1, settings: Settings::default(), } @@ -152,14 +152,10 @@ const APP: () = { } } - #[idle(resources=[mqtt_config], spawn=[settings_update])] + #[idle(resources=[mqtt], spawn=[settings_update])] fn idle(mut c: idle::Context) -> ! { loop { - match c - .resources - .mqtt_config - .lock(|config_interface| config_interface.update()) - { + match c.resources.mqtt.lock(|mqtt| mqtt.update()) { Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::UpdateSettings) => { c.spawn.settings_update().unwrap() @@ -169,9 +165,9 @@ const APP: () = { } } - #[task(priority = 1, resources=[mqtt_config, afes, settings])] + #[task(priority = 1, resources=[mqtt, afes, settings])] fn settings_update(mut c: settings_update::Context) { - let settings = &c.resources.mqtt_config.mqtt.settings; + let settings = c.resources.mqtt.settings(); // Update the IIR channels. c.resources.settings.lock(|current| *current = *settings); diff --git a/src/bin/lockin-internal.rs b/src/bin/lockin-internal.rs deleted file mode 100644 index ee9da2f..0000000 --- a/src/bin/lockin-internal.rs +++ /dev/null @@ -1,143 +0,0 @@ -#![deny(warnings)] -#![no_std] -#![no_main] - -use dsp::{Accu, Complex, ComplexExt, Lockin}; -use generic_array::typenum::U2; -use hardware::{Adc1Input, Dac0Output, Dac1Output, AFE0, AFE1}; -use stabilizer::{hardware, hardware::design_parameters}; - -// A constant sinusoid to send on the DAC output. -// Full-scale gives a +/- 10V amplitude waveform. Scale it down to give +/- 1V. -const ONE: i16 = (0.1 * u16::MAX as f32) as _; -const SQRT2: i16 = (ONE as f32 * 0.707) as _; -const DAC_SEQUENCE: [i16; design_parameters::SAMPLE_BUFFER_SIZE] = - [ONE, SQRT2, 0, -SQRT2, -ONE, -SQRT2, 0, SQRT2]; - -#[rtic::app(device = stm32h7xx_hal::stm32, peripherals = true, monotonic = rtic::cyccnt::CYCCNT)] -const APP: () = { - struct Resources { - afes: (AFE0, AFE1), - adc: Adc1Input, - dacs: (Dac0Output, Dac1Output), - - lockin: Lockin, - } - - #[init] - fn init(c: init::Context) -> init::LateResources { - // Configure the microcontroller - let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); - - // Enable ADC/DAC events - stabilizer.adcs.1.start(); - stabilizer.dacs.0.start(); - stabilizer.dacs.1.start(); - - // Start sampling ADCs. - stabilizer.adc_dac_timer.start(); - - init::LateResources { - lockin: Lockin::default(), - afes: stabilizer.afes, - adc: stabilizer.adcs.1, - dacs: stabilizer.dacs, - } - } - - /// Main DSP processing routine. - /// - /// See `dual-iir` for general notes on processing time and timing. - /// - /// This is an implementation of an internal-reference lockin on the ADC1 signal. - /// The reference at f_sample/8 is output on DAC0 and the phase of the demodulated - /// signal on DAC1. - #[task(binds=DMA1_STR4, resources=[adc, dacs, lockin], priority=2)] - fn process(c: process::Context) { - let lockin = c.resources.lockin; - let adc_samples = c.resources.adc.acquire_buffer(); - let dac_samples = [ - c.resources.dacs.0.acquire_buffer(), - c.resources.dacs.1.acquire_buffer(), - ]; - - // Reference phase and frequency are known. - let pll_phase = 0i32; - let pll_frequency = - 1i32 << (32 - design_parameters::SAMPLE_BUFFER_SIZE_LOG2); - - // Harmonic index of the LO: -1 to _de_modulate the fundamental (complex conjugate) - let harmonic: i32 = -1; - - // Demodulation LO phase offset - let phase_offset: i32 = 1 << 30; - - // Log2 lowpass time constant. - let time_constant: u8 = 8; - - let sample_frequency = (pll_frequency as i32).wrapping_mul(harmonic); - let sample_phase = - phase_offset.wrapping_add(pll_phase.wrapping_mul(harmonic)); - - let output: Complex = adc_samples - .iter() - // Zip in the LO phase. - .zip(Accu::new(sample_phase, sample_frequency)) - // Convert to signed, MSB align the ADC sample, update the Lockin (demodulate, filter) - .map(|(&sample, phase)| { - let s = (sample as i16 as i32) << 16; - lockin.update(s, phase, time_constant) - }) - // Decimate - .last() - .unwrap() - * 2; // Full scale assuming the 2f component is gone. - - // Convert to DAC data. - for (i, data) in DAC_SEQUENCE.iter().enumerate() { - // DAC0 always generates a fixed sinusoidal output. - dac_samples[0][i] = *data as u16 ^ 0x8000; - dac_samples[1][i] = (output.arg() >> 16) as u16 ^ 0x8000; - } - } - - #[idle(resources=[afes])] - fn idle(_: idle::Context) -> ! { - loop { - cortex_m::asm::wfi(); - } - } - - #[task(binds = ETH, priority = 1)] - fn eth(_: eth::Context) { - unsafe { stm32h7xx_hal::ethernet::interrupt_handler() } - } - - #[task(binds = SPI2, priority = 3)] - fn spi2(_: spi2::Context) { - panic!("ADC0 input overrun"); - } - - #[task(binds = SPI3, priority = 3)] - fn spi3(_: spi3::Context) { - panic!("ADC1 input overrun"); - } - - #[task(binds = SPI4, priority = 3)] - fn spi4(_: spi4::Context) { - panic!("DAC0 output error"); - } - - #[task(binds = SPI5, priority = 3)] - fn spi5(_: spi5::Context) { - panic!("DAC1 output error"); - } - - extern "C" { - // hw interrupt handlers for RTIC to use for scheduling tasks - // one per priority - fn DCMI(); - fn JPEG(); - fn SDMMC(); - } -}; diff --git a/src/bin/lockin-external.rs b/src/bin/lockin.rs similarity index 65% rename from src/bin/lockin-external.rs rename to src/bin/lockin.rs index 082b0d2..cf9529e 100644 --- a/src/bin/lockin-external.rs +++ b/src/bin/lockin.rs @@ -16,18 +16,36 @@ use stabilizer::hardware::{ }; use miniconf::Miniconf; -use stabilizer::net::{Action, MiniconfInterface}; +use stabilizer::net::{Action, MqttInterface}; + +// A constant sinusoid to send on the DAC output. +// Full-scale gives a +/- 10.24V amplitude waveform. Scale it down to give +/- 1V. +const ONE: i16 = ((1.0 / 10.24) * i16::MAX as f32) as _; +const SQRT2: i16 = (ONE as f32 * 0.707) as _; +const DAC_SEQUENCE: [i16; design_parameters::SAMPLE_BUFFER_SIZE] = + [ONE, SQRT2, 0, -SQRT2, -ONE, -SQRT2, 0, SQRT2]; #[derive(Copy, Clone, Debug, Deserialize, Miniconf)] enum Conf { - PowerPhase, - FrequencyDiscriminator, + Magnitude, + Phase, + ReferenceFrequency, + LogPower, + InPhase, Quadrature, + Modulation, +} + +#[derive(Copy, Clone, Debug, Miniconf, Deserialize, PartialEq)] +enum LockinMode { + Internal, + External, } #[derive(Copy, Clone, Debug, Deserialize, Miniconf)] pub struct Settings { afe: [AfeGain; 2], + lockin_mode: LockinMode, pll_tc: [u8; 2], @@ -43,13 +61,15 @@ impl Default for Settings { Self { afe: [AfeGain::G1; 2], + lockin_mode: LockinMode::External, + pll_tc: [21, 21], // frequency and phase settling time (log2 counter cycles) lockin_tc: 6, // lockin lowpass time constant lockin_harmonic: -1, // Harmonic index of the LO: -1 to _de_modulate the fundamental (complex conjugate) lockin_phase: 0, // Demodulation LO phase offset - output_conf: [Conf::Quadrature; 2], + output_conf: [Conf::InPhase, Conf::Quadrature], } } } @@ -60,7 +80,7 @@ const APP: () = { afes: (AFE0, AFE1), adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), - mqtt_config: MiniconfInterface, + mqtt: MqttInterface, settings: Settings, timestamper: InputStamper, @@ -73,7 +93,7 @@ const APP: () = { // Configure the microcontroller let (mut stabilizer, _pounder) = setup(c.core, c.device); - let mqtt_config = MiniconfInterface::new( + let mqtt = MqttInterface::new( stabilizer.net.stack, "", &net::get_device_prefix( @@ -113,7 +133,7 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, - mqtt_config, + mqtt, timestamper: stabilizer.timestamper, settings, @@ -137,7 +157,7 @@ const APP: () = { c.resources.adcs.1.acquire_buffer(), ]; - let dac_samples = [ + let mut dac_samples = [ c.resources.dacs.0.acquire_buffer(), c.resources.dacs.1.acquire_buffer(), ]; @@ -145,21 +165,37 @@ const APP: () = { let lockin = c.resources.lockin; let settings = c.resources.settings; - let timestamp = - c.resources.timestamper.latest_timestamp().unwrap_or(None); // Ignore data from timer capture overflows. - let (pll_phase, pll_frequency) = c.resources.pll.update( - timestamp.map(|t| t as i32), - settings.pll_tc[0], - settings.pll_tc[1], - ); + let (reference_phase, reference_frequency) = match settings.lockin_mode + { + LockinMode::External => { + let timestamp = + c.resources.timestamper.latest_timestamp().unwrap_or(None); // Ignore data from timer capture overflows. + let (pll_phase, pll_frequency) = c.resources.pll.update( + timestamp.map(|t| t as i32), + settings.pll_tc[0], + settings.pll_tc[1], + ); + ( + pll_phase, + (pll_frequency + >> design_parameters::SAMPLE_BUFFER_SIZE_LOG2) + as i32, + ) + } + LockinMode::Internal => { + // Reference phase and frequency are known. + ( + 1i32 << 30, + 1i32 << (32 - design_parameters::SAMPLE_BUFFER_SIZE_LOG2), + ) + } + }; - let sample_frequency = ((pll_frequency - >> design_parameters::SAMPLE_BUFFER_SIZE_LOG2) - as i32) - .wrapping_mul(settings.lockin_harmonic); - let sample_phase = settings - .lockin_phase - .wrapping_add(pll_phase.wrapping_mul(settings.lockin_harmonic)); + let sample_frequency = + reference_frequency.wrapping_mul(settings.lockin_harmonic); + let sample_phase = settings.lockin_phase.wrapping_add( + reference_phase.wrapping_mul(settings.lockin_harmonic), + ); let output: Complex = adc_samples[0] .iter() @@ -175,34 +211,30 @@ const APP: () = { .unwrap() * 2; // Full scale assuming the 2f component is gone. - let output = [ - match settings.output_conf[0] { - Conf::PowerPhase => output.abs_sqr() as _, - Conf::FrequencyDiscriminator => (output.log2() << 24) as _, - Conf::Quadrature => output.re, - }, - match settings.output_conf[1] { - Conf::PowerPhase => output.arg(), - Conf::FrequencyDiscriminator => pll_frequency as _, - Conf::Quadrature => output.im, - }, - ]; - // Convert to DAC data. - for i in 0..dac_samples[0].len() { - dac_samples[0][i] = (output[0] >> 16) as u16 ^ 0x8000; - dac_samples[1][i] = (output[1] >> 16) as u16 ^ 0x8000; + for (channel, samples) in dac_samples.iter_mut().enumerate() { + for (i, sample) in samples.iter_mut().enumerate() { + let value = match settings.output_conf[channel] { + Conf::Magnitude => output.abs_sqr() as i32 >> 16, + Conf::Phase => output.arg() >> 16, + Conf::LogPower => (output.log2() << 24) as i32 >> 16, + Conf::ReferenceFrequency => { + reference_frequency as i32 >> 16 + } + Conf::InPhase => output.re >> 16, + Conf::Quadrature => output.im >> 16, + Conf::Modulation => DAC_SEQUENCE[i] as i32, + }; + + *sample = value as u16 ^ 0x8000; + } } } - #[idle(resources=[mqtt_config], spawn=[settings_update])] + #[idle(resources=[mqtt], spawn=[settings_update])] fn idle(mut c: idle::Context) -> ! { loop { - match c - .resources - .mqtt_config - .lock(|config_interface| config_interface.update()) - { + match c.resources.mqtt.lock(|mqtt| mqtt.update()) { Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::UpdateSettings) => { c.spawn.settings_update().unwrap() @@ -212,9 +244,9 @@ const APP: () = { } } - #[task(priority = 1, resources=[mqtt_config, settings, afes])] + #[task(priority = 1, resources=[mqtt, settings, afes])] fn settings_update(mut c: settings_update::Context) { - let settings = &c.resources.mqtt_config.mqtt.settings; + let settings = c.resources.mqtt.settings(); c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.1.set_gain(settings.afe[1]); diff --git a/src/net/messages.rs b/src/net/messages.rs new file mode 100644 index 0000000..167e440 --- /dev/null +++ b/src/net/messages.rs @@ -0,0 +1,91 @@ +use heapless::{consts, String, Vec}; +use serde::Serialize; + +use core::fmt::Write; + +#[derive(Debug, Copy, Clone)] +pub enum SettingsResponseCode { + NoError = 0, + MiniconfError = 1, +} + +/// Represents a generic MQTT message. +pub struct MqttMessage<'a> { + pub topic: &'a str, + pub message: Vec, + pub properties: Vec, consts::U1>, +} + +/// The payload of the MQTT response message to a settings update request. +#[derive(Serialize)] +pub struct SettingsResponse { + code: u8, + msg: String, +} + +impl<'a> MqttMessage<'a> { + /// Construct a new MQTT message from an incoming message. + /// + /// # Args + /// * `properties` - A list of properties associated with the inbound message. + /// * `default_response` - The default response topic for the message + /// * `msg` - The response associated with the message. Must fit within 128 bytes. + pub fn new<'b: 'a>( + properties: &[minimq::Property<'a>], + default_response: &'b str, + msg: &impl Serialize, + ) -> Self { + // Extract the MQTT response topic. + let topic = properties + .iter() + .find_map(|prop| { + if let minimq::Property::ResponseTopic(topic) = prop { + Some(topic) + } else { + None + } + }) + .unwrap_or(&default_response); + + // Associate any provided correlation data with the response. + let mut correlation_data: Vec, consts::U1> = + Vec::new(); + if let Some(data) = properties + .iter() + .find(|prop| matches!(prop, minimq::Property::CorrelationData(_))) + { + // Note(unwrap): Unwrap can not fail, as we only ever push one value. + correlation_data.push(*data).unwrap(); + } + + Self { + topic, + // Note(unwrap): All SettingsResponse objects are guaranteed to fit in the vector. + message: miniconf::serde_json_core::to_vec(msg).unwrap(), + properties: correlation_data, + } + } +} + +impl From> for SettingsResponse { + fn from(result: Result<(), miniconf::Error>) -> Self { + match result { + Ok(_) => Self { + msg: String::from("OK"), + code: SettingsResponseCode::NoError as u8, + }, + + Err(error) => { + let mut msg = String::new(); + if write!(&mut msg, "{:?}", error).is_err() { + msg = String::from("Miniconf Error"); + } + + Self { + code: SettingsResponseCode::MiniconfError as u8, + msg, + } + } + } + } +} diff --git a/src/net/mod.rs b/src/net/mod.rs index 13b514c..75f92d6 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,11 +1,18 @@ -use crate::hardware::{ - design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, -}; +///! Stabilizer network management module +///! +///! # Design +///! The stabilizer network architecture supports numerous layers to permit transmission of +///! telemetry (via MQTT), configuration of run-time settings (via MQTT + Miniconf), and live data +///! streaming over raw UDP/TCP sockets. This module encompasses the main processing routines +///! related to Stabilizer networking operations. +use heapless::{consts, String}; use core::fmt::Write; -use heapless::{consts, String}; -use miniconf::minimq; +mod messages; +mod mqtt_interface; +use messages::{MqttMessage, SettingsResponse}; +pub use mqtt_interface::MqttInterface; /// Potential actions for firmware to take. pub enum Action { @@ -16,101 +23,6 @@ pub enum Action { UpdateSettings, } -/// MQTT settings interface. -pub struct MiniconfInterface -where - S: miniconf::Miniconf + Default, -{ - pub mqtt: miniconf::MqttInterface, - clock: CycleCounter, - phy: EthernetPhy, - network_was_reset: bool, -} - -impl MiniconfInterface -where - S: miniconf::Miniconf + Default, -{ - /// Construct a new MQTT settings interface. - /// - /// # Args - /// * `stack` - The network stack to use for communication. - /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. - /// * `prefix` - The MQTT device prefix to use for this device. - /// * `phy` - The PHY driver for querying the link state. - /// * `clock` - The clock to utilize for querying the current system time. - pub fn new( - stack: NetworkStack, - client_id: &str, - prefix: &str, - phy: EthernetPhy, - clock: CycleCounter, - ) -> Self { - let mqtt = { - let mqtt_client = { - minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) - .unwrap() - }; - - miniconf::MqttInterface::new(mqtt_client, prefix, S::default()) - .unwrap() - }; - - Self { - mqtt, - clock, - phy, - network_was_reset: false, - } - } - - /// Update the MQTT interface and service the network - /// - /// # Returns - /// An option containing an action that should be completed as a result of network servicing. - pub fn update(&mut self) -> Option { - let now = self.clock.current_ms(); - - // First, service the network stack to process and inbound and outbound traffic. - let sleep = match self.mqtt.network_stack().poll(now) { - Ok(updated) => !updated, - Err(err) => { - log::info!("Network error: {:?}", err); - false - } - }; - - // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network - // stack. - if self.phy.poll_link() == false { - // Only reset the network stack once per link reconnection. This prevents us from - // sending an excessive number of DHCP requests. - if !self.network_was_reset { - self.network_was_reset = true; - self.mqtt.network_stack().handle_link_reset(); - } - } else { - self.network_was_reset = false; - } - - // Finally, service the MQTT interface and handle any necessary messages. - match self.mqtt.update() { - Ok(true) => Some(Action::UpdateSettings), - Ok(false) if sleep => Some(Action::Sleep), - Ok(_) => None, - - Err(miniconf::MqttError::Network( - smoltcp_nal::NetworkError::NoIpAddress, - )) => None, - - Err(error) => { - log::info!("Unexpected error: {:?}", error); - None - } - } - } -} - /// Get the MQTT prefix of a device. /// /// # Args diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs new file mode 100644 index 0000000..2a55b90 --- /dev/null +++ b/src/net/mqtt_interface.rs @@ -0,0 +1,197 @@ +use crate::hardware::{ + design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, +}; + +use heapless::{consts, String}; + +use super::{Action, MqttMessage, SettingsResponse}; + +/// MQTT settings interface. +pub struct MqttInterface +where + S: miniconf::Miniconf + Default + Clone, +{ + default_response_topic: String, + mqtt: minimq::MqttClient, + settings: S, + clock: CycleCounter, + phy: EthernetPhy, + network_was_reset: bool, + subscribed: bool, + settings_prefix: String, +} + +impl MqttInterface +where + S: miniconf::Miniconf + Default + Clone, +{ + /// Construct a new MQTT settings interface. + /// + /// # Args + /// * `stack` - The network stack to use for communication. + /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. + /// * `prefix` - The MQTT device prefix to use for this device. + /// * `phy` - The PHY driver for querying the link state. + /// * `clock` - The clock to utilize for querying the current system time. + pub fn new( + stack: NetworkStack, + client_id: &str, + prefix: &str, + phy: EthernetPhy, + clock: CycleCounter, + ) -> Self { + let mqtt = + minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) + .unwrap(); + + let mut response_topic: String = String::from(prefix); + response_topic.push_str("/log").unwrap(); + + let mut settings_prefix: String = String::from(prefix); + settings_prefix.push_str("/settings").unwrap(); + + Self { + mqtt, + settings: S::default(), + settings_prefix, + clock, + phy, + default_response_topic: response_topic, + network_was_reset: false, + subscribed: false, + } + } + + /// Update the MQTT interface and service the network + /// + /// # Returns + /// An option containing an action that should be completed as a result of network servicing. + pub fn update(&mut self) -> Option { + // First, service the network stack to process any inbound and outbound traffic. + let sleep = match self.mqtt.network_stack.poll(self.clock.current_ms()) + { + Ok(updated) => !updated, + Err(err) => { + log::info!("Network error: {:?}", err); + false + } + }; + + // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network + // stack. + match self.phy.poll_link() { + true => self.network_was_reset = false, + + // Only reset the network stack once per link reconnection. This prevents us from + // sending an excessive number of DHCP requests. + false if !self.network_was_reset => { + self.network_was_reset = true; + self.mqtt.network_stack.handle_link_reset(); + } + _ => {} + }; + + let mqtt_connected = match self.mqtt.is_connected() { + Ok(connected) => connected, + Err(minimq::Error::Network( + smoltcp_nal::NetworkError::NoIpAddress, + )) => false, + Err(minimq::Error::Network(error)) => { + log::info!("Unexpected network error: {:?}", error); + false + } + Err(error) => { + log::warn!("Unexpected MQTT error: {:?}", error); + false + } + }; + + // If we're no longer subscribed to the settings topic, but we are connected to the broker, + // resubscribe. + if !self.subscribed && mqtt_connected { + // Note(unwrap): We construct a string with two more characters than the prefix + // strucutre, so we are guaranteed to have space for storage. + let mut settings_topic: String = + String::from(self.settings_prefix.as_str()); + settings_topic.push_str("/#").unwrap(); + + // We do not currently handle or process potential subscription failures. Instead, this + // failure will be logged through the stabilizer logging interface. + self.mqtt.subscribe(&settings_topic, &[]).unwrap(); + self.subscribed = true; + } + + // Handle any MQTT traffic. + let settings = &mut self.settings; + let mqtt = &mut self.mqtt; + let prefix = self.settings_prefix.as_str(); + let default_response_topic = self.default_response_topic.as_str(); + + let mut update = false; + match mqtt.poll(|client, topic, message, properties| { + let path = match topic.strip_prefix(prefix) { + // For paths, we do not want to include the leading slash. + Some(path) => { + if path.len() > 0 { + &path[1..] + } else { + path + } + } + None => { + info!("Unexpected MQTT topic: {}", topic); + return; + } + }; + + let message: SettingsResponse = settings + .string_set(path.split('/').peekable(), message) + .and_then(|_| { + update = true; + Ok(()) + }) + .into(); + + let response = + MqttMessage::new(properties, default_response_topic, &message); + + client + .publish( + response.topic, + &response.message, + // TODO: When Minimq supports more QoS levels, this should be increased to + // ensure that the client has received it at least once. + minimq::QoS::AtMostOnce, + &response.properties, + ) + .ok(); + }) { + // If settings updated, + Ok(_) => { + if update { + Some(Action::UpdateSettings) + } else if sleep { + Some(Action::Sleep) + } else { + None + } + } + Err(minimq::Error::Disconnected) => { + self.subscribed = false; + None + } + Err(minimq::Error::Network( + smoltcp_nal::NetworkError::NoIpAddress, + )) => None, + + Err(error) => { + log::info!("Unexpected error: {:?}", error); + None + } + } + } + + pub fn settings(&self) -> &S { + &self.settings + } +}