Adding refactor for telemetry

This commit is contained in:
Ryan Summers 2021-05-05 15:39:33 +02:00
parent 06b328ff52
commit 4a656eedd2
6 changed files with 122 additions and 78 deletions

View File

@ -13,7 +13,10 @@ use hardware::{
DigitalInput1, InputPin, SystemTimer, AFE0, AFE1, DigitalInput1, InputPin, SystemTimer, AFE0, AFE1,
}; };
use net::{UpdateState, MiniconfClient, NetworkProcessor}; use net::{
MiniconfClient, NetworkManager, NetworkProcessor, NetworkUsers,
TelemetryBuffer, UpdateState,
};
const SCALE: f32 = i16::MAX as _; const SCALE: f32 = i16::MAX as _;
@ -49,11 +52,6 @@ impl Default for Settings {
} }
} }
struct NetworkUsers {
miniconf: MiniconfClient<Settings>,
processor: NetworkProcessor,
}
#[rtic::app(device = stm32h7xx_hal::stm32, peripherals = true, monotonic = stabilizer::hardware::SystemTimer)] #[rtic::app(device = stm32h7xx_hal::stm32, peripherals = true, monotonic = stabilizer::hardware::SystemTimer)]
const APP: () = { const APP: () = {
struct Resources { struct Resources {
@ -61,9 +59,10 @@ const APP: () = {
digital_inputs: (DigitalInput0, DigitalInput1), digital_inputs: (DigitalInput0, DigitalInput1),
adcs: (Adc0Input, Adc1Input), adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output), dacs: (Dac0Output, Dac1Output),
network: NetworkUsers, network: NetworkUsers<Settings>,
settings: Settings, settings: Settings,
telemetry: TelemetryBuffer,
#[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])] #[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])]
iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2], iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2],
@ -75,7 +74,8 @@ const APP: () = {
let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device);
let network = { let network = {
let stack_manager = cortex_m::singleton!(: NetworkManager = NetworkManager::new(stabilizer.net.stack)).unwrap(); let stack = stabilizer.net.stack;
let stack_manager = cortex_m::singleton!(: NetworkManager = NetworkManager::new(stack)).unwrap();
let processor = NetworkProcessor::new( let processor = NetworkProcessor::new(
stack_manager.acquire_stack(), stack_manager.acquire_stack(),
@ -83,7 +83,7 @@ const APP: () = {
stabilizer.cycle_counter, stabilizer.cycle_counter,
); );
let settings = MqttInterface::new( let settings = MiniconfClient::new(
stack_manager.acquire_stack(), stack_manager.acquire_stack(),
"", "",
&net::get_device_prefix( &net::get_device_prefix(
@ -113,7 +113,6 @@ const APP: () = {
// Start sampling ADCs. // Start sampling ADCs.
stabilizer.adc_dac_timer.start(); stabilizer.adc_dac_timer.start();
init::LateResources { init::LateResources {
afes: stabilizer.afes, afes: stabilizer.afes,
adcs: stabilizer.adcs, adcs: stabilizer.adcs,
@ -193,12 +192,19 @@ const APP: () = {
fn idle(mut c: idle::Context) -> ! { fn idle(mut c: idle::Context) -> ! {
loop { loop {
// Update the smoltcp network stack. // Update the smoltcp network stack.
let poll_result = c.resources.network.lock(|network| network.processor.poll()); let poll_result = c
.resources
.network
.lock(|network| network.processor.update());
// Service the MQTT configuration client. // Service the MQTT configuration client.
if c.resources.miniconf_client.lock(|client| client.update()) == UpdateStatus::Updated { if c.resources
.network
.lock(|network| network.miniconf.update())
== UpdateState::Updated
{
c.spawn.settings_update().unwrap() c.spawn.settings_update().unwrap()
} else if poll_result == UpdateStatus::NoChange { } else if poll_result == UpdateState::NoChange {
cortex_m::asm::wfi(); cortex_m::asm::wfi();
} }
} }
@ -217,10 +223,10 @@ const APP: () = {
#[task(priority = 1, resources=[network, settings, telemetry], schedule=[telemetry])] #[task(priority = 1, resources=[network, settings, telemetry], schedule=[telemetry])]
fn telemetry(mut c: telemetry::Context) { fn telemetry(mut c: telemetry::Context) {
let telemetry = let _telemetry =
c.resources.telemetry.lock(|telemetry| telemetry.clone()); c.resources.telemetry.lock(|telemetry| telemetry.clone());
let gains = c.resources.settings.lock(|settings| settings.afe.clone()); let _gains = c.resources.settings.lock(|settings| settings.afe.clone());
// TODO: Publish telemetry through the telemetry client here. // TODO: Publish telemetry through the telemetry client here.
//c.resources //c.resources

View File

@ -18,7 +18,9 @@ use stabilizer::hardware::{
}; };
use miniconf::Miniconf; use miniconf::Miniconf;
use stabilizer::net::{Action, MqttInterface}; use stabilizer::net::{
MiniconfClient, NetworkManager, NetworkProcessor, NetworkUsers, UpdateState,
};
#[derive(Copy, Clone, Debug, Deserialize, Miniconf)] #[derive(Copy, Clone, Debug, Deserialize, Miniconf)]
enum Conf { enum Conf {
@ -64,7 +66,7 @@ const APP: () = {
afes: (AFE0, AFE1), afes: (AFE0, AFE1),
adcs: (Adc0Input, Adc1Input), adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output), dacs: (Dac0Output, Dac1Output),
mqtt: MqttInterface<Settings>, network: NetworkUsers<Settings>,
settings: Settings, settings: Settings,
telemetry: net::TelemetryBuffer, telemetry: net::TelemetryBuffer,
digital_inputs: (DigitalInput0, DigitalInput1), digital_inputs: (DigitalInput0, DigitalInput1),
@ -79,17 +81,33 @@ const APP: () = {
// Configure the microcontroller // Configure the microcontroller
let (mut stabilizer, _pounder) = setup(c.core, c.device); let (mut stabilizer, _pounder) = setup(c.core, c.device);
let mqtt = MqttInterface::new( let network = {
stabilizer.net.stack, let stack = stabilizer.net.stack;
let stack_manager = cortex_m::singleton!(: NetworkManager = NetworkManager::new(stack)).unwrap();
let processor = NetworkProcessor::new(
stack_manager.acquire_stack(),
stabilizer.net.phy,
stabilizer.cycle_counter,
);
let settings = MiniconfClient::new(
stack_manager.acquire_stack(),
"", "",
&net::get_device_prefix( &net::get_device_prefix(
env!("CARGO_BIN_NAME"), env!("CARGO_BIN_NAME"),
stabilizer.net.mac_address, stabilizer.net.mac_address,
), ),
stabilizer.net.phy,
stabilizer.cycle_counter,
); );
// TODO: Add telemetry client
NetworkUsers {
miniconf: settings,
processor,
}
};
let settings = Settings::default(); let settings = Settings::default();
let pll = RPLL::new( let pll = RPLL::new(
@ -120,7 +138,7 @@ const APP: () = {
afes: stabilizer.afes, afes: stabilizer.afes,
adcs: stabilizer.adcs, adcs: stabilizer.adcs,
dacs: stabilizer.dacs, dacs: stabilizer.dacs,
mqtt, network,
digital_inputs: stabilizer.digital_inputs, digital_inputs: stabilizer.digital_inputs,
timestamper: stabilizer.timestamper, timestamper: stabilizer.timestamper,
telemetry: net::TelemetryBuffer::default(), telemetry: net::TelemetryBuffer::default(),
@ -211,22 +229,31 @@ const APP: () = {
[dac_samples[0][0], dac_samples[1][0]]; [dac_samples[0][0], dac_samples[1][0]];
} }
#[idle(resources=[mqtt], spawn=[settings_update])] #[idle(resources=[network], spawn=[settings_update])]
fn idle(mut c: idle::Context) -> ! { fn idle(mut c: idle::Context) -> ! {
loop { loop {
match c.resources.mqtt.lock(|mqtt| mqtt.update()) { // Update the smoltcp network stack.
Some(Action::Sleep) => cortex_m::asm::wfi(), let poll_result = c
Some(Action::UpdateSettings) => { .resources
.network
.lock(|network| network.processor.update());
// Service the MQTT configuration client.
if c.resources
.network
.lock(|network| network.miniconf.update())
== UpdateState::Updated
{
c.spawn.settings_update().unwrap() c.spawn.settings_update().unwrap()
} } else if poll_result == UpdateState::NoChange {
_ => {} cortex_m::asm::wfi();
} }
} }
} }
#[task(priority = 1, resources=[mqtt, settings, afes])] #[task(priority = 1, resources=[network, settings, afes])]
fn settings_update(mut c: settings_update::Context) { fn settings_update(mut c: settings_update::Context) {
let settings = c.resources.mqtt.settings(); let settings = c.resources.network.miniconf.settings();
c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.0.set_gain(settings.afe[0]);
c.resources.afes.1.set_gain(settings.afe[1]); c.resources.afes.1.set_gain(settings.afe[1]);
@ -234,7 +261,7 @@ const APP: () = {
c.resources.settings.lock(|current| *current = *settings); c.resources.settings.lock(|current| *current = *settings);
} }
#[task(priority = 1, resources=[mqtt, digital_inputs, settings, telemetry], schedule=[telemetry])] #[task(priority = 1, resources=[network, digital_inputs, settings, telemetry], schedule=[telemetry])]
fn telemetry(mut c: telemetry::Context) { fn telemetry(mut c: telemetry::Context) {
let mut telemetry = let mut telemetry =
c.resources.telemetry.lock(|telemetry| telemetry.clone()); c.resources.telemetry.lock(|telemetry| telemetry.clone());
@ -244,10 +271,12 @@ const APP: () = {
c.resources.digital_inputs.1.is_high().unwrap(), c.resources.digital_inputs.1.is_high().unwrap(),
]; ];
let gains = c.resources.settings.lock(|settings| settings.afe.clone()); let _gains = c.resources.settings.lock(|settings| settings.afe.clone());
c.resources
.mqtt // TODO: Publish telemetry.
.publish_telemetry(&telemetry.to_telemetry(gains[0], gains[1])); //c.resources
// .mqtt
// .publish_telemetry(&telemetry.to_telemetry(gains[0], gains[1]));
let telemetry_period = c let telemetry_period = c
.resources .resources

View File

@ -13,23 +13,30 @@ mod messages;
mod mqtt_interface; mod mqtt_interface;
mod shared; mod shared;
mod stack_manager; mod stack_manager;
use messages::{MqttMessage, SettingsResponse}; mod telemetry;
pub use mqtt_interface::MiniconfClient;
pub use stack_manager::NetworkProcessor;
pub use shared::NetworkManager;
use crate::hardware::NetworkStack; use crate::hardware::NetworkStack;
pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>; use messages::{MqttMessage, SettingsResponse};
use miniconf::Miniconf;
mod telemetry; pub use mqtt_interface::MiniconfClient;
pub use shared::NetworkManager;
pub use stack_manager::NetworkProcessor;
pub use telemetry::{Telemetry, TelemetryBuffer}; pub use telemetry::{Telemetry, TelemetryBuffer};
pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>;
#[derive(Copy, Clone, PartialEq)]
pub enum UpdateState { pub enum UpdateState {
NoChange, NoChange,
Updated, Updated,
} }
pub struct NetworkUsers<S: Default + Clone + Miniconf> {
pub miniconf: MiniconfClient<S>,
pub processor: NetworkProcessor,
}
/// Get the MQTT prefix of a device. /// Get the MQTT prefix of a device.
/// ///
/// # Args /// # Args

View File

@ -2,7 +2,7 @@ use crate::hardware::design_parameters::MQTT_BROKER;
use heapless::{consts, String}; use heapless::{consts, String};
use super::{UpdateState, MqttMessage, SettingsResponse, NetworkReference}; use super::{MqttMessage, NetworkReference, SettingsResponse, UpdateState};
/// MQTT settings interface. /// MQTT settings interface.
pub struct MiniconfClient<S> pub struct MiniconfClient<S>
@ -26,11 +26,7 @@ where
/// * `stack` - The network stack to use for communication. /// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device. /// * `prefix` - The MQTT device prefix to use for this device.
pub fn new( pub fn new(stack: NetworkReference, client_id: &str, prefix: &str) -> Self {
stack: NetworkReference,
client_id: &str,
prefix: &str,
) -> Self {
let mqtt = let mqtt =
minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack)
.unwrap(); .unwrap();
@ -55,7 +51,6 @@ where
/// # Returns /// # Returns
/// An option containing an action that should be completed as a result of network servicing. /// An option containing an action that should be completed as a result of network servicing.
pub fn update(&mut self) -> UpdateState { pub fn update(&mut self) -> UpdateState {
let mqtt_connected = match self.mqtt.is_connected() { let mqtt_connected = match self.mqtt.is_connected() {
Ok(connected) => connected, Ok(connected) => connected,
Err(minimq::Error::Network( Err(minimq::Error::Network(

View File

@ -1,19 +1,15 @@
use shared_bus::{AtomicCheckMutex, BusMutex};
use minimq::embedded_nal; use minimq::embedded_nal;
use smoltcp_nal::smoltcp; use shared_bus::{AtomicCheckMutex, BusMutex};
use crate::hardware::NetworkStack; use crate::hardware::NetworkStack;
pub struct NetworkStackProxy<'a, S> { pub struct NetworkStackProxy<'a, S> {
mutex: &'a AtomicCheckMutex<S> mutex: &'a AtomicCheckMutex<S>,
} }
impl<'a> NetworkStackProxy<'a, NetworkStack> { impl<'a, S> NetworkStackProxy<'a, S> {
pub fn poll(&mut self, now: u32) -> Result<bool, smoltcp::Error> { pub fn lock<R, F: FnOnce(&mut S) -> R>(&mut self, f: F) -> R {
self.mutex.lock(|stack| stack.poll(now)) self.mutex.lock(|stack| f(stack))
}
pub fn handle_link_reset(&mut self) {
self.mutex.lock(|stack| stack.handle_link_reset())
} }
} }
@ -27,7 +23,7 @@ macro_rules! forward {
impl<'a, S> embedded_nal::TcpStack for NetworkStackProxy<'a, S> impl<'a, S> embedded_nal::TcpStack for NetworkStackProxy<'a, S>
where where
S: embedded_nal::TcpStack S: embedded_nal::TcpStack,
{ {
type TcpSocket = S::TcpSocket; type TcpSocket = S::TcpSocket;
type Error = S::Error; type Error = S::Error;
@ -41,17 +37,17 @@ where
} }
pub struct NetworkManager { pub struct NetworkManager {
mutex: AtomicCheckMutex<NetworkStack> mutex: AtomicCheckMutex<NetworkStack>,
} }
impl NetworkManager { impl NetworkManager {
pub fn new(stack: NetworkStack) -> Self { pub fn new(stack: NetworkStack) -> Self {
Self { mutex: AtomicCheckMutex::create(stack) } Self {
mutex: AtomicCheckMutex::create(stack),
}
} }
pub fn acquire_stack<'a>(&'a self) -> NetworkStackProxy<'a, NetworkStack> { pub fn acquire_stack<'a>(&'a self) -> NetworkStackProxy<'a, NetworkStack> {
NetworkStackProxy { NetworkStackProxy { mutex: &self.mutex }
mutex: &self.mutex
}
} }
} }

View File

@ -1,6 +1,6 @@
use super::{UpdateState, NetworkReference}; use super::{NetworkReference, UpdateState};
use crate::hardware::{EthernetPhy, CycleCounter}; use crate::hardware::{CycleCounter, EthernetPhy};
pub struct NetworkProcessor { pub struct NetworkProcessor {
stack: NetworkReference, stack: NetworkReference,
@ -10,13 +10,24 @@ pub struct NetworkProcessor {
} }
impl NetworkProcessor { impl NetworkProcessor {
pub fn new(stack: NetworkReference, phy: EthernetPhy, clock: CycleCounter) -> Self { pub fn new(
Self { stack, phy, clock, network_was_reset: false } stack: NetworkReference,
phy: EthernetPhy,
clock: CycleCounter,
) -> Self {
Self {
stack,
phy,
clock,
network_was_reset: false,
}
} }
pub fn update(&mut self) -> UpdateState { pub fn update(&mut self) -> UpdateState {
// Service the network stack to process any inbound and outbound traffic. // Service the network stack to process any inbound and outbound traffic.
let result = match self.stack.poll(self.clock.current_ms()) { let now = self.clock.current_ms();
let result = match self.stack.lock(|stack| stack.poll(now)) {
Ok(true) => UpdateState::Updated, Ok(true) => UpdateState::Updated,
Ok(false) => UpdateState::NoChange, Ok(false) => UpdateState::NoChange,
Err(err) => { Err(err) => {
@ -34,9 +45,9 @@ impl NetworkProcessor {
// sending an excessive number of DHCP requests. // sending an excessive number of DHCP requests.
false if !self.network_was_reset => { false if !self.network_was_reset => {
self.network_was_reset = true; self.network_was_reset = true;
self.stack.handle_link_reset(); self.stack.lock(|stack| stack.handle_link_reset());
} }
_ => {}, _ => {}
}; };
result result