414: Feature/generic stream r=jordens a=ryan-summers

This PR fixes #385 by improving the semantics by which streams are generated. Specifically, serialization format is now application-defined and data is only copied once.

TODO:
- [x] Add documentation for streaming architecture
- [x] Add format specifiers
- [x] Clean up reception script
- [x] Update lockin serialization

Co-authored-by: Ryan Summers <ryan.summers@vertigo-designs.com>
master
bors[bot] 2021-07-27 11:52:48 +00:00 committed by GitHub
commit 62d1a16b4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 310 additions and 341 deletions

3
Cargo.lock generated
View File

@ -353,8 +353,7 @@ dependencies = [
[[package]] [[package]]
name = "heapless" name = "heapless"
version = "0.7.3" version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/quartiq/heapless.git?branch=feature/assume-init#0139ab11d55c6924dafd5d99ac9eda92bd0df77b"
checksum = "34e26526e7168021f34243a3c8faac4dc4f938cde75a0f9b8e373cca5eb4e7ce"
dependencies = [ dependencies = [
"atomic-polyfill", "atomic-polyfill",
"hash32 0.2.1", "hash32 0.2.1",

View File

@ -69,6 +69,10 @@ rev = "33aa67d"
git = "https://github.com/rust-embedded/cortex-m-rt.git" git = "https://github.com/rust-embedded/cortex-m-rt.git"
rev = "a2e3ad5" rev = "a2e3ad5"
[patch.crates-io.heapless]
git = "https://github.com/quartiq/heapless.git"
branch = "feature/assume-init"
[patch.crates-io.miniconf] [patch.crates-io.miniconf]
git = "https://github.com/quartiq/miniconf.git" git = "https://github.com/quartiq/miniconf.git"
rev = "9c826f8" rev = "9c826f8"

View File

@ -4,14 +4,55 @@ Author: Ryan Summers
Description: Provides a mechanism for measuring Stabilizer stream data throughput. Description: Provides a mechanism for measuring Stabilizer stream data throughput.
""" """
import argparse
import socket import socket
import collections import collections
import struct import struct
import time import time
import logging import logging
# Representation of a single UDP packet transmitted by Stabilizer. # Representation of a single data batch transmitted by Stabilizer.
Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac']) Packet = collections.namedtuple('Packet', ['index', 'data'])
# The magic header half-word at the start of each packet.
MAGIC_HEADER = 0x057B
# The struct format of the header.
HEADER_FORMAT = '<HBBI'
# All supported formats by this reception script.
#
# The items in this dict are functions that will be provided the sample batch size and will return
# the struct deserialization code to unpack a single batch.
FORMAT = {
1: lambda batch_size: f'<{batch_size}H{batch_size}H{batch_size}H{batch_size}H'
}
def parse_packet(buf):
""" Attempt to parse packets from the received buffer. """
# Attempt to parse a block from the buffer.
if len(buf) < struct.calcsize(HEADER_FORMAT):
return
# Parse out the packet header
magic, format_id, batch_size, sequence_number = struct.unpack_from(HEADER_FORMAT, buf)
buf = buf[struct.calcsize(HEADER_FORMAT):]
if magic != MAGIC_HEADER:
logging.warning('Encountered bad magic header: %s', hex(magic))
return
frame_format = FORMAT[format_id](batch_size)
batch_count = int(len(buf) / struct.calcsize(frame_format))
packets = []
for offset in range(batch_count):
data = struct.unpack_from(frame_format, buf)
buf = buf[struct.calcsize(frame_format):]
yield Packet(sequence_number + offset, data)
class Timer: class Timer:
""" A basic timer for measuring elapsed time periods. """ """ A basic timer for measuring elapsed time periods. """
@ -52,99 +93,35 @@ class Timer:
return now - self.start_time return now - self.start_time
class PacketParser: def sequence_delta(previous_sequence, next_sequence):
""" Utilize class used for parsing received UDP data. """ """ Check the number of items between two sequence numbers. """
if previous_sequence is None:
return 0
def __init__(self): delta = next_sequence - (previous_sequence + 1)
""" Initialize the parser. """ return delta & 0xFFFFFFFF
self.buf = b''
self.total_bytes = 0
def ingress(self, data):
""" Ingress received UDP data. """
self.total_bytes += len(data)
self.buf += data
def parse_all_packets(self):
""" Parse all received packets from the receive buffer.
Returns:
A list of received Packets.
"""
packets = []
while True:
new_packets = self._parse()
if new_packets:
packets += new_packets
else:
return packets
def _parse(self):
""" Attempt to parse packets from the received buffer. """
# Attempt to parse a block from the buffer.
if len(self.buf) < 4:
return None
start_id, num_blocks, data_size = struct.unpack_from('!HBB', self.buf)
packet_size = 4 + data_size * num_blocks * 8
if len(self.buf) < packet_size:
return None
self.buf = self.buf[4:]
packets = []
for offset in range(num_blocks):
adcs_dacs = struct.unpack_from(f'!{4 * data_size}H', self.buf)
adc = [
adcs_dacs[0:data_size],
adcs_dacs[data_size:2*data_size],
]
dac = [
adcs_dacs[2*data_size: 3*data_size],
adcs_dacs[3*data_size:],
]
self.buf = self.buf[8*data_size:]
packets.append(Packet(start_id + offset, adc, dac))
return packets
def check_index(previous_index, next_index):
""" Check if two indices are sequential. """
if previous_index == -1:
return True
# Handle index roll-over. Indices are only stored in 16-bit numbers.
if next_index < previous_index:
next_index += 65536
expected_index = previous_index + 1
return next_index == expected_index
def main(): def main():
""" Main program. """ """ Main program. """
parser = argparse.ArgumentParser(description='Measure Stabilizer livestream quality')
parser.add_argument('--port', default=1111, help='The port that stabilizer is streaming to')
args = parser.parse_args()
connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
connection.bind(("", 1111)) connection.bind(("", args.port))
logging.basicConfig(level=logging.INFO, logging.basicConfig(level=logging.INFO,
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s') format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s')
last_index = -1 last_index = None
drop_count = 0 drop_count = 0
good_blocks = 0 good_blocks = 0
total_bytes = 0
timer = Timer() timer = Timer()
parser = PacketParser()
while True: while True:
# Receive any data over UDP and parse it. # Receive any data over UDP and parse it.
@ -152,34 +129,24 @@ def main():
if data and not timer.is_started(): if data and not timer.is_started():
timer.start() timer.start()
parser.ingress(data)
# Handle any received packets. # Handle any received packets.
for packet in parser.parse_all_packets(): total_bytes += len(data)
for packet in parse_packet(data):
# Handle any dropped packets. # Handle any dropped packets.
if not check_index(last_index, packet.index): drop_count += sequence_delta(last_index, packet.index)
print(hex(last_index), hex(packet.index))
if packet.index < (last_index + 1):
dropped = packet.index + 65536 - (last_index + 1)
else:
dropped = packet.index - (last_index + 1)
drop_count += dropped
last_index = packet.index last_index = packet.index
good_blocks += 1 good_blocks += 1
# Report the throughput periodically. # Report the throughput periodically.
if timer.is_triggered(): if timer.is_triggered():
drate = parser.total_bytes * 8 / 1e6 / timer.elapsed() drate = total_bytes * 8 / 1e6 / timer.elapsed()
print(f''' print(f'''
Data Rate: {drate:.3f} Mbps Data Rate: {drate:.3f} Mbps
Received Blocks: {good_blocks} Received Blocks: {good_blocks}
Dropped blocks: {drop_count} Dropped blocks: {drop_count}
Metadata: {parser.total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s Metadata: {total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s
---- ----
''') ''')
timer.arm() timer.arm()

View File

@ -43,6 +43,7 @@ use stabilizer::{
adc::{Adc0Input, Adc1Input, AdcCode}, adc::{Adc0Input, Adc1Input, AdcCode},
afe::Gain, afe::Gain,
dac::{Dac0Output, Dac1Output, DacCode}, dac::{Dac0Output, Dac1Output, DacCode},
design_parameters::SAMPLE_BUFFER_SIZE,
embedded_hal::digital::v2::InputPin, embedded_hal::digital::v2::InputPin,
hal, hal,
signal_generator::{self, SignalGenerator}, signal_generator::{self, SignalGenerator},
@ -50,7 +51,7 @@ use stabilizer::{
DigitalInput0, DigitalInput1, AFE0, AFE1, DigitalInput0, DigitalInput1, AFE0, AFE1,
}, },
net::{ net::{
data_stream::{BlockGenerator, StreamTarget}, data_stream::{FrameGenerator, StreamFormat, StreamTarget},
miniconf::Miniconf, miniconf::Miniconf,
serde::Deserialize, serde::Deserialize,
telemetry::{Telemetry, TelemetryBuffer}, telemetry::{Telemetry, TelemetryBuffer},
@ -169,7 +170,7 @@ const APP: () = {
adcs: (Adc0Input, Adc1Input), adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output), dacs: (Dac0Output, Dac1Output),
network: NetworkUsers<Settings, Telemetry>, network: NetworkUsers<Settings, Telemetry>,
generator: BlockGenerator, generator: FrameGenerator,
signal_generator: [SignalGenerator; 2], signal_generator: [SignalGenerator; 2],
settings: Settings, settings: Settings,
@ -193,7 +194,10 @@ const APP: () = {
stabilizer.net.mac_address, stabilizer.net.mac_address,
); );
let generator = network.enable_streaming(); let generator = network.configure_streaming(
StreamFormat::AdcDacData,
SAMPLE_BUFFER_SIZE as u8,
);
// Spawn a settings update for default settings. // Spawn a settings update for default settings.
c.spawn.settings_update().unwrap(); c.spawn.settings_update().unwrap();
@ -307,7 +311,23 @@ const APP: () = {
} }
// Stream the data. // Stream the data.
generator.send(&adc_samples, &dac_samples); const N: usize = SAMPLE_BUFFER_SIZE * core::mem::size_of::<u16>();
generator.add::<_, { N * 4 }>(|buf| {
for (data, buf) in adc_samples
.iter()
.chain(dac_samples.iter())
.zip(buf.chunks_exact_mut(N))
{
assert_eq!(core::mem::size_of_val(*data), N);
let data = unsafe {
core::slice::from_raw_parts(
data.as_ptr() as *const u8,
N,
)
};
buf.copy_from_slice(data)
}
});
// Update telemetry measurements. // Update telemetry measurements.
telemetry.adcs = telemetry.adcs =

View File

@ -43,6 +43,7 @@ use stabilizer::{
adc::{Adc0Input, Adc1Input, AdcCode}, adc::{Adc0Input, Adc1Input, AdcCode},
afe::Gain, afe::Gain,
dac::{Dac0Output, Dac1Output, DacCode}, dac::{Dac0Output, Dac1Output, DacCode},
design_parameters::SAMPLE_BUFFER_SIZE,
embedded_hal::digital::v2::InputPin, embedded_hal::digital::v2::InputPin,
hal, hal,
input_stamper::InputStamper, input_stamper::InputStamper,
@ -51,7 +52,7 @@ use stabilizer::{
DigitalInput0, DigitalInput1, AFE0, AFE1, DigitalInput0, DigitalInput1, AFE0, AFE1,
}, },
net::{ net::{
data_stream::{BlockGenerator, StreamTarget}, data_stream::{FrameGenerator, StreamFormat, StreamTarget},
miniconf::Miniconf, miniconf::Miniconf,
serde::Deserialize, serde::Deserialize,
telemetry::{Telemetry, TelemetryBuffer}, telemetry::{Telemetry, TelemetryBuffer},
@ -208,7 +209,7 @@ const APP: () = {
settings: Settings, settings: Settings,
telemetry: TelemetryBuffer, telemetry: TelemetryBuffer,
digital_inputs: (DigitalInput0, DigitalInput1), digital_inputs: (DigitalInput0, DigitalInput1),
generator: BlockGenerator, generator: FrameGenerator,
signal_generator: signal_generator::SignalGenerator, signal_generator: signal_generator::SignalGenerator,
timestamper: InputStamper, timestamper: InputStamper,
@ -230,7 +231,10 @@ const APP: () = {
stabilizer.net.mac_address, stabilizer.net.mac_address,
); );
let generator = network.enable_streaming(); let generator = network.configure_streaming(
StreamFormat::AdcDacData,
SAMPLE_BUFFER_SIZE as u8,
);
let settings = Settings::default(); let settings = Settings::default();
@ -394,8 +398,24 @@ const APP: () = {
} }
} }
// Stream data // Stream the data.
generator.send(&adc_samples, &dac_samples); const N: usize = SAMPLE_BUFFER_SIZE * core::mem::size_of::<u16>();
generator.add::<_, { N * 4 }>(|buf| {
for (data, buf) in adc_samples
.iter()
.chain(dac_samples.iter())
.zip(buf.chunks_exact_mut(N))
{
assert_eq!(core::mem::size_of_val(*data), N);
let data = unsafe {
core::slice::from_raw_parts(
data.as_ptr() as *const u8,
N,
)
};
buf.copy_from_slice(data)
}
});
// Update telemetry measurements. // Update telemetry measurements.
telemetry.adcs = telemetry.adcs =

View File

@ -36,11 +36,6 @@ pub struct NetStorage {
[Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8], [Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8],
pub routes_cache: pub routes_cache:
[Option<(smoltcp::wire::IpCidr, smoltcp::iface::Route)>; 8], [Option<(smoltcp::wire::IpCidr, smoltcp::iface::Route)>; 8],
pub dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata; 1],
pub dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata; 1],
pub dhcp_tx_storage: [u8; 600],
pub dhcp_rx_storage: [u8; 600],
} }
pub struct UdpSocketStorage { pub struct UdpSocketStorage {
@ -94,10 +89,6 @@ impl Default for NetStorage {
sockets: [None, None, None, None, None, None], sockets: [None, None, None, None, None, None],
tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS], tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS],
udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS], udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS],
dhcp_tx_storage: [0; 600],
dhcp_rx_storage: [0; 600],
dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1],
dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1],
} }
} }
} }

