From 9b3bb62811a69a9eb9cc8a24fc4330f4081edb1d Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 22 Jul 2021 14:45:58 +0200 Subject: [PATCH 01/13] Adding WIP refactored streaming API --- Cargo.lock | 3 +- Cargo.toml | 4 + scripts/stream_throughput.py | 11 +- src/bin/dual-iir.rs | 18 +- src/bin/lockin.rs | 20 ++- src/net/data_stream.rs | 340 ++++++++++++----------------------- src/net/mod.rs | 6 +- 7 files changed, 161 insertions(+), 241 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4959e1f..40b6751 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -353,8 +353,7 @@ dependencies = [ [[package]] name = "heapless" version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34e26526e7168021f34243a3c8faac4dc4f938cde75a0f9b8e373cca5eb4e7ce" +source = "git+https://github.com/quartiq/heapless.git?branch=feature/assume-init#0139ab11d55c6924dafd5d99ac9eda92bd0df77b" dependencies = [ "atomic-polyfill", "hash32 0.2.1", diff --git a/Cargo.toml b/Cargo.toml index 407d568..64bb296 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,10 @@ rev = "33aa67d" git = "https://github.com/rust-embedded/cortex-m-rt.git" rev = "a2e3ad5" +[patch.crates-io.heapless] +git = "https://github.com/quartiq/heapless.git" +branch = "feature/assume-init" + [patch.crates-io.miniconf] git = "https://github.com/quartiq/miniconf.git" rev = "9c826f8" diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index 2981b0c..d7ce9b8 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -13,6 +13,12 @@ import logging # Representation of a single UDP packet transmitted by Stabilizer. Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac']) +Format = collections.namedtuple('Format', ['batch_size', 'batches_per_frame']) + +FORMAT = { + 0: Format(8, 255) +} + class Timer: """ A basic timer for measuring elapsed time periods. """ @@ -88,9 +94,10 @@ class PacketParser: if len(self.buf) < 4: return None - start_id, num_blocks, data_size = struct.unpack_from('!HBB', self.buf) + start_id, format_id = struct.unpack_from('!HH', self.buf) - packet_size = 4 + data_size * num_blocks * 8 + frame_format = FORMAT[format_id] + packet_size = 4 + frame_format.batch_size * frame_format.batches_per_frame * 8 if len(self.buf) < packet_size: return None diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 39cdefe..73fd897 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -43,6 +43,7 @@ use stabilizer::{ adc::{Adc0Input, Adc1Input, AdcCode}, afe::Gain, dac::{Dac0Output, Dac1Output, DacCode}, + design_parameters::SAMPLE_BUFFER_SIZE, embedded_hal::digital::v2::InputPin, hal, signal_generator::{self, SignalGenerator}, @@ -50,7 +51,7 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ - data_stream::{BlockGenerator, StreamTarget}, + data_stream::{FrameGenerator, StreamTarget}, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, @@ -169,7 +170,7 @@ const APP: () = { adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), network: NetworkUsers, - generator: BlockGenerator, + generator: FrameGenerator, signal_generator: [SignalGenerator; 2], settings: Settings, @@ -307,7 +308,18 @@ const APP: () = { } // Stream the data. - generator.send(&adc_samples, &dac_samples); + generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>(0, |buf| { + let mut offset = 0; + for device in [adc_samples.iter(), dac_samples.iter()] { + for channel in device { + for sample in channel.iter() { + buf[offset..offset + 2] + .copy_from_slice(&sample.to_ne_bytes()); + offset += 2; + } + } + } + }); // Update telemetry measurements. telemetry.adcs = diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index af7a747..c21b4f8 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -43,6 +43,7 @@ use stabilizer::{ adc::{Adc0Input, Adc1Input, AdcCode}, afe::Gain, dac::{Dac0Output, Dac1Output, DacCode}, + design_parameters::SAMPLE_BUFFER_SIZE, embedded_hal::digital::v2::InputPin, hal, input_stamper::InputStamper, @@ -51,7 +52,7 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ - data_stream::{BlockGenerator, StreamTarget}, + data_stream::{FrameGenerator, StreamTarget}, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, @@ -208,7 +209,7 @@ const APP: () = { settings: Settings, telemetry: TelemetryBuffer, digital_inputs: (DigitalInput0, DigitalInput1), - generator: BlockGenerator, + generator: FrameGenerator, signal_generator: signal_generator::SignalGenerator, timestamper: InputStamper, @@ -394,8 +395,19 @@ const APP: () = { } } - // Stream data - generator.send(&adc_samples, &dac_samples); + // Stream the data. + generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>(0, |buf| { + let mut offset = 0; + for device in [adc_samples.iter(), dac_samples.iter()] { + for channel in device { + for sample in channel.iter() { + buf[offset..offset + 2] + .copy_from_slice(&sample.to_ne_bytes()); + offset += 2; + } + } + } + }); // Update telemetry measurements. telemetry.adcs = diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index e5b69ce..0e37bea 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -15,14 +15,13 @@ use miniconf::MiniconfAtomic; use serde::Deserialize; use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; +use heapless::pool::{Box, Init, Pool, Uninit}; + use super::NetworkReference; -use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; -// The number of data blocks that we will buffer in the queue. -const BLOCK_BUFFER_SIZE: usize = 30; +const FRAME_COUNT: usize = 4; -// A factor that data may be subsampled at. -const SUBSAMPLE_RATE: usize = 1; +static mut FRAME_DATA: [u8; 5200] = [0; 5200]; /// Represents the destination for the UDP stream to send data to. /// @@ -54,15 +53,6 @@ impl From for SocketAddr { } } -/// A basic "batch" of data. -// Note: In the future, the stream may be generic over this type. -#[derive(Debug, Copy, Clone)] -pub struct AdcDacData { - block_id: u16, - adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], - dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], -} - /// Configure streaming on a device. /// /// # Args @@ -73,216 +63,121 @@ pub struct AdcDacData { /// `stream` is the logically consumer (UDP transmitter) of the enqueued data. pub fn setup_streaming( stack: NetworkReference, -) -> (BlockGenerator, DataStream) { - let queue = cortex_m::singleton!(: Queue = Queue::new()).unwrap(); - +) -> (FrameGenerator, DataStream) { + let queue = + cortex_m::singleton!(: Queue = Queue::new()) + .unwrap(); let (producer, consumer) = queue.split(); - let generator = BlockGenerator::new(producer); + let frame_pool = + cortex_m::singleton!(: Pool<[u8; 1024]>= Pool::new()).unwrap(); - let stream = DataStream::new(stack, consumer); + // Note(unsafe): We guarantee that FRAME_DATA is only accessed once in this function. + let memory = unsafe { &mut FRAME_DATA }; + frame_pool.grow(memory); + + let generator = FrameGenerator::new(producer, frame_pool); + + let stream = DataStream::new(stack, consumer, frame_pool); (generator, stream) } -/// The data generator for a stream. -pub struct BlockGenerator { - queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, - current_id: u16, +struct StreamFrame { + format: u16, + sequence_number: u16, + buffer: Box<[u8; 1024], Init>, + offset: usize, } -impl BlockGenerator { - /// Construct a new generator. - /// # Args - /// * `queue` - The producer portion of the SPSC queue to enqueue data into. - /// - /// # Returns - /// The generator to use. - fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self { +impl StreamFrame { + pub fn new( + buffer: Box<[u8; 1024], Uninit>, + format: u16, + sequence_number: u16, + ) -> Self { + Self { + format, + offset: 4, + sequence_number, + buffer: unsafe { buffer.assume_init() }, + } + } + + pub fn add_batch(&mut self, mut f: F) + where + F: FnMut(&mut [u8]), + { + assert!(!self.is_full::(), "Batch cannot be added to full frame"); + + let result = f(&mut self.buffer[self.offset..self.offset + T]); + + self.offset += T; + + result + } + + pub fn is_full(&self) -> bool { + self.offset + T >= self.buffer.len() + } + + pub fn finish(&mut self) -> &[u8] { + let offset = self.offset; + self.buffer[0..2].copy_from_slice(&self.sequence_number.to_ne_bytes()); + self.buffer[2..4].copy_from_slice(&self.format.to_ne_bytes()); + &self.buffer[..offset] + } +} + +/// The data generator for a stream. +pub struct FrameGenerator { + queue: Producer<'static, StreamFrame, FRAME_COUNT>, + pool: &'static Pool<[u8; 1024]>, + current_frame: Option, + sequence_number: u16, +} + +impl FrameGenerator { + fn new( + queue: Producer<'static, StreamFrame, FRAME_COUNT>, + pool: &'static Pool<[u8; 1024]>, + ) -> Self { Self { queue, - current_id: 0, + pool, + current_frame: None, + sequence_number: 0, } } - /// Schedule data to be sent by the generator. - /// - /// # Note - /// If no space is available, the data batch may be silently dropped. - /// - /// # Args - /// * `adcs` - The ADC data to transmit. - /// * `dacs` - The DAC data to transmit. - pub fn send( - &mut self, - adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], - dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], - ) { - let block = AdcDacData { - block_id: self.current_id, - adcs: [*adcs[0], *adcs[1]], - dacs: [*dacs[0], *dacs[1]], - }; + pub fn add(&mut self, format: u16, f: F) + where + F: FnMut(&mut [u8]), + { + let sequence_number = self.sequence_number; + self.sequence_number = self.sequence_number.wrapping_add(1); - self.current_id = self.current_id.wrapping_add(1); - self.queue.enqueue(block).ok(); - } -} - -/// # Stream Packet -/// Represents a single UDP packet sent by the stream. -/// -/// A "batch" of data is defined to be the data collected for a single invocation of the DSP -/// routine. A packet is composed of as many sequential batches as can fit. -/// -/// The packet is given a header indicating the starting batch sequence number and the number of -/// batches present. If the UDP transmitter encounters a non-sequential batch, it does not enqueue -/// it into the packet and instead transmits any staged data. The non-sequential batch is then -/// transmitted in a new UDP packet. This method allows a receiver to detect dropped batches (e.g. -/// due to processing overhead). -/// -/// ## Data Format -/// -/// Data sent via UDP is sent in "blocks". Each block is a single batch of ADC/DAC codes from an -/// individual DSP processing routine. Each block is assigned a unique 16-bit identifier. The identifier -/// increments by one for each block and rolls over. All blocks in a single packet are guaranteed to -/// contain sequential identifiers. -/// -/// All data is transmitted in network-endian (big-endian) format. -/// -/// ### Quick Reference -/// -/// In the reference below, any values enclosed in parentheses represents the number of bytes used for -/// that value. E.g. "Batch size (1)" indicates 1 byte is used to represent the batch size. -/// ``` -/// # UDP packets take the following form -///
,,[, ...] -/// -/// # The header takes the following form -///
= ,, -/// -/// # Each batch takes the following form -/// = ,,, -/// -/// # Where -/// = , ... -/// ``` -/// -/// ### Packet Format -/// Multiple blocks are sent in a single UDP packet simultaneously. Each UDP packet transmitted -/// contains a header followed by the serialized data blocks. -/// ``` -///
,,[, ...] -/// ``` -/// -/// ### Header -/// A header takes the following form: -/// * The starting block ID (2 bytes) -/// * The number of blocks present in the packet (1 byte) -/// * The size of each bach in samples (1 byte) -/// -/// ``` -/// ,, -/// ``` -/// -/// ### Data Blocks -/// Following the header, each block is sequentially serialized. Each block takes the following form: -/// ``` -/// ,,, -/// ``` -/// -/// Where `` is an array of N 16-bit ADC/DAC samples. The number of samples is provided in the -/// header. -/// -/// ADC and DAC codes are transmitted in raw machine-code format. Please refer to the datasheet for the -/// ADC and DAC if you need to convert these to voltages. -pub struct DataPacket<'a> { - buf: &'a mut [u8], - subsample_rate: usize, - start_id: Option, - num_blocks: u8, - write_index: usize, -} - -impl<'a> DataPacket<'a> { - /// Construct a new packet. - /// - /// # Args - /// * `buf` - The location to serialize the data packet into. - /// * `subsample_rate` - The factor at which to subsample data from batches. - pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self { - Self { - buf, - start_id: None, - num_blocks: 0, - subsample_rate, - write_index: 4, - } - } - - /// Add a batch of data to the packet. - /// - /// # Note - /// Serialization occurs as the packet is added. - /// - /// # Args - /// * `batch` - The batch to add to the packet. - pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> { - // Check that the block is sequential. - if let Some(id) = &self.start_id { - if batch.block_id != id.wrapping_add(self.num_blocks.into()) { - return Err(()); - } - } else { - // Otherwise, this is the first block. Record the strt ID. - self.start_id = Some(batch.block_id); - } - - // Check that there is space for the block. - let block_size_bytes = SAMPLE_BUFFER_SIZE / self.subsample_rate * 4 * 2; - if self.buf.len() - self.get_packet_size() < block_size_bytes { - return Err(()); - } - - // Copy the samples into the buffer. - for device in &[batch.adcs, batch.dacs] { - for channel in device { - for sample in channel.iter().step_by(self.subsample_rate) { - self.buf[self.write_index..self.write_index + 2] - .copy_from_slice(&sample.to_be_bytes()); - self.write_index += 2; - } + if self.current_frame.is_none() { + if let Some(buffer) = self.pool.alloc() { + self.current_frame.replace(StreamFrame::new( + buffer, + format, + sequence_number, + )); + } else { + return; } } - self.num_blocks += 1; + self.current_frame.as_mut().unwrap().add_batch::<_, T>(f); - Ok(()) - } - - fn get_packet_size(&self) -> usize { - let header_length = 4; - let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; - let block_size_bytes = block_sample_size * 2 * 4; - - block_size_bytes * self.num_blocks as usize + header_length - } - - /// Complete the packet and prepare it for transmission. - /// - /// # Returns - /// The size of the packet. The user should utilize the original buffer provided for packet - /// construction to access the packet. - pub fn finish(self) -> usize { - let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; - - // Write the header into the block. - self.buf[0..2].copy_from_slice(&self.start_id.unwrap().to_be_bytes()); - self.buf[2] = self.num_blocks; - self.buf[3] = block_sample_size as u8; - - // Return the length of the packet to transmit. - self.get_packet_size() + if self.current_frame.as_ref().unwrap().is_full::() { + // If we fail to enqueue the frame, free the underlying buffer. + match self.queue.enqueue(self.current_frame.take().unwrap()) { + Err(frame) => self.pool.free(frame.buffer), + _ => {} + }; + } } } @@ -293,9 +188,9 @@ impl<'a> DataPacket<'a> { pub struct DataStream { stack: NetworkReference, socket: Option<::UdpSocket>, - queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + queue: Consumer<'static, StreamFrame, FRAME_COUNT>, + frame_pool: &'static Pool<[u8; 1024]>, remote: SocketAddr, - buffer: [u8; 1024], } impl DataStream { @@ -304,16 +199,18 @@ impl DataStream { /// # Args /// * `stack` - A reference to the shared network stack. /// * `consumer` - The read side of the queue containing data to transmit. + /// * `frame_pool` - The Pool to return stream frame objects into. fn new( stack: NetworkReference, - consumer: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + consumer: Consumer<'static, StreamFrame, FRAME_COUNT>, + frame_pool: &'static Pool<[u8; 1024]>, ) -> Self { Self { stack, socket: None, remote: StreamTarget::default().into(), queue: consumer, - buffer: [0; 1024], + frame_pool, } } @@ -365,27 +262,16 @@ impl DataStream { // If there's no socket available, try to connect to our remote. if self.open().is_ok() { // If we just successfully opened the socket, flush old data from queue. - while self.queue.dequeue().is_some() {} + while let Some(frame) = self.queue.dequeue() { + self.frame_pool.free(frame.buffer); + } } } Some(handle) => { - if self.queue.ready() { - // Dequeue data from the queue into a larger block structure. - let mut packet = - DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); - while self - .queue - .peek() - .and_then(|batch| packet.add_batch(batch).ok()) - .is_some() - { - // Dequeue the batch that we just added to the packet. - self.queue.dequeue(); - } - - // Transmit the data packet. - let size = packet.finish(); - self.stack.send(handle, &self.buffer[..size]).ok(); + if let Some(mut frame) = self.queue.dequeue() { + // Transmit the frame and return it to the pool. + self.stack.send(handle, frame.finish()).ok(); + self.frame_pool.free(frame.buffer) } } } diff --git a/src/net/mod.rs b/src/net/mod.rs index 115ba3b..65541ed 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -17,7 +17,7 @@ pub mod shared; pub mod telemetry; use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack}; -use data_stream::{BlockGenerator, DataStream}; +use data_stream::{DataStream, FrameGenerator}; use messages::{MqttMessage, SettingsResponse}; use miniconf_client::MiniconfClient; use network_processor::NetworkProcessor; @@ -49,7 +49,7 @@ pub struct NetworkUsers { pub miniconf: MiniconfClient, pub processor: NetworkProcessor, stream: DataStream, - generator: Option, + generator: Option, pub telemetry: TelemetryClient, } @@ -113,7 +113,7 @@ where } /// Enable live data streaming. - pub fn enable_streaming(&mut self) -> BlockGenerator { + pub fn enable_streaming(&mut self) -> FrameGenerator { self.generator.take().unwrap() } From 6c87db377865a9c6403f301fd1e21ed2893b3dcd Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 23 Jul 2021 14:12:59 +0200 Subject: [PATCH 02/13] Updating comments --- scripts/stream_throughput.py | 29 +++++++++++++++-------------- src/bin/dual-iir.rs | 28 +++++++++++++++++----------- src/hardware/setup.rs | 9 --------- src/net/data_stream.rs | 34 ++++++++++++++++++++-------------- 4 files changed, 52 insertions(+), 48 deletions(-) diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index d7ce9b8..3624333 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -13,10 +13,10 @@ import logging # Representation of a single UDP packet transmitted by Stabilizer. Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac']) -Format = collections.namedtuple('Format', ['batch_size', 'batches_per_frame']) +Format = collections.namedtuple('Format', ['batch_size', 'length']) FORMAT = { - 0: Format(8, 255) + 0: Format(8, 964) } class Timer: @@ -94,30 +94,31 @@ class PacketParser: if len(self.buf) < 4: return None - start_id, format_id = struct.unpack_from('!HH', self.buf) + start_id, format_id = struct.unpack_from('(0, |buf| { - let mut offset = 0; - for device in [adc_samples.iter(), dac_samples.iter()] { - for channel in device { - for sample in channel.iter() { - buf[offset..offset + 2] - .copy_from_slice(&sample.to_ne_bytes()); - offset += 2; - } - } - } + generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>(0, |buf| unsafe { + let dst = buf.as_ptr() as usize as *mut u16; + + let adc0 = &adc_samples[0][0] as *const u16; + core::ptr::copy_nonoverlapping(adc0, dst, SAMPLE_BUFFER_SIZE); + + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let adc1 = &adc_samples[1][0] as *const u16; + core::ptr::copy_nonoverlapping(adc1, dst, SAMPLE_BUFFER_SIZE); + + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let dac0 = &dac_samples[0][0] as *const u16; + core::ptr::copy_nonoverlapping(dac0, dst, SAMPLE_BUFFER_SIZE); + + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let dac1 = &dac_samples[1][0] as *const u16; + core::ptr::copy_nonoverlapping(dac1, dst, SAMPLE_BUFFER_SIZE); }); // Update telemetry measurements. diff --git a/src/hardware/setup.rs b/src/hardware/setup.rs index 2b270d5..61b9a2d 100644 --- a/src/hardware/setup.rs +++ b/src/hardware/setup.rs @@ -36,11 +36,6 @@ pub struct NetStorage { [Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8], pub routes_cache: [Option<(smoltcp::wire::IpCidr, smoltcp::iface::Route)>; 8], - - pub dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata; 1], - pub dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata; 1], - pub dhcp_tx_storage: [u8; 600], - pub dhcp_rx_storage: [u8; 600], } pub struct UdpSocketStorage { @@ -94,10 +89,6 @@ impl Default for NetStorage { sockets: [None, None, None, None, None, None], tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS], udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS], - dhcp_tx_storage: [0; 600], - dhcp_rx_storage: [0; 600], - dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1], - dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1], } } } diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 0e37bea..5766721 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -19,9 +19,11 @@ use heapless::pool::{Box, Init, Pool, Uninit}; use super::NetworkReference; -const FRAME_COUNT: usize = 4; +const FRAME_COUNT: usize = 6; +const FRAME_SIZE: usize = 1024; -static mut FRAME_DATA: [u8; 5200] = [0; 5200]; +static mut FRAME_DATA: [u8; FRAME_SIZE * FRAME_COUNT] = + [0; FRAME_SIZE * FRAME_COUNT]; /// Represents the destination for the UDP stream to send data to. /// @@ -70,7 +72,7 @@ pub fn setup_streaming( let (producer, consumer) = queue.split(); let frame_pool = - cortex_m::singleton!(: Pool<[u8; 1024]>= Pool::new()).unwrap(); + cortex_m::singleton!(: Pool<[u8; FRAME_SIZE]>= Pool::new()).unwrap(); // Note(unsafe): We guarantee that FRAME_DATA is only accessed once in this function. let memory = unsafe { &mut FRAME_DATA }; @@ -86,13 +88,13 @@ pub fn setup_streaming( struct StreamFrame { format: u16, sequence_number: u16, - buffer: Box<[u8; 1024], Init>, + buffer: Box<[u8; FRAME_SIZE], Init>, offset: usize, } impl StreamFrame { pub fn new( - buffer: Box<[u8; 1024], Uninit>, + buffer: Box<[u8; FRAME_SIZE], Uninit>, format: u16, sequence_number: u16, ) -> Self { @@ -132,7 +134,7 @@ impl StreamFrame { /// The data generator for a stream. pub struct FrameGenerator { queue: Producer<'static, StreamFrame, FRAME_COUNT>, - pool: &'static Pool<[u8; 1024]>, + pool: &'static Pool<[u8; FRAME_SIZE]>, current_frame: Option, sequence_number: u16, } @@ -140,7 +142,7 @@ pub struct FrameGenerator { impl FrameGenerator { fn new( queue: Producer<'static, StreamFrame, FRAME_COUNT>, - pool: &'static Pool<[u8; 1024]>, + pool: &'static Pool<[u8; FRAME_SIZE]>, ) -> Self { Self { queue, @@ -172,11 +174,15 @@ impl FrameGenerator { self.current_frame.as_mut().unwrap().add_batch::<_, T>(f); if self.current_frame.as_ref().unwrap().is_full::() { - // If we fail to enqueue the frame, free the underlying buffer. - match self.queue.enqueue(self.current_frame.take().unwrap()) { - Err(frame) => self.pool.free(frame.buffer), - _ => {} - }; + if self + .queue + .enqueue(self.current_frame.take().unwrap()) + .is_err() + { + // Given that the queue is the same size as the number of frames available, this + // should never occur. + panic!("Frame enqueue failure") + } } } } @@ -189,7 +195,7 @@ pub struct DataStream { stack: NetworkReference, socket: Option<::UdpSocket>, queue: Consumer<'static, StreamFrame, FRAME_COUNT>, - frame_pool: &'static Pool<[u8; 1024]>, + frame_pool: &'static Pool<[u8; FRAME_SIZE]>, remote: SocketAddr, } @@ -203,7 +209,7 @@ impl DataStream { fn new( stack: NetworkReference, consumer: Consumer<'static, StreamFrame, FRAME_COUNT>, - frame_pool: &'static Pool<[u8; 1024]>, + frame_pool: &'static Pool<[u8; FRAME_SIZE]>, ) -> Self { Self { stack, From 8a143a3f5822c8fda9bd91c629e7c368200ca9ff Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 23 Jul 2021 15:08:07 +0200 Subject: [PATCH 03/13] Adding stream format, updating header format --- scripts/stream_throughput.py | 41 +++++++++++++---------------- src/bin/dual-iir.rs | 49 +++++++++++++++++++++++----------- src/bin/lockin.rs | 51 +++++++++++++++++++++++++++--------- src/configuration.rs | 2 +- src/net/data_stream.rs | 33 +++++++++++++++++++---- 5 files changed, 119 insertions(+), 57 deletions(-) diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index 3624333..2e2b33c 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -10,13 +10,15 @@ import struct import time import logging -# Representation of a single UDP packet transmitted by Stabilizer. -Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac']) +# Representation of a single data batch transmitted by Stabilizer. +Packet = collections.namedtuple('Packet', ['index', 'data']) -Format = collections.namedtuple('Format', ['batch_size', 'length']) +Format = collections.namedtuple('Format', ['sample_size_bytes', 'batch_format']) +# All supported formats by this reception script. FORMAT = { - 0: Format(8, 964) + 0: Format(sample_size_bytes=8, + batch_format='<{batch_size}H{batch_size}H{batch_size}H{batch_size}H') } class Timer: @@ -94,32 +96,25 @@ class PacketParser: if len(self.buf) < 4: return None - start_id, format_id = struct.unpack_from('(0, |buf| unsafe { - let dst = buf.as_ptr() as usize as *mut u16; + generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>( + StreamFormat::AdcDacData, + |buf| unsafe { + let dst = buf.as_ptr() as usize as *mut u16; - let adc0 = &adc_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping(adc0, dst, SAMPLE_BUFFER_SIZE); + let adc0 = &adc_samples[0][0] as *const u16; + core::ptr::copy_nonoverlapping( + adc0, + dst, + SAMPLE_BUFFER_SIZE, + ); - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let adc1 = &adc_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping(adc1, dst, SAMPLE_BUFFER_SIZE); + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let adc1 = &adc_samples[1][0] as *const u16; + core::ptr::copy_nonoverlapping( + adc1, + dst, + SAMPLE_BUFFER_SIZE, + ); - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac0 = &dac_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping(dac0, dst, SAMPLE_BUFFER_SIZE); + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let dac0 = &dac_samples[0][0] as *const u16; + core::ptr::copy_nonoverlapping( + dac0, + dst, + SAMPLE_BUFFER_SIZE, + ); - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac1 = &dac_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping(dac1, dst, SAMPLE_BUFFER_SIZE); - }); + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let dac1 = &dac_samples[1][0] as *const u16; + core::ptr::copy_nonoverlapping( + dac1, + dst, + SAMPLE_BUFFER_SIZE, + ); + }, + ); // Update telemetry measurements. telemetry.adcs = diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index c21b4f8..7e09506 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -52,7 +52,7 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ - data_stream::{FrameGenerator, StreamTarget}, + data_stream::{FrameGenerator, StreamFormat, StreamTarget}, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, @@ -396,18 +396,43 @@ const APP: () = { } // Stream the data. - generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>(0, |buf| { - let mut offset = 0; - for device in [adc_samples.iter(), dac_samples.iter()] { - for channel in device { - for sample in channel.iter() { - buf[offset..offset + 2] - .copy_from_slice(&sample.to_ne_bytes()); - offset += 2; - } - } - } - }); + generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>( + StreamFormat::AdcDacData, + |buf| unsafe { + let dst = buf.as_ptr() as usize as *mut u16; + + let adc0 = &adc_samples[0][0] as *const u16; + core::ptr::copy_nonoverlapping( + adc0, + dst, + SAMPLE_BUFFER_SIZE, + ); + + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let adc1 = &adc_samples[1][0] as *const u16; + core::ptr::copy_nonoverlapping( + adc1, + dst, + SAMPLE_BUFFER_SIZE, + ); + + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let dac0 = &dac_samples[0][0] as *const u16; + core::ptr::copy_nonoverlapping( + dac0, + dst, + SAMPLE_BUFFER_SIZE, + ); + + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let dac1 = &dac_samples[1][0] as *const u16; + core::ptr::copy_nonoverlapping( + dac1, + dst, + SAMPLE_BUFFER_SIZE, + ); + }, + ); // Update telemetry measurements. telemetry.adcs = diff --git a/src/configuration.rs b/src/configuration.rs index 01e410a..228f40e 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -3,7 +3,7 @@ /// MQTT broker IPv4 address /// /// In the default configuration, the IP address is defined as 10.34.16.10. -pub const MQTT_BROKER: [u8; 4] = [10, 34, 16, 10]; +pub const MQTT_BROKER: [u8; 4] = [10, 35, 16, 10]; /// Sampling Frequency /// diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 5766721..08bf352 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -15,6 +15,8 @@ use miniconf::MiniconfAtomic; use serde::Deserialize; use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; +use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; + use heapless::pool::{Box, Init, Pool, Uninit}; use super::NetworkReference; @@ -41,6 +43,15 @@ pub struct StreamTarget { pub port: u16, } +/// Specifies the format of streamed data +#[repr(u16)] +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum StreamFormat { + /// Streamed data contains ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format. Each + /// batch is loaded into the stream frame sequentially until the frame is full. + AdcDacData = 0, +} + impl From for SocketAddr { fn from(target: StreamTarget) -> SocketAddr { SocketAddr::new( @@ -86,23 +97,27 @@ pub fn setup_streaming( } struct StreamFrame { - format: u16, + format: StreamFormat, sequence_number: u16, buffer: Box<[u8; FRAME_SIZE], Init>, offset: usize, + batch_count: u16, + batch_size: u8, } impl StreamFrame { pub fn new( buffer: Box<[u8; FRAME_SIZE], Uninit>, - format: u16, + format: StreamFormat, sequence_number: u16, ) -> Self { Self { format, - offset: 4, + offset: 7, sequence_number, buffer: unsafe { buffer.assume_init() }, + batch_size: SAMPLE_BUFFER_SIZE as u8, + batch_count: 0, } } @@ -115,6 +130,7 @@ impl StreamFrame { let result = f(&mut self.buffer[self.offset..self.offset + T]); self.offset += T; + self.batch_count = self.batch_count.checked_add(1).unwrap(); result } @@ -126,7 +142,9 @@ impl StreamFrame { pub fn finish(&mut self) -> &[u8] { let offset = self.offset; self.buffer[0..2].copy_from_slice(&self.sequence_number.to_ne_bytes()); - self.buffer[2..4].copy_from_slice(&self.format.to_ne_bytes()); + self.buffer[2..4].copy_from_slice(&(self.format as u16).to_ne_bytes()); + self.buffer[4..6].copy_from_slice(&self.batch_count.to_ne_bytes()); + self.buffer[6] = self.batch_size; &self.buffer[..offset] } } @@ -152,7 +170,7 @@ impl FrameGenerator { } } - pub fn add(&mut self, format: u16, f: F) + pub fn add(&mut self, format: StreamFormat, f: F) where F: FnMut(&mut [u8]), { @@ -171,6 +189,11 @@ impl FrameGenerator { } } + assert!( + format == self.current_frame.as_ref().unwrap().format, + "Unexpected stream format encountered" + ); + self.current_frame.as_mut().unwrap().add_batch::<_, T>(f); if self.current_frame.as_ref().unwrap().is_full::() { From 26b1f3422faabb34604a464fefb6c3ca8628fb2b Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 23 Jul 2021 15:10:28 +0200 Subject: [PATCH 04/13] Reverting IP config --- src/configuration.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/configuration.rs b/src/configuration.rs index 228f40e..01e410a 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -3,7 +3,7 @@ /// MQTT broker IPv4 address /// /// In the default configuration, the IP address is defined as 10.34.16.10. -pub const MQTT_BROKER: [u8; 4] = [10, 35, 16, 10]; +pub const MQTT_BROKER: [u8; 4] = [10, 34, 16, 10]; /// Sampling Frequency /// From be6066e97974beebc434486d3c116e4112abc482 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 23 Jul 2021 15:14:30 +0200 Subject: [PATCH 05/13] Cleaning up test script --- scripts/stream_throughput.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index 2e2b33c..4ba58bb 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -4,6 +4,7 @@ Author: Ryan Summers Description: Provides a mechanism for measuring Stabilizer stream data throughput. """ +import argparse import socket import collections import struct @@ -93,9 +94,10 @@ class PacketParser: def _parse(self): """ Attempt to parse packets from the received buffer. """ # Attempt to parse a block from the buffer. - if len(self.buf) < 4: + if len(self.buf) < 7: return None + # Parse out the packet header start_id, format_id, batch_count, batch_size = struct.unpack_from(' Date: Fri, 23 Jul 2021 15:16:36 +0200 Subject: [PATCH 06/13] Adding docs --- scripts/stream_throughput.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index 4ba58bb..5d6aa16 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -14,6 +14,11 @@ import logging # Representation of a single data batch transmitted by Stabilizer. Packet = collections.namedtuple('Packet', ['index', 'data']) +# Specifies a known format for incoming packets. +# +# * `sample_size_bytes` is the number of bytes required for each sample in the batch. +# * `batch_format` is a `struct` format string that will be provided the `batch_size` as an named +# argument. This format string will be used to deserialize each batch of data from the frame. Format = collections.namedtuple('Format', ['sample_size_bytes', 'batch_format']) # All supported formats by this reception script. @@ -158,7 +163,7 @@ def main(): while True: # Receive any data over UDP and parse it. - data = connection.recv(4096 * 4) + data = connection.recv(4096) if data and not timer.is_started(): timer.start() From 4c27100c233c7bdacb2dbc2085bb5b3084fdc2db Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 23 Jul 2021 15:30:38 +0200 Subject: [PATCH 07/13] Updating docs for streaming --- src/net/data_stream.rs | 49 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 43 insertions(+), 6 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 08bf352..519f3fb 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -2,10 +2,32 @@ //! //! # Design //! Data streamining utilizes UDP packets to send live data streams at high throughput. -//! Packets are always sent in a best-effort fashion, and data may be dropped. Each packet contains -//! an identifier that can be used to detect dropped data. +//! Packets are always sent in a best-effort fashion, and data may be dropped. //! -//! Refer to [DataPacket] for information about the serialization format of each UDP packet. +//! Stabilizer organizes livestreamed data into batches within a "Frame" that will be sent as a UDP +//! packet. Each frame consits of a header followed by sequential batch serializations. The packet +//! header is constant for all streaming capabilities, but the serialization format after the header +//! is application-defined. +//! +//! ## Header Format +//! The header of each stream frame consists of 7 bytes. All data is stored in little-endian format. +//! +//! Elements appear sequentiall as follows: +//! * Sequence Number +//! * Format Code +//! * Batch Count <16> +//! * Batch size +//! +//! The "Sequence Number" is an identifier that increments for ever execution of the DSP process. +//! This can be used to determine if a stream frame was lost. +//! +//! The "Format Code" is a unique specifier that indicates the serialization format of each batch of +//! data in the frame. Refer to [StreamFormat] for further information. +//! +//! The "Batch Count" indicates how many batches are present in the current frame. +//! +//! The "Batch Size" specifies the [crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE] +//! parameter, which can be used to determine the number of samples per batch. //! //! # Example //! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception @@ -21,9 +43,13 @@ use heapless::pool::{Box, Init, Pool, Uninit}; use super::NetworkReference; -const FRAME_COUNT: usize = 6; +// The number of frames that can be buffered. +const FRAME_COUNT: usize = 4; + +// The size of each livestream frame in bytes. const FRAME_SIZE: usize = 1024; +// Static storage used for a heapless::Pool of frame buffers. static mut FRAME_DATA: [u8; FRAME_SIZE * FRAME_COUNT] = [0; FRAME_SIZE * FRAME_COUNT]; @@ -47,8 +73,13 @@ pub struct StreamTarget { #[repr(u16)] #[derive(Debug, Copy, Clone, PartialEq)] pub enum StreamFormat { - /// Streamed data contains ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format. Each - /// batch is loaded into the stream frame sequentially until the frame is full. + /// Streamed data contains ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format. + /// + /// # Example + /// With a batch size of 2, the serialization would take the following form: + /// ``` + /// + /// ``` AdcDacData = 0, } @@ -170,6 +201,12 @@ impl FrameGenerator { } } + /// Add a batch to the current stream frame. + /// + /// # Args + /// * `format` - The format of the stream. This must be the same for each execution. + /// * `f` - A closure that will be provided the buffer to write batch data into. The buffer will + /// be the size of the `T` template argument. pub fn add(&mut self, format: StreamFormat, f: F) where F: FnMut(&mut [u8]), From 8d4193ed62948af74588b6c35d1dedb4e986efb8 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 26 Jul 2021 12:24:36 +0200 Subject: [PATCH 08/13] Updating after refactor --- src/bin/dual-iir.rs | 56 ++++++++----------------- src/bin/lockin.rs | 56 ++++++++----------------- src/net/data_stream.rs | 93 +++++++++++++++++++++++------------------- src/net/mod.rs | 9 +++- 4 files changed, 95 insertions(+), 119 deletions(-) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 57521ff..b77091c 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -194,7 +194,7 @@ const APP: () = { stabilizer.net.mac_address, ); - let generator = network.enable_streaming(); + let generator = network.enable_streaming(StreamFormat::AdcDacData); // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); @@ -308,43 +308,23 @@ const APP: () = { } // Stream the data. - generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>( - StreamFormat::AdcDacData, - |buf| unsafe { - let dst = buf.as_ptr() as usize as *mut u16; - - let adc0 = &adc_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping( - adc0, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let adc1 = &adc_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping( - adc1, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac0 = &dac_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping( - dac0, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac1 = &dac_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping( - dac1, - dst, - SAMPLE_BUFFER_SIZE, - ); - }, - ); + const N: usize = SAMPLE_BUFFER_SIZE * core::mem::size_of::(); + generator.add::<_, { N * 4 }>(|buf| { + for (data, buf) in adc_samples + .iter() + .chain(dac_samples.iter()) + .zip(buf.chunks_exact_mut(N)) + { + assert_eq!(core::mem::size_of_val(data), N); + let data = unsafe { + core::slice::from_raw_parts( + data.as_ptr() as *const u8, + N, + ) + }; + buf.copy_from_slice(data) + } + }); // Update telemetry measurements. telemetry.adcs = diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index 7e09506..c0bc924 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -231,7 +231,7 @@ const APP: () = { stabilizer.net.mac_address, ); - let generator = network.enable_streaming(); + let generator = network.enable_streaming(StreamFormat::AdcDacData); let settings = Settings::default(); @@ -396,43 +396,23 @@ const APP: () = { } // Stream the data. - generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>( - StreamFormat::AdcDacData, - |buf| unsafe { - let dst = buf.as_ptr() as usize as *mut u16; - - let adc0 = &adc_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping( - adc0, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let adc1 = &adc_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping( - adc1, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac0 = &dac_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping( - dac0, - dst, - SAMPLE_BUFFER_SIZE, - ); - - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac1 = &dac_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping( - dac1, - dst, - SAMPLE_BUFFER_SIZE, - ); - }, - ); + const N: usize = SAMPLE_BUFFER_SIZE * core::mem::size_of::(); + generator.add::<_, { N * 4 }>(|buf| { + for (data, buf) in adc_samples + .iter() + .chain(dac_samples.iter()) + .zip(buf.chunks_exact_mut(N)) + { + assert_eq!(core::mem::size_of_val(data), N); + let data = unsafe { + core::slice::from_raw_parts( + data.as_ptr() as *const u8, + N, + ) + }; + buf.copy_from_slice(data) + } + }); // Update telemetry measurements. telemetry.adcs = diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 519f3fb..199360c 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -12,22 +12,21 @@ //! ## Header Format //! The header of each stream frame consists of 7 bytes. All data is stored in little-endian format. //! -//! Elements appear sequentiall as follows: -//! * Sequence Number -//! * Format Code -//! * Batch Count <16> -//! * Batch size +//! Elements appear sequentially as follows: +//! * Magic word 0x057B +//! * Format Code +//! * Batch Size +//! * Sequence Number //! -//! The "Sequence Number" is an identifier that increments for ever execution of the DSP process. -//! This can be used to determine if a stream frame was lost. +//! The "Magic word" is a constant field for all packets. The value is alway 0x057B. //! //! The "Format Code" is a unique specifier that indicates the serialization format of each batch of //! data in the frame. Refer to [StreamFormat] for further information. //! -//! The "Batch Count" indicates how many batches are present in the current frame. +//! The "Batch size" is the value of [SAMPLE_BUFFER_SIZE]. //! -//! The "Batch Size" specifies the [crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE] -//! parameter, which can be used to determine the number of samples per batch. +//! The "Sequence Number" is an identifier that increments for ever execution of the DSP process. +//! This can be used to determine if a stream frame was lost. //! //! # Example //! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception @@ -43,6 +42,10 @@ use heapless::pool::{Box, Init, Pool, Uninit}; use super::NetworkReference; +const MAGIC_WORD: u16 = 0x057B; + +const HEADER_SIZE: usize = 8; + // The number of frames that can be buffered. const FRAME_COUNT: usize = 4; @@ -70,9 +73,12 @@ pub struct StreamTarget { } /// Specifies the format of streamed data -#[repr(u16)] +#[repr(u8)] #[derive(Debug, Copy, Clone, PartialEq)] pub enum StreamFormat { + /// Reserved, unused format specifier. + Unknown = 0, + /// Streamed data contains ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format. /// /// # Example @@ -80,7 +86,7 @@ pub enum StreamFormat { /// ``` /// /// ``` - AdcDacData = 0, + AdcDacData = 1, } impl From for SocketAddr { @@ -128,27 +134,25 @@ pub fn setup_streaming( } struct StreamFrame { - format: StreamFormat, - sequence_number: u16, buffer: Box<[u8; FRAME_SIZE], Init>, offset: usize, - batch_count: u16, - batch_size: u8, } impl StreamFrame { pub fn new( buffer: Box<[u8; FRAME_SIZE], Uninit>, - format: StreamFormat, - sequence_number: u16, + format: u8, + buffer_size: u8, + sequence_number: u32, ) -> Self { + let mut buffer = unsafe { buffer.assume_init() }; + buffer[0..2].copy_from_slice(&MAGIC_WORD.to_ne_bytes()); + buffer[2] = format; + buffer[3] = buffer_size; + buffer[4..8].copy_from_slice(&sequence_number.to_ne_bytes()); Self { - format, - offset: 7, - sequence_number, - buffer: unsafe { buffer.assume_init() }, - batch_size: SAMPLE_BUFFER_SIZE as u8, - batch_count: 0, + buffer, + offset: HEADER_SIZE, } } @@ -161,7 +165,6 @@ impl StreamFrame { let result = f(&mut self.buffer[self.offset..self.offset + T]); self.offset += T; - self.batch_count = self.batch_count.checked_add(1).unwrap(); result } @@ -171,12 +174,7 @@ impl StreamFrame { } pub fn finish(&mut self) -> &[u8] { - let offset = self.offset; - self.buffer[0..2].copy_from_slice(&self.sequence_number.to_ne_bytes()); - self.buffer[2..4].copy_from_slice(&(self.format as u16).to_ne_bytes()); - self.buffer[4..6].copy_from_slice(&self.batch_count.to_ne_bytes()); - self.buffer[6] = self.batch_size; - &self.buffer[..offset] + &self.buffer[..self.offset] } } @@ -185,7 +183,8 @@ pub struct FrameGenerator { queue: Producer<'static, StreamFrame, FRAME_COUNT>, pool: &'static Pool<[u8; FRAME_SIZE]>, current_frame: Option, - sequence_number: u16, + sequence_number: u32, + format: StreamFormat, } impl FrameGenerator { @@ -196,18 +195,32 @@ impl FrameGenerator { Self { queue, pool, + format: StreamFormat::Unknown, current_frame: None, sequence_number: 0, } } + /// Specify the format of the stream. + /// + /// # Note: + /// This function may only be called once upon initializing streaming + /// + /// # Args + /// * `format` - The desired format of the stream. + #[doc(hidden)] + pub(crate) fn set_format(&mut self, format: StreamFormat) { + assert!(self.format == StreamFormat::Unknown); + assert!(format != StreamFormat::Unknown); + self.format = format; + } + /// Add a batch to the current stream frame. /// /// # Args - /// * `format` - The format of the stream. This must be the same for each execution. /// * `f` - A closure that will be provided the buffer to write batch data into. The buffer will /// be the size of the `T` template argument. - pub fn add(&mut self, format: StreamFormat, f: F) + pub fn add(&mut self, f: F) where F: FnMut(&mut [u8]), { @@ -218,7 +231,8 @@ impl FrameGenerator { if let Some(buffer) = self.pool.alloc() { self.current_frame.replace(StreamFrame::new( buffer, - format, + self.format as u8, + SAMPLE_BUFFER_SIZE as u8, sequence_number, )); } else { @@ -226,14 +240,11 @@ impl FrameGenerator { } } - assert!( - format == self.current_frame.as_ref().unwrap().format, - "Unexpected stream format encountered" - ); + let current_frame = self.current_frame.as_mut().unwrap(); - self.current_frame.as_mut().unwrap().add_batch::<_, T>(f); + current_frame.add_batch::<_, T>(f); - if self.current_frame.as_ref().unwrap().is_full::() { + if current_frame.is_full::() { if self .queue .enqueue(self.current_frame.take().unwrap()) diff --git a/src/net/mod.rs b/src/net/mod.rs index 65541ed..fe0a7e9 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -113,8 +113,13 @@ where } /// Enable live data streaming. - pub fn enable_streaming(&mut self) -> FrameGenerator { - self.generator.take().unwrap() + pub fn enable_streaming( + &mut self, + format: data_stream::StreamFormat, + ) -> FrameGenerator { + let mut generator = self.generator.take().unwrap(); + generator.set_format(format); + generator } /// Direct the stream to the provided remote target. From c0157b70956f7a85b553d738ced6c5bf363514e2 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 26 Jul 2021 12:26:10 +0200 Subject: [PATCH 09/13] Including header size in frame size --- src/net/data_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 199360c..b86410b 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -50,7 +50,7 @@ const HEADER_SIZE: usize = 8; const FRAME_COUNT: usize = 4; // The size of each livestream frame in bytes. -const FRAME_SIZE: usize = 1024; +const FRAME_SIZE: usize = 1024 + HEADER_SIZE; // Static storage used for a heapless::Pool of frame buffers. static mut FRAME_DATA: [u8; FRAME_SIZE * FRAME_COUNT] = From 11d279a1b60cff60c3be150bb8dd1213d8b3946b Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 26 Jul 2021 13:07:07 +0200 Subject: [PATCH 10/13] StreamFormat -> Into --- scripts/stream_throughput.py | 150 +++++++++++++---------------------- src/net/data_stream.rs | 16 ++-- src/net/mod.rs | 5 +- 3 files changed, 68 insertions(+), 103 deletions(-) diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index 5d6aa16..94bcf66 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -14,19 +14,50 @@ import logging # Representation of a single data batch transmitted by Stabilizer. Packet = collections.namedtuple('Packet', ['index', 'data']) -# Specifies a known format for incoming packets. -# -# * `sample_size_bytes` is the number of bytes required for each sample in the batch. -# * `batch_format` is a `struct` format string that will be provided the `batch_size` as an named -# argument. This format string will be used to deserialize each batch of data from the frame. -Format = collections.namedtuple('Format', ['sample_size_bytes', 'batch_format']) +# The magic header half-word at the start of each packet. +MAGIC_HEADER = 0x057B + +# The struct format of the header. +HEADER_FORMAT = ' for u8 { + fn from(format: StreamFormat) -> u8 { + format as u8 + } +} + impl From for SocketAddr { fn from(target: StreamTarget) -> SocketAddr { SocketAddr::new( @@ -184,7 +190,7 @@ pub struct FrameGenerator { pool: &'static Pool<[u8; FRAME_SIZE]>, current_frame: Option, sequence_number: u32, - format: StreamFormat, + format: u8, } impl FrameGenerator { @@ -195,7 +201,7 @@ impl FrameGenerator { Self { queue, pool, - format: StreamFormat::Unknown, + format: StreamFormat::Unknown.into(), current_frame: None, sequence_number: 0, } @@ -209,10 +215,8 @@ impl FrameGenerator { /// # Args /// * `format` - The desired format of the stream. #[doc(hidden)] - pub(crate) fn set_format(&mut self, format: StreamFormat) { - assert!(self.format == StreamFormat::Unknown); - assert!(format != StreamFormat::Unknown); - self.format = format; + pub(crate) fn set_format(&mut self, format: impl Into) { + self.format = format.into(); } /// Add a batch to the current stream frame. diff --git a/src/net/mod.rs b/src/net/mod.rs index fe0a7e9..ca1bb3d 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -113,9 +113,12 @@ where } /// Enable live data streaming. + /// + /// # Args + /// * `format` - A unique u8 code indicating the format of the data. pub fn enable_streaming( &mut self, - format: data_stream::StreamFormat, + format: impl Into, ) -> FrameGenerator { let mut generator = self.generator.take().unwrap(); generator.set_format(format); From 1c66310b6d4a13587c0d864460bf5e1ef9ff253a Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 26 Jul 2021 13:47:03 +0200 Subject: [PATCH 11/13] Fixing issues after testing --- scripts/stream_throughput.py | 10 ++++------ src/bin/dual-iir.rs | 2 +- src/bin/lockin.rs | 2 +- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index 94bcf66..bbcb411 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -32,7 +32,7 @@ def parse_packet(buf): """ Attempt to parse packets from the received buffer. """ # Attempt to parse a block from the buffer. if len(buf) < struct.calcsize(HEADER_FORMAT): - return None + return [] # Parse out the packet header magic, format_id, batch_size, sequence_number = struct.unpack_from(HEADER_FORMAT, buf) @@ -40,14 +40,14 @@ def parse_packet(buf): if magic != MAGIC_HEADER: logging.warning('Encountered bad magic header: %s', hex(magic)) - return None + return [] if format_id not in FORMAT: raise Exception(f'Unknown format specifier: {format_id}') frame_format = FORMAT[format_id](batch_size) - batch_count = len(buf) / struct.calcsize(frame_format) + batch_count = int(len(buf) / struct.calcsize(frame_format)) packets = [] for offset in range(batch_count): @@ -135,9 +135,7 @@ def main(): # Handle any received packets. total_bytes += len(data) - packet = parse_packet(data) - - if packet: + for packet in parse_packet(data): # Handle any dropped packets. drop_count += sequence_delta(last_index, packet.index) last_index = packet.index diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index b77091c..6930d3d 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -315,7 +315,7 @@ const APP: () = { .chain(dac_samples.iter()) .zip(buf.chunks_exact_mut(N)) { - assert_eq!(core::mem::size_of_val(data), N); + assert_eq!(core::mem::size_of_val(*data), N); let data = unsafe { core::slice::from_raw_parts( data.as_ptr() as *const u8, diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index c0bc924..be4f812 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -403,7 +403,7 @@ const APP: () = { .chain(dac_samples.iter()) .zip(buf.chunks_exact_mut(N)) { - assert_eq!(core::mem::size_of_val(data), N); + assert_eq!(core::mem::size_of_val(*data), N); let data = unsafe { core::slice::from_raw_parts( data.as_ptr() as *const u8, From 7a4f73d558e01e63a45a2b694a8c13c86ee3b42f Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 26 Jul 2021 13:52:57 +0200 Subject: [PATCH 12/13] Cleaning up lint --- src/net/data_stream.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 8560b29..f8eb921 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -139,6 +139,7 @@ pub fn setup_streaming( (generator, stream) } +#[derive(Debug)] struct StreamFrame { buffer: Box<[u8; FRAME_SIZE], Init>, offset: usize, @@ -249,15 +250,9 @@ impl FrameGenerator { current_frame.add_batch::<_, T>(f); if current_frame.is_full::() { - if self - .queue + self.queue .enqueue(self.current_frame.take().unwrap()) - .is_err() - { - // Given that the queue is the same size as the number of frames available, this - // should never occur. - panic!("Frame enqueue failure") - } + .unwrap(); } } } From de63be09e4e21f22c633295e831bff40410f049b Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 27 Jul 2021 13:12:57 +0200 Subject: [PATCH 13/13] Updating after review --- scripts/stream_throughput.py | 14 +++---- src/bin/dual-iir.rs | 5 ++- src/bin/lockin.rs | 5 ++- src/net/data_stream.rs | 79 +++++++++++++++++------------------- src/net/mod.rs | 5 ++- 5 files changed, 54 insertions(+), 54 deletions(-) diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index bbcb411..ca3ccd4 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -22,7 +22,7 @@ HEADER_FORMAT = ' -//! * Format Code -//! * Batch Size -//! * Sequence Number -//! -//! The "Magic word" is a constant field for all packets. The value is alway 0x057B. -//! -//! The "Format Code" is a unique specifier that indicates the serialization format of each batch of -//! data in the frame. Refer to [StreamFormat] for further information. -//! -//! The "Batch size" is the value of [SAMPLE_BUFFER_SIZE]. -//! -//! The "Sequence Number" is an identifier that increments for ever execution of the DSP process. -//! This can be used to determine if a stream frame was lost. +//! * **Magic word 0x057B** : a constant to identify Stabilizer streaming data. +//! * **Format Code** : a unique ID that indicates the serialization format of each batch of data +//! in the frame. Refer to [StreamFormat] for further information. +//! * **Batch Size** : the number of samples in each batch of data. +//! * **Sequence Number** : an the sequence number of the first batch in the frame. +//! This can be used to determine if and how many stream batches are lost. //! //! # Example //! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception //! of livestreamed data. use heapless::spsc::{Consumer, Producer, Queue}; use miniconf::MiniconfAtomic; +use num_enum::IntoPrimitive; use serde::Deserialize; use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; -use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; - use heapless::pool::{Box, Init, Pool, Uninit}; use super::NetworkReference; const MAGIC_WORD: u16 = 0x057B; +// The size of the header, calculated in bytes. +// The header has a 16-bit magic word, an 8-bit format, 8-bit batch-size, and 32-bit sequence +// number, which corresponds to 8 bytes total. const HEADER_SIZE: usize = 8; // The number of frames that can be buffered. @@ -52,6 +45,10 @@ const FRAME_COUNT: usize = 4; // The size of each livestream frame in bytes. const FRAME_SIZE: usize = 1024 + HEADER_SIZE; +// The size of the frame queue must be at least as large as the number of frame buffers. Every +// allocated frame buffer should fit in the queue. +const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2; + // Static storage used for a heapless::Pool of frame buffers. static mut FRAME_DATA: [u8; FRAME_SIZE * FRAME_COUNT] = [0; FRAME_SIZE * FRAME_COUNT]; @@ -74,7 +71,7 @@ pub struct StreamTarget { /// Specifies the format of streamed data #[repr(u8)] -#[derive(Debug, Copy, Clone, PartialEq)] +#[derive(Debug, Copy, Clone, PartialEq, IntoPrimitive)] pub enum StreamFormat { /// Reserved, unused format specifier. Unknown = 0, @@ -89,12 +86,6 @@ pub enum StreamFormat { AdcDacData = 1, } -impl From for u8 { - fn from(format: StreamFormat) -> u8 { - format as u8 - } -} - impl From for SocketAddr { fn from(target: StreamTarget) -> SocketAddr { SocketAddr::new( @@ -120,8 +111,10 @@ impl From for SocketAddr { pub fn setup_streaming( stack: NetworkReference, ) -> (FrameGenerator, DataStream) { + // The queue needs to be at least as large as the frame count to ensure that every allocated + // frame can potentially be enqueued for transmission. let queue = - cortex_m::singleton!(: Queue = Queue::new()) + cortex_m::singleton!(: Queue = Queue::new()) .unwrap(); let (producer, consumer) = queue.split(); @@ -167,17 +160,13 @@ impl StreamFrame { where F: FnMut(&mut [u8]), { - assert!(!self.is_full::(), "Batch cannot be added to full frame"); - - let result = f(&mut self.buffer[self.offset..self.offset + T]); + f(&mut self.buffer[self.offset..self.offset + T]); self.offset += T; - - result } pub fn is_full(&self) -> bool { - self.offset + T >= self.buffer.len() + self.offset + T > self.buffer.len() } pub fn finish(&mut self) -> &[u8] { @@ -187,37 +176,42 @@ impl StreamFrame { /// The data generator for a stream. pub struct FrameGenerator { - queue: Producer<'static, StreamFrame, FRAME_COUNT>, + queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, pool: &'static Pool<[u8; FRAME_SIZE]>, current_frame: Option, sequence_number: u32, format: u8, + batch_size: u8, } impl FrameGenerator { fn new( - queue: Producer<'static, StreamFrame, FRAME_COUNT>, + queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, pool: &'static Pool<[u8; FRAME_SIZE]>, ) -> Self { Self { queue, pool, + batch_size: 0, format: StreamFormat::Unknown.into(), current_frame: None, sequence_number: 0, } } - /// Specify the format of the stream. + /// Configure the format of the stream. /// /// # Note: - /// This function may only be called once upon initializing streaming + /// This function shall only be called once upon initializing streaming /// /// # Args /// * `format` - The desired format of the stream. + /// * `batch_size` - The number of samples in each data batch. See + /// [crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE] #[doc(hidden)] - pub(crate) fn set_format(&mut self, format: impl Into) { + pub(crate) fn configure(&mut self, format: impl Into, batch_size: u8) { self.format = format.into(); + self.batch_size = batch_size; } /// Add a batch to the current stream frame. @@ -237,7 +231,7 @@ impl FrameGenerator { self.current_frame.replace(StreamFrame::new( buffer, self.format as u8, - SAMPLE_BUFFER_SIZE as u8, + self.batch_size, sequence_number, )); } else { @@ -245,11 +239,14 @@ impl FrameGenerator { } } + // Note(unwrap): We ensure the frame is present above. let current_frame = self.current_frame.as_mut().unwrap(); current_frame.add_batch::<_, T>(f); if current_frame.is_full::() { + // Note(unwrap): The queue is designed to be at least as large as the frame buffer + // count, so this enqueue should always succeed. self.queue .enqueue(self.current_frame.take().unwrap()) .unwrap(); @@ -264,7 +261,7 @@ impl FrameGenerator { pub struct DataStream { stack: NetworkReference, socket: Option<::UdpSocket>, - queue: Consumer<'static, StreamFrame, FRAME_COUNT>, + queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>, frame_pool: &'static Pool<[u8; FRAME_SIZE]>, remote: SocketAddr, } @@ -278,7 +275,7 @@ impl DataStream { /// * `frame_pool` - The Pool to return stream frame objects into. fn new( stack: NetworkReference, - consumer: Consumer<'static, StreamFrame, FRAME_COUNT>, + consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>, frame_pool: &'static Pool<[u8; FRAME_SIZE]>, ) -> Self { Self { diff --git a/src/net/mod.rs b/src/net/mod.rs index ca1bb3d..da82999 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -116,12 +116,13 @@ where /// /// # Args /// * `format` - A unique u8 code indicating the format of the data. - pub fn enable_streaming( + pub fn configure_streaming( &mut self, format: impl Into, + batch_size: u8, ) -> FrameGenerator { let mut generator = self.generator.take().unwrap(); - generator.set_format(format); + generator.configure(format, batch_size); generator }