From 8d4193ed62948af74588b6c35d1dedb4e986efb8 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 26 Jul 2021 12:24:36 +0200 Subject: [PATCH] Updating after refactor --- src/bin/dual-iir.rs | 56 ++++++++----------------- src/bin/lockin.rs | 56 ++++++++----------------- src/net/data_stream.rs | 93 +++++++++++++++++++++++------------------- src/net/mod.rs | 9 +++- 4 files changed, 95 insertions(+), 119 deletions(-) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 57521ff..b77091c 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -194,7 +194,7 @@ const APP: () = { stabilizer.net.mac_address, ); - let generator = network.enable_streaming(); + let generator = network.enable_streaming(StreamFormat::AdcDacData); // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); @@ -308,43 +308,23 @@ const APP: () = { } // Stream the data. - generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>( - StreamFormat::AdcDacData, - |buf| unsafe { - let dst = buf.as_ptr() as usize as *mut u16; - - let adc0 = &adc_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping( - adc0, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let adc1 = &adc_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping( - adc1, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac0 = &dac_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping( - dac0, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac1 = &dac_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping( - dac1, - dst, - SAMPLE_BUFFER_SIZE, - ); - }, - ); + const N: usize = SAMPLE_BUFFER_SIZE * core::mem::size_of::(); + generator.add::<_, { N * 4 }>(|buf| { + for (data, buf) in adc_samples + .iter() + .chain(dac_samples.iter()) + .zip(buf.chunks_exact_mut(N)) + { + assert_eq!(core::mem::size_of_val(data), N); + let data = unsafe { + core::slice::from_raw_parts( + data.as_ptr() as *const u8, + N, + ) + }; + buf.copy_from_slice(data) + } + }); // Update telemetry measurements. telemetry.adcs = diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index 7e09506..c0bc924 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -231,7 +231,7 @@ const APP: () = { stabilizer.net.mac_address, ); - let generator = network.enable_streaming(); + let generator = network.enable_streaming(StreamFormat::AdcDacData); let settings = Settings::default(); @@ -396,43 +396,23 @@ const APP: () = { } // Stream the data. - generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>( - StreamFormat::AdcDacData, - |buf| unsafe { - let dst = buf.as_ptr() as usize as *mut u16; - - let adc0 = &adc_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping( - adc0, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let adc1 = &adc_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping( - adc1, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac0 = &dac_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping( - dac0, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac1 = &dac_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping( - dac1, - dst, - SAMPLE_BUFFER_SIZE, - ); - }, - ); + const N: usize = SAMPLE_BUFFER_SIZE * core::mem::size_of::(); + generator.add::<_, { N * 4 }>(|buf| { + for (data, buf) in adc_samples + .iter() + .chain(dac_samples.iter()) + .zip(buf.chunks_exact_mut(N)) + { + assert_eq!(core::mem::size_of_val(data), N); + let data = unsafe { + core::slice::from_raw_parts( + data.as_ptr() as *const u8, + N, + ) + }; + buf.copy_from_slice(data) + } + }); // Update telemetry measurements. telemetry.adcs = diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 519f3fb..199360c 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -12,22 +12,21 @@ //! ## Header Format //! The header of each stream frame consists of 7 bytes. All data is stored in little-endian format. //! -//! Elements appear sequentiall as follows: -//! * Sequence Number -//! * Format Code -//! * Batch Count <16> -//! * Batch size +//! Elements appear sequentially as follows: +//! * Magic word 0x057B +//! * Format Code +//! * Batch Size +//! * Sequence Number //! -//! The "Sequence Number" is an identifier that increments for ever execution of the DSP process. -//! This can be used to determine if a stream frame was lost. +//! The "Magic word" is a constant field for all packets. The value is alway 0x057B. //! //! The "Format Code" is a unique specifier that indicates the serialization format of each batch of //! data in the frame. Refer to [StreamFormat] for further information. //! -//! The "Batch Count" indicates how many batches are present in the current frame. +//! The "Batch size" is the value of [SAMPLE_BUFFER_SIZE]. //! -//! The "Batch Size" specifies the [crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE] -//! parameter, which can be used to determine the number of samples per batch. +//! The "Sequence Number" is an identifier that increments for ever execution of the DSP process. +//! This can be used to determine if a stream frame was lost. //! //! # Example //! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception @@ -43,6 +42,10 @@ use heapless::pool::{Box, Init, Pool, Uninit}; use super::NetworkReference; +const MAGIC_WORD: u16 = 0x057B; + +const HEADER_SIZE: usize = 8; + // The number of frames that can be buffered. const FRAME_COUNT: usize = 4; @@ -70,9 +73,12 @@ pub struct StreamTarget { } /// Specifies the format of streamed data -#[repr(u16)] +#[repr(u8)] #[derive(Debug, Copy, Clone, PartialEq)] pub enum StreamFormat { + /// Reserved, unused format specifier. + Unknown = 0, + /// Streamed data contains ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format. /// /// # Example @@ -80,7 +86,7 @@ pub enum StreamFormat { /// ``` /// /// ``` - AdcDacData = 0, + AdcDacData = 1, } impl From for SocketAddr { @@ -128,27 +134,25 @@ pub fn setup_streaming( } struct StreamFrame { - format: StreamFormat, - sequence_number: u16, buffer: Box<[u8; FRAME_SIZE], Init>, offset: usize, - batch_count: u16, - batch_size: u8, } impl StreamFrame { pub fn new( buffer: Box<[u8; FRAME_SIZE], Uninit>, - format: StreamFormat, - sequence_number: u16, + format: u8, + buffer_size: u8, + sequence_number: u32, ) -> Self { + let mut buffer = unsafe { buffer.assume_init() }; + buffer[0..2].copy_from_slice(&MAGIC_WORD.to_ne_bytes()); + buffer[2] = format; + buffer[3] = buffer_size; + buffer[4..8].copy_from_slice(&sequence_number.to_ne_bytes()); Self { - format, - offset: 7, - sequence_number, - buffer: unsafe { buffer.assume_init() }, - batch_size: SAMPLE_BUFFER_SIZE as u8, - batch_count: 0, + buffer, + offset: HEADER_SIZE, } } @@ -161,7 +165,6 @@ impl StreamFrame { let result = f(&mut self.buffer[self.offset..self.offset + T]); self.offset += T; - self.batch_count = self.batch_count.checked_add(1).unwrap(); result } @@ -171,12 +174,7 @@ impl StreamFrame { } pub fn finish(&mut self) -> &[u8] { - let offset = self.offset; - self.buffer[0..2].copy_from_slice(&self.sequence_number.to_ne_bytes()); - self.buffer[2..4].copy_from_slice(&(self.format as u16).to_ne_bytes()); - self.buffer[4..6].copy_from_slice(&self.batch_count.to_ne_bytes()); - self.buffer[6] = self.batch_size; - &self.buffer[..offset] + &self.buffer[..self.offset] } } @@ -185,7 +183,8 @@ pub struct FrameGenerator { queue: Producer<'static, StreamFrame, FRAME_COUNT>, pool: &'static Pool<[u8; FRAME_SIZE]>, current_frame: Option, - sequence_number: u16, + sequence_number: u32, + format: StreamFormat, } impl FrameGenerator { @@ -196,18 +195,32 @@ impl FrameGenerator { Self { queue, pool, + format: StreamFormat::Unknown, current_frame: None, sequence_number: 0, } } + /// Specify the format of the stream. + /// + /// # Note: + /// This function may only be called once upon initializing streaming + /// + /// # Args + /// * `format` - The desired format of the stream. + #[doc(hidden)] + pub(crate) fn set_format(&mut self, format: StreamFormat) { + assert!(self.format == StreamFormat::Unknown); + assert!(format != StreamFormat::Unknown); + self.format = format; + } + /// Add a batch to the current stream frame. /// /// # Args - /// * `format` - The format of the stream. This must be the same for each execution. /// * `f` - A closure that will be provided the buffer to write batch data into. The buffer will /// be the size of the `T` template argument. - pub fn add(&mut self, format: StreamFormat, f: F) + pub fn add(&mut self, f: F) where F: FnMut(&mut [u8]), { @@ -218,7 +231,8 @@ impl FrameGenerator { if let Some(buffer) = self.pool.alloc() { self.current_frame.replace(StreamFrame::new( buffer, - format, + self.format as u8, + SAMPLE_BUFFER_SIZE as u8, sequence_number, )); } else { @@ -226,14 +240,11 @@ impl FrameGenerator { } } - assert!( - format == self.current_frame.as_ref().unwrap().format, - "Unexpected stream format encountered" - ); + let current_frame = self.current_frame.as_mut().unwrap(); - self.current_frame.as_mut().unwrap().add_batch::<_, T>(f); + current_frame.add_batch::<_, T>(f); - if self.current_frame.as_ref().unwrap().is_full::() { + if current_frame.is_full::() { if self .queue .enqueue(self.current_frame.take().unwrap()) diff --git a/src/net/mod.rs b/src/net/mod.rs index 65541ed..fe0a7e9 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -113,8 +113,13 @@ where } /// Enable live data streaming. - pub fn enable_streaming(&mut self) -> FrameGenerator { - self.generator.take().unwrap() + pub fn enable_streaming( + &mut self, + format: data_stream::StreamFormat, + ) -> FrameGenerator { + let mut generator = self.generator.take().unwrap(); + generator.set_format(format); + generator } /// Direct the stream to the provided remote target.