Adding rework to network module

This commit is contained in:
Ryan Summers 2021-05-04 19:52:41 +02:00
parent 5767973548
commit 4888f18f88
4 changed files with 125 additions and 62 deletions

View File

@ -11,19 +11,23 @@ use core::fmt::Write;
mod messages; mod messages;
mod mqtt_interface; mod mqtt_interface;
mod shared;
mod stack_manager;
use messages::{MqttMessage, SettingsResponse}; use messages::{MqttMessage, SettingsResponse};
pub use mqtt_interface::MqttInterface; pub use mqtt_interface::MiniconfClient;
pub use stack_manager::NetworkProcessor;
pub use shared::NetworkManager;
use crate::hardware::NetworkStack;
pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>;
mod telemetry; mod telemetry;
pub use telemetry::{Telemetry, TelemetryBuffer}; pub use telemetry::{Telemetry, TelemetryBuffer};
/// Potential actions for firmware to take. pub enum UpdateState {
pub enum Action { NoChange,
/// Indicates that firmware can sleep for the next event. Updated,
Sleep,
/// Indicates that settings have updated and firmware needs to propogate changes.
UpdateSettings,
} }
/// Get the MQTT prefix of a device. /// Get the MQTT prefix of a device.

View File

@ -1,27 +1,22 @@
use crate::hardware::{ use crate::hardware::design_parameters::MQTT_BROKER;
design_parameters::MQTT_BROKER, CycleCounter, EthernetPhy, NetworkStack,
};
use heapless::{consts, String}; use heapless::{consts, String};
use super::{Action, MqttMessage, SettingsResponse}; use super::{UpdateState, MqttMessage, SettingsResponse, NetworkReference};
/// MQTT settings interface. /// MQTT settings interface.
pub struct MqttInterface<S> pub struct MiniconfClient<S>
where where
S: miniconf::Miniconf + Default + Clone, S: miniconf::Miniconf + Default + Clone,
{ {
default_response_topic: String<consts::U128>, default_response_topic: String<consts::U128>,
mqtt: minimq::MqttClient<minimq::consts::U256, NetworkStack>, mqtt: minimq::MqttClient<minimq::consts::U256, NetworkReference>,
settings: S, settings: S,
clock: CycleCounter,
phy: EthernetPhy,
network_was_reset: bool,
subscribed: bool, subscribed: bool,
settings_prefix: String<consts::U64>, settings_prefix: String<consts::U64>,
} }
impl<S> MqttInterface<S> impl<S> MiniconfClient<S>
where where
S: miniconf::Miniconf + Default + Clone, S: miniconf::Miniconf + Default + Clone,
{ {
@ -31,14 +26,10 @@ where
/// * `stack` - The network stack to use for communication. /// * `stack` - The network stack to use for communication.
/// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning. /// * `client_id` - The ID of the MQTT client. May be an empty string for auto-assigning.
/// * `prefix` - The MQTT device prefix to use for this device. /// * `prefix` - The MQTT device prefix to use for this device.
/// * `phy` - The PHY driver for querying the link state.
/// * `clock` - The clock to utilize for querying the current system time.
pub fn new( pub fn new(
stack: NetworkStack, stack: NetworkReference,
client_id: &str, client_id: &str,
prefix: &str, prefix: &str,
phy: EthernetPhy,
clock: CycleCounter,
) -> Self { ) -> Self {
let mqtt = let mqtt =
minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack) minimq::MqttClient::new(MQTT_BROKER.into(), client_id, stack)
@ -54,10 +45,7 @@ where
mqtt, mqtt,
settings: S::default(), settings: S::default(),
settings_prefix, settings_prefix,
clock,
phy,
default_response_topic: response_topic, default_response_topic: response_topic,
network_was_reset: false,
subscribed: false, subscribed: false,
} }
} }
@ -66,30 +54,7 @@ where
/// ///
/// # Returns /// # Returns
/// An option containing an action that should be completed as a result of network servicing. /// An option containing an action that should be completed as a result of network servicing.
pub fn update(&mut self) -> Option<Action> { pub fn update(&mut self) -> UpdateState {
// First, service the network stack to process any inbound and outbound traffic.
let sleep = match self.mqtt.network_stack.poll(self.clock.current_ms())
{
Ok(updated) => !updated,
Err(err) => {
log::info!("Network error: {:?}", err);
false
}
};
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
// stack.
match self.phy.poll_link() {
true => self.network_was_reset = false,
// Only reset the network stack once per link reconnection. This prevents us from
// sending an excessive number of DHCP requests.
false if !self.network_was_reset => {
self.network_was_reset = true;
self.mqtt.network_stack.handle_link_reset();
}
_ => {},
};
let mqtt_connected = match self.mqtt.is_connected() { let mqtt_connected = match self.mqtt.is_connected() {
Ok(connected) => connected, Ok(connected) => connected,
@ -165,26 +130,19 @@ where
.ok(); .ok();
}) { }) {
// If settings updated, // If settings updated,
Ok(_) => { Ok(_) if update => UpdateState::Updated,
if update { Ok(_) => UpdateState::NoChange,
Some(Action::UpdateSettings)
} else if sleep {
Some(Action::Sleep)
} else {
None
}
}
Err(minimq::Error::Disconnected) => { Err(minimq::Error::Disconnected) => {
self.subscribed = false; self.subscribed = false;
None UpdateState::NoChange
} }
Err(minimq::Error::Network( Err(minimq::Error::Network(
smoltcp_nal::NetworkError::NoIpAddress, smoltcp_nal::NetworkError::NoIpAddress,
)) => None, )) => UpdateState::NoChange,
Err(error) => { Err(error) => {
log::info!("Unexpected error: {:?}", error); log::info!("Unexpected error: {:?}", error);
None UpdateState::NoChange
} }
} }
} }

