Merge branch 'master' into rj/itcm

This commit is contained in:
Robert Jördens 2021-05-10 12:09:01 +02:00 committed by GitHub
commit bd491cf584
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 453 additions and 350 deletions

View File

@ -25,8 +25,7 @@ jobs:
- run: > - run: >
zip bin.zip zip bin.zip
target/*/release/dual-iir target/*/release/dual-iir
target/*/release/lockin-external target/*/release/lockin
target/*/release/lockin-internal
- id: create_release - id: create_release
uses: actions/create-release@v1 uses: actions/create-release@v1
env: env:

23
Cargo.lock generated
View File

@ -206,7 +206,7 @@ dependencies = [
[[package]] [[package]]
name = "derive_miniconf" name = "derive_miniconf"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" source = "git+https://github.com/quartiq/miniconf.git?rev=c6f2b28#c6f2b28f735e27b337eaa986846536e904c6f2bd"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -419,11 +419,9 @@ dependencies = [
[[package]] [[package]]
name = "miniconf" name = "miniconf"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/quartiq/miniconf.git?rev=314fa5587d#314fa5587d1aa28e1ad70106f19e30db646e9f28" source = "git+https://github.com/quartiq/miniconf.git?rev=c6f2b28#c6f2b28f735e27b337eaa986846536e904c6f2bd"
dependencies = [ dependencies = [
"derive_miniconf", "derive_miniconf",
"heapless 0.6.1",
"minimq",
"serde", "serde",
"serde-json-core", "serde-json-core",
] ]
@ -431,7 +429,7 @@ dependencies = [
[[package]] [[package]]
name = "minimq" name = "minimq"
version = "0.2.0" version = "0.2.0"
source = "git+https://github.com/quartiq/minimq.git?rev=933687c2e4b#933687c2e4bc8a4d972de9a4d1508b0b554a8b38" source = "git+https://github.com/quartiq/minimq.git?rev=b3f364d#b3f364d55dea35da6572f78ddb91c87bfbb453bf"
dependencies = [ dependencies = [
"bit_field", "bit_field",
"embedded-nal", "embedded-nal",
@ -655,6 +653,12 @@ dependencies = [
"semver", "semver",
] ]
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
[[package]] [[package]]
name = "semver" name = "semver"
version = "0.9.0" version = "0.9.0"
@ -681,10 +685,12 @@ dependencies = [
[[package]] [[package]]
name = "serde-json-core" name = "serde-json-core"
version = "0.2.0" version = "0.3.0"
source = "git+https://github.com/rust-embedded-community/serde-json-core.git?rev=ee06ac91bc#ee06ac91bc43b72450a92198a00d9e5c5b9946d2" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39af17f40c2a28d2c9a7918663ddc8a10f54cc6f109ead5c3f010869761df186"
dependencies = [ dependencies = [
"heapless 0.5.6", "heapless 0.6.1",
"ryu",
"serde", "serde",
] ]
@ -740,6 +746,7 @@ dependencies = [
"log", "log",
"mcp23017", "mcp23017",
"miniconf", "miniconf",
"minimq",
"nb 1.0.0", "nb 1.0.0",
"panic-semihosting", "panic-semihosting",
"paste", "paste",

View File

@ -63,19 +63,15 @@ rev = "a2e3ad5"
[patch.crates-io.miniconf] [patch.crates-io.miniconf]
git = "https://github.com/quartiq/miniconf.git" git = "https://github.com/quartiq/miniconf.git"
rev = "314fa5587d" rev = "c6f2b28"
[dependencies.smoltcp-nal] [dependencies.smoltcp-nal]
git = "https://github.com/quartiq/smoltcp-nal.git" git = "https://github.com/quartiq/smoltcp-nal.git"
rev = "8468f11" rev = "8468f11"
[patch.crates-io.minimq] [dependencies.minimq]
git = "https://github.com/quartiq/minimq.git" git = "https://github.com/quartiq/minimq.git"
rev = "933687c2e4b" rev = "b3f364d"
[patch.crates-io.serde-json-core]
git = "https://github.com/rust-embedded-community/serde-json-core.git"
rev = "ee06ac91bc"
[features] [features]
semihosting = ["panic-semihosting", "cortex-m-log/semihosting"] semihosting = ["panic-semihosting", "cortex-m-log/semihosting"]

View File

@ -29,9 +29,7 @@ to implement different use cases. Several applications are provides by default
* anti-windup * anti-windup
* derivative kick avoidance * derivative kick avoidance
### Lockin external ### Lockin
### Lockin internal
## Minimal bootstrapping documentation ## Minimal bootstrapping documentation

View File

@ -9,11 +9,12 @@ import argparse
import asyncio import asyncio
import json import json
import logging import logging
import sys
import uuid
from gmqtt import Client as MqttClient from gmqtt import Client as MqttClient
logger = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class Miniconf: class Miniconf:
"""An asynchronous API for controlling Miniconf devices using MQTT.""" """An asynchronous API for controlling Miniconf devices using MQTT."""
@ -32,27 +33,33 @@ class Miniconf:
client: A connected MQTT5 client. client: A connected MQTT5 client.
prefix: The MQTT toptic prefix of the device to control. prefix: The MQTT toptic prefix of the device to control.
""" """
self.uuid = uuid.uuid1()
self.request_id = 0
self.client = client self.client = client
self.prefix = prefix self.prefix = prefix
self.inflight = {} self.inflight = {}
self.client.on_message = self._handle_response self.client.on_message = self._handle_response
self.client.subscribe(f'{prefix}/response/#') self.client.subscribe(f'{prefix}/response/{self.uuid.hex}')
def _handle_response(self, _client, topic, payload, *_args, **_kwargs): def _handle_response(self, _client, _topic, payload, _qos, properties):
"""Callback function for when messages are received over MQTT. """Callback function for when messages are received over MQTT.
Args: Args:
_client: The MQTT client. _client: The MQTT client.
topic: The topic that the message was received on. _topic: The topic that the message was received on.
payload: The payload of the message. payload: The payload of the message.
_qos: The quality-of-service level of the received packet
properties: A dictionary of properties associated with the message.
""" """
if topic not in self.inflight: # Extract corrleation data from the properties
# TODO use correlation_data to distinguish clients and requests correlation_data = json.loads(properties['correlation_data'][0].decode('ascii'))
logger.warning('Unexpected response on topic: %s', topic)
return # Get the request ID from the correlation data
request_id = correlation_data['request_id']
self.inflight[request_id].set_result(json.loads(payload))
del self.inflight[request_id]
self.inflight[topic].set_result(payload.decode('ascii'))
del self.inflight[topic]
async def command(self, path, value): async def command(self, path, value):
"""Write the provided data to the specified path. """Write the provided data to the specified path.
@ -62,29 +69,37 @@ class Miniconf:
value: The value to write to the path. value: The value to write to the path.
Returns: Returns:
The received response to the command. The response to the command as a dictionary.
""" """
setting_topic = f'{self.prefix}/settings/{path}' setting_topic = f'{self.prefix}/settings/{path}'
response_topic = f'{self.prefix}/response/{path}' response_topic = f'{self.prefix}/response/{self.uuid.hex}'
if response_topic in self.inflight:
# TODO use correlation_data to distinguish clients and requests # Assign a unique identifier to this update request.
raise NotImplementedError( request_id = self.request_id
'Only one in-flight message per topic is supported') self.request_id += 1
assert request_id not in self.inflight, 'Invalid ID encountered'
correlation_data = json.dumps({
'request_id': request_id,
}).encode('ascii')
value = json.dumps(value) value = json.dumps(value)
logger.info('Sending %s to "%s"', value, setting_topic) LOGGER.info('Sending %s to "%s"', value, setting_topic)
fut = asyncio.get_running_loop().create_future() fut = asyncio.get_running_loop().create_future()
self.inflight[response_topic] = fut
self.inflight[request_id] = fut
self.client.publish(setting_topic, payload=value, qos=0, retain=True, self.client.publish(setting_topic, payload=value, qos=0, retain=True,
response_topic=response_topic) response_topic=response_topic,
correlation_data=correlation_data)
return await fut return await fut
def main(): def main():
""" Main program entry point. """
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description='Miniconf command line interface.', description='Miniconf command line interface.',
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''Examples: epilog='''Examples:
%(prog)s dt/sinara/stabilizer afe/0='"G2"' iir_ch/0/0=\ %(prog)s dt/sinara/stabilizer afe/0='"G2"' iir_ch/0/0=\
'{"y_min": -32767, "y_max": 32767, "y_offset": 0, "ba": [1.0, 0, 0, 0, 0]}' '{"y_min": -32767, "y_max": 32767, "y_offset": 0, "ba": [1.0, 0, 0, 0, 0]}'
''') ''')
@ -100,19 +115,22 @@ def main():
args = parser.parse_args() args = parser.parse_args()
logging.basicConfig( logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(name)s: %(message)s', format='%(asctime)s [%(levelname)s] %(name)s: %(message)s',
level=logging.WARN - 10*args.verbose) level=logging.WARN - 10*args.verbose)
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
async def configure_settings(): async def configure_settings():
interface = await Miniconf.create(args.prefix, args.broker) interface = await Miniconf.create(args.prefix, args.broker)
for kv in args.settings: for key_value in args.settings:
path, value = kv.split("=", 1) path, value = key_value.split("=", 1)
response = await interface.command(path, json.loads(value)) response = await interface.command(path, json.loads(value))
print(response) print(f'{path}: {response}')
if response['code'] != 0:
return response['code']
return 0
loop.run_until_complete(configure_settings()) sys.exit(loop.run_until_complete(configure_settings()))
if __name__ == '__main__': if __name__ == '__main__':

View File

@ -13,7 +13,7 @@ use hardware::{
InputPin, AFE0, AFE1, InputPin, AFE0, AFE1,
}; };
use net::{Action, MiniconfInterface}; use net::{Action, MqttInterface};
const SCALE: f32 = i16::MAX as _; const SCALE: f32 = i16::MAX as _;
@ -54,7 +54,7 @@ const APP: () = {
digital_input1: DigitalInput1, digital_input1: DigitalInput1,
adcs: (Adc0Input, Adc1Input), adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output), dacs: (Dac0Output, Dac1Output),
mqtt_config: MiniconfInterface<Settings>, mqtt: MqttInterface<Settings>,
#[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])] #[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])]
iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2], iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2],
@ -66,7 +66,7 @@ const APP: () = {
// Configure the microcontroller // Configure the microcontroller
let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device);
let mqtt_config = MiniconfInterface::new( let mqtt = MqttInterface::new(
stabilizer.net.stack, stabilizer.net.stack,
"", "",
&net::get_device_prefix( &net::get_device_prefix(
@ -93,7 +93,7 @@ const APP: () = {
afes: stabilizer.afes, afes: stabilizer.afes,
adcs: stabilizer.adcs, adcs: stabilizer.adcs,
dacs: stabilizer.dacs, dacs: stabilizer.dacs,
mqtt_config, mqtt,
digital_input1: stabilizer.digital_inputs.1, digital_input1: stabilizer.digital_inputs.1,
settings: Settings::default(), settings: Settings::default(),
} }
@ -152,14 +152,10 @@ const APP: () = {
} }
} }
#[idle(resources=[mqtt_config], spawn=[settings_update])] #[idle(resources=[mqtt], spawn=[settings_update])]
fn idle(mut c: idle::Context) -> ! { fn idle(mut c: idle::Context) -> ! {
loop { loop {
match c match c.resources.mqtt.lock(|mqtt| mqtt.update()) {
.resources
.mqtt_config
.lock(|config_interface| config_interface.update())
{
Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::Sleep) => cortex_m::asm::wfi(),
Some(Action::UpdateSettings) => { Some(Action::UpdateSettings) => {
c.spawn.settings_update().unwrap() c.spawn.settings_update().unwrap()
@ -169,9 +165,9 @@ const APP: () = {
} }
} }
#[task(priority = 1, resources=[mqtt_config, afes, settings])] #[task(priority = 1, resources=[mqtt, afes, settings])]
fn settings_update(mut c: settings_update::Context) { fn settings_update(mut c: settings_update::Context) {
let settings = &c.resources.mqtt_config.mqtt.settings; let settings = c.resources.mqtt.settings();
// Update the IIR channels. // Update the IIR channels.
c.resources.settings.lock(|current| *current = *settings); c.resources.settings.lock(|current| *current = *settings);

View File

@ -1,143 +0,0 @@
#![deny(warnings)]
#![no_std]
#![no_main]
use dsp::{Accu, Complex, ComplexExt, Lockin};
use generic_array::typenum::U2;
use hardware::{Adc1Input, Dac0Output, Dac1Output, AFE0, AFE1};
use stabilizer::{hardware, hardware::design_parameters};
// A constant sinusoid to send on the DAC output.
// Full-scale gives a +/- 10V amplitude waveform. Scale it down to give +/- 1V.
const ONE: i16 = (0.1 * u16::MAX as f32) as _;
const SQRT2: i16 = (ONE as f32 * 0.707) as _;
const DAC_SEQUENCE: [i16; design_parameters::SAMPLE_BUFFER_SIZE] =
[ONE, SQRT2, 0, -SQRT2, -ONE, -SQRT2, 0, SQRT2];
#[rtic::app(device = stm32h7xx_hal::stm32, peripherals = true, monotonic = rtic::cyccnt::CYCCNT)]
const APP: () = {
struct Resources {
afes: (AFE0, AFE1),
adc: Adc1Input,
dacs: (Dac0Output, Dac1Output),
lockin: Lockin<U2>,
}
#[init]
fn init(c: init::Context) -> init::LateResources {
// Configure the microcontroller
let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device);
// Enable ADC/DAC events
stabilizer.adcs.1.start();
stabilizer.dacs.0.start();
stabilizer.dacs.1.start();
// Start sampling ADCs.
stabilizer.adc_dac_timer.start();
init::LateResources {
lockin: Lockin::default(),
afes: stabilizer.afes,
adc: stabilizer.adcs.1,
dacs: stabilizer.dacs,
}
}
/// Main DSP processing routine.
///
/// See `dual-iir` for general notes on processing time and timing.
///
/// This is an implementation of an internal-reference lockin on the ADC1 signal.
/// The reference at f_sample/8 is output on DAC0 and the phase of the demodulated
/// signal on DAC1.
#[task(binds=DMA1_STR4, resources=[adc, dacs, lockin], priority=2)]
fn process(c: process::Context) {
let lockin = c.resources.lockin;
let adc_samples = c.resources.adc.acquire_buffer();
let dac_samples = [
c.resources.dacs.0.acquire_buffer(),
c.resources.dacs.1.acquire_buffer(),
];
// Reference phase and frequency are known.
let pll_phase = 0i32;
let pll_frequency =
1i32 << (32 - design_parameters::SAMPLE_BUFFER_SIZE_LOG2);
// Harmonic index of the LO: -1 to _de_modulate the fundamental (complex conjugate)
let harmonic: i32 = -1;
// Demodulation LO phase offset
let phase_offset: i32 = 1 << 30;
// Log2 lowpass time constant.
let time_constant: u8 = 8;
let sample_frequency = (pll_frequency as i32).wrapping_mul(harmonic);
let sample_phase =
phase_offset.wrapping_add(pll_phase.wrapping_mul(harmonic));
let output: Complex<i32> = adc_samples
.iter()
// Zip in the LO phase.
.zip(Accu::new(sample_phase, sample_frequency))
// Convert to signed, MSB align the ADC sample, update the Lockin (demodulate, filter)
.map(|(&sample, phase)| {
let s = (sample as i16 as i32) << 16;
lockin.update(s, phase, time_constant)
})
// Decimate
.last()
.unwrap()
* 2; // Full scale assuming the 2f component is gone.
// Convert to DAC data.
for (i, data) in DAC_SEQUENCE.iter().enumerate() {
// DAC0 always generates a fixed sinusoidal output.
dac_samples[0][i] = *data as u16 ^ 0x8000;
dac_samples[1][i] = (output.arg() >> 16) as u16 ^ 0x8000;
}
}
#[idle(resources=[afes])]
fn idle(_: idle::Context) -> ! {
loop {
cortex_m::asm::wfi();
}
}
#[task(binds = ETH, priority = 1)]
fn eth(_: eth::Context) {
unsafe { stm32h7xx_hal::ethernet::interrupt_handler() }
}
#[task(binds = SPI2, priority = 3)]
fn spi2(_: spi2::Context) {
panic!("ADC0 input overrun");
}
#[task(binds = SPI3, priority = 3)]
fn spi3(_: spi3::Context) {
panic!("ADC1 input overrun");
}
#[task(binds = SPI4, priority = 3)]
fn spi4(_: spi4::Context) {
panic!("DAC0 output error");
}
#[task(binds = SPI5, priority = 3)]
fn spi5(_: spi5::Context) {
panic!("DAC1 output error");
}
extern "C" {
// hw interrupt handlers for RTIC to use for scheduling tasks
// one per priority
fn DCMI();
fn JPEG();
fn SDMMC();
}
};

View File

@ -16,18 +16,36 @@ use stabilizer::hardware::{
}; };
use miniconf::Miniconf; use miniconf::Miniconf;
use stabilizer::net::{Action, MiniconfInterface}; use stabilizer::net::{Action, MqttInterface};
// A constant sinusoid to send on the DAC output.
// Full-scale gives a +/- 10.24V amplitude waveform. Scale it down to give +/- 1V.
const ONE: i16 = ((1.0 / 10.24) * i16::MAX as f32) as _;
const SQRT2: i16 = (ONE as f32 * 0.707) as _;
const DAC_SEQUENCE: [i16; design_parameters::SAMPLE_BUFFER_SIZE] =
[ONE, SQRT2, 0, -SQRT2, -ONE, -SQRT2, 0, SQRT2];
#[derive(Copy, Clone, Debug, Deserialize, Miniconf)] #[derive(Copy, Clone, Debug, Deserialize, Miniconf)]
enum Conf { enum Conf {
PowerPhase, Magnitude,
FrequencyDiscriminator, Phase,
ReferenceFrequency,
LogPower,
InPhase,
Quadrature, Quadrature,
Modulation,
}
#[derive(Copy, Clone, Debug, Miniconf, Deserialize, PartialEq)]
enum LockinMode {
Internal,
External,
} }
#[derive(Copy, Clone, Debug, Deserialize, Miniconf)] #[derive(Copy, Clone, Debug, Deserialize, Miniconf)]
pub struct Settings { pub struct Settings {
afe: [AfeGain; 2], afe: [AfeGain; 2],
lockin_mode: LockinMode,
pll_tc: [u8; 2], pll_tc: [u8; 2],
@ -43,13 +61,15 @@ impl Default for Settings {
Self { Self {
afe: [AfeGain::G1; 2], afe: [AfeGain::G1; 2],
lockin_mode: LockinMode::External,
pll_tc: [21, 21], // frequency and phase settling time (log2 counter cycles) pll_tc: [21, 21], // frequency and phase settling time (log2 counter cycles)
lockin_tc: 6, // lockin lowpass time constant lockin_tc: 6, // lockin lowpass time constant
lockin_harmonic: -1, // Harmonic index of the LO: -1 to _de_modulate the fundamental (complex conjugate) lockin_harmonic: -1, // Harmonic index of the LO: -1 to _de_modulate the fundamental (complex conjugate)
lockin_phase: 0, // Demodulation LO phase offset lockin_phase: 0, // Demodulation LO phase offset
output_conf: [Conf::Quadrature; 2], output_conf: [Conf::InPhase, Conf::Quadrature],
} }
} }
} }
@ -60,7 +80,7 @@ const APP: () = {
afes: (AFE0, AFE1), afes: (AFE0, AFE1),
adcs: (Adc0Input, Adc1Input), adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output), dacs: (Dac0Output, Dac1Output),
mqtt_config: MiniconfInterface<Settings>, mqtt: MqttInterface<Settings>,
settings: Settings, settings: Settings,
timestamper: InputStamper, timestamper: InputStamper,
@ -73,7 +93,7 @@ const APP: () = {
// Configure the microcontroller // Configure the microcontroller
let (mut stabilizer, _pounder) = setup(c.core, c.device); let (mut stabilizer, _pounder) = setup(c.core, c.device);
let mqtt_config = MiniconfInterface::new( let mqtt = MqttInterface::new(
stabilizer.net.stack, stabilizer.net.stack,
"", "",
&net::get_device_prefix( &net::get_device_prefix(
@ -113,7 +133,7 @@ const APP: () = {
afes: stabilizer.afes, afes: stabilizer.afes,
adcs: stabilizer.adcs, adcs: stabilizer.adcs,
dacs: stabilizer.dacs, dacs: stabilizer.dacs,
mqtt_config, mqtt,
timestamper: stabilizer.timestamper, timestamper: stabilizer.timestamper,
settings, settings,
@ -137,7 +157,7 @@ const APP: () = {
c.resources.adcs.1.acquire_buffer(), c.resources.adcs.1.acquire_buffer(),
]; ];
let dac_samples = [ let mut dac_samples = [
c.resources.dacs.0.acquire_buffer(), c.resources.dacs.0.acquire_buffer(),
c.resources.dacs.1.acquire_buffer(), c.resources.dacs.1.acquire_buffer(),
]; ];
@ -145,21 +165,37 @@ const APP: () = {
let lockin = c.resources.lockin; let lockin = c.resources.lockin;
let settings = c.resources.settings; let settings = c.resources.settings;
let timestamp = let (reference_phase, reference_frequency) = match settings.lockin_mode
c.resources.timestamper.latest_timestamp().unwrap_or(None); // Ignore data from timer capture overflows. {
let (pll_phase, pll_frequency) = c.resources.pll.update( LockinMode::External => {
timestamp.map(|t| t as i32), let timestamp =
settings.pll_tc[0], c.resources.timestamper.latest_timestamp().unwrap_or(None); // Ignore data from timer capture overflows.
settings.pll_tc[1], let (pll_phase, pll_frequency) = c.resources.pll.update(
); timestamp.map(|t| t as i32),
settings.pll_tc[0],
settings.pll_tc[1],
);
(
pll_phase,
(pll_frequency
>> design_parameters::SAMPLE_BUFFER_SIZE_LOG2)
as i32,
)
}
LockinMode::Internal => {
// Reference phase and frequency are known.
(
1i32 << 30,
1i32 << (32 - design_parameters::SAMPLE_BUFFER_SIZE_LOG2),
)
}
};
let sample_frequency = ((pll_frequency let sample_frequency =
>> design_parameters::SAMPLE_BUFFER_SIZE_LOG2) reference_frequency.wrapping_mul(settings.lockin_harmonic);
as i32) let sample_phase = settings.lockin_phase.wrapping_add(
.wrapping_mul(settings.lockin_harmonic); reference_phase.wrapping_mul(settings.lockin_harmonic),
let sample_phase = settings );
.lockin_phase
.wrapping_add(pll_phase.wrapping_mul(settings.lockin_harmonic));
let output: Complex<i32> = adc_samples[0] let output: Complex<i32> = adc_samples[0]
.iter() .iter()
@ -175,34 +211,30 @@ const APP: () = {
.unwrap() .unwrap()
* 2; // Full scale assuming the 2f component is gone. * 2; // Full scale assuming the 2f component is gone.
let output = [
match settings.output_conf[0] {
Conf::PowerPhase => output.abs_sqr() as _,
Conf::FrequencyDiscriminator => (output.log2() << 24) as _,
Conf::Quadrature => output.re,
},
match settings.output_conf[1] {
Conf::PowerPhase => output.arg(),
Conf::FrequencyDiscriminator => pll_frequency as _,
Conf::Quadrature => output.im,
},
];
// Convert to DAC data. // Convert to DAC data.
for i in 0..dac_samples[0].len() { for (channel, samples) in dac_samples.iter_mut().enumerate() {
dac_samples[0][i] = (output[0] >> 16) as u16 ^ 0x8000; for (i, sample) in samples.iter_mut().enumerate() {
dac_samples[1][i] = (output[1] >> 16) as u16 ^ 0x8000; let value = match settings.output_conf[channel] {
Conf::Magnitude => output.abs_sqr() as i32 >> 16,
Conf::Phase => output.arg() >> 16,
Conf::LogPower => (output.log2() << 24) as i32 >> 16,
Conf::ReferenceFrequency => {
reference_frequency as i32 >> 16
}
Conf::InPhase => output.re >> 16,
Conf::Quadrature => output.im >> 16,
Conf::Modulation => DAC_SEQUENCE[i] as i32,
};
*sample = value as u16 ^ 0x8000;
}
} }
} }
#[idle(resources=[mqtt_config], spawn=[settings_update])] #[idle(resources=[mqtt], spawn=[settings_update])]
fn idle(mut c: idle::Context) -> ! { fn idle(mut c: idle::Context) -> ! {
loop { loop {
match c match c.resources.mqtt.lock(|mqtt| mqtt.update()) {
.resources
.mqtt_config
.lock(|config_interface| config_interface.update())
{
Some(Action::Sleep) => cortex_m::asm::wfi(), Some(Action::Sleep) => cortex_m::asm::wfi(),
Some(Action::UpdateSettings) => { Some(Action::UpdateSettings) => {
c.spawn.settings_update().unwrap() c.spawn.settings_update().unwrap()
@ -212,9 +244,9 @@ const APP: () = {
} }
} }
#[task(priority = 1, resources=[mqtt_config, settings, afes])] #[task(priority = 1, resources=[mqtt, settings, afes])]
fn settings_update(mut c: settings_update::Context) { fn settings_update(mut c: settings_update::Context) {
let settings = &c.resources.mqtt_config.mqtt.settings; let settings = c.resources.mqtt.settings();
c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.0.set_gain(settings.afe[0]);
c.resources.afes.1.set_gain(settings.afe[1]); c.resources.afes.1.set_gain(settings.afe[1]);

91
src/net/messages.rs Normal file
View File

@ -0,0 +1,91 @@
use heapless::{consts, String, Vec};
use serde::Serialize;
use core::fmt::Write;
#[derive(Debug, Copy, Clone)]
pub enum SettingsResponseCode {
NoError = 0,
MiniconfError = 1,
}
/// Represents a generic MQTT message.
pub struct MqttMessage<'a> {
pub topic: &'a str,
pub message: Vec<u8, consts::U128>,
pub properties: Vec<minimq::Property<'a>, consts::U1>,
}
/// The payload of the MQTT response message to a settings update request.
#[derive(Serialize)]
pub struct SettingsResponse {
code: u8,
msg: String<heapless::consts::U64>,
}
impl<'a> MqttMessage<'a> {
/// Construct a new MQTT message from an incoming message.
///
/// # Args
/// * `properties` - A list of properties associated with the inbound message.
/// * `default_response` - The default response topic for the message
/// * `msg` - The response associated with the message. Must fit within 128 bytes.
pub fn new<'b: 'a>(
properties: &[minimq::Property<'a>],
default_response: &'b str,
msg: &impl Serialize,
) -> Self {
// Extract the MQTT response topic.
let topic = properties
.iter()
.find_map(|prop| {
if let minimq::Property::ResponseTopic(topic) = prop {
Some(topic)
} else {
None
}
})
.unwrap_or(&default_response);
// Associate any provided correlation data with the response.
let mut correlation_data: Vec<minimq::Property<'a>, consts::U1> =
Vec::new();
if let Some(data) = properties
.iter()
.find(|prop| matches!(prop, minimq::Property::CorrelationData(_)))
{
// Note(unwrap): Unwrap can not fail, as we only ever push one value.
correlation_data.push(*data).unwrap();
}
Self {
topic,
// Note(unwrap): All SettingsResponse objects are guaranteed to fit in the vector.
message: miniconf::serde_json_core::to_vec(msg).unwrap(),
properties: correlation_data,
}
}
}
impl From<Result<(), miniconf::Error>> for SettingsResponse {
fn from(result: Result<(), miniconf::Error>) -> Self {
match result {
Ok(_) => Self {
msg: String::from("OK"),
code: SettingsResponseCode::NoError as u8,
},
Err(error) => {
let mut msg = String::new();
if write!(&mut msg, "{:?}", error).is_err() {
msg = String::from("Miniconf Error");
}
Self {
code: SettingsResponseCode::MiniconfError as u8,
msg,
}
}
}
}
}

View File

@ -1,11 +1,18 @@
use crate::hardware::{ ///! Stabilizer network management module
design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, ///!
}; ///! # Design
///! The stabilizer network architecture supports numerous layers to permit transmission of
///! telemetry (via MQTT), configuration of run-time settings (via MQTT + Miniconf), and live data
///! streaming over raw UDP/TCP sockets. This module encompasses the main processing routines
///! related to Stabilizer networking operations.
use heapless::{consts, String};
use core::fmt::Write; use core::fmt::Write;
use heapless::{consts, String}; mod messages;
use miniconf::minimq; mod mqtt_interface;
use messages::{MqttMessage, SettingsResponse};
pub use mqtt_interface::MqttInterface;
/// Potential actions for firmware to take. /// Potential actions for firmware to take.
pub enum Action { pub enum Action {
@ -16,101 +23,6 @@ pub enum Action {
UpdateSettings, UpdateSettings,
} }
/// MQTT settings interface.
pub struct MiniconfInterface<S>
where
S: miniconf::Miniconf + Default,
{
pub mqtt: miniconf::MqttInterface<S, NetworkStack, minimq::consts::U256>,
clock: CycleCounter,
phy: EthernetPhy,
network_was_reset: bool,
}
impl<S> MiniconfInterface<S>
where
S: miniconf::Miniconf + Default,
{
/// Construct a new MQTT settings interface.
///
/// # Args
/// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device.
/// * `phy` - The PHY driver for querying the link state.
/// * `clock` - The clock to utilize for querying the current system time.
pub fn new(
stack: NetworkStack,
client_id: &str,
prefix: &str,
phy: EthernetPhy,
clock: CycleCounter,
) -> Self {
let mqtt = {
let mqtt_client = {
minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack)
.unwrap()
};
miniconf::MqttInterface::new(mqtt_client, prefix, S::default())
.unwrap()
};
Self {
mqtt,
clock,
phy,
network_was_reset: false,
}
}
/// Update the MQTT interface and service the network
///
/// # Returns
/// An option containing an action that should be completed as a result of network servicing.
pub fn update(&mut self) -> Option<Action> {
let now = self.clock.current_ms();
// First, service the network stack to process and inbound and outbound traffic.
let sleep = match self.mqtt.network_stack().poll(now) {
Ok(updated) => !updated,
Err(err) => {
log::info!("Network error: {:?}", err);
false
}
};
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
// stack.
if self.phy.poll_link() == false {
// Only reset the network stack once per link reconnection. This prevents us from
// sending an excessive number of DHCP requests.
if !self.network_was_reset {
self.network_was_reset = true;
self.mqtt.network_stack().handle_link_reset();
}
} else {
self.network_was_reset = false;
}
// Finally, service the MQTT interface and handle any necessary messages.
match self.mqtt.update() {
Ok(true) => Some(Action::UpdateSettings),
Ok(false) if sleep => Some(Action::Sleep),
Ok(_) => None,
Err(miniconf::MqttError::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => None,
Err(error) => {
log::info!("Unexpected error: {:?}", error);
None
}
}
}
}
/// Get the MQTT prefix of a device. /// Get the MQTT prefix of a device.
/// ///
/// # Args /// # Args

197
src/net/mqtt_interface.rs Normal file
View File

@ -0,0 +1,197 @@
use crate::hardware::{
design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack,
};
use heapless::{consts, String};
use super::{Action, MqttMessage, SettingsResponse};
/// MQTT settings interface.
pub struct MqttInterface<S>
where
S: miniconf::Miniconf + Default + Clone,
{
default_response_topic: String<consts::U128>,
mqtt: minimq::MqttClient<minimq::consts::U256, NetworkStack>,
settings: S,
clock: CycleCounter,
phy: EthernetPhy,
network_was_reset: bool,
subscribed: bool,
settings_prefix: String<consts::U64>,
}
impl<S> MqttInterface<S>
where
S: miniconf::Miniconf + Default + Clone,
{
/// Construct a new MQTT settings interface.
///
/// # Args
/// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device.
/// * `phy` - The PHY driver for querying the link state.
/// * `clock` - The clock to utilize for querying the current system time.
pub fn new(
stack: NetworkStack,
client_id: &str,
prefix: &str,
phy: EthernetPhy,
clock: CycleCounter,
) -> Self {
let mqtt =
minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack)
.unwrap();
let mut response_topic: String<consts::U128> = String::from(prefix);
response_topic.push_str("/log").unwrap();
let mut settings_prefix: String<consts::U64> = String::from(prefix);
settings_prefix.push_str("/settings").unwrap();
Self {
mqtt,
settings: S::default(),
settings_prefix,
clock,
phy,
default_response_topic: response_topic,
network_was_reset: false,
subscribed: false,
}
}
/// Update the MQTT interface and service the network
///
/// # Returns
/// An option containing an action that should be completed as a result of network servicing.
pub fn update(&mut self) -> Option<Action> {
// First, service the network stack to process any inbound and outbound traffic.
let sleep = match self.mqtt.network_stack.poll(self.clock.current_ms())
{
Ok(updated) => !updated,
Err(err) => {
log::info!("Network error: {:?}", err);
false
}
};
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
// stack.
match self.phy.poll_link() {
true => self.network_was_reset = false,
// Only reset the network stack once per link reconnection. This prevents us from
// sending an excessive number of DHCP requests.
false if !self.network_was_reset => {
self.network_was_reset = true;
self.mqtt.network_stack.handle_link_reset();
}
_ => {}
};
let mqtt_connected = match self.mqtt.is_connected() {
Ok(connected) => connected,
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => false,
Err(minimq::Error::Network(error)) => {
log::info!("Unexpected network error: {:?}", error);
false
}
Err(error) => {
log::warn!("Unexpected MQTT error: {:?}", error);
false
}
};
// If we're no longer subscribed to the settings topic, but we are connected to the broker,
// resubscribe.
if !self.subscribed && mqtt_connected {
// Note(unwrap): We construct a string with two more characters than the prefix
// strucutre, so we are guaranteed to have space for storage.
let mut settings_topic: String<consts::U66> =
String::from(self.settings_prefix.as_str());
settings_topic.push_str("/#").unwrap();
// We do not currently handle or process potential subscription failures. Instead, this
// failure will be logged through the stabilizer logging interface.
self.mqtt.subscribe(&settings_topic, &[]).unwrap();
self.subscribed = true;
}
// Handle any MQTT traffic.
let settings = &mut self.settings;
let mqtt = &mut self.mqtt;
let prefix = self.settings_prefix.as_str();
let default_response_topic = self.default_response_topic.as_str();
let mut update = false;
match mqtt.poll(|client, topic, message, properties| {
let path = match topic.strip_prefix(prefix) {
// For paths, we do not want to include the leading slash.
Some(path) => {
if path.len() > 0 {
&path[1..]
} else {
path
}
}
None => {
info!("Unexpected MQTT topic: {}", topic);
return;
}
};
let message: SettingsResponse = settings
.string_set(path.split('/').peekable(), message)
.and_then(|_| {
update = true;
Ok(())
})
.into();
let response =
MqttMessage::new(properties, default_response_topic, &message);
client
.publish(
response.topic,
&response.message,
// TODO: When Minimq supports more QoS levels, this should be increased to
// ensure that the client has received it at least once.
minimq::QoS::AtMostOnce,
&response.properties,
)
.ok();
}) {
// If settings updated,
Ok(_) => {
if update {
Some(Action::UpdateSettings)
} else if sleep {
Some(Action::Sleep)
} else {
None
}
}
Err(minimq::Error::Disconnected) => {
self.subscribed = false;
None
}
Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress,
)) => None,
Err(error) => {
log::info!("Unexpected error: {:?}", error);
None
}
}
}
pub fn settings(&self) -> &S {
&self.settings
}
}