View File

@ -2,27 +2,56 @@
//! //!
//! # Design //! # Design
//! Data streamining utilizes UDP packets to send live data streams at high throughput. //! Data streamining utilizes UDP packets to send live data streams at high throughput.
//! Packets are always sent in a best-effort fashion, and data may be dropped. Each packet contains //! Packets are always sent in a best-effort fashion, and data may be dropped.
//! an identifier that can be used to detect dropped data.
//! //!
//! Refer to [DataPacket] for information about the serialization format of each UDP packet. //! Stabilizer organizes livestreamed data into batches within a "Frame" that will be sent as a UDP
//! packet. Each frame consits of a header followed by sequential batch serializations. The packet
//! header is constant for all streaming capabilities, but the serialization format after the header
//! is application-defined.
//!
//! ## Frame Header
//! The header consists of the following, all in little-endian.
//!
//! * **Magic word 0x057B** <u16>: a constant to identify Stabilizer streaming data.
//! * **Format Code** <u8>: a unique ID that indicates the serialization format of each batch of data
//! in the frame. Refer to [StreamFormat] for further information.
//! * **Batch Size** <u8>: the number of samples in each batch of data.
//! * **Sequence Number** <u32>: an the sequence number of the first batch in the frame.
//! This can be used to determine if and how many stream batches are lost.
//! //!
//! # Example //! # Example
//! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception //! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception
//! of livestreamed data. //! of livestreamed data.
use heapless::spsc::{Consumer, Producer, Queue}; use heapless::spsc::{Consumer, Producer, Queue};
use miniconf::MiniconfAtomic; use miniconf::MiniconfAtomic;
use num_enum::IntoPrimitive;
use serde::Deserialize; use serde::Deserialize;
use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack};
use heapless::pool::{Box, Init, Pool, Uninit};
use super::NetworkReference; use super::NetworkReference;
use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE;
// The number of data blocks that we will buffer in the queue. const MAGIC_WORD: u16 = 0x057B;
const BLOCK_BUFFER_SIZE: usize = 30;
// A factor that data may be subsampled at. // The size of the header, calculated in bytes.
const SUBSAMPLE_RATE: usize = 1; // The header has a 16-bit magic word, an 8-bit format, 8-bit batch-size, and 32-bit sequence
// number, which corresponds to 8 bytes total.
const HEADER_SIZE: usize = 8;
// The number of frames that can be buffered.
const FRAME_COUNT: usize = 4;
// The size of each livestream frame in bytes.
const FRAME_SIZE: usize = 1024 + HEADER_SIZE;
// The size of the frame queue must be at least as large as the number of frame buffers. Every
// allocated frame buffer should fit in the queue.
const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2;
// Static storage used for a heapless::Pool of frame buffers.
static mut FRAME_DATA: [u8; FRAME_SIZE * FRAME_COUNT] =
[0; FRAME_SIZE * FRAME_COUNT];
/// Represents the destination for the UDP stream to send data to. /// Represents the destination for the UDP stream to send data to.
/// ///
@ -40,6 +69,23 @@ pub struct StreamTarget {
pub port: u16, pub port: u16,
} }
/// Specifies the format of streamed data
#[repr(u8)]
#[derive(Debug, Copy, Clone, PartialEq, IntoPrimitive)]
pub enum StreamFormat {
/// Reserved, unused format specifier.
Unknown = 0,
/// Streamed data contains ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format.
///
/// # Example
/// With a batch size of 2, the serialization would take the following form:
/// ```
/// <ADC0[0]> <ADC0[1]> <ADC1[0]> <ADC1[1]> <DAC0[0]> <DAC0[1]> <DAC1[0]> <DAC1[1]>
/// ```
AdcDacData = 1,
}
impl From<StreamTarget> for SocketAddr { impl From<StreamTarget> for SocketAddr {
fn from(target: StreamTarget) -> SocketAddr { fn from(target: StreamTarget) -> SocketAddr {
SocketAddr::new( SocketAddr::new(
@ -54,15 +100,6 @@ impl From<StreamTarget> for SocketAddr {
} }
} }
/// A basic "batch" of data.
// Note: In the future, the stream may be generic over this type.
#[derive(Debug, Copy, Clone)]
pub struct AdcDacData {
block_id: u16,
adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
}
/// Configure streaming on a device. /// Configure streaming on a device.
/// ///
/// # Args /// # Args
@ -73,216 +110,147 @@ pub struct AdcDacData {
/// `stream` is the logically consumer (UDP transmitter) of the enqueued data. /// `stream` is the logically consumer (UDP transmitter) of the enqueued data.
pub fn setup_streaming( pub fn setup_streaming(
stack: NetworkReference, stack: NetworkReference,
) -> (BlockGenerator, DataStream) { ) -> (FrameGenerator, DataStream) {
let queue = cortex_m::singleton!(: Queue<AdcDacData, BLOCK_BUFFER_SIZE> = Queue::new()).unwrap(); // The queue needs to be at least as large as the frame count to ensure that every allocated
// frame can potentially be enqueued for transmission.
let queue =
cortex_m::singleton!(: Queue<StreamFrame, FRAME_QUEUE_SIZE> = Queue::new())
.unwrap();
let (producer, consumer) = queue.split(); let (producer, consumer) = queue.split();
let generator = BlockGenerator::new(producer); let frame_pool =
cortex_m::singleton!(: Pool<[u8; FRAME_SIZE]>= Pool::new()).unwrap();
let stream = DataStream::new(stack, consumer); // Note(unsafe): We guarantee that FRAME_DATA is only accessed once in this function.
let memory = unsafe { &mut FRAME_DATA };
frame_pool.grow(memory);
let generator = FrameGenerator::new(producer, frame_pool);
let stream = DataStream::new(stack, consumer, frame_pool);
(generator, stream) (generator, stream)
} }
/// The data generator for a stream. #[derive(Debug)]
pub struct BlockGenerator { struct StreamFrame {
queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, buffer: Box<[u8; FRAME_SIZE], Init>,
current_id: u16, offset: usize,
} }
impl BlockGenerator { impl StreamFrame {
/// Construct a new generator. pub fn new(
/// # Args buffer: Box<[u8; FRAME_SIZE], Uninit>,
/// * `queue` - The producer portion of the SPSC queue to enqueue data into. format: u8,
/// buffer_size: u8,
/// # Returns sequence_number: u32,
/// The generator to use. ) -> Self {
fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self { let mut buffer = unsafe { buffer.assume_init() };
buffer[0..2].copy_from_slice(&MAGIC_WORD.to_ne_bytes());
buffer[2] = format;
buffer[3] = buffer_size;
buffer[4..8].copy_from_slice(&sequence_number.to_ne_bytes());
Self {
buffer,
offset: HEADER_SIZE,
}
}
pub fn add_batch<F, const T: usize>(&mut self, mut f: F)
where
F: FnMut(&mut [u8]),
{
f(&mut self.buffer[self.offset..self.offset + T]);
self.offset += T;
}
pub fn is_full<const T: usize>(&self) -> bool {
self.offset + T > self.buffer.len()
}
pub fn finish(&mut self) -> &[u8] {
&self.buffer[..self.offset]
}
}
/// The data generator for a stream.
pub struct FrameGenerator {
queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
pool: &'static Pool<[u8; FRAME_SIZE]>,
current_frame: Option<StreamFrame>,
sequence_number: u32,
format: u8,
batch_size: u8,
}
impl FrameGenerator {
fn new(
queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
pool: &'static Pool<[u8; FRAME_SIZE]>,
) -> Self {
Self { Self {
queue, queue,
current_id: 0, pool,
batch_size: 0,
format: StreamFormat::Unknown.into(),
current_frame: None,
sequence_number: 0,
} }
} }
/// Schedule data to be sent by the generator. /// Configure the format of the stream.
/// ///
/// # Note /// # Note:
/// If no space is available, the data batch may be silently dropped. /// This function shall only be called once upon initializing streaming
/// ///
/// # Args /// # Args
/// * `adcs` - The ADC data to transmit. /// * `format` - The desired format of the stream.
/// * `dacs` - The DAC data to transmit. /// * `batch_size` - The number of samples in each data batch. See
pub fn send( /// [crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE]
&mut self, #[doc(hidden)]
adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], pub(crate) fn configure(&mut self, format: impl Into<u8>, batch_size: u8) {
dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], self.format = format.into();
) { self.batch_size = batch_size;
let block = AdcDacData {
block_id: self.current_id,
adcs: [*adcs[0], *adcs[1]],
dacs: [*dacs[0], *dacs[1]],
};
self.current_id = self.current_id.wrapping_add(1);
self.queue.enqueue(block).ok();
}
}
/// # Stream Packet
/// Represents a single UDP packet sent by the stream.
///
/// A "batch" of data is defined to be the data collected for a single invocation of the DSP
/// routine. A packet is composed of as many sequential batches as can fit.
///
/// The packet is given a header indicating the starting batch sequence number and the number of
/// batches present. If the UDP transmitter encounters a non-sequential batch, it does not enqueue
/// it into the packet and instead transmits any staged data. The non-sequential batch is then
/// transmitted in a new UDP packet. This method allows a receiver to detect dropped batches (e.g.
/// due to processing overhead).
///
/// ## Data Format
///
/// Data sent via UDP is sent in "blocks". Each block is a single batch of ADC/DAC codes from an
/// individual DSP processing routine. Each block is assigned a unique 16-bit identifier. The identifier
/// increments by one for each block and rolls over. All blocks in a single packet are guaranteed to
/// contain sequential identifiers.
///
/// All data is transmitted in network-endian (big-endian) format.
///
/// ### Quick Reference
///
/// In the reference below, any values enclosed in parentheses represents the number of bytes used for
/// that value. E.g. "Batch size (1)" indicates 1 byte is used to represent the batch size.
/// ```
/// # UDP packets take the following form
/// <Header>,<Batch 1>,[<Batch 2>, ...<Batch N>]
///
/// # The header takes the following form
/// <Header> = <Starting ID (2)>,<Number blocks [N] (1)>,<Batch size [BS] (1)>
///
/// # Each batch takes the following form
/// <Batch N> = <ADC0>,<ADC1>,<DAC0>,<DAC1>
///
/// # Where
/// <ADCx/DACx> = <Sample 1 (2)>, ...<Sample BS (2)>
/// ```
///
/// ### Packet Format
/// Multiple blocks are sent in a single UDP packet simultaneously. Each UDP packet transmitted
/// contains a header followed by the serialized data blocks.
/// ```
/// <Header>,<Batch 1>,[<Batch 2>, ...<Batch N>]
/// ```
///
/// ### Header
/// A header takes the following form:
/// * The starting block ID (2 bytes)
/// * The number of blocks present in the packet (1 byte)
/// * The size of each bach in samples (1 byte)
///
/// ```
/// <Starting ID (2)>,<N blocks (1)>,<Batch size (1)>
/// ```
///
/// ### Data Blocks
/// Following the header, each block is sequentially serialized. Each block takes the following form:
/// ```
/// <ADC0 samples>,<ADC1 samples>,<DAC0 samples>,<DAC1 samples>
/// ```
///
/// Where `<XXX samples>` is an array of N 16-bit ADC/DAC samples. The number of samples is provided in the
/// header.
///
/// ADC and DAC codes are transmitted in raw machine-code format. Please refer to the datasheet for the
/// ADC and DAC if you need to convert these to voltages.
pub struct DataPacket<'a> {
buf: &'a mut [u8],
subsample_rate: usize,
start_id: Option<u16>,
num_blocks: u8,
write_index: usize,
}
impl<'a> DataPacket<'a> {
/// Construct a new packet.
///
/// # Args
/// * `buf` - The location to serialize the data packet into.
/// * `subsample_rate` - The factor at which to subsample data from batches.
pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self {
Self {
buf,
start_id: None,
num_blocks: 0,
subsample_rate,
write_index: 4,
}
} }
/// Add a batch of data to the packet. /// Add a batch to the current stream frame.
///
/// # Note
/// Serialization occurs as the packet is added.
/// ///
/// # Args /// # Args
/// * `batch` - The batch to add to the packet. /// * `f` - A closure that will be provided the buffer to write batch data into. The buffer will
pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> { /// be the size of the `T` template argument.
// Check that the block is sequential. pub fn add<F, const T: usize>(&mut self, f: F)
if let Some(id) = &self.start_id { where
if batch.block_id != id.wrapping_add(self.num_blocks.into()) { F: FnMut(&mut [u8]),
return Err(()); {
} let sequence_number = self.sequence_number;
} else { self.sequence_number = self.sequence_number.wrapping_add(1);
// Otherwise, this is the first block. Record the strt ID.
self.start_id = Some(batch.block_id);
}
// Check that there is space for the block. if self.current_frame.is_none() {
let block_size_bytes = SAMPLE_BUFFER_SIZE / self.subsample_rate * 4 * 2; if let Some(buffer) = self.pool.alloc() {
if self.buf.len() - self.get_packet_size() < block_size_bytes { self.current_frame.replace(StreamFrame::new(
return Err(()); buffer,
} self.format as u8,
self.batch_size,
// Copy the samples into the buffer. sequence_number,
for device in &[batch.adcs, batch.dacs] { ));
for channel in device { } else {
for sample in channel.iter().step_by(self.subsample_rate) { return;
self.buf[self.write_index..self.write_index + 2]
.copy_from_slice(&sample.to_be_bytes());
self.write_index += 2;
}
} }
} }
self.num_blocks += 1; // Note(unwrap): We ensure the frame is present above.
let current_frame = self.current_frame.as_mut().unwrap();
Ok(()) current_frame.add_batch::<_, T>(f);
}
fn get_packet_size(&self) -> usize { if current_frame.is_full::<T>() {
let header_length = 4; // Note(unwrap): The queue is designed to be at least as large as the frame buffer
let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; // count, so this enqueue should always succeed.
let block_size_bytes = block_sample_size * 2 * 4; self.queue
.enqueue(self.current_frame.take().unwrap())
block_size_bytes * self.num_blocks as usize + header_length .unwrap();
} }
/// Complete the packet and prepare it for transmission.
///
/// # Returns
/// The size of the packet. The user should utilize the original buffer provided for packet
/// construction to access the packet.
pub fn finish(self) -> usize {
let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate;
// Write the header into the block.
self.buf[0..2].copy_from_slice(&self.start_id.unwrap().to_be_bytes());
self.buf[2] = self.num_blocks;
self.buf[3] = block_sample_size as u8;
// Return the length of the packet to transmit.
self.get_packet_size()
} }
} }
@ -293,9 +261,9 @@ impl<'a> DataPacket<'a> {
pub struct DataStream { pub struct DataStream {
stack: NetworkReference, stack: NetworkReference,
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>, socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
frame_pool: &'static Pool<[u8; FRAME_SIZE]>,
remote: SocketAddr, remote: SocketAddr,
buffer: [u8; 1024],
} }
impl DataStream { impl DataStream {
@ -304,16 +272,18 @@ impl DataStream {
/// # Args /// # Args
/// * `stack` - A reference to the shared network stack. /// * `stack` - A reference to the shared network stack.
/// * `consumer` - The read side of the queue containing data to transmit. /// * `consumer` - The read side of the queue containing data to transmit.
/// * `frame_pool` - The Pool to return stream frame objects into.
fn new( fn new(
stack: NetworkReference, stack: NetworkReference,
consumer: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
frame_pool: &'static Pool<[u8; FRAME_SIZE]>,
) -> Self { ) -> Self {
Self { Self {
stack, stack,
socket: None, socket: None,
remote: StreamTarget::default().into(), remote: StreamTarget::default().into(),
queue: consumer, queue: consumer,
buffer: [0; 1024], frame_pool,
} }
} }
@ -365,27 +335,16 @@ impl DataStream {
// If there's no socket available, try to connect to our remote. // If there's no socket available, try to connect to our remote.
if self.open().is_ok() { if self.open().is_ok() {
// If we just successfully opened the socket, flush old data from queue. // If we just successfully opened the socket, flush old data from queue.
while self.queue.dequeue().is_some() {} while let Some(frame) = self.queue.dequeue() {
self.frame_pool.free(frame.buffer);
}
} }
} }
Some(handle) => { Some(handle) => {
if self.queue.ready() { if let Some(mut frame) = self.queue.dequeue() {
// Dequeue data from the queue into a larger block structure. // Transmit the frame and return it to the pool.
let mut packet = self.stack.send(handle, frame.finish()).ok();
DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); self.frame_pool.free(frame.buffer)
while self
.queue
.peek()
.and_then(|batch| packet.add_batch(batch).ok())
.is_some()
{
// Dequeue the batch that we just added to the packet.
self.queue.dequeue();
}
// Transmit the data packet.
let size = packet.finish();
self.stack.send(handle, &self.buffer[..size]).ok();
} }
} }
} }