57
src/net/shared.rs Normal file
View File

@ -0,0 +1,57 @@
use shared_bus::{AtomicCheckMutex, BusMutex};
use minimq::embedded_nal;
use smoltcp_nal::smoltcp;
use crate::hardware::NetworkStack;
pub struct NetworkStackProxy<'a, S> {
mutex: &'a AtomicCheckMutex<S>
}
impl<'a> NetworkStackProxy<'a, NetworkStack> {
pub fn poll(&mut self, now: u32) -> Result<bool, smoltcp::Error> {
self.mutex.lock(|stack| stack.poll(now))
}
pub fn handle_link_reset(&mut self) {
self.mutex.lock(|stack| stack.handle_link_reset())
}
}
macro_rules! forward {
($func:ident($($v:ident: $IT:ty),*) -> $T:ty) => {
fn $func(&self, $($v: $IT),*) -> $T {
self.mutex.lock(|stack| stack.$func($($v),*))
}
}
}
impl<'a, S> embedded_nal::TcpStack for NetworkStackProxy<'a, S>
where
S: embedded_nal::TcpStack
{
type TcpSocket = S::TcpSocket;
type Error = S::Error;
forward! {open(mode: embedded_nal::Mode) -> Result<S::TcpSocket, S::Error>}
forward! {connect(socket: S::TcpSocket, remote: embedded_nal::SocketAddr) -> Result<S::TcpSocket, S::Error>}
forward! {is_connected(socket: &S::TcpSocket) -> Result<bool, S::Error>}
forward! {write(socket: &mut S::TcpSocket, buffer: &[u8]) -> embedded_nal::nb::Result<usize, S::Error>}
forward! {read(socket: &mut S::TcpSocket, buffer: &mut [u8]) -> embedded_nal::nb::Result<usize, S::Error>}
forward! {close(socket: S::TcpSocket) -> Result<(), S::Error>}
}
pub struct NetworkManager {
mutex: AtomicCheckMutex<NetworkStack>
}
impl NetworkManager {
pub fn new(stack: NetworkStack) -> Self {
Self { mutex: AtomicCheckMutex::create(stack) }
}
pub fn acquire_stack<'a>(&'a self) -> NetworkStackProxy<'a, NetworkStack> {
NetworkStackProxy {
mutex: &self.mutex
}
}
}

44
src/net/stack_manager.rs Normal file
View File

@ -0,0 +1,44 @@
use super::{UpdateState, NetworkReference};
use crate::hardware::{EthernetPhy, CycleCounter};
pub struct NetworkProcessor {
stack: NetworkReference,
phy: EthernetPhy,
clock: CycleCounter,
network_was_reset: bool,
}
impl NetworkProcessor {
pub fn new(stack: NetworkReference, phy: EthernetPhy, clock: CycleCounter) -> Self {
Self { stack, phy, clock, network_was_reset: false }
}
pub fn update(&mut self) -> UpdateState {
// Service the network stack to process any inbound and outbound traffic.
let result = match self.stack.poll(self.clock.current_ms()) {
Ok(true) => UpdateState::Updated,
Ok(false) => UpdateState::NoChange,
Err(err) => {
log::info!("Network error: {:?}", err);
UpdateState::Updated
}
};
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
// stack.
match self.phy.poll_link() {
true => self.network_was_reset = false,
// Only reset the network stack once per link reconnection. This prevents us from
// sending an excessive number of DHCP requests.
false if !self.network_was_reset => {
self.network_was_reset = true;
self.stack.handle_link_reset();
}
_ => {},
};
result
}
}