From 4888f18f880bf5b3e2673c5120ffae9f237d82db Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 4 May 2021 19:52:41 +0200 Subject: [PATCH] Adding rework to network module --- src/net/mod.rs | 20 +++++++----- src/net/mqtt_interface.rs | 66 +++++++-------------------------------- src/net/shared.rs | 57 +++++++++++++++++++++++++++++++++ src/net/stack_manager.rs | 44 ++++++++++++++++++++++++++ 4 files changed, 125 insertions(+), 62 deletions(-) create mode 100644 src/net/shared.rs create mode 100644 src/net/stack_manager.rs diff --git a/src/net/mod.rs b/src/net/mod.rs index 9b4d22d..0127017 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -11,19 +11,23 @@ use core::fmt::Write; mod messages; mod mqtt_interface; +mod shared; +mod stack_manager; use messages::{MqttMessage, SettingsResponse}; -pub use mqtt_interface::MqttInterface; +pub use mqtt_interface::MiniconfClient; +pub use stack_manager::NetworkProcessor; + +pub use shared::NetworkManager; + +use crate::hardware::NetworkStack; +pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>; mod telemetry; pub use telemetry::{Telemetry, TelemetryBuffer}; -/// Potential actions for firmware to take. -pub enum Action { - /// Indicates that firmware can sleep for the next event. - Sleep, - - /// Indicates that settings have updated and firmware needs to propogate changes. - UpdateSettings, +pub enum UpdateState { + NoChange, + Updated, } /// Get the MQTT prefix of a device. diff --git a/src/net/mqtt_interface.rs b/src/net/mqtt_interface.rs index cef26dd..02a6ba6 100644 --- a/src/net/mqtt_interface.rs +++ b/src/net/mqtt_interface.rs @@ -1,27 +1,22 @@ -use crate::hardware::{ - design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack, -}; +use crate::hardware::design_parameters::MQTT_BROKER; use heapless::{consts, String}; -use super::{Action, MqttMessage, SettingsResponse}; +use super::{UpdateState, MqttMessage, SettingsResponse, NetworkReference}; /// MQTT settings interface. -pub struct MqttInterface +pub struct MiniconfClient where S: miniconf::Miniconf + Default + Clone, { default_response_topic: String, - mqtt: minimq::MqttClient, + mqtt: minimq::MqttClient, settings: S, - clock: CycleCounter, - phy: EthernetPhy, - network_was_reset: bool, subscribed: bool, settings_prefix: String, } -impl MqttInterface +impl MiniconfClient where S: miniconf::Miniconf + Default + Clone, { @@ -31,14 +26,10 @@ where /// * `stack` - The network stack to use for communication. /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. /// * `prefix` - The MQTT device prefix to use for this device. - /// * `phy` - The PHY driver for querying the link state. - /// * `clock` - The clock to utilize for querying the current system time. pub fn new( - stack: NetworkStack, + stack: NetworkReference, client_id: &str, prefix: &str, - phy: EthernetPhy, - clock: CycleCounter, ) -> Self { let mqtt = minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) @@ -54,10 +45,7 @@ where mqtt, settings: S::default(), settings_prefix, - clock, - phy, default_response_topic: response_topic, - network_was_reset: false, subscribed: false, } } @@ -66,30 +54,7 @@ where /// /// # Returns /// An option containing an action that should be completed as a result of network servicing. - pub fn update(&mut self) -> Option { - // First, service the network stack to process any inbound and outbound traffic. - let sleep = match self.mqtt.network_stack.poll(self.clock.current_ms()) - { - Ok(updated) => !updated, - Err(err) => { - log::info!("Network error: {:?}", err); - false - } - }; - - // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network - // stack. - match self.phy.poll_link() { - true => self.network_was_reset = false, - - // Only reset the network stack once per link reconnection. This prevents us from - // sending an excessive number of DHCP requests. - false if !self.network_was_reset => { - self.network_was_reset = true; - self.mqtt.network_stack.handle_link_reset(); - } - _ => {}, - }; + pub fn update(&mut self) -> UpdateState { let mqtt_connected = match self.mqtt.is_connected() { Ok(connected) => connected, @@ -165,26 +130,19 @@ where .ok(); }) { // If settings updated, - Ok(_) => { - if update { - Some(Action::UpdateSettings) - } else if sleep { - Some(Action::Sleep) - } else { - None - } - } + Ok(_) if update => UpdateState::Updated, + Ok(_) => UpdateState::NoChange, Err(minimq::Error::Disconnected) => { self.subscribed = false; - None + UpdateState::NoChange } Err(minimq::Error::Network( smoltcp_nal::NetworkError::NoIpAddress, - )) => None, + )) => UpdateState::NoChange, Err(error) => { log::info!("Unexpected error: {:?}", error); - None + UpdateState::NoChange } } } diff --git a/src/net/shared.rs b/src/net/shared.rs new file mode 100644 index 0000000..c8f0b77 --- /dev/null +++ b/src/net/shared.rs @@ -0,0 +1,57 @@ +use shared_bus::{AtomicCheckMutex, BusMutex}; +use minimq::embedded_nal; +use smoltcp_nal::smoltcp; + +use crate::hardware::NetworkStack; + +pub struct NetworkStackProxy<'a, S> { + mutex: &'a AtomicCheckMutex +} + +impl<'a> NetworkStackProxy<'a, NetworkStack> { + pub fn poll(&mut self, now: u32) -> Result { + self.mutex.lock(|stack| stack.poll(now)) + } + pub fn handle_link_reset(&mut self) { + self.mutex.lock(|stack| stack.handle_link_reset()) + } +} + +macro_rules! forward { + ($func:ident($($v:ident: $IT:ty),*) -> $T:ty) => { + fn $func(&self, $($v: $IT),*) -> $T { + self.mutex.lock(|stack| stack.$func($($v),*)) + } + } +} + +impl<'a, S> embedded_nal::TcpStack for NetworkStackProxy<'a, S> +where + S: embedded_nal::TcpStack +{ + type TcpSocket = S::TcpSocket; + type Error = S::Error; + + forward! {open(mode: embedded_nal::Mode) -> Result} + forward! {connect(socket: S::TcpSocket, remote: embedded_nal::SocketAddr) -> Result} + forward! {is_connected(socket: &S::TcpSocket) -> Result} + forward! {write(socket: &mut S::TcpSocket, buffer: &[u8]) -> embedded_nal::nb::Result} + forward! {read(socket: &mut S::TcpSocket, buffer: &mut [u8]) -> embedded_nal::nb::Result} + forward! {close(socket: S::TcpSocket) -> Result<(), S::Error>} +} + +pub struct NetworkManager { + mutex: AtomicCheckMutex +} + +impl NetworkManager { + pub fn new(stack: NetworkStack) -> Self { + Self { mutex: AtomicCheckMutex::create(stack) } + } + + pub fn acquire_stack<'a>(&'a self) -> NetworkStackProxy<'a, NetworkStack> { + NetworkStackProxy { + mutex: &self.mutex + } + } +} diff --git a/src/net/stack_manager.rs b/src/net/stack_manager.rs new file mode 100644 index 0000000..3284188 --- /dev/null +++ b/src/net/stack_manager.rs @@ -0,0 +1,44 @@ +use super::{UpdateState, NetworkReference}; + +use crate::hardware::{EthernetPhy, CycleCounter}; + +pub struct NetworkProcessor { + stack: NetworkReference, + phy: EthernetPhy, + clock: CycleCounter, + network_was_reset: bool, +} + +impl NetworkProcessor { + pub fn new(stack: NetworkReference, phy: EthernetPhy, clock: CycleCounter) -> Self { + Self { stack, phy, clock, network_was_reset: false } + } + + pub fn update(&mut self) -> UpdateState { + // Service the network stack to process any inbound and outbound traffic. + let result = match self.stack.poll(self.clock.current_ms()) { + Ok(true) => UpdateState::Updated, + Ok(false) => UpdateState::NoChange, + Err(err) => { + log::info!("Network error: {:?}", err); + UpdateState::Updated + } + }; + + // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network + // stack. + match self.phy.poll_link() { + true => self.network_was_reset = false, + + // Only reset the network stack once per link reconnection. This prevents us from + // sending an excessive number of DHCP requests. + false if !self.network_was_reset => { + self.network_was_reset = true; + self.stack.handle_link_reset(); + } + _ => {}, + }; + + result + } +}