diff --git a/Cargo.lock b/Cargo.lock index 9cffcb5..9f13afc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,7 +38,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9a69a963b70ddacfcd382524f72a4576f359af9334b3bf48a79566590bb8bfa" dependencies = [ "bitrate", - "cortex-m 0.6.7", + "cortex-m 0.7.2", "embedded-hal", ] @@ -432,6 +432,12 @@ dependencies = [ "heapless 0.7.1", ] +[[package]] +name = "mutex-trait" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4bb1638d419e12f8b1c43d9e639abd0d1424285bdea2f76aa231e233c63cd3a" + [[package]] name = "nanorand" version = "0.5.2" @@ -778,6 +784,7 @@ dependencies = [ "mcp23017", "miniconf", "minimq", + "mutex-trait", "nb 1.0.0", "num_enum", "paste", @@ -803,7 +810,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b672c837e0ee8158ecc7fce0f9a948dd0693a9c588338e728d14b73307a0b7d" dependencies = [ "bare-metal 0.2.5", - "cortex-m 0.6.7", + "cortex-m 0.7.2", "cortex-m-rt", "vcell", ] diff --git a/Cargo.toml b/Cargo.toml index 44f96c7..6f52210 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ miniconf = "0.1.0" shared-bus = {version = "0.2.2", features = ["cortex-m"] } serde-json-core = "0.4" mcp23017 = "1.0" +mutex-trait = "0.2" # rtt-target bump [dependencies.rtt-logger] diff --git a/dsp/src/iir_int.rs b/dsp/src/iir_int.rs index 5ac75f5..c7dfe56 100644 --- a/dsp/src/iir_int.rs +++ b/dsp/src/iir_int.rs @@ -49,9 +49,9 @@ impl Coeff for Vec5 { #[derive(Copy, Clone, Default, Debug, MiniconfAtomic, Deserialize)] pub struct IIR { pub ba: Vec5, - // pub y_offset: i32, - // pub y_min: i32, - // pub y_max: i32, + pub y_offset: i32, + pub y_min: i32, + pub y_max: i32, } impl IIR { @@ -76,9 +76,9 @@ impl IIR { // Store x0 x0 x1 x2 y1 y2 xy[0] = x0; // Compute y0 by multiply-accumulate - let y0 = macc_i32(0, xy, &self.ba, IIR::SHIFT); + let y0 = macc_i32(self.y_offset, xy, &self.ba, IIR::SHIFT); // Limit y0 - // let y0 = y0.max(self.y_min).min(self.y_max); + let y0 = y0.max(self.y_min).min(self.y_max); // Store y0 x0 x1 y0 y1 y2 xy[n / 2] = y0; y0 diff --git a/dsp/src/lowpass.rs b/dsp/src/lowpass.rs index 265929b..7573e28 100644 --- a/dsp/src/lowpass.rs +++ b/dsp/src/lowpass.rs @@ -25,7 +25,6 @@ impl Lowpass { /// Filtered output y. pub fn update(&mut self, x: i32, k: u8) -> i32 { debug_assert!(k & 31 == k); - debug_assert!((k - 1) & 31 == k - 1); // This is an unrolled and optimized first-order IIR loop // that works for all possible time constants. // Note T-DF-I and the zeros at Nyquist. @@ -35,6 +34,6 @@ impl Lowpass { *y += dy; x = *y - (dy >> 1); } - x.saturating_add((self.y.len() as i32) << (k - 1)) + x.saturating_add((self.y.len() as i32) << (k - 1).max(0)) } } diff --git a/dsp/src/unwrap.rs b/dsp/src/unwrap.rs index 68ddd0c..b50c874 100644 --- a/dsp/src/unwrap.rs +++ b/dsp/src/unwrap.rs @@ -70,7 +70,7 @@ impl Unwrapper { mod tests { use super::*; #[test] - fn mini() { + fn overflowing_sub_correctness() { for (x0, x1, v) in [ (0i32, 0i32, 0i8), (0, 1, 0), @@ -101,4 +101,42 @@ mod tests { assert_eq!(dx, dx0); } } + + #[test] + fn saturating_scale_correctness() { + let shift = 8; + for (lo, hi, res) in [ + (0i32, 0i32, 0i32), + (0, 1, 0x0100_0000), + (0, -1, -0x0100_0000), + (0x100, 0, 1), + (-1 << 31, 0, -1 << 23), + (0x7fffffff, 0, 0x007f_ffff), + (0x7fffffff, 1, 0x0017f_ffff), + (-0x7fffffff, -1, -0x0180_0000), + (0x1234_5600, 0x7f, 0x7f12_3456), + (0x1234_5600, -0x7f, -0x7f00_0000 + 0x12_3456), + (0, 0x7f, 0x7f00_0000), + (0, 0x80, 0x7fff_ff80), + (0, -0x7f, -0x7f00_0000), + (0, -0x80, -0x7fff_ff80), + (0x7fff_ffff, 0x7f, 0x7f7f_ffff), + (-0x8000_0000, 0x7f, 0x7e80_0000), + (-0x8000_0000, -0x7f, -0x7f80_0000), + (0x7fff_ffff, -0x7f, -0x7e80_0001), + (0x100, 0x7f, 0x7f00_0001), + (0, -0x80, -0x7fff_ff80), + (-1 << 31, 0x80, 0x7fff_ff80), + (-1 << 31, -0x80, -0x7fff_ff80), + ] + .iter() + { + let s = saturating_scale(*lo, *hi, shift); + assert_eq!( + *res, s, + "{:#x} != {:#x} = saturating_scale({:#x}, {:#x}, {:#x})", + *res, s, *lo, *hi, shift + ); + } + } } diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index a27fc3d..2e45be6 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -4,9 +4,10 @@ use core::sync::atomic::{fence, Ordering}; +use mutex_trait::prelude::*; + use dsp::iir; use stabilizer::{ - flatten_closures, hardware::{ self, adc::{Adc0Input, Adc1Input, AdcCode}, @@ -176,7 +177,7 @@ const APP: () = { let hold = settings.force_hold || (digital_inputs[1] && settings.allow_hold); - flatten_closures!(with_buffer, adc0, adc1, dac0, dac1, { + (adc0, adc1, dac0, dac1).lock(|adc0, adc1, dac0, dac1| { let adc_samples = [adc0, adc1]; let dac_samples = [dac0, dac1]; diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index d0af31e..6b7510b 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -4,9 +4,10 @@ use core::sync::atomic::{fence, Ordering}; +use mutex_trait::prelude::*; + use dsp::{Accu, Complex, ComplexExt, Lockin, RPLL}; use stabilizer::{ - flatten_closures, hardware::{ self, adc::{Adc0Input, Adc1Input, AdcCode}, @@ -234,7 +235,7 @@ const APP: () = { reference_phase.wrapping_mul(settings.lockin_harmonic), ); - flatten_closures!(with_buffer, adc0, adc1, dac0, dac1, { + (adc0, adc1, dac0, dac1).lock(|adc0, adc1, dac0, dac1| { let adc_samples = [adc0, adc1]; let mut dac_samples = [dac0, dac1]; diff --git a/src/hardware/adc.rs b/src/hardware/adc.rs index 26752f7..9b1a833 100644 --- a/src/hardware/adc.rs +++ b/src/hardware/adc.rs @@ -67,6 +67,8 @@ ///! buffer mode DMA disable/enable and buffer update sequence is slow. use stm32h7xx_hal as hal; +use mutex_trait::Mutex; + use super::design_parameters::{SampleBuffer, SAMPLE_BUFFER_SIZE}; use super::timers; @@ -367,6 +369,15 @@ macro_rules! adc_input { unsafe { self.transfer.next_dbm_transfer_with(|buf, _current| f(buf)) } } } + + // This is not actually a Mutex. It only re-uses the semantics and macros of mutex-trait + // to reduce rightward drift when jointly calling `with_buffer(f)` on multiple DAC/ADCs. + impl Mutex for $name { + type Data = SampleBuffer; + fn lock(&mut self, f: impl FnOnce(&mut Self::Data) -> R) -> R { + self.with_buffer(f).unwrap() + } + } } }; } diff --git a/src/hardware/dac.rs b/src/hardware/dac.rs index 9f9a330..90b75c7 100644 --- a/src/hardware/dac.rs +++ b/src/hardware/dac.rs @@ -52,6 +52,8 @@ ///! served promptly after the transfer completes. use stm32h7xx_hal as hal; +use mutex_trait::Mutex; + use super::design_parameters::{SampleBuffer, SAMPLE_BUFFER_SIZE}; use super::timers; @@ -242,6 +244,15 @@ macro_rules! dac_output { } } } + + // This is not actually a Mutex. It only re-uses the semantics and macros of mutex-trait + // to reduce rightward drift when jointly calling `with_buffer(f)` on multiple DAC/ADCs. + impl Mutex for $name { + type Data = SampleBuffer; + fn lock(&mut self, f: impl FnOnce(&mut Self::Data) -> R) -> R { + self.with_buffer(f).unwrap() + } + } }; } diff --git a/src/hardware/setup.rs b/src/hardware/setup.rs index 20842ae..f647047 100644 --- a/src/hardware/setup.rs +++ b/src/hardware/setup.rs @@ -1,6 +1,8 @@ ///! Stabilizer hardware configuration ///! ///! This file contains all of the hardware-specific configuration of Stabilizer. +use core::sync::atomic::{self, AtomicBool, Ordering}; +use core::{ptr, slice}; use stm32h7xx_hal::{ self as hal, ethernet::{self, PHY}, @@ -149,8 +151,6 @@ fn load_itcm() { static mut __eitcm: u32; static mut __siitcm: u32; } - use core::{ptr, slice, sync::atomic}; - // NOTE(unsafe): Assuming the address symbols from the linker as well as // the source instruction data are all valid, this is safe as it only // copies linker-prepared data to where the code expects it to be. @@ -163,7 +163,7 @@ fn load_itcm() { ptr::write_volatile(ITCMCR, ptr::read_volatile(ITCMCR) | 1); // Ensure ITCM is enabled before loading. - atomic::fence(atomic::Ordering::SeqCst); + atomic::fence(Ordering::SeqCst); let len = (&__eitcm as *const u32).offset_from(&__sitcm as *const _) as usize; @@ -174,7 +174,7 @@ fn load_itcm() { } // Ensure ITCM is loaded before potentially executing any instructions from it. - atomic::fence(atomic::Ordering::SeqCst); + atomic::fence(Ordering::SeqCst); cortex_m::asm::dsb(); cortex_m::asm::isb(); } @@ -224,10 +224,38 @@ pub fn setup( // Enable debug during WFE/WFI-induced sleep device.DBGMCU.cr.modify(|_, w| w.dbgsleep_d1().set_bit()); - use rtt_logger::RTTLogger; + // Set up RTT channel to use for `rprintln!()` as "best effort". + // This removes a critical section around the logging and thus allows + // high-prio tasks to always interrupt at low latency. + // It comes at a cost: + // If a high-priority tasks preempts while we are logging something, + // and if we then also want to log from within that high-preiority task, + // the high-prio log message will be lost. - static LOGGER: RTTLogger = RTTLogger::new(log::LevelFilter::Info); - rtt_target::rtt_init_print!(); + let channels = rtt_target::rtt_init_default!(); + // Note(unsafe): The closure we pass does not establish a critical section + // as demanded but it does ensure synchronization and implements a lock. + unsafe { + rtt_target::set_print_channel_cs( + channels.up.0, + &((|arg, f| { + static LOCKED: AtomicBool = AtomicBool::new(false); + if LOCKED.compare_exchange_weak( + false, + true, + Ordering::Acquire, + Ordering::Relaxed, + ) == Ok(false) + { + f(arg); + LOCKED.store(false, Ordering::Release); + } + }) as rtt_target::CriticalSectionFunc), + ); + } + + static LOGGER: rtt_logger::RTTLogger = + rtt_logger::RTTLogger::new(log::LevelFilter::Info); log::set_logger(&LOGGER) .map(|()| log::set_max_level(log::LevelFilter::Trace)) .unwrap(); diff --git a/src/lib.rs b/src/lib.rs index e1fdecf..85964a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,16 +3,3 @@ pub mod hardware; pub mod net; - -/// Macro to reduce rightward drift when calling the same closure-based API -/// on multiple structs simultaneously, e.g. when accessing DMA buffers. -/// This could be improved a bit using the tuple-based style from `mutex-trait`. -#[macro_export] -macro_rules! flatten_closures { - ($fn:ident, $e:ident, $fun:block) => { - $e.$fn(|$e| $fun ).unwrap() - }; - ($fn:ident, $e:ident, $($es:ident),+, $fun:block) => { - $e.$fn(|$e| flatten_closures!($fn, $($es),*, $fun)).unwrap() - }; -} diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 61bceba..4b926ed 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -30,21 +30,12 @@ const BLOCK_BUFFER_SIZE: usize = 30; const SUBSAMPLE_RATE: usize = 1; /// Represents the destination for the UDP stream to send data to. -#[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize)] +#[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize, Default)] pub struct StreamTarget { pub ip: [u8; 4], pub port: u16, } -impl Default for StreamTarget { - fn default() -> Self { - Self { - ip: [0; 4], - port: 0, - } - } -} - impl From for SocketAddr { fn from(target: StreamTarget) -> SocketAddr { SocketAddr::new( @@ -271,30 +262,28 @@ impl DataStream { } fn close(&mut self) { - // Note(unwrap): We guarantee that the socket is available above. - let socket = self.socket.take().unwrap(); - self.stack.close(socket).unwrap(); + if let Some(socket) = self.socket.take() { + log::info!("Closing stream"); + // Note(unwrap): We guarantee that the socket is available above. + self.stack.close(socket).unwrap(); + } } - fn open(&mut self, remote: SocketAddr) -> Result<(), ()> { - if self.socket.is_some() { - self.close(); - } - - // If the remote address is unspecified, just close the existing socket. - if remote.ip().is_unspecified() { - if self.socket.is_some() { - self.close(); - } - + // Open new socket. + fn open(&mut self) -> Result<(), ()> { + // If there is already a socket of if remote address is unspecified, + // do not open a new socket. + if self.socket.is_some() || self.remote.ip().is_unspecified() { return Err(()); } - let mut socket = self.stack.socket().map_err(|_| ())?; + log::info!("Opening stream"); + + let mut socket = self.stack.socket().or(Err(()))?; // Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be // bound. - self.stack.connect(&mut socket, remote).unwrap(); + self.stack.connect(&mut socket, self.remote).unwrap(); self.socket.replace(socket); @@ -306,48 +295,43 @@ impl DataStream { /// # Args /// * `remote` - The destination to send stream data to. pub fn set_remote(&mut self, remote: SocketAddr) { - // If the remote is identical to what we already have, do nothing. - if remote == self.remote { - return; + // Close socket to be reopened if the remote has changed. + if remote != self.remote { + self.close(); } - - // Open the new remote connection. - self.open(remote).ok(); self.remote = remote; } /// Process any data for transmission. pub fn process(&mut self) { - // If there's no socket available, try to connect to our remote. - if self.socket.is_none() { - // If we can't open the socket (e.g. we do not have an IP address yet), clear data from - // the queue. - if self.open(self.remote).is_err() { - while self.queue.ready() { - self.queue.dequeue(); + match self.socket.as_mut() { + None => { + // If there's no socket available, try to connect to our remote. + if self.open().is_ok() { + // If we just successfully opened the socket, flush old data from queue. + while self.queue.dequeue().is_some() {} } - return; } - } + Some(handle) => { + if self.queue.ready() { + // Dequeue data from the queue into a larger block structure. + let mut packet = + DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); + while self + .queue + .peek() + .and_then(|batch| packet.add_batch(batch).ok()) + .is_some() + { + // Dequeue the batch that we just added to the packet. + self.queue.dequeue(); + } - if self.queue.ready() { - // Dequeue data from the queue into a larger block structure. - let mut packet = DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); - while self.queue.ready() { - // Note(unwrap): We check above that the queue is ready before calling this. - if packet.add_batch(self.queue.peek().unwrap()).is_err() { - // If we cannot add another batch, break out of the loop and send the packet. - break; + // Transmit the data packet. + let size = packet.finish(); + self.stack.send(handle, &self.buffer[..size]).ok(); } - - // Remove the batch that we just added. - self.queue.dequeue(); } - - // Transmit the data block. - let mut handle = self.socket.as_mut().unwrap(); - let size = packet.finish(); - self.stack.send(&mut handle, &self.buffer[..size]).ok(); } } } diff --git a/src/net/miniconf_client.rs b/src/net/miniconf_client.rs index 4eaab07..4bdd2b1 100644 --- a/src/net/miniconf_client.rs +++ b/src/net/miniconf_client.rs @@ -80,6 +80,7 @@ where // If we're no longer subscribed to the settings topic, but we are connected to the broker, // resubscribe. if !self.subscribed && mqtt_connected { + log::info!("MQTT connected, subscribing to settings"); // Note(unwrap): We construct a string with two more characters than the prefix // strucutre, so we are guaranteed to have space for storage. let mut settings_topic: String<66> = @@ -115,6 +116,8 @@ where } }; + log::info!("Settings update: `{}`", path); + let message: SettingsResponse = settings .string_set(path.split('/').peekable(), message) .map(|_| { @@ -140,13 +143,13 @@ where Ok(_) if update => UpdateState::Updated, Ok(_) => UpdateState::NoChange, Err(minimq::Error::SessionReset) => { + log::warn!("Settings MQTT session reset"); self.subscribed = false; UpdateState::NoChange } Err(minimq::Error::Network( smoltcp_nal::NetworkError::NoIpAddress, )) => UpdateState::NoChange, - Err(error) => { log::info!("Unexpected error: {:?}", error); UpdateState::NoChange diff --git a/src/net/network_processor.rs b/src/net/network_processor.rs index 13f7035..eeb4f06 100644 --- a/src/net/network_processor.rs +++ b/src/net/network_processor.rs @@ -45,12 +45,16 @@ impl NetworkProcessor { pub fn handle_link(&mut self) { // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network // stack. - match self.phy.poll_link() { - true => self.network_was_reset = false, - + let link_up = self.phy.poll_link(); + match (link_up, self.network_was_reset) { + (true, true) => { + log::warn!("Network link UP"); + self.network_was_reset = false; + } // Only reset the network stack once per link reconnection. This prevents us from // sending an excessive number of DHCP requests. - false if !self.network_was_reset => { + (false, false) => { + log::warn!("Network link DOWN"); self.network_was_reset = true; self.stack.lock(|stack| stack.handle_link_reset()); }