View File

@ -17,7 +17,7 @@ pub mod shared;
pub mod telemetry; pub mod telemetry;
use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack}; use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack};
use data_stream::{BlockGenerator, DataStream}; use data_stream::{DataStream, FrameGenerator};
use messages::{MqttMessage, SettingsResponse}; use messages::{MqttMessage, SettingsResponse};
use miniconf_client::MiniconfClient; use miniconf_client::MiniconfClient;
use network_processor::NetworkProcessor; use network_processor::NetworkProcessor;
@ -49,7 +49,7 @@ pub struct NetworkUsers<S: Default + Clone + Miniconf, T: Serialize> {
pub miniconf: MiniconfClient<S>, pub miniconf: MiniconfClient<S>,
pub processor: NetworkProcessor, pub processor: NetworkProcessor,
stream: DataStream, stream: DataStream,
generator: Option<BlockGenerator>, generator: Option<FrameGenerator>,
pub telemetry: TelemetryClient<T>, pub telemetry: TelemetryClient<T>,
} }
@ -113,8 +113,17 @@ where
} }
/// Enable live data streaming. /// Enable live data streaming.
pub fn enable_streaming(&mut self) -> BlockGenerator { ///
self.generator.take().unwrap() /// # Args
/// * `format` - A unique u8 code indicating the format of the data.
pub fn configure_streaming(
&mut self,
format: impl Into<u8>,
batch_size: u8,
) -> FrameGenerator {
let mut generator = self.generator.take().unwrap();
generator.configure(format, batch_size);
generator
} }
/// Direct the stream to the provided remote target. /// Direct the stream to the provided remote target.