From 2815d6d9e9723fc1f1ca22f8a16f0a5e81e84795 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 15 Jun 2021 13:18:16 +0200 Subject: [PATCH] Updating docs --- Cargo.lock | 2 +- scripts/stream_throughput.py | 189 +++++++++++++++++++++++++++++++++++ src/bin/dual-iir.rs | 3 +- src/net/data_stream.rs | 152 ++++++++++++++++++++-------- src/net/mod.rs | 7 +- 5 files changed, 309 insertions(+), 44 deletions(-) create mode 100644 scripts/stream_throughput.py diff --git a/Cargo.lock b/Cargo.lock index 7ff3b35..e10c1e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,7 +811,7 @@ dependencies = [ [[package]] name = "stm32h7xx-hal" version = "0.9.0" -source = "git+https://github.com/quartiq/stm32h7xx-hal.git?branch=feature/smoltcp-update#191b1d50a8a4d956492649630efaf563f59e35bf" +source = "git+https://github.com/quartiq/stm32h7xx-hal.git?rev=33aa67d#33aa67d74790cb9f680a4f281b72df0664bcf03c" dependencies = [ "bare-metal 1.0.0", "cast", diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py new file mode 100644 index 0000000..2981b0c --- /dev/null +++ b/scripts/stream_throughput.py @@ -0,0 +1,189 @@ +#!/usr/bin/python3 +""" +Author: Ryan Summers + +Description: Provides a mechanism for measuring Stabilizer stream data throughput. +""" +import socket +import collections +import struct +import time +import logging + +# Representation of a single UDP packet transmitted by Stabilizer. +Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac']) + +class Timer: + """ A basic timer for measuring elapsed time periods. """ + + def __init__(self, period=1.0): + """ Create the timer with the provided period. """ + self.start_time = time.time() + self.trigger_time = self.start_time + period + self.period = period + self.started = False + + + def is_triggered(self): + """ Check if the timer period has elapsed. """ + now = time.time() + return now >= self.trigger_time + + + def start(self): + """ Start the timer. """ + self.start_time = time.time() + self.started = True + + + def is_started(self): + """ Check if the timer has started. """ + return self.started + + + def arm(self): + """ Arm the timer trigger. """ + self.trigger_time = time.time() + self.period + + + def elapsed(self): + """ Get the elapsed time since the timer was started. """ + now = time.time() + return now - self.start_time + + +class PacketParser: + """ Utilize class used for parsing received UDP data. """ + + def __init__(self): + """ Initialize the parser. """ + self.buf = b'' + self.total_bytes = 0 + + + def ingress(self, data): + """ Ingress received UDP data. """ + self.total_bytes += len(data) + self.buf += data + + + def parse_all_packets(self): + """ Parse all received packets from the receive buffer. + + Returns: + A list of received Packets. + """ + packets = [] + while True: + new_packets = self._parse() + if new_packets: + packets += new_packets + else: + return packets + + + def _parse(self): + """ Attempt to parse packets from the received buffer. """ + # Attempt to parse a block from the buffer. + if len(self.buf) < 4: + return None + + start_id, num_blocks, data_size = struct.unpack_from('!HBB', self.buf) + + packet_size = 4 + data_size * num_blocks * 8 + + if len(self.buf) < packet_size: + return None + + self.buf = self.buf[4:] + + packets = [] + for offset in range(num_blocks): + adcs_dacs = struct.unpack_from(f'!{4 * data_size}H', self.buf) + adc = [ + adcs_dacs[0:data_size], + adcs_dacs[data_size:2*data_size], + ] + + dac = [ + adcs_dacs[2*data_size: 3*data_size], + adcs_dacs[3*data_size:], + ] + + self.buf = self.buf[8*data_size:] + packets.append(Packet(start_id + offset, adc, dac)) + + return packets + + +def check_index(previous_index, next_index): + """ Check if two indices are sequential. """ + if previous_index == -1: + return True + + # Handle index roll-over. Indices are only stored in 16-bit numbers. + if next_index < previous_index: + next_index += 65536 + + expected_index = previous_index + 1 + + return next_index == expected_index + + +def main(): + """ Main program. """ + connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + connection.bind(("", 1111)) + + logging.basicConfig(level=logging.INFO, + format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s') + + last_index = -1 + + drop_count = 0 + good_blocks = 0 + + timer = Timer() + parser = PacketParser() + + while True: + # Receive any data over UDP and parse it. + data = connection.recv(4096) + if data and not timer.is_started(): + timer.start() + + parser.ingress(data) + + # Handle any received packets. + for packet in parser.parse_all_packets(): + + # Handle any dropped packets. + if not check_index(last_index, packet.index): + print(hex(last_index), hex(packet.index)) + if packet.index < (last_index + 1): + dropped = packet.index + 65536 - (last_index + 1) + else: + dropped = packet.index - (last_index + 1) + + drop_count += dropped + + last_index = packet.index + good_blocks += 1 + + # Report the throughput periodically. + if timer.is_triggered(): + drate = parser.total_bytes * 8 / 1e6 / timer.elapsed() + + print(f''' +Data Rate: {drate:.3f} Mbps +Received Blocks: {good_blocks} +Dropped blocks: {drop_count} + +Metadata: {parser.total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s +---- +''') + timer.arm() + + +if __name__ == '__main__': + main() diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index e6c47d3..efec9c1 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -95,8 +95,7 @@ const APP: () = { stabilizer.net.mac_address, ); - let generator = - network.enable_streaming(StreamTarget::default().into()); + let generator = network.enable_streaming(); // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 545abcb..45a349c 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -1,3 +1,20 @@ +///! Stabilizer data stream capabilities +///! +///! # Design +///! Stabilizer data streamining utilizes UDP packets to send live data streams at high throughput. +///! Packets are always sent in a best-effort fashion, and data may be dropped. Each packet contains +///! an identifier that can be used to detect any dropped data. +///! +///! The current implementation utilizes an single-producer, single-consumer queue to send data +///! between a high priority task and the UDP transmitter. +///! +///! A "batch" of data is defined to be a single item in the SPSC queue sent to the UDP transmitter +///! thread. The transmitter thread then serializes as many sequential "batches" into a single UDP +///! packet as possible. The UDP packet is also given a header indicating the starting batch +///! sequence number and the number of batches present. If the UDP transmitter encounters a +///! non-sequential batch, it does not enqueue it into the packet and instead transmits any staged +///! data. The non-sequential batch is then transmitted in a new UDP packet. This method allows a +///! receiver to detect dropped batches (e.g. due to processing overhead). use core::borrow::BorrowMut; use heapless::spsc::{Consumer, Producer, Queue}; use miniconf::MiniconfAtomic; @@ -10,8 +27,10 @@ use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; // The number of data blocks that we will buffer in the queue. const BLOCK_BUFFER_SIZE: usize = 30; +// A factor that data may be subsampled at. const SUBSAMPLE_RATE: usize = 1; +/// Represents the destination for the UDP stream to send data to. #[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize)] pub struct StreamTarget { pub ip: [u8; 4], @@ -38,6 +57,23 @@ impl Into for StreamTarget { } } +/// A basic "batch" of data. +// Note: In the future, the stream may be generic over this type. +#[derive(Debug, Copy, Clone)] +pub struct AdcDacData { + block_id: u16, + adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], +} + +/// Configure streaming on a device. +/// +/// # Args +/// * `stack` - A reference to the shared network stack. +/// +/// # Returns +/// (generator, stream) where `generator` can be used to enqueue "batches" for transmission. The +/// `stream` is the logically consumer (UDP transmitter) of the enqueued data. pub fn setup_streaming( stack: NetworkReference, ) -> (BlockGenerator, DataStream) { @@ -52,28 +88,34 @@ pub fn setup_streaming( (generator, stream) } -#[derive(Debug, Copy, Clone)] -pub struct AdcDacData { - block_id: u16, - adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], - dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], -} - +/// The data generator for a stream. pub struct BlockGenerator { queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, current_id: u16, } impl BlockGenerator { - pub fn new( - queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, - ) -> Self { + /// Construct a new generator. + /// # Args + /// * `queue` - The producer portion of the SPSC queue to enqueue data into. + /// + /// # Returns + /// The generator to use. + fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self { Self { queue, current_id: 0, } } + /// Schedule data to be sent by the generator. + /// + /// # Note + /// If no space is available, the data batch may be silently dropped. + /// + /// # Args + /// * `adcs` - The ADC data to transmit. + /// * `dacs` - The DAC data to transmit. pub fn send( &mut self, adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], @@ -90,26 +132,21 @@ impl BlockGenerator { } } -pub struct DataStream { - stack: NetworkReference, - socket: Option<::UdpSocket>, - queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, - remote: Option, - buffer: [u8; 1024], -} - -// Datapacket format: -// -// Header: -// [0..2]: Start block ID (u16) -// [2..3]: Num Blocks present (u8) -// [3..4]: Batch Size (u8) -// -// Following the header, batches are added sequentially. Each batch takes the form of: -// [*0..*2]: ADC0 -// [*2..*4]: ADC1 -// [*4..*6]: DAC0 -// [*6..*8]: DAC1 +/// Represents a single UDP packet sent by the stream. +/// +/// # Packet Format +/// All data is sent in network-endian format. The format is as follows +/// +/// Header: +/// [0..2]: Start block ID (u16) +/// [2..3]: Num Blocks present (u8) +/// [3..4]: Batch Size (u8) +/// +/// Following the header, batches are added sequentially. Each batch takes the form of: +/// [*0..*2]: ADC0 +/// [*2..*4]: ADC1 +/// [*4..*6]: DAC0 +/// [*6..*8]: DAC1 struct DataPacket<'a> { buf: &'a mut [u8], subsample_rate: usize, @@ -119,6 +156,11 @@ struct DataPacket<'a> { } impl<'a> DataPacket<'a> { + /// Construct a new packet. + /// + /// # Args + /// * `buf` - The location to serialize the data packet into. + /// * `subsample_rate` - The factor at which to subsample data from batches. pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self { Self { buf, @@ -129,6 +171,13 @@ impl<'a> DataPacket<'a> { } } + /// Add a batch of data to the packet. + /// + /// # Note + /// Serialization occurs as the packet is added. + /// + /// # Args + /// * `batch` - The batch to add to the packet. pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> { // Check that the block is sequential. if let Some(id) = &self.start_id { @@ -170,6 +219,11 @@ impl<'a> DataPacket<'a> { block_size_bytes * self.num_blocks as usize + header_length } + /// Complete the packet and prepare it for transmission. + /// + /// # Returns + /// The size of the packet. The user should utilize the original buffer provided for packet + /// construction to access the packet. pub fn finish(self) -> usize { let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; @@ -183,15 +237,32 @@ impl<'a> DataPacket<'a> { } } +/// The "consumer" portion of the data stream. +/// +/// # Note +/// This is responsible for consuming data and sending it over UDP. +pub struct DataStream { + stack: NetworkReference, + socket: Option<::UdpSocket>, + queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + remote: SocketAddr, + buffer: [u8; 1024], +} + impl DataStream { - pub fn new( + /// Construct a new data streamer. + /// + /// # Args + /// * `stack` - A reference to the shared network stack. + /// * `consumer` - The read side of the queue containing data to transmit. + fn new( stack: NetworkReference, consumer: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, ) -> Self { Self { stack, socket: None, - remote: None, + remote: StreamTarget::default().into(), queue: consumer, buffer: [0; 1024], } @@ -230,24 +301,27 @@ impl DataStream { Ok(()) } + /// Configure the remote endpoint of the stream. + /// + /// # Args + /// * `remote` - The destination to send stream data to. pub fn set_remote(&mut self, remote: SocketAddr) { // If the remote is identical to what we already have, do nothing. - if let Some(current_remote) = self.remote { - if current_remote == remote { - return; - } + if remote == self.remote { + return; } // Open the new remote connection. self.open(remote).ok(); - self.remote = Some(remote); + self.remote = remote; } + /// Process any data for transmission. pub fn process(&mut self) { // If there's no socket available, try to connect to our remote. - if self.socket.is_none() && self.remote.is_some() { + if self.socket.is_none() { // If we still can't open the remote, continue. - if self.open(self.remote.unwrap()).is_err() { + if self.open(self.remote).is_err() { // Clear the queue out. while self.queue.ready() { self.queue.dequeue(); diff --git a/src/net/mod.rs b/src/net/mod.rs index 70d7dfe..115ba3b 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -113,11 +113,14 @@ where } /// Enable live data streaming. - pub fn enable_streaming(&mut self, remote: SocketAddr) -> BlockGenerator { - self.stream.set_remote(remote); + pub fn enable_streaming(&mut self) -> BlockGenerator { self.generator.take().unwrap() } + /// Direct the stream to the provided remote target. + /// + /// # Args + /// * `remote` - The destination for the streamed data. pub fn direct_stream(&mut self, remote: SocketAddr) { if self.generator.is_none() { self.stream.set_remote(remote);