From de63be09e4e21f22c633295e831bff40410f049b Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 27 Jul 2021 13:12:57 +0200 Subject: [PATCH] Updating after review --- scripts/stream_throughput.py | 14 +++---- src/bin/dual-iir.rs | 5 ++- src/bin/lockin.rs | 5 ++- src/net/data_stream.rs | 79 +++++++++++++++++------------------- src/net/mod.rs | 5 ++- 5 files changed, 54 insertions(+), 54 deletions(-) diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index bbcb411..ca3ccd4 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -22,7 +22,7 @@ HEADER_FORMAT = ' -//! * Format Code -//! * Batch Size -//! * Sequence Number -//! -//! The "Magic word" is a constant field for all packets. The value is alway 0x057B. -//! -//! The "Format Code" is a unique specifier that indicates the serialization format of each batch of -//! data in the frame. Refer to [StreamFormat] for further information. -//! -//! The "Batch size" is the value of [SAMPLE_BUFFER_SIZE]. -//! -//! The "Sequence Number" is an identifier that increments for ever execution of the DSP process. -//! This can be used to determine if a stream frame was lost. +//! * **Magic word 0x057B** : a constant to identify Stabilizer streaming data. +//! * **Format Code** : a unique ID that indicates the serialization format of each batch of data +//! in the frame. Refer to [StreamFormat] for further information. +//! * **Batch Size** : the number of samples in each batch of data. +//! * **Sequence Number** : an the sequence number of the first batch in the frame. +//! This can be used to determine if and how many stream batches are lost. //! //! # Example //! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception //! of livestreamed data. use heapless::spsc::{Consumer, Producer, Queue}; use miniconf::MiniconfAtomic; +use num_enum::IntoPrimitive; use serde::Deserialize; use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; -use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; - use heapless::pool::{Box, Init, Pool, Uninit}; use super::NetworkReference; const MAGIC_WORD: u16 = 0x057B; +// The size of the header, calculated in bytes. +// The header has a 16-bit magic word, an 8-bit format, 8-bit batch-size, and 32-bit sequence +// number, which corresponds to 8 bytes total. const HEADER_SIZE: usize = 8; // The number of frames that can be buffered. @@ -52,6 +45,10 @@ const FRAME_COUNT: usize = 4; // The size of each livestream frame in bytes. const FRAME_SIZE: usize = 1024 + HEADER_SIZE; +// The size of the frame queue must be at least as large as the number of frame buffers. Every +// allocated frame buffer should fit in the queue. +const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2; + // Static storage used for a heapless::Pool of frame buffers. static mut FRAME_DATA: [u8; FRAME_SIZE * FRAME_COUNT] = [0; FRAME_SIZE * FRAME_COUNT]; @@ -74,7 +71,7 @@ pub struct StreamTarget { /// Specifies the format of streamed data #[repr(u8)] -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, IntoPrimitive)] pub enum StreamFormat { /// Reserved, unused format specifier. Unknown = 0, @@ -89,12 +86,6 @@ pub enum StreamFormat { AdcDacData = 1, } -impl From for u8 { - fn from(format: StreamFormat) -> u8 { - format as u8 - } -} - impl From for SocketAddr { fn from(target: StreamTarget) -> SocketAddr { SocketAddr::new( @@ -120,8 +111,10 @@ impl From for SocketAddr { pub fn setup_streaming( stack: NetworkReference, ) -> (FrameGenerator, DataStream) { + // The queue needs to be at least as large as the frame count to ensure that every allocated + // frame can potentially be enqueued for transmission. let queue = - cortex_m::singleton!(: Queue = Queue::new()) + cortex_m::singleton!(: Queue = Queue::new()) .unwrap(); let (producer, consumer) = queue.split(); @@ -167,17 +160,13 @@ impl StreamFrame { where F: FnMut(&mut [u8]), { - assert!(!self.is_full::(), "Batch cannot be added to full frame"); - - let result = f(&mut self.buffer[self.offset..self.offset + T]); + f(&mut self.buffer[self.offset..self.offset + T]); self.offset += T; - - result } pub fn is_full(&self) -> bool { - self.offset + T >= self.buffer.len() + self.offset + T > self.buffer.len() } pub fn finish(&mut self) -> &[u8] { @@ -187,37 +176,42 @@ impl StreamFrame { /// The data generator for a stream. pub struct FrameGenerator { - queue: Producer<'static, StreamFrame, FRAME_COUNT>, + queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, pool: &'static Pool<[u8; FRAME_SIZE]>, current_frame: Option, sequence_number: u32, format: u8, + batch_size: u8, } impl FrameGenerator { fn new( - queue: Producer<'static, StreamFrame, FRAME_COUNT>, + queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, pool: &'static Pool<[u8; FRAME_SIZE]>, ) -> Self { Self { queue, pool, + batch_size: 0, format: StreamFormat::Unknown.into(), current_frame: None, sequence_number: 0, } } - /// Specify the format of the stream. + /// Configure the format of the stream. /// /// # Note: - /// This function may only be called once upon initializing streaming + /// This function shall only be called once upon initializing streaming /// /// # Args /// * `format` - The desired format of the stream. + /// * `batch_size` - The number of samples in each data batch. See + /// [crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE] #[doc(hidden)] - pub(crate) fn set_format(&mut self, format: impl Into) { + pub(crate) fn configure(&mut self, format: impl Into, batch_size: u8) { self.format = format.into(); + self.batch_size = batch_size; } /// Add a batch to the current stream frame. @@ -237,7 +231,7 @@ impl FrameGenerator { self.current_frame.replace(StreamFrame::new( buffer, self.format as u8, - SAMPLE_BUFFER_SIZE as u8, + self.batch_size, sequence_number, )); } else { @@ -245,11 +239,14 @@ impl FrameGenerator { } } + // Note(unwrap): We ensure the frame is present above. let current_frame = self.current_frame.as_mut().unwrap(); current_frame.add_batch::<_, T>(f); if current_frame.is_full::() { + // Note(unwrap): The queue is designed to be at least as large as the frame buffer + // count, so this enqueue should always succeed. self.queue .enqueue(self.current_frame.take().unwrap()) .unwrap(); @@ -264,7 +261,7 @@ impl FrameGenerator { pub struct DataStream { stack: NetworkReference, socket: Option<::UdpSocket>, - queue: Consumer<'static, StreamFrame, FRAME_COUNT>, + queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>, frame_pool: &'static Pool<[u8; FRAME_SIZE]>, remote: SocketAddr, } @@ -278,7 +275,7 @@ impl DataStream { /// * `frame_pool` - The Pool to return stream frame objects into. fn new( stack: NetworkReference, - consumer: Consumer<'static, StreamFrame, FRAME_COUNT>, + consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>, frame_pool: &'static Pool<[u8; FRAME_SIZE]>, ) -> Self { Self { diff --git a/src/net/mod.rs b/src/net/mod.rs index ca1bb3d..da82999 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -116,12 +116,13 @@ where /// /// # Args /// * `format` - A unique u8 code indicating the format of the data. - pub fn enable_streaming( + pub fn configure_streaming( &mut self, format: impl Into, + batch_size: u8, ) -> FrameGenerator { let mut generator = self.generator.take().unwrap(); - generator.set_format(format); + generator.configure(format, batch_size); generator }