pounder_test/src/net/data_stream.rs

354 lines
11 KiB
Rust
Raw Normal View History

2021-07-15 19:28:19 +08:00
//! Stabilizer data stream capabilities
//!
//! # Design
//! Data streamining utilizes UDP packets to send live data streams at high throughput.
2021-07-23 21:30:38 +08:00
//! Packets are always sent in a best-effort fashion, and data may be dropped.
2021-07-15 19:28:19 +08:00
//!
2021-07-23 21:30:38 +08:00
//! Stabilizer organizes livestreamed data into batches within a "Frame" that will be sent as a UDP
//! packet. Each frame consits of a header followed by sequential batch serializations. The packet
//! header is constant for all streaming capabilities, but the serialization format after the header
//! is application-defined.
//!
2021-07-27 19:12:57 +08:00
//! ## Frame Header
//! The header consists of the following, all in little-endian.
2021-07-23 21:30:38 +08:00
//!
2021-07-27 19:12:57 +08:00
//! * **Magic word 0x057B** <u16>: a constant to identify Stabilizer streaming data.
//! * **Format Code** <u8>: a unique ID that indicates the serialization format of each batch of data
//! in the frame. Refer to [StreamFormat] for further information.
//! * **Batch Size** <u8>: the number of samples in each batch of data.
//! * **Sequence Number** <u32>: an the sequence number of the first batch in the frame.
//! This can be used to determine if and how many stream batches are lost.
2021-07-15 19:28:19 +08:00
//!
//! # Example
//! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception
//! of livestreamed data.
use heapless::spsc::{Consumer, Producer, Queue};
use miniconf::MiniconfAtomic;
2021-07-27 19:12:57 +08:00
use num_enum::IntoPrimitive;
use serde::Deserialize;
use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack};
2021-07-22 20:45:58 +08:00
use heapless::pool::{Box, Init, Pool, Uninit};
use super::NetworkReference;
2021-07-26 18:24:36 +08:00
const MAGIC_WORD: u16 = 0x057B;
2021-07-27 19:12:57 +08:00
// The size of the header, calculated in bytes.
// The header has a 16-bit magic word, an 8-bit format, 8-bit batch-size, and 32-bit sequence
// number, which corresponds to 8 bytes total.
2021-07-26 18:24:36 +08:00
const HEADER_SIZE: usize = 8;
2021-07-23 21:30:38 +08:00
// The number of frames that can be buffered.
const FRAME_COUNT: usize = 4;
// The size of each livestream frame in bytes.
2021-07-26 18:26:10 +08:00
const FRAME_SIZE: usize = 1024 + HEADER_SIZE;
2021-07-27 19:12:57 +08:00
// The size of the frame queue must be at least as large as the number of frame buffers. Every
// allocated frame buffer should fit in the queue.
const FRAME_QUEUE_SIZE: usize = FRAME_COUNT * 2;
2021-07-23 21:30:38 +08:00
// Static storage used for a heapless::Pool of frame buffers.
2021-07-23 20:12:59 +08:00
static mut FRAME_DATA: [u8; FRAME_SIZE * FRAME_COUNT] =
[0; FRAME_SIZE * FRAME_COUNT];
2021-05-29 01:37:28 +08:00
2021-06-15 19:18:16 +08:00
/// Represents the destination for the UDP stream to send data to.
2021-07-15 19:28:19 +08:00
///
/// # Miniconf
/// `{"ip": <addr>, "port": <port>}`
///
/// * `<addr>` is an array of 4 bytes. E.g. `[192, 168, 0, 1]`
/// * `<port>` is any unsigned 16-bit value.
///
/// ## Example
/// `{"ip": [192, 168,0, 1], "port": 1111}`
2021-06-25 04:14:55 +08:00
#[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize, Default)]
pub struct StreamTarget {
pub ip: [u8; 4],
pub port: u16,
}
/// Specifies the format of streamed data
2021-07-26 18:24:36 +08:00
#[repr(u8)]
2021-07-27 19:12:57 +08:00
#[derive(Debug, Copy, Clone, PartialEq, IntoPrimitive)]
pub enum StreamFormat {
2021-07-26 18:24:36 +08:00
/// Reserved, unused format specifier.
Unknown = 0,
2021-07-23 21:30:38 +08:00
/// Streamed data contains ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format.
///
/// # Example
/// With a batch size of 2, the serialization would take the following form:
/// ```
/// <ADC0[0]> <ADC0[1]> <ADC1[0]> <ADC1[1]> <DAC0[0]> <DAC0[1]> <DAC1[0]> <DAC1[1]>
/// ```
2021-07-26 18:24:36 +08:00
AdcDacData = 1,
}
2021-06-15 19:46:39 +08:00
impl From<StreamTarget> for SocketAddr {
fn from(target: StreamTarget) -> SocketAddr {
SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(
2021-06-15 19:46:39 +08:00
target.ip[0],
target.ip[1],
target.ip[2],
target.ip[3],
)),
2021-06-15 19:46:39 +08:00
target.port,
)
}
}
2021-06-15 19:18:16 +08:00
/// Configure streaming on a device.
///
/// # Args
/// * `stack` - A reference to the shared network stack.
///
/// # Returns
/// (generator, stream) where `generator` can be used to enqueue "batches" for transmission. The
/// `stream` is the logically consumer (UDP transmitter) of the enqueued data.
pub fn setup_streaming(
stack: NetworkReference,
2021-07-22 20:45:58 +08:00
) -> (FrameGenerator, DataStream) {
2021-07-27 19:12:57 +08:00
// The queue needs to be at least as large as the frame count to ensure that every allocated
// frame can potentially be enqueued for transmission.
2021-07-22 20:45:58 +08:00
let queue =
2021-07-27 19:12:57 +08:00
cortex_m::singleton!(: Queue<StreamFrame, FRAME_QUEUE_SIZE> = Queue::new())
2021-07-22 20:45:58 +08:00
.unwrap();
let (producer, consumer) = queue.split();
2021-07-22 20:45:58 +08:00
let frame_pool =
2021-07-23 20:12:59 +08:00
cortex_m::singleton!(: Pool<[u8; FRAME_SIZE]>= Pool::new()).unwrap();
2021-07-22 20:45:58 +08:00
// Note(unsafe): We guarantee that FRAME_DATA is only accessed once in this function.
let memory = unsafe { &mut FRAME_DATA };
frame_pool.grow(memory);
let generator = FrameGenerator::new(producer, frame_pool);
let stream = DataStream::new(stack, consumer, frame_pool);
(generator, stream)
}
2021-07-26 19:52:57 +08:00
#[derive(Debug)]
2021-07-22 20:45:58 +08:00
struct StreamFrame {
2021-07-23 20:12:59 +08:00
buffer: Box<[u8; FRAME_SIZE], Init>,
2021-07-22 20:45:58 +08:00
offset: usize,
}
2021-07-22 20:45:58 +08:00
impl StreamFrame {
pub fn new(
2021-07-23 20:12:59 +08:00
buffer: Box<[u8; FRAME_SIZE], Uninit>,
2021-07-26 18:24:36 +08:00
format: u8,
buffer_size: u8,
sequence_number: u32,
2021-07-22 20:45:58 +08:00
) -> Self {
2021-07-26 18:24:36 +08:00
let mut buffer = unsafe { buffer.assume_init() };
buffer[0..2].copy_from_slice(&MAGIC_WORD.to_ne_bytes());
buffer[2] = format;
buffer[3] = buffer_size;
buffer[4..8].copy_from_slice(&sequence_number.to_ne_bytes());
Self {
2021-07-26 18:24:36 +08:00
buffer,
offset: HEADER_SIZE,
}
}
2021-07-22 20:45:58 +08:00
pub fn add_batch<F, const T: usize>(&mut self, mut f: F)
where
F: FnMut(&mut [u8]),
{
2021-07-27 19:12:57 +08:00
f(&mut self.buffer[self.offset..self.offset + T]);
2021-07-22 20:45:58 +08:00
self.offset += T;
}
pub fn is_full<const T: usize>(&self) -> bool {
2021-07-27 19:12:57 +08:00
self.offset + T > self.buffer.len()
2021-07-22 20:45:58 +08:00
}
pub fn finish(&mut self) -> &[u8] {
2021-07-26 18:24:36 +08:00
&self.buffer[..self.offset]
}
}
2021-07-22 20:45:58 +08:00
/// The data generator for a stream.
pub struct FrameGenerator {
2021-07-27 19:12:57 +08:00
queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
2021-07-23 20:12:59 +08:00
pool: &'static Pool<[u8; FRAME_SIZE]>,
2021-07-22 20:45:58 +08:00
current_frame: Option<StreamFrame>,
2021-07-26 18:24:36 +08:00
sequence_number: u32,
2021-07-26 19:07:07 +08:00
format: u8,
2021-07-27 19:12:57 +08:00
batch_size: u8,
}
2021-07-22 20:45:58 +08:00
impl FrameGenerator {
fn new(
2021-07-27 19:12:57 +08:00
queue: Producer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
2021-07-23 20:12:59 +08:00
pool: &'static Pool<[u8; FRAME_SIZE]>,
2021-07-22 20:45:58 +08:00
) -> Self {
2021-06-11 22:36:19 +08:00
Self {
2021-07-22 20:45:58 +08:00
queue,
pool,
2021-07-27 19:12:57 +08:00
batch_size: 0,
2021-07-26 19:07:07 +08:00
format: StreamFormat::Unknown.into(),
2021-07-22 20:45:58 +08:00
current_frame: None,
sequence_number: 0,
2021-06-11 22:36:19 +08:00
}
}
2021-07-27 19:12:57 +08:00
/// Configure the format of the stream.
2021-07-26 18:24:36 +08:00
///
/// # Note:
2021-07-27 19:12:57 +08:00
/// This function shall only be called once upon initializing streaming
2021-07-26 18:24:36 +08:00
///
/// # Args
/// * `format` - The desired format of the stream.
2021-07-27 19:12:57 +08:00
/// * `batch_size` - The number of samples in each data batch. See
/// [crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE]
2021-07-26 18:24:36 +08:00
#[doc(hidden)]
2021-07-27 19:12:57 +08:00
pub(crate) fn configure(&mut self, format: impl Into<u8>, batch_size: u8) {
2021-07-26 19:07:07 +08:00
self.format = format.into();
2021-07-27 19:12:57 +08:00
self.batch_size = batch_size;
2021-07-26 18:24:36 +08:00
}
2021-07-23 21:30:38 +08:00
/// Add a batch to the current stream frame.
///
/// # Args
/// * `f` - A closure that will be provided the buffer to write batch data into. The buffer will
/// be the size of the `T` template argument.
2021-07-26 18:24:36 +08:00
pub fn add<F, const T: usize>(&mut self, f: F)
2021-07-22 20:45:58 +08:00
where
F: FnMut(&mut [u8]),
{
let sequence_number = self.sequence_number;
self.sequence_number = self.sequence_number.wrapping_add(1);
if self.current_frame.is_none() {
if let Some(buffer) = self.pool.alloc() {
self.current_frame.replace(StreamFrame::new(
buffer,
2021-07-26 18:24:36 +08:00
self.format as u8,
2021-07-27 19:12:57 +08:00
self.batch_size,
2021-07-22 20:45:58 +08:00
sequence_number,
));
} else {
return;
2021-06-11 22:36:19 +08:00
}
}
2021-07-27 19:12:57 +08:00
// Note(unwrap): We ensure the frame is present above.
2021-07-26 18:24:36 +08:00
let current_frame = self.current_frame.as_mut().unwrap();
2021-07-26 18:24:36 +08:00
current_frame.add_batch::<_, T>(f);
2021-05-29 00:57:23 +08:00
2021-07-26 18:24:36 +08:00
if current_frame.is_full::<T>() {
2021-07-27 19:12:57 +08:00
// Note(unwrap): The queue is designed to be at least as large as the frame buffer
// count, so this enqueue should always succeed.
2021-07-26 19:52:57 +08:00
self.queue
2021-07-23 20:12:59 +08:00
.enqueue(self.current_frame.take().unwrap())
2021-07-26 19:52:57 +08:00
.unwrap();
2021-05-18 00:33:43 +08:00
}
}
}
2021-06-15 19:18:16 +08:00
/// The "consumer" portion of the data stream.
///
/// # Note
/// This is responsible for consuming data and sending it over UDP.
pub struct DataStream {
stack: NetworkReference,
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
2021-07-27 19:12:57 +08:00
queue: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
2021-07-23 20:12:59 +08:00
frame_pool: &'static Pool<[u8; FRAME_SIZE]>,
2021-06-15 19:18:16 +08:00
remote: SocketAddr,
}
impl DataStream {
2021-06-15 19:18:16 +08:00
/// Construct a new data streamer.
///
/// # Args
/// * `stack` - A reference to the shared network stack.
/// * `consumer` - The read side of the queue containing data to transmit.
2021-07-22 20:45:58 +08:00
/// * `frame_pool` - The Pool to return stream frame objects into.
2021-06-15 19:18:16 +08:00
fn new(
stack: NetworkReference,
2021-07-27 19:12:57 +08:00
consumer: Consumer<'static, StreamFrame, FRAME_QUEUE_SIZE>,
2021-07-23 20:12:59 +08:00
frame_pool: &'static Pool<[u8; FRAME_SIZE]>,
) -> Self {
Self {
stack,
socket: None,
2021-06-15 19:18:16 +08:00
remote: StreamTarget::default().into(),
queue: consumer,
2021-07-22 20:45:58 +08:00
frame_pool,
}
}
2021-05-18 00:33:43 +08:00
fn close(&mut self) {
if let Some(socket) = self.socket.take() {
log::info!("Closing stream");
// Note(unwrap): We guarantee that the socket is available above.
self.stack.close(socket).unwrap();
}
}
// Open new socket.
fn open(&mut self) -> Result<(), ()> {
// If there is already a socket of if remote address is unspecified,
// do not open a new socket.
if self.socket.is_some() || self.remote.ip().is_unspecified() {
return Err(());
}
2021-06-24 22:36:17 +08:00
log::info!("Opening stream");
let mut socket = self.stack.socket().or(Err(()))?;
2021-06-15 20:19:28 +08:00
// Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be
// bound.
self.stack.connect(&mut socket, self.remote).unwrap();
self.socket.replace(socket);
Ok(())
}
2021-06-15 19:18:16 +08:00
/// Configure the remote endpoint of the stream.
///
/// # Args
/// * `remote` - The destination to send stream data to.
pub fn set_remote(&mut self, remote: SocketAddr) {
// Close socket to be reopened if the remote has changed.
if remote != self.remote {
self.close();
}
2021-06-15 19:18:16 +08:00
self.remote = remote;
2021-08-18 17:41:01 +08:00
log::info!("set stream remote endpoint to {}", self.remote);
}
2021-06-15 19:18:16 +08:00
/// Process any data for transmission.
2021-05-29 00:57:23 +08:00
pub fn process(&mut self) {
match self.socket.as_mut() {
None => {
// If there's no socket available, try to connect to our remote.
if self.open().is_ok() {
// If we just successfully opened the socket, flush old data from queue.
2021-07-22 20:45:58 +08:00
while let Some(frame) = self.queue.dequeue() {
self.frame_pool.free(frame.buffer);
}
}
}
Some(handle) => {
2021-07-22 20:45:58 +08:00
if let Some(mut frame) = self.queue.dequeue() {
// Transmit the frame and return it to the pool.
self.stack.send(handle, frame.finish()).ok();
self.frame_pool.free(frame.buffer)
2021-06-11 22:36:19 +08:00
}
}
}
}
}