From f0e7c153ba852a05a652b6e81aed71e7f211e82e Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 26 Jan 2021 14:28:06 +0100 Subject: [PATCH] Adding WIP refactor for MQTT + settings --- Cargo.toml | 1 + src/bin/dual-iir.rs | 154 ++++------------------------ src/hardware/configuration.rs | 40 +++++--- src/hardware/mod.rs | 5 +- src/hardware/mqtt_interface.rs | 102 +++++++++++++++++++ src/hardware/smoltcp_nal.rs | 181 +++++++++++++++++++++++++++++++++ 6 files changed, 337 insertions(+), 146 deletions(-) create mode 100644 src/hardware/mqtt_interface.rs create mode 100644 src/hardware/smoltcp_nal.rs diff --git a/Cargo.toml b/Cargo.toml index f4f90fa..2789de0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ enum-iterator = "0.6.0" paste = "1" dsp = { path = "dsp" } ad9959 = { path = "ad9959" } +minimq = "0.1.0" [dependencies.mcp23017] git = "https://github.com/mrd0ll4r/mcp23017.git" diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index fd8f5e4..53d1924 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -25,6 +25,12 @@ const TCP_TX_BUFFER_SIZE: usize = 8192; // The number of cascaded IIR biquads per channel. Select 1 or 2! const IIR_CASCADE_LENGTH: usize = 1; +#[derive(miniconf::StringSet)] +struct Settings { + afe_gain: [hardware::AfeGain; 2], + iir: [[iir::IIR; IIR_CASCADE_LENGTH]; 2], +} + #[rtic::app(device = stm32h7xx_hal::stm32, peripherals = true, monotonic = rtic::cyccnt::CYCCNT)] const APP: () = { struct Resources { @@ -32,6 +38,7 @@ const APP: () = { adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), net_interface: hardware::Ethernet, + mqtt_interface: hardware::MqttInterface, // Format: iir_state[ch][cascade-no][coeff] #[init([[[0.; 5]; IIR_CASCADE_LENGTH]; 2])] @@ -107,26 +114,8 @@ const APP: () = { } } - #[idle(resources=[net_interface, iir_state, iir_ch, afes])] + #[idle(resources=[mqtt_interface], spawn=[settings_update])] fn idle(mut c: idle::Context) -> ! { - let mut socket_set_entries: [_; 8] = Default::default(); - let mut sockets = - smoltcp::socket::SocketSet::new(&mut socket_set_entries[..]); - - let mut rx_storage = [0; TCP_RX_BUFFER_SIZE]; - let mut tx_storage = [0; TCP_TX_BUFFER_SIZE]; - let tcp_handle = { - let tcp_rx_buffer = - smoltcp::socket::TcpSocketBuffer::new(&mut rx_storage[..]); - let tcp_tx_buffer = - smoltcp::socket::TcpSocketBuffer::new(&mut tx_storage[..]); - let tcp_socket = - smoltcp::socket::TcpSocket::new(tcp_rx_buffer, tcp_tx_buffer); - sockets.add(tcp_socket) - }; - - let mut server = server::Server::new(); - let mut time = 0u32; let mut next_ms = Instant::now(); @@ -141,125 +130,26 @@ const APP: () = { time += 1; } - { - let socket = - &mut *sockets.get::(tcp_handle); - if socket.state() == smoltcp::socket::TcpState::CloseWait { - socket.close(); - } else if !(socket.is_open() || socket.is_listening()) { - socket - .listen(1235) - .unwrap_or_else(|e| warn!("TCP listen error: {:?}", e)); - } else { - server.poll(socket, |req| { - info!("Got request: {:?}", req); - stabilizer::route_request!(req, - readable_attributes: [ - "stabilizer/iir/state": (|| { - let state = c.resources.iir_state.lock(|iir_state| - server::Status { - t: time, - x0: iir_state[0][0][0], - y0: iir_state[0][0][2], - x1: iir_state[1][0][0], - y1: iir_state[1][0][2], - }); - - Ok::(state) - }), - // "_b" means cascades 2nd IIR - "stabilizer/iir_b/state": (|| { let state = c.resources.iir_state.lock(|iir_state| - server::Status { - t: time, - x0: iir_state[0][IIR_CASCADE_LENGTH-1][0], - y0: iir_state[0][IIR_CASCADE_LENGTH-1][2], - x1: iir_state[1][IIR_CASCADE_LENGTH-1][0], - y1: iir_state[1][IIR_CASCADE_LENGTH-1][2], - }); - - Ok::(state) - }), - "stabilizer/afe0/gain": (|| c.resources.afes.0.get_gain()), - "stabilizer/afe1/gain": (|| c.resources.afes.1.get_gain()) - ], - - modifiable_attributes: [ - "stabilizer/iir0/state": server::IirRequest, (|req: server::IirRequest| { - c.resources.iir_ch.lock(|iir_ch| { - if req.channel > 1 { - return Err(()); - } - - iir_ch[req.channel as usize][0] = req.iir; - - Ok::(req) - }) - }), - "stabilizer/iir1/state": server::IirRequest, (|req: server::IirRequest| { - c.resources.iir_ch.lock(|iir_ch| { - if req.channel > 1 { - return Err(()); - } - - iir_ch[req.channel as usize][0] = req.iir; - - Ok::(req) - }) - }), - "stabilizer/iir_b0/state": server::IirRequest, (|req: server::IirRequest| { - c.resources.iir_ch.lock(|iir_ch| { - if req.channel > 1 { - return Err(()); - } - - iir_ch[req.channel as usize][IIR_CASCADE_LENGTH-1] = req.iir; - - Ok::(req) - }) - }), - "stabilizer/iir_b1/state": server::IirRequest,(|req: server::IirRequest| { - c.resources.iir_ch.lock(|iir_ch| { - if req.channel > 1 { - return Err(()); - } - - iir_ch[req.channel as usize][IIR_CASCADE_LENGTH-1] = req.iir; - - Ok::(req) - }) - }), - "stabilizer/afe0/gain": hardware::AfeGain, (|gain| { - c.resources.afes.0.set_gain(gain); - Ok::<(), ()>(()) - }), - "stabilizer/afe1/gain": hardware::AfeGain, (|gain| { - c.resources.afes.1.set_gain(gain); - Ok::<(), ()>(()) - }) - ] - ) - }); - } - } - - let sleep = match c.resources.net_interface.poll( - &mut sockets, + let sleep = c.resources.network_stack.update( smoltcp::time::Instant::from_millis(time as i64), - ) { - Ok(changed) => !changed, - Err(smoltcp::Error::Unrecognized) => true, - Err(e) => { - info!("iface poll error: {:?}", e); - true - } - }; + ); - if sleep { - cortex_m::asm::wfi(); + match c.resources.mqtt_interface.lock(|interface| interface.update(time).unwrap()) { + Action::Sleep => cortex_m::asm::wfi(), + Action::Continue => {}, + Action::CommitSettings => c.spawn.settings_update().unwrap(); } } } + #[task(priority = 1, resources=[mqtt_interface, afes, iir_ch])] + fn settings_update(c: settings_update::Context) { + let settings = c.resources.mqtt_interface.current_settings(); + c.resources.iir_ch.lock(|iir_ch| *iir_ch = settings.iir); + c.resources.afes.0.set_gain(settings.afe_gain[0]); + c.resources.afes.1.set_gain(settings.afe_gain[1]); + } + #[task(binds = ETH, priority = 1)] fn eth(_: eth::Context) { unsafe { hal::ethernet::interrupt_handler() } diff --git a/src/hardware/configuration.rs b/src/hardware/configuration.rs index 18ca67f..925d684 100644 --- a/src/hardware/configuration.rs +++ b/src/hardware/configuration.rs @@ -21,20 +21,12 @@ use embedded_hal::digital::v2::{InputPin, OutputPin}; use super::{ adc, afe, dac, design_parameters, digital_input_stamper, eeprom, pounder, - timers, DdsOutput, Ethernet, AFE0, AFE1, + smoltcp_nal::NetStorage, timers, DdsOutput, NetworkStack, AFE0, AFE1, }; -// Network storage definition for the ethernet interface. -struct NetStorage { - ip_addrs: [smoltcp::wire::IpCidr; 1], - neighbor_cache: - [Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8], - routes_storage: [Option<(smoltcp::wire::IpCidr, smoltcp::iface::Route)>; 1], -} - /// The available networking devices on Stabilizer. pub struct NetworkDevices { - pub interface: Ethernet, + pub stack: NetworkStack, pub phy: ethernet::phy::LAN8742A, } @@ -71,7 +63,11 @@ static mut NET_STORE: NetStorage = NetStorage { smoltcp::wire::Ipv6Cidr::SOLICITED_NODE_PREFIX, )], neighbor_cache: [None; 8], - routes_storage: [None; 1], + routes_cache: [None; 8], + sockets: [None; 1], + + tx_storage: [0; 4096], + rx_storage: [0; 4096], }; /// Configure the stabilizer hardware for operation. @@ -515,7 +511,7 @@ pub fn setup( ); let default_v4_gw = Ipv4Address::new(10, 0, 16, 1); - let mut routes = Routes::new(&mut store.routes_storage[..]); + let mut routes = Routes::new(&mut store.routes_cache[..]); routes.add_default_ipv4_route(default_v4_gw).unwrap(); let neighbor_cache = @@ -528,8 +524,26 @@ pub fn setup( .routes(routes) .finalize(); + let sockets = { + // Note(unsafe): Configuration is only called once, so we only access the global + // storage a single time. + let socket_storage = unsafe { &mut NET_STORE.sockets }; + let mut sockets = smoltcp::socket::SocketSet::new(socket_storage); + + let tcp_socket = { + // Note(unsafe): Configuration is only called once, so we only access the global + // storage a single time. + let rx_storage = unsafe { &mut NET_STORE.rx_storage[..] }; + let tx_storage = unsafe { &mut NET_STORE.tx_storage[..] }; + smoltcp::socket::TcpSocket::new(rx_storage, tx_storage) + }; + + sockets.add(tcp_socket); + sockets + }; + NetworkDevices { - interface, + stack: NetworkStack::new(interface, sockets), phy: lan8742a, } }; diff --git a/src/hardware/mod.rs b/src/hardware/mod.rs index dc3aa25..2d638c4 100644 --- a/src/hardware/mod.rs +++ b/src/hardware/mod.rs @@ -15,6 +15,7 @@ mod design_parameters; mod digital_input_stamper; mod eeprom; mod pounder; +mod smoltcp_nal; mod timers; pub use adc::{Adc0Input, Adc1Input}; @@ -36,13 +37,15 @@ pub type AFE1 = afe::ProgrammableGainAmplifier< >; // Type alias for the ethernet interface on Stabilizer. -pub type Ethernet = smoltcp::iface::EthernetInterface< +type Ethernet = smoltcp::iface::EthernetInterface< 'static, 'static, 'static, hal::ethernet::EthernetDMA<'static>, >; +pub type NetworkStack = smoltcp_nal::NetworkStack<'static, 'static, 'static>; + pub use configuration::{setup, PounderDevices, StabilizerDevices}; #[inline(never)] diff --git a/src/hardware/mqtt_interface.rs b/src/hardware/mqtt_interface.rs new file mode 100644 index 0000000..7ce6544 --- /dev/null +++ b/src/hardware/mqtt_interface.rs @@ -0,0 +1,102 @@ +use super::NetworkStack; + +use minimq::{QoS, Error, Property, MqttClient}; + +pub enum Action { + Continue, + Sleep, + CommitSettings, +} + +struct MqttInterface { + client: MqttClient, + subscribed: bool, + settings: T, +} + +impl MqttInterface +where + T: miniconf::StringSet +{ + pub fn new(stack: NetworkStack, settings: T) -> Self { + let client: MqttClient = MqttClient::new( + IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), + "stabilizer", + stack).unwrap(); + + Self { + client, + subscribed: false, + settings, + } + } + + pub fn current_settings(&self) -> &T { + &self.settings + } + + pub fn update(&mut self, time: u32) -> Result { + + let sleep = self.client.network_stack.update(smoltcp::time::Instant::from_millis(time as i64)); + + if !self.subscribed && self.client.is_connected().unwrap() { + self.client.subscribe("stabilizer/settings/#", &[]); + self.client.subscribe("stabilizer/commit", &[]); + } + + let mut commit = false; + + match self.client.poll(|client, topic, message, properties| { + let split = topic.split('/').iter(); + // TODO: Verify topic ID against our ID. + let id = split.next().unwrap(); + + // Process the command + let command = split.next().unwrap(); + let response: String = match command { + "settings" => { + // Handle settings failures + let mut response: String = String::new(); + match self.settings.string_set(split.peekable(), message) { + Ok(_) => write!(&mut response, "{} written", topic).unwrap(), + Err(error) => { + write!(&mut response, "Settings failure: {}", error).unwrap(); + } + }; + + response + }, + "commit" => { + commit = true; + String::from("Committing pending settings"); + } + }; + + // Publish the response to the request over MQTT using the ResponseTopic property if + // possible. Otherwise, default to a logging topic. + if let Property::ResponseTopic(topic) = properties.iter().find(|&prop| { + if let Property::ResponseTopic(_) = *prop { + true + } else { + false + } + }).or(Some(&Property::ResponseTopic("stabilizer/log"))).unwrap() { + self.client.publish(topic, &response.into_bytes(), QoS::AtMostOnce, &[]).unwrap(); + } + }) { + Ok(_) => {}, + Err(Error::Disconnected) => self.subscribed = false, + Err(err) => error!("Unexpected error: {:?}", err) + }; + + let action = if commit { + Action::Commit + } else if sleep { + Action::Sleep + } else { + Action::Continue + }; + + Ok(action) + } +} diff --git a/src/hardware/smoltcp_nal.rs b/src/hardware/smoltcp_nal.rs new file mode 100644 index 0000000..8cd7f46 --- /dev/null +++ b/src/hardware/smoltcp_nal.rs @@ -0,0 +1,181 @@ +use core::cell::RefCell; +///! Network abstraction layer for smoltcp. +use heapless::{consts, Vec}; +use minimq::embedded_nal::{self as nal, nb}; + +use super::Ethernet; + +pub struct NetStorage { + pub ip_addrs: [smoltcp::wire::IpCidr; 1], + pub sockets: [Option>; 1], + pub neighbor_cache: + [Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8], + pub routes_cache: + [Option<(smoltcp::wire::IpCidr, smoltcp::iface::Route)>; 8], + pub tx_storage: [u8; 4096], + pub rx_storage: [u8; 4096], +} + +#[derive(Debug)] +pub enum NetworkError { + NoSocket, + ConnectionFailure, + ReadFailure, + WriteFailure, + Unsupported, +} + +pub struct NetworkStack<'a, 'b, 'c> { + network_interface: RefCell, + sockets: RefCell>, + next_port: RefCell, + unused_handles: RefCell>, +} + +impl<'a, 'b, 'c> NetworkStack<'a, 'b, 'c> { + pub fn new( + interface: Ethernet, + sockets: smoltcp::socket::SocketSet<'a, 'b, 'c>, + ) -> Self { + let mut unused_handles: Vec< + smoltcp::socket::SocketHandle, + consts::U16, + > = Vec::new(); + for socket in sockets.iter() { + unused_handles.push(socket.handle()).unwrap(); + } + + NetworkStack { + network_interface: RefCell::new(interface), + sockets: RefCell::new(sockets), + next_port: RefCell::new(49152), + unused_handles: RefCell::new(unused_handles), + } + } + + pub fn update(&self, time: u32) -> bool { + match self.network_interface.borrow_mut().poll( + &mut self.sockets.borrow_mut(), + smoltcp::time::Instant::from_millis(time as i64), + ) { + Ok(changed) => changed == false, + Err(e) => { + info!("{:?}", e); + true + } + } + } + + fn get_ephemeral_port(&self) -> u16 { + // Get the next ephemeral port + let current_port = self.next_port.borrow().clone(); + + let (next, wrap) = self.next_port.borrow().overflowing_add(1); + *self.next_port.borrow_mut() = if wrap { 49152 } else { next }; + + return current_port; + } +} + +impl<'a, 'b, 'c> NetworkStack<'a, 'b, 'c> { + fn open( + &self, + _mode: nal::Mode, + ) -> Result { + match self.unused_handles.borrow_mut().pop() { + Some(handle) => { + // Abort any active connections on the handle. + let mut sockets = self.sockets.borrow_mut(); + let internal_socket: &mut smoltcp::socket::TcpSocket = + &mut *sockets.get(handle); + internal_socket.abort(); + + Ok(handle) + } + None => Err(NetworkError::NoSocket), + } + } + + fn connect( + &self, + socket: smoltcp::socket::SocketHandle, + remote: nal::SocketAddr, + ) -> Result { + let mut sockets = self.sockets.borrow_mut(); + let internal_socket: &mut smoltcp::socket::TcpSocket = + &mut *sockets.get(socket); + + // If we're already in the process of connecting, ignore the request silently. + if internal_socket.is_open() { + return Ok(socket); + } + + match remote.ip() { + nal::IpAddr::V4(addr) => { + let octets = addr.octets(); + let address = smoltcp::wire::Ipv4Address::new( + octets[0], octets[1], octets[2], octets[3], + ); + internal_socket + .connect( + (address, remote.port()), + self.get_ephemeral_port(), + ) + .map_err(|_| NetworkError::ConnectionFailure)?; + Ok(socket) + } + + // We only support IPv4. + _ => Err(NetworkError::Unsupported), + } + } + + fn is_connected( + &self, + socket: &smoltcp::socket::SocketHandle, + ) -> Result { + let mut sockets = self.sockets.borrow_mut(); + let socket: &mut smoltcp::socket::TcpSocket = + &mut *sockets.get(*socket); + Ok(socket.may_send() && socket.may_recv()) + } + + fn write( + &self, + socket: &mut smoltcp::socket::SocketHandle, + buffer: &[u8], + ) -> nb::Result { + let mut sockets = self.sockets.borrow_mut(); + let socket: &mut smoltcp::socket::TcpSocket = + &mut *sockets.get(*socket); + socket + .send_slice(buffer) + .map_err(|_| nb::Error::Other(NetworkError::WriteFailure)) + } + + fn read( + &self, + socket: &mut smoltcp::socket::SocketHandle, + buffer: &mut [u8], + ) -> nb::Result { + let mut sockets = self.sockets.borrow_mut(); + let socket: &mut smoltcp::socket::TcpSocket = + &mut *sockets.get(*socket); + socket + .recv_slice(buffer) + .map_err(|_| nb::Error::Other(NetworkError::ReadFailure)) + } + + fn close( + &self, + socket: smoltcp::socket::SocketHandle, + ) -> Result<(), NetworkError> { + let mut sockets = self.sockets.borrow_mut(); + let internal_socket: &mut smoltcp::socket::TcpSocket = + &mut *sockets.get(socket); + internal_socket.close(); + + self.unused_handles.borrow_mut().push(socket).unwrap(); + Ok(()) + } +}