diff --git a/Cargo.lock b/Cargo.lock index 4959e1f..40b6751 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -353,8 +353,7 @@ dependencies = [ [[package]] name = "heapless" version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34e26526e7168021f34243a3c8faac4dc4f938cde75a0f9b8e373cca5eb4e7ce" +source = "git+https://github.com/quartiq/heapless.git?branch=feature/assume-init#0139ab11d55c6924dafd5d99ac9eda92bd0df77b" dependencies = [ "atomic-polyfill", "hash32 0.2.1", diff --git a/Cargo.toml b/Cargo.toml index 3465c9f..6510583 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,10 @@ rev = "33aa67d" git = "https://github.com/rust-embedded/cortex-m-rt.git" rev = "a2e3ad5" +[patch.crates-io.heapless] +git = "https://github.com/quartiq/heapless.git" +branch = "feature/assume-init" + [patch.crates-io.miniconf] git = "https://github.com/quartiq/miniconf.git" rev = "9c826f8" diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index 2981b0c..ca3ccd4 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -4,14 +4,55 @@ Author: Ryan Summers Description: Provides a mechanism for measuring Stabilizer stream data throughput. """ +import argparse import socket import collections import struct import time import logging -# Representation of a single UDP packet transmitted by Stabilizer. -Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac']) +# Representation of a single data batch transmitted by Stabilizer. +Packet = collections.namedtuple('Packet', ['index', 'data']) + +# The magic header half-word at the start of each packet. +MAGIC_HEADER = 0x057B + +# The struct format of the header. +HEADER_FORMAT = ', - generator: BlockGenerator, + generator: FrameGenerator, signal_generator: [SignalGenerator; 2], settings: Settings, @@ -193,7 +194,10 @@ const APP: () = { stabilizer.net.mac_address, ); - let generator = network.enable_streaming(); + let generator = network.configure_streaming( + StreamFormat::AdcDacData, + SAMPLE_BUFFER_SIZE as u8, + ); // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); @@ -307,7 +311,23 @@ const APP: () = { } // Stream the data. - generator.send(&adc_samples, &dac_samples); + const N: usize = SAMPLE_BUFFER_SIZE * core::mem::size_of::(); + generator.add::<_, { N * 4 }>(|buf| { + for (data, buf) in adc_samples + .iter() + .chain(dac_samples.iter()) + .zip(buf.chunks_exact_mut(N)) + { + assert_eq!(core::mem::size_of_val(*data), N); + let data = unsafe { + core::slice::from_raw_parts( + data.as_ptr() as *const u8, + N, + ) + }; + buf.copy_from_slice(data) + } + }); // Update telemetry measurements. telemetry.adcs = diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index af7a747..3cabc7a 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -43,6 +43,7 @@ use stabilizer::{ adc::{Adc0Input, Adc1Input, AdcCode}, afe::Gain, dac::{Dac0Output, Dac1Output, DacCode}, + design_parameters::SAMPLE_BUFFER_SIZE, embedded_hal::digital::v2::InputPin, hal, input_stamper::InputStamper, @@ -51,7 +52,7 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ - data_stream::{BlockGenerator, StreamTarget}, + data_stream::{FrameGenerator, StreamFormat, StreamTarget}, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, @@ -208,7 +209,7 @@ const APP: () = { settings: Settings, telemetry: TelemetryBuffer, digital_inputs: (DigitalInput0, DigitalInput1), - generator: BlockGenerator, + generator: FrameGenerator, signal_generator: signal_generator::SignalGenerator, timestamper: InputStamper, @@ -230,7 +231,10 @@ const APP: () = { stabilizer.net.mac_address, ); - let generator = network.enable_streaming(); + let generator = network.configure_streaming( + StreamFormat::AdcDacData, + SAMPLE_BUFFER_SIZE as u8, + ); let settings = Settings::default(); @@ -394,8 +398,24 @@ const APP: () = { } } - // Stream data - generator.send(&adc_samples, &dac_samples); + // Stream the data. + const N: usize = SAMPLE_BUFFER_SIZE * core::mem::size_of::(); + generator.add::<_, { N * 4 }>(|buf| { + for (data, buf) in adc_samples + .iter() + .chain(dac_samples.iter()) + .zip(buf.chunks_exact_mut(N)) + { + assert_eq!(core::mem::size_of_val(*data), N); + let data = unsafe { + core::slice::from_raw_parts( + data.as_ptr() as *const u8, + N, + ) + }; + buf.copy_from_slice(data) + } + }); // Update telemetry measurements. telemetry.adcs = diff --git a/src/hardware/setup.rs b/src/hardware/setup.rs index 2b270d5..61b9a2d 100644 --- a/src/hardware/setup.rs +++ b/src/hardware/setup.rs @@ -36,11 +36,6 @@ pub struct NetStorage { [Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8], pub routes_cache: [Option<(smoltcp::wire::IpCidr, smoltcp::iface::Route)>; 8], - - pub dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata; 1], - pub dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata; 1], - pub dhcp_tx_storage: [u8; 600], - pub dhcp_rx_storage: [u8; 600], } pub struct UdpSocketStorage { @@ -94,10 +89,6 @@ impl Default for NetStorage { sockets: [None, None, None, None, None, None], tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS], udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS], - dhcp_tx_storage: [0; 600], - dhcp_rx_storage: [0; 600], - dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1], - dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1], } } } diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index e5b69ce..dbedac0 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -2,27 +2,56 @@ //! //! # Design //! Data streamining utilizes UDP packets to send live data streams at high throughput. -//! Packets are always sent in a best-effort fashion, and data may be dropped. Each packet contains -//! an identifier that can be used to detect dropped data. +//! Packets are always sent in a best-effort fashion, and data may be dropped. //! -//! Refer to [DataPacket] for information about the serialization format of each UDP packet. +//! Stabilizer organizes livestreamed data into batches within a "Frame" that will be sent as a UDP +//! packet. Each frame consits of a header followed by sequential batch serializations. The packet +//! header is constant for all streaming capabilities, but the serialization format after the header +//! is application-defined. +//! +//! ## Frame Header +//! The header consists of the following, all in little-endian. +//! +//! * **Magic word 0x057B** : a constant to identify Stabilizer streaming data. +//! * **Format Code** : a unique ID that indicates the serialization format of each batch of data +//! in the frame. Refer to [StreamFormat] for further information. +//! * **Batch Size** : the number of samples in each batch of data. +//! * **Sequence Number** : an the sequence number of the first batch in the frame. +//! This can be used to determine if and how many stream batches are lost. //! //! # Example //! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception //! of livestreamed data. use heapless::spsc::{Consumer, Producer, Queue}; use miniconf::MiniconfAtomic; +use num_enum::IntoPrimitive; use serde::Deserialize; use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; +use heapless::pool::{Box, Init, Pool, Uninit}; + use super::NetworkReference; -use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; -// The number of data blocks that we will buffer in the queue. -const BLOCK_BUFFER_SIZE: usize = 30; +const MAGIC_WORD: u16 = 0x057B; -// A factor that data may be subsampled at. -const SUBSAMPLE_RATE: usize = 1; +// The size of the header, calculated in bytes. +// The header has a 16-bit magic word, an 8-bit format, 8-bit batch-size, and 32-bit sequence +// number, which corresponds to 8 bytes total. +const HEADER_SIZE: usize = 8; + +// The number of frames that can be buffered. +const FRAME_COUNT: usize = 4; + +// The size of each livestream frame in bytes. +const FRAME_SIZE: usize = 1024 + HEADER_SIZE; + +// The size of the frame queue must be at least as large as the number of frame buffers. Every +// allocated frame buffer should fit in the queue. +const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2; + +// Static storage used for a heapless::Pool of frame buffers. +static mut FRAME_DATA: [u8; FRAME_SIZE * FRAME_COUNT] = + [0; FRAME_SIZE * FRAME_COUNT]; /// Represents the destination for the UDP stream to send data to. /// @@ -40,6 +69,23 @@ pub struct StreamTarget { pub port: u16, } +/// Specifies the format of streamed data +#[repr(u8)] +#[derive(Debug, Copy, Clone, PartialEq, IntoPrimitive)] +pub enum StreamFormat { + /// Reserved, unused format specifier. + Unknown = 0, + + /// Streamed data contains ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format. + /// + /// # Example + /// With a batch size of 2, the serialization would take the following form: + /// ``` + /// + /// ``` + AdcDacData = 1, +} + impl From for SocketAddr { fn from(target: StreamTarget) -> SocketAddr { SocketAddr::new( @@ -54,15 +100,6 @@ impl From for SocketAddr { } } -/// A basic "batch" of data. -// Note: In the future, the stream may be generic over this type. -#[derive(Debug, Copy, Clone)] -pub struct AdcDacData { - block_id: u16, - adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], - dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], -} - /// Configure streaming on a device. /// /// # Args @@ -73,216 +110,147 @@ pub struct AdcDacData { /// `stream` is the logically consumer (UDP transmitter) of the enqueued data. pub fn setup_streaming( stack: NetworkReference, -) -> (BlockGenerator, DataStream) { - let queue = cortex_m::singleton!(: Queue = Queue::new()).unwrap(); - +) -> (FrameGenerator, DataStream) { + // The queue needs to be at least as large as the frame count to ensure that every allocated + // frame can potentially be enqueued for transmission. + let queue = + cortex_m::singleton!(: Queue = Queue::new()) + .unwrap(); let (producer, consumer) = queue.split(); - let generator = BlockGenerator::new(producer); + let frame_pool = + cortex_m::singleton!(: Pool<[u8; FRAME_SIZE]>= Pool::new()).unwrap(); - let stream = DataStream::new(stack, consumer); + // Note(unsafe): We guarantee that FRAME_DATA is only accessed once in this function. + let memory = unsafe { &mut FRAME_DATA }; + frame_pool.grow(memory); + + let generator = FrameGenerator::new(producer, frame_pool); + + let stream = DataStream::new(stack, consumer, frame_pool); (generator, stream) } -/// The data generator for a stream. -pub struct BlockGenerator { - queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, - current_id: u16, +#[derive(Debug)] +struct StreamFrame { + buffer: Box<[u8; FRAME_SIZE], Init>, + offset: usize, } -impl BlockGenerator { - /// Construct a new generator. - /// # Args - /// * `queue` - The producer portion of the SPSC queue to enqueue data into. - /// - /// # Returns - /// The generator to use. - fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self { +impl StreamFrame { + pub fn new( + buffer: Box<[u8; FRAME_SIZE], Uninit>, + format: u8, + buffer_size: u8, + sequence_number: u32, + ) -> Self { + let mut buffer = unsafe { buffer.assume_init() }; + buffer[0..2].copy_from_slice(&MAGIC_WORD.to_ne_bytes()); + buffer[2] = format; + buffer[3] = buffer_size; + buffer[4..8].copy_from_slice(&sequence_number.to_ne_bytes()); + Self { + buffer, + offset: HEADER_SIZE, + } + } + + pub fn add_batch(&mut self, mut f: F) + where + F: FnMut(&mut [u8]), + { + f(&mut self.buffer[self.offset..self.offset + T]); + + self.offset += T; + } + + pub fn is_full(&self) -> bool { + self.offset + T > self.buffer.len() + } + + pub fn finish(&mut self) -> &[u8] { + &self.buffer[..self.offset] + } +} + +/// The data generator for a stream. +pub struct FrameGenerator { + queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, + pool: &'static Pool<[u8; FRAME_SIZE]>, + current_frame: Option, + sequence_number: u32, + format: u8, + batch_size: u8, +} + +impl FrameGenerator { + fn new( + queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>, + pool: &'static Pool<[u8; FRAME_SIZE]>, + ) -> Self { Self { queue, - current_id: 0, + pool, + batch_size: 0, + format: StreamFormat::Unknown.into(), + current_frame: None, + sequence_number: 0, } } - /// Schedule data to be sent by the generator. + /// Configure the format of the stream. /// - /// # Note - /// If no space is available, the data batch may be silently dropped. + /// # Note: + /// This function shall only be called once upon initializing streaming /// /// # Args - /// * `adcs` - The ADC data to transmit. - /// * `dacs` - The DAC data to transmit. - pub fn send( - &mut self, - adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], - dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], - ) { - let block = AdcDacData { - block_id: self.current_id, - adcs: [*adcs[0], *adcs[1]], - dacs: [*dacs[0], *dacs[1]], - }; - - self.current_id = self.current_id.wrapping_add(1); - self.queue.enqueue(block).ok(); - } -} - -/// # Stream Packet -/// Represents a single UDP packet sent by the stream. -/// -/// A "batch" of data is defined to be the data collected for a single invocation of the DSP -/// routine. A packet is composed of as many sequential batches as can fit. -/// -/// The packet is given a header indicating the starting batch sequence number and the number of -/// batches present. If the UDP transmitter encounters a non-sequential batch, it does not enqueue -/// it into the packet and instead transmits any staged data. The non-sequential batch is then -/// transmitted in a new UDP packet. This method allows a receiver to detect dropped batches (e.g. -/// due to processing overhead). -/// -/// ## Data Format -/// -/// Data sent via UDP is sent in "blocks". Each block is a single batch of ADC/DAC codes from an -/// individual DSP processing routine. Each block is assigned a unique 16-bit identifier. The identifier -/// increments by one for each block and rolls over. All blocks in a single packet are guaranteed to -/// contain sequential identifiers. -/// -/// All data is transmitted in network-endian (big-endian) format. -/// -/// ### Quick Reference -/// -/// In the reference below, any values enclosed in parentheses represents the number of bytes used for -/// that value. E.g. "Batch size (1)" indicates 1 byte is used to represent the batch size. -/// ``` -/// # UDP packets take the following form -///
,,[, ...] -/// -/// # The header takes the following form -///
= ,, -/// -/// # Each batch takes the following form -/// = ,,, -/// -/// # Where -/// = , ... -/// ``` -/// -/// ### Packet Format -/// Multiple blocks are sent in a single UDP packet simultaneously. Each UDP packet transmitted -/// contains a header followed by the serialized data blocks. -/// ``` -///
,,[, ...] -/// ``` -/// -/// ### Header -/// A header takes the following form: -/// * The starting block ID (2 bytes) -/// * The number of blocks present in the packet (1 byte) -/// * The size of each bach in samples (1 byte) -/// -/// ``` -/// ,, -/// ``` -/// -/// ### Data Blocks -/// Following the header, each block is sequentially serialized. Each block takes the following form: -/// ``` -/// ,,, -/// ``` -/// -/// Where `` is an array of N 16-bit ADC/DAC samples. The number of samples is provided in the -/// header. -/// -/// ADC and DAC codes are transmitted in raw machine-code format. Please refer to the datasheet for the -/// ADC and DAC if you need to convert these to voltages. -pub struct DataPacket<'a> { - buf: &'a mut [u8], - subsample_rate: usize, - start_id: Option, - num_blocks: u8, - write_index: usize, -} - -impl<'a> DataPacket<'a> { - /// Construct a new packet. - /// - /// # Args - /// * `buf` - The location to serialize the data packet into. - /// * `subsample_rate` - The factor at which to subsample data from batches. - pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self { - Self { - buf, - start_id: None, - num_blocks: 0, - subsample_rate, - write_index: 4, - } + /// * `format` - The desired format of the stream. + /// * `batch_size` - The number of samples in each data batch. See + /// [crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE] + #[doc(hidden)] + pub(crate) fn configure(&mut self, format: impl Into, batch_size: u8) { + self.format = format.into(); + self.batch_size = batch_size; } - /// Add a batch of data to the packet. - /// - /// # Note - /// Serialization occurs as the packet is added. + /// Add a batch to the current stream frame. /// /// # Args - /// * `batch` - The batch to add to the packet. - pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> { - // Check that the block is sequential. - if let Some(id) = &self.start_id { - if batch.block_id != id.wrapping_add(self.num_blocks.into()) { - return Err(()); - } - } else { - // Otherwise, this is the first block. Record the strt ID. - self.start_id = Some(batch.block_id); - } + /// * `f` - A closure that will be provided the buffer to write batch data into. The buffer will + /// be the size of the `T` template argument. + pub fn add(&mut self, f: F) + where + F: FnMut(&mut [u8]), + { + let sequence_number = self.sequence_number; + self.sequence_number = self.sequence_number.wrapping_add(1); - // Check that there is space for the block. - let block_size_bytes = SAMPLE_BUFFER_SIZE / self.subsample_rate * 4 * 2; - if self.buf.len() - self.get_packet_size() < block_size_bytes { - return Err(()); - } - - // Copy the samples into the buffer. - for device in &[batch.adcs, batch.dacs] { - for channel in device { - for sample in channel.iter().step_by(self.subsample_rate) { - self.buf[self.write_index..self.write_index + 2] - .copy_from_slice(&sample.to_be_bytes()); - self.write_index += 2; - } + if self.current_frame.is_none() { + if let Some(buffer) = self.pool.alloc() { + self.current_frame.replace(StreamFrame::new( + buffer, + self.format as u8, + self.batch_size, + sequence_number, + )); + } else { + return; } } - self.num_blocks += 1; + // Note(unwrap): We ensure the frame is present above. + let current_frame = self.current_frame.as_mut().unwrap(); - Ok(()) - } + current_frame.add_batch::<_, T>(f); - fn get_packet_size(&self) -> usize { - let header_length = 4; - let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; - let block_size_bytes = block_sample_size * 2 * 4; - - block_size_bytes * self.num_blocks as usize + header_length - } - - /// Complete the packet and prepare it for transmission. - /// - /// # Returns - /// The size of the packet. The user should utilize the original buffer provided for packet - /// construction to access the packet. - pub fn finish(self) -> usize { - let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; - - // Write the header into the block. - self.buf[0..2].copy_from_slice(&self.start_id.unwrap().to_be_bytes()); - self.buf[2] = self.num_blocks; - self.buf[3] = block_sample_size as u8; - - // Return the length of the packet to transmit. - self.get_packet_size() + if current_frame.is_full::() { + // Note(unwrap): The queue is designed to be at least as large as the frame buffer + // count, so this enqueue should always succeed. + self.queue + .enqueue(self.current_frame.take().unwrap()) + .unwrap(); + } } } @@ -293,9 +261,9 @@ impl<'a> DataPacket<'a> { pub struct DataStream { stack: NetworkReference, socket: Option<::UdpSocket>, - queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>, + frame_pool: &'static Pool<[u8; FRAME_SIZE]>, remote: SocketAddr, - buffer: [u8; 1024], } impl DataStream { @@ -304,16 +272,18 @@ impl DataStream { /// # Args /// * `stack` - A reference to the shared network stack. /// * `consumer` - The read side of the queue containing data to transmit. + /// * `frame_pool` - The Pool to return stream frame objects into. fn new( stack: NetworkReference, - consumer: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>, + frame_pool: &'static Pool<[u8; FRAME_SIZE]>, ) -> Self { Self { stack, socket: None, remote: StreamTarget::default().into(), queue: consumer, - buffer: [0; 1024], + frame_pool, } } @@ -365,27 +335,16 @@ impl DataStream { // If there's no socket available, try to connect to our remote. if self.open().is_ok() { // If we just successfully opened the socket, flush old data from queue. - while self.queue.dequeue().is_some() {} + while let Some(frame) = self.queue.dequeue() { + self.frame_pool.free(frame.buffer); + } } } Some(handle) => { - if self.queue.ready() { - // Dequeue data from the queue into a larger block structure. - let mut packet = - DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); - while self - .queue - .peek() - .and_then(|batch| packet.add_batch(batch).ok()) - .is_some() - { - // Dequeue the batch that we just added to the packet. - self.queue.dequeue(); - } - - // Transmit the data packet. - let size = packet.finish(); - self.stack.send(handle, &self.buffer[..size]).ok(); + if let Some(mut frame) = self.queue.dequeue() { + // Transmit the frame and return it to the pool. + self.stack.send(handle, frame.finish()).ok(); + self.frame_pool.free(frame.buffer) } } } diff --git a/src/net/mod.rs b/src/net/mod.rs index 115ba3b..da82999 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -17,7 +17,7 @@ pub mod shared; pub mod telemetry; use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack}; -use data_stream::{BlockGenerator, DataStream}; +use data_stream::{DataStream, FrameGenerator}; use messages::{MqttMessage, SettingsResponse}; use miniconf_client::MiniconfClient; use network_processor::NetworkProcessor; @@ -49,7 +49,7 @@ pub struct NetworkUsers { pub miniconf: MiniconfClient, pub processor: NetworkProcessor, stream: DataStream, - generator: Option, + generator: Option, pub telemetry: TelemetryClient, } @@ -113,8 +113,17 @@ where } /// Enable live data streaming. - pub fn enable_streaming(&mut self) -> BlockGenerator { - self.generator.take().unwrap() + /// + /// # Args + /// * `format` - A unique u8 code indicating the format of the data. + pub fn configure_streaming( + &mut self, + format: impl Into, + batch_size: u8, + ) -> FrameGenerator { + let mut generator = self.generator.take().unwrap(); + generator.configure(format, batch_size); + generator } /// Direct the stream to the provided remote target.