2021-07-15 19:28:19 +08:00
|
|
|
//! Stabilizer data stream capabilities
|
|
|
|
//!
|
|
|
|
//! # Design
|
|
|
|
//! Data streamining utilizes UDP packets to send live data streams at high throughput.
|
|
|
|
//! Packets are always sent in a best-effort fashion, and data may be dropped. Each packet contains
|
|
|
|
//! an identifier that can be used to detect dropped data.
|
|
|
|
//!
|
|
|
|
//! Refer to [DataPacket] for information about the serialization format of each UDP packet.
|
|
|
|
//!
|
|
|
|
//! # Example
|
|
|
|
//! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception
|
|
|
|
//! of livestreamed data.
|
2021-06-09 19:26:41 +08:00
|
|
|
use heapless::spsc::{Consumer, Producer, Queue};
|
2021-06-09 21:25:59 +08:00
|
|
|
use miniconf::MiniconfAtomic;
|
|
|
|
use serde::Deserialize;
|
|
|
|
use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack};
|
2021-05-17 18:43:04 +08:00
|
|
|
|
|
|
|
use super::NetworkReference;
|
|
|
|
use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE;
|
|
|
|
|
|
|
|
// The number of data blocks that we will buffer in the queue.
|
2021-05-29 01:01:24 +08:00
|
|
|
const BLOCK_BUFFER_SIZE: usize = 30;
|
2021-05-17 18:43:04 +08:00
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
// A factor that data may be subsampled at.
|
2021-05-31 20:06:02 +08:00
|
|
|
const SUBSAMPLE_RATE: usize = 1;
|
2021-05-29 01:37:28 +08:00
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Represents the destination for the UDP stream to send data to.
|
2021-07-15 19:28:19 +08:00
|
|
|
///
|
|
|
|
/// # Miniconf
|
|
|
|
/// `{"ip": <addr>, "port": <port>}`
|
|
|
|
///
|
|
|
|
/// * `<addr>` is an array of 4 bytes. E.g. `[192, 168, 0, 1]`
|
|
|
|
/// * `<port>` is any unsigned 16-bit value.
|
|
|
|
///
|
|
|
|
/// ## Example
|
|
|
|
/// `{"ip": [192, 168,0, 1], "port": 1111}`
|
2021-06-25 04:14:55 +08:00
|
|
|
#[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize, Default)]
|
2021-06-09 21:25:59 +08:00
|
|
|
pub struct StreamTarget {
|
|
|
|
pub ip: [u8; 4],
|
|
|
|
pub port: u16,
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:46:39 +08:00
|
|
|
impl From<StreamTarget> for SocketAddr {
|
|
|
|
fn from(target: StreamTarget) -> SocketAddr {
|
2021-06-09 21:25:59 +08:00
|
|
|
SocketAddr::new(
|
|
|
|
IpAddr::V4(Ipv4Addr::new(
|
2021-06-15 19:46:39 +08:00
|
|
|
target.ip[0],
|
|
|
|
target.ip[1],
|
|
|
|
target.ip[2],
|
|
|
|
target.ip[3],
|
2021-06-09 21:25:59 +08:00
|
|
|
)),
|
2021-06-15 19:46:39 +08:00
|
|
|
target.port,
|
2021-06-09 21:25:59 +08:00
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// A basic "batch" of data.
|
|
|
|
// Note: In the future, the stream may be generic over this type.
|
|
|
|
#[derive(Debug, Copy, Clone)]
|
|
|
|
pub struct AdcDacData {
|
|
|
|
block_id: u16,
|
|
|
|
adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
|
|
|
|
dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Configure streaming on a device.
|
|
|
|
///
|
|
|
|
/// # Args
|
|
|
|
/// * `stack` - A reference to the shared network stack.
|
|
|
|
///
|
|
|
|
/// # Returns
|
|
|
|
/// (generator, stream) where `generator` can be used to enqueue "batches" for transmission. The
|
|
|
|
/// `stream` is the logically consumer (UDP transmitter) of the enqueued data.
|
2021-05-17 18:43:04 +08:00
|
|
|
pub fn setup_streaming(
|
|
|
|
stack: NetworkReference,
|
|
|
|
) -> (BlockGenerator, DataStream) {
|
2021-05-29 01:01:24 +08:00
|
|
|
let queue = cortex_m::singleton!(: Queue<AdcDacData, BLOCK_BUFFER_SIZE> = Queue::new()).unwrap();
|
2021-05-17 18:43:04 +08:00
|
|
|
|
|
|
|
let (producer, consumer) = queue.split();
|
|
|
|
|
|
|
|
let generator = BlockGenerator::new(producer);
|
|
|
|
|
|
|
|
let stream = DataStream::new(stack, consumer);
|
|
|
|
|
|
|
|
(generator, stream)
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// The data generator for a stream.
|
2021-05-17 18:43:04 +08:00
|
|
|
pub struct BlockGenerator {
|
2021-05-29 01:01:24 +08:00
|
|
|
queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>,
|
2021-06-11 22:36:19 +08:00
|
|
|
current_id: u16,
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
impl BlockGenerator {
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Construct a new generator.
|
|
|
|
/// # Args
|
|
|
|
/// * `queue` - The producer portion of the SPSC queue to enqueue data into.
|
|
|
|
///
|
|
|
|
/// # Returns
|
|
|
|
/// The generator to use.
|
|
|
|
fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self {
|
2021-05-17 18:43:04 +08:00
|
|
|
Self {
|
|
|
|
queue,
|
2021-06-11 22:36:19 +08:00
|
|
|
current_id: 0,
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Schedule data to be sent by the generator.
|
|
|
|
///
|
|
|
|
/// # Note
|
|
|
|
/// If no space is available, the data batch may be silently dropped.
|
|
|
|
///
|
|
|
|
/// # Args
|
|
|
|
/// * `adcs` - The ADC data to transmit.
|
|
|
|
/// * `dacs` - The DAC data to transmit.
|
2021-05-17 18:43:04 +08:00
|
|
|
pub fn send(
|
|
|
|
&mut self,
|
2021-06-09 18:52:13 +08:00
|
|
|
adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2],
|
2021-05-17 18:43:04 +08:00
|
|
|
dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2],
|
|
|
|
) {
|
2021-06-11 22:36:19 +08:00
|
|
|
let block = AdcDacData {
|
|
|
|
block_id: self.current_id,
|
|
|
|
adcs: [*adcs[0], *adcs[1]],
|
|
|
|
dacs: [*dacs[0], *dacs[1]],
|
|
|
|
};
|
2021-05-17 18:43:04 +08:00
|
|
|
|
2021-06-11 22:36:19 +08:00
|
|
|
self.current_id = self.current_id.wrapping_add(1);
|
|
|
|
self.queue.enqueue(block).ok();
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-15 19:28:19 +08:00
|
|
|
/// # Stream Packet
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Represents a single UDP packet sent by the stream.
|
|
|
|
///
|
2021-07-15 19:28:19 +08:00
|
|
|
/// A "batch" of data is defined to be the data collected for a single invocation of the DSP
|
|
|
|
/// routine. A packet is composed of as many sequential batches as can fit.
|
|
|
|
///
|
|
|
|
/// The packet is given a header indicating the starting batch sequence number and the number of
|
|
|
|
/// batches present. If the UDP transmitter encounters a non-sequential batch, it does not enqueue
|
|
|
|
/// it into the packet and instead transmits any staged data. The non-sequential batch is then
|
|
|
|
/// transmitted in a new UDP packet. This method allows a receiver to detect dropped batches (e.g.
|
|
|
|
/// due to processing overhead).
|
|
|
|
///
|
|
|
|
/// ## Data Format
|
|
|
|
///
|
|
|
|
/// Data sent via UDP is sent in "blocks". Each block is a single batch of ADC/DAC codes from an
|
|
|
|
/// individual DSP processing routine. Each block is assigned a unique 16-bit identifier. The identifier
|
|
|
|
/// increments by one for each block and rolls over. All blocks in a single packet are guaranteed to
|
|
|
|
/// contain sequential identifiers.
|
|
|
|
///
|
|
|
|
/// All data is transmitted in network-endian (big-endian) format.
|
|
|
|
///
|
|
|
|
/// ### Quick Reference
|
|
|
|
///
|
|
|
|
/// In the reference below, any values enclosed in parentheses represents the number of bytes used for
|
|
|
|
/// that value. E.g. "Batch size (1)" indicates 1 byte is used to represent the batch size.
|
|
|
|
/// ```
|
|
|
|
/// # UDP packets take the following form
|
|
|
|
/// <Header>,<Batch 1>,[<Batch 2>, ...<Batch N>]
|
|
|
|
///
|
|
|
|
/// # The header takes the following form
|
|
|
|
/// <Header> = <Starting ID (2)>,<Number blocks [N] (1)>,<Batch size [BS] (1)>
|
|
|
|
///
|
|
|
|
/// # Each batch takes the following form
|
|
|
|
/// <Batch N> = <ADC0>,<ADC1>,<DAC0>,<DAC1>
|
|
|
|
///
|
|
|
|
/// # Where
|
|
|
|
/// <ADCx/DACx> = <Sample 1 (2)>, ...<Sample BS (2)>
|
|
|
|
/// ```
|
|
|
|
///
|
|
|
|
/// ### Packet Format
|
|
|
|
/// Multiple blocks are sent in a single UDP packet simultaneously. Each UDP packet transmitted
|
|
|
|
/// contains a header followed by the serialized data blocks.
|
|
|
|
/// ```
|
|
|
|
/// <Header>,<Batch 1>,[<Batch 2>, ...<Batch N>]
|
|
|
|
/// ```
|
|
|
|
///
|
|
|
|
/// ### Header
|
|
|
|
/// A header takes the following form:
|
|
|
|
/// * The starting block ID (2 bytes)
|
|
|
|
/// * The number of blocks present in the packet (1 byte)
|
|
|
|
/// * The size of each bach in samples (1 byte)
|
|
|
|
///
|
|
|
|
/// ```
|
|
|
|
/// <Starting ID (2)>,<N blocks (1)>,<Batch size (1)>
|
|
|
|
/// ```
|
|
|
|
///
|
|
|
|
/// ### Data Blocks
|
|
|
|
/// Following the header, each block is sequentially serialized. Each block takes the following form:
|
|
|
|
/// ```
|
|
|
|
/// <ADC0 samples>,<ADC1 samples>,<DAC0 samples>,<DAC1 samples>
|
|
|
|
/// ```
|
2021-06-15 19:18:16 +08:00
|
|
|
///
|
2021-07-15 19:28:19 +08:00
|
|
|
/// Where `<XXX samples>` is an array of N 16-bit ADC/DAC samples. The number of samples is provided in the
|
|
|
|
/// header.
|
2021-06-15 19:18:16 +08:00
|
|
|
///
|
2021-07-15 19:28:19 +08:00
|
|
|
/// ADC and DAC codes are transmitted in raw machine-code format. Please refer to the datasheet for the
|
|
|
|
/// ADC and DAC if you need to convert these to voltages.
|
|
|
|
pub struct DataPacket<'a> {
|
2021-06-11 22:36:19 +08:00
|
|
|
buf: &'a mut [u8],
|
|
|
|
subsample_rate: usize,
|
|
|
|
start_id: Option<u16>,
|
|
|
|
num_blocks: u8,
|
|
|
|
write_index: usize,
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
|
2021-06-11 22:36:19 +08:00
|
|
|
impl<'a> DataPacket<'a> {
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Construct a new packet.
|
|
|
|
///
|
|
|
|
/// # Args
|
|
|
|
/// * `buf` - The location to serialize the data packet into.
|
|
|
|
/// * `subsample_rate` - The factor at which to subsample data from batches.
|
2021-06-11 22:36:19 +08:00
|
|
|
pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self {
|
|
|
|
Self {
|
|
|
|
buf,
|
|
|
|
start_id: None,
|
|
|
|
num_blocks: 0,
|
|
|
|
subsample_rate,
|
|
|
|
write_index: 4,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Add a batch of data to the packet.
|
|
|
|
///
|
|
|
|
/// # Note
|
|
|
|
/// Serialization occurs as the packet is added.
|
|
|
|
///
|
|
|
|
/// # Args
|
|
|
|
/// * `batch` - The batch to add to the packet.
|
2021-06-11 22:36:19 +08:00
|
|
|
pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> {
|
|
|
|
// Check that the block is sequential.
|
|
|
|
if let Some(id) = &self.start_id {
|
|
|
|
if batch.block_id != id.wrapping_add(self.num_blocks.into()) {
|
|
|
|
return Err(());
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
// Otherwise, this is the first block. Record the strt ID.
|
|
|
|
self.start_id = Some(batch.block_id);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check that there is space for the block.
|
|
|
|
let block_size_bytes = SAMPLE_BUFFER_SIZE / self.subsample_rate * 4 * 2;
|
|
|
|
if self.buf.len() - self.get_packet_size() < block_size_bytes {
|
|
|
|
return Err(());
|
|
|
|
}
|
2021-05-29 00:57:23 +08:00
|
|
|
|
2021-06-11 22:36:19 +08:00
|
|
|
// Copy the samples into the buffer.
|
|
|
|
for device in &[batch.adcs, batch.dacs] {
|
2021-05-18 00:33:43 +08:00
|
|
|
for channel in device {
|
2021-06-11 22:36:19 +08:00
|
|
|
for sample in channel.iter().step_by(self.subsample_rate) {
|
|
|
|
self.buf[self.write_index..self.write_index + 2]
|
2021-06-09 19:26:41 +08:00
|
|
|
.copy_from_slice(&sample.to_be_bytes());
|
2021-06-11 22:36:19 +08:00
|
|
|
self.write_index += 2;
|
2021-05-18 00:33:43 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-11 23:52:11 +08:00
|
|
|
self.num_blocks += 1;
|
|
|
|
|
2021-06-11 22:36:19 +08:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_packet_size(&self) -> usize {
|
|
|
|
let header_length = 4;
|
|
|
|
let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate;
|
|
|
|
let block_size_bytes = block_sample_size * 2 * 4;
|
|
|
|
|
|
|
|
block_size_bytes * self.num_blocks as usize + header_length
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Complete the packet and prepare it for transmission.
|
|
|
|
///
|
|
|
|
/// # Returns
|
|
|
|
/// The size of the packet. The user should utilize the original buffer provided for packet
|
|
|
|
/// construction to access the packet.
|
2021-06-11 22:36:19 +08:00
|
|
|
pub fn finish(self) -> usize {
|
|
|
|
let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate;
|
|
|
|
|
|
|
|
// Write the header into the block.
|
|
|
|
self.buf[0..2].copy_from_slice(&self.start_id.unwrap().to_be_bytes());
|
|
|
|
self.buf[2] = self.num_blocks;
|
|
|
|
self.buf[3] = block_sample_size as u8;
|
|
|
|
|
|
|
|
// Return the length of the packet to transmit.
|
|
|
|
self.get_packet_size()
|
2021-05-18 00:33:43 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// The "consumer" portion of the data stream.
|
|
|
|
///
|
|
|
|
/// # Note
|
|
|
|
/// This is responsible for consuming data and sending it over UDP.
|
|
|
|
pub struct DataStream {
|
|
|
|
stack: NetworkReference,
|
|
|
|
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
|
|
|
|
queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>,
|
|
|
|
remote: SocketAddr,
|
|
|
|
buffer: [u8; 1024],
|
|
|
|
}
|
|
|
|
|
2021-05-17 18:43:04 +08:00
|
|
|
impl DataStream {
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Construct a new data streamer.
|
|
|
|
///
|
|
|
|
/// # Args
|
|
|
|
/// * `stack` - A reference to the shared network stack.
|
|
|
|
/// * `consumer` - The read side of the queue containing data to transmit.
|
|
|
|
fn new(
|
2021-05-17 18:43:04 +08:00
|
|
|
stack: NetworkReference,
|
2021-05-29 01:01:24 +08:00
|
|
|
consumer: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>,
|
2021-05-17 18:43:04 +08:00
|
|
|
) -> Self {
|
|
|
|
Self {
|
|
|
|
stack,
|
|
|
|
socket: None,
|
2021-06-15 19:18:16 +08:00
|
|
|
remote: StreamTarget::default().into(),
|
2021-05-17 18:43:04 +08:00
|
|
|
queue: consumer,
|
2021-05-29 00:57:23 +08:00
|
|
|
buffer: [0; 1024],
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-18 00:33:43 +08:00
|
|
|
fn close(&mut self) {
|
2021-06-25 00:16:27 +08:00
|
|
|
if let Some(socket) = self.socket.take() {
|
|
|
|
log::info!("Closing stream");
|
|
|
|
// Note(unwrap): We guarantee that the socket is available above.
|
|
|
|
self.stack.close(socket).unwrap();
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
2021-06-25 00:16:27 +08:00
|
|
|
}
|
2021-05-17 18:43:04 +08:00
|
|
|
|
2021-06-25 00:16:27 +08:00
|
|
|
// Open new socket.
|
|
|
|
fn open(&mut self) -> Result<(), ()> {
|
|
|
|
// If there is already a socket of if remote address is unspecified,
|
|
|
|
// do not open a new socket.
|
|
|
|
if self.socket.is_some() || self.remote.ip().is_unspecified() {
|
2021-06-09 21:25:59 +08:00
|
|
|
return Err(());
|
|
|
|
}
|
|
|
|
|
2021-06-24 22:36:17 +08:00
|
|
|
log::info!("Opening stream");
|
2021-06-25 00:16:27 +08:00
|
|
|
|
|
|
|
let mut socket = self.stack.socket().or(Err(()))?;
|
|
|
|
|
2021-06-15 20:19:28 +08:00
|
|
|
// Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be
|
|
|
|
// bound.
|
2021-06-25 00:16:27 +08:00
|
|
|
self.stack.connect(&mut socket, self.remote).unwrap();
|
2021-05-17 18:43:04 +08:00
|
|
|
|
|
|
|
self.socket.replace(socket);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Configure the remote endpoint of the stream.
|
|
|
|
///
|
|
|
|
/// # Args
|
|
|
|
/// * `remote` - The destination to send stream data to.
|
2021-05-17 18:43:04 +08:00
|
|
|
pub fn set_remote(&mut self, remote: SocketAddr) {
|
2021-06-25 00:16:27 +08:00
|
|
|
// Close socket to be reopened if the remote has changed.
|
|
|
|
if remote != self.remote {
|
|
|
|
self.close();
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
2021-06-15 19:18:16 +08:00
|
|
|
self.remote = remote;
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Process any data for transmission.
|
2021-05-29 00:57:23 +08:00
|
|
|
pub fn process(&mut self) {
|
2021-06-25 00:16:27 +08:00
|
|
|
match self.socket.as_mut() {
|
|
|
|
None => {
|
|
|
|
// If there's no socket available, try to connect to our remote.
|
|
|
|
if self.open().is_ok() {
|
|
|
|
// If we just successfully opened the socket, flush old data from queue.
|
|
|
|
while self.queue.dequeue().is_some() {}
|
2021-05-26 23:56:44 +08:00
|
|
|
}
|
|
|
|
}
|
2021-06-25 00:16:27 +08:00
|
|
|
Some(handle) => {
|
|
|
|
if self.queue.ready() {
|
|
|
|
// Dequeue data from the queue into a larger block structure.
|
|
|
|
let mut packet =
|
|
|
|
DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE);
|
|
|
|
while self
|
|
|
|
.queue
|
|
|
|
.peek()
|
|
|
|
.and_then(|batch| packet.add_batch(batch).ok())
|
|
|
|
.is_some()
|
|
|
|
{
|
|
|
|
// Dequeue the batch that we just added to the packet.
|
|
|
|
self.queue.dequeue();
|
|
|
|
}
|
|
|
|
|
|
|
|
// Transmit the data packet.
|
|
|
|
let size = packet.finish();
|
|
|
|
self.stack.send(handle, &self.buffer[..size]).ok();
|
2021-06-11 22:36:19 +08:00
|
|
|
}
|
|
|
|
}
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|