2021-07-15 19:28:19 +08:00
|
|
|
//! Stabilizer data stream capabilities
|
|
|
|
//!
|
|
|
|
//! # Design
|
|
|
|
//! Data streamining utilizes UDP packets to send live data streams at high throughput.
|
|
|
|
//! Packets are always sent in a best-effort fashion, and data may be dropped. Each packet contains
|
|
|
|
//! an identifier that can be used to detect dropped data.
|
|
|
|
//!
|
|
|
|
//! Refer to [DataPacket] for information about the serialization format of each UDP packet.
|
|
|
|
//!
|
|
|
|
//! # Example
|
|
|
|
//! A sample Python script is available in `scripts/stream_throughput.py` to demonstrate reception
|
|
|
|
//! of livestreamed data.
|
2021-06-09 19:26:41 +08:00
|
|
|
use heapless::spsc::{Consumer, Producer, Queue};
|
2021-06-09 21:25:59 +08:00
|
|
|
use miniconf::MiniconfAtomic;
|
|
|
|
use serde::Deserialize;
|
|
|
|
use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack};
|
2021-05-17 18:43:04 +08:00
|
|
|
|
2021-07-23 21:08:07 +08:00
|
|
|
use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE;
|
|
|
|
|
2021-07-22 20:45:58 +08:00
|
|
|
use heapless::pool::{Box, Init, Pool, Uninit};
|
|
|
|
|
2021-05-17 18:43:04 +08:00
|
|
|
use super::NetworkReference;
|
|
|
|
|
2021-07-23 20:12:59 +08:00
|
|
|
const FRAME_COUNT: usize = 6;
|
|
|
|
const FRAME_SIZE: usize = 1024;
|
2021-05-17 18:43:04 +08:00
|
|
|
|
2021-07-23 20:12:59 +08:00
|
|
|
static mut FRAME_DATA: [u8; FRAME_SIZE * FRAME_COUNT] =
|
|
|
|
[0; FRAME_SIZE * FRAME_COUNT];
|
2021-05-29 01:37:28 +08:00
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Represents the destination for the UDP stream to send data to.
|
2021-07-15 19:28:19 +08:00
|
|
|
///
|
|
|
|
/// # Miniconf
|
|
|
|
/// `{"ip": <addr>, "port": <port>}`
|
|
|
|
///
|
|
|
|
/// * `<addr>` is an array of 4 bytes. E.g. `[192, 168, 0, 1]`
|
|
|
|
/// * `<port>` is any unsigned 16-bit value.
|
|
|
|
///
|
|
|
|
/// ## Example
|
|
|
|
/// `{"ip": [192, 168,0, 1], "port": 1111}`
|
2021-06-25 04:14:55 +08:00
|
|
|
#[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize, Default)]
|
2021-06-09 21:25:59 +08:00
|
|
|
pub struct StreamTarget {
|
|
|
|
pub ip: [u8; 4],
|
|
|
|
pub port: u16,
|
|
|
|
}
|
|
|
|
|
2021-07-23 21:08:07 +08:00
|
|
|
/// Specifies the format of streamed data
|
|
|
|
#[repr(u16)]
|
|
|
|
#[derive(Debug, Copy, Clone, PartialEq)]
|
|
|
|
pub enum StreamFormat {
|
|
|
|
/// Streamed data contains ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format. Each
|
|
|
|
/// batch is loaded into the stream frame sequentially until the frame is full.
|
|
|
|
AdcDacData = 0,
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:46:39 +08:00
|
|
|
impl From<StreamTarget> for SocketAddr {
|
|
|
|
fn from(target: StreamTarget) -> SocketAddr {
|
2021-06-09 21:25:59 +08:00
|
|
|
SocketAddr::new(
|
|
|
|
IpAddr::V4(Ipv4Addr::new(
|
2021-06-15 19:46:39 +08:00
|
|
|
target.ip[0],
|
|
|
|
target.ip[1],
|
|
|
|
target.ip[2],
|
|
|
|
target.ip[3],
|
2021-06-09 21:25:59 +08:00
|
|
|
)),
|
2021-06-15 19:46:39 +08:00
|
|
|
target.port,
|
2021-06-09 21:25:59 +08:00
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Configure streaming on a device.
|
|
|
|
///
|
|
|
|
/// # Args
|
|
|
|
/// * `stack` - A reference to the shared network stack.
|
|
|
|
///
|
|
|
|
/// # Returns
|
|
|
|
/// (generator, stream) where `generator` can be used to enqueue "batches" for transmission. The
|
|
|
|
/// `stream` is the logically consumer (UDP transmitter) of the enqueued data.
|
2021-05-17 18:43:04 +08:00
|
|
|
pub fn setup_streaming(
|
|
|
|
stack: NetworkReference,
|
2021-07-22 20:45:58 +08:00
|
|
|
) -> (FrameGenerator, DataStream) {
|
|
|
|
let queue =
|
|
|
|
cortex_m::singleton!(: Queue<StreamFrame, FRAME_COUNT> = Queue::new())
|
|
|
|
.unwrap();
|
2021-05-17 18:43:04 +08:00
|
|
|
let (producer, consumer) = queue.split();
|
|
|
|
|
2021-07-22 20:45:58 +08:00
|
|
|
let frame_pool =
|
2021-07-23 20:12:59 +08:00
|
|
|
cortex_m::singleton!(: Pool<[u8; FRAME_SIZE]>= Pool::new()).unwrap();
|
2021-05-17 18:43:04 +08:00
|
|
|
|
2021-07-22 20:45:58 +08:00
|
|
|
// Note(unsafe): We guarantee that FRAME_DATA is only accessed once in this function.
|
|
|
|
let memory = unsafe { &mut FRAME_DATA };
|
|
|
|
frame_pool.grow(memory);
|
|
|
|
|
|
|
|
let generator = FrameGenerator::new(producer, frame_pool);
|
|
|
|
|
|
|
|
let stream = DataStream::new(stack, consumer, frame_pool);
|
2021-05-17 18:43:04 +08:00
|
|
|
|
|
|
|
(generator, stream)
|
|
|
|
}
|
|
|
|
|
2021-07-22 20:45:58 +08:00
|
|
|
struct StreamFrame {
|
2021-07-23 21:08:07 +08:00
|
|
|
format: StreamFormat,
|
2021-07-22 20:45:58 +08:00
|
|
|
sequence_number: u16,
|
2021-07-23 20:12:59 +08:00
|
|
|
buffer: Box<[u8; FRAME_SIZE], Init>,
|
2021-07-22 20:45:58 +08:00
|
|
|
offset: usize,
|
2021-07-23 21:08:07 +08:00
|
|
|
batch_count: u16,
|
|
|
|
batch_size: u8,
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
|
2021-07-22 20:45:58 +08:00
|
|
|
impl StreamFrame {
|
|
|
|
pub fn new(
|
2021-07-23 20:12:59 +08:00
|
|
|
buffer: Box<[u8; FRAME_SIZE], Uninit>,
|
2021-07-23 21:08:07 +08:00
|
|
|
format: StreamFormat,
|
2021-07-22 20:45:58 +08:00
|
|
|
sequence_number: u16,
|
|
|
|
) -> Self {
|
2021-05-17 18:43:04 +08:00
|
|
|
Self {
|
2021-07-22 20:45:58 +08:00
|
|
|
format,
|
2021-07-23 21:08:07 +08:00
|
|
|
offset: 7,
|
2021-07-22 20:45:58 +08:00
|
|
|
sequence_number,
|
|
|
|
buffer: unsafe { buffer.assume_init() },
|
2021-07-23 21:08:07 +08:00
|
|
|
batch_size: SAMPLE_BUFFER_SIZE as u8,
|
|
|
|
batch_count: 0,
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-22 20:45:58 +08:00
|
|
|
pub fn add_batch<F, const T: usize>(&mut self, mut f: F)
|
|
|
|
where
|
|
|
|
F: FnMut(&mut [u8]),
|
|
|
|
{
|
|
|
|
assert!(!self.is_full::<T>(), "Batch cannot be added to full frame");
|
|
|
|
|
|
|
|
let result = f(&mut self.buffer[self.offset..self.offset + T]);
|
|
|
|
|
|
|
|
self.offset += T;
|
2021-07-23 21:08:07 +08:00
|
|
|
self.batch_count = self.batch_count.checked_add(1).unwrap();
|
2021-07-22 20:45:58 +08:00
|
|
|
|
|
|
|
result
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn is_full<const T: usize>(&self) -> bool {
|
|
|
|
self.offset + T >= self.buffer.len()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn finish(&mut self) -> &[u8] {
|
|
|
|
let offset = self.offset;
|
|
|
|
self.buffer[0..2].copy_from_slice(&self.sequence_number.to_ne_bytes());
|
2021-07-23 21:08:07 +08:00
|
|
|
self.buffer[2..4].copy_from_slice(&(self.format as u16).to_ne_bytes());
|
|
|
|
self.buffer[4..6].copy_from_slice(&self.batch_count.to_ne_bytes());
|
|
|
|
self.buffer[6] = self.batch_size;
|
2021-07-22 20:45:58 +08:00
|
|
|
&self.buffer[..offset]
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-22 20:45:58 +08:00
|
|
|
/// The data generator for a stream.
|
|
|
|
pub struct FrameGenerator {
|
|
|
|
queue: Producer<'static, StreamFrame, FRAME_COUNT>,
|
2021-07-23 20:12:59 +08:00
|
|
|
pool: &'static Pool<[u8; FRAME_SIZE]>,
|
2021-07-22 20:45:58 +08:00
|
|
|
current_frame: Option<StreamFrame>,
|
|
|
|
sequence_number: u16,
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
|
2021-07-22 20:45:58 +08:00
|
|
|
impl FrameGenerator {
|
|
|
|
fn new(
|
|
|
|
queue: Producer<'static, StreamFrame, FRAME_COUNT>,
|
2021-07-23 20:12:59 +08:00
|
|
|
pool: &'static Pool<[u8; FRAME_SIZE]>,
|
2021-07-22 20:45:58 +08:00
|
|
|
) -> Self {
|
2021-06-11 22:36:19 +08:00
|
|
|
Self {
|
2021-07-22 20:45:58 +08:00
|
|
|
queue,
|
|
|
|
pool,
|
|
|
|
current_frame: None,
|
|
|
|
sequence_number: 0,
|
2021-06-11 22:36:19 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-23 21:08:07 +08:00
|
|
|
pub fn add<F, const T: usize>(&mut self, format: StreamFormat, f: F)
|
2021-07-22 20:45:58 +08:00
|
|
|
where
|
|
|
|
F: FnMut(&mut [u8]),
|
|
|
|
{
|
|
|
|
let sequence_number = self.sequence_number;
|
|
|
|
self.sequence_number = self.sequence_number.wrapping_add(1);
|
|
|
|
|
|
|
|
if self.current_frame.is_none() {
|
|
|
|
if let Some(buffer) = self.pool.alloc() {
|
|
|
|
self.current_frame.replace(StreamFrame::new(
|
|
|
|
buffer,
|
|
|
|
format,
|
|
|
|
sequence_number,
|
|
|
|
));
|
|
|
|
} else {
|
|
|
|
return;
|
2021-06-11 22:36:19 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-07-23 21:08:07 +08:00
|
|
|
assert!(
|
|
|
|
format == self.current_frame.as_ref().unwrap().format,
|
|
|
|
"Unexpected stream format encountered"
|
|
|
|
);
|
|
|
|
|
2021-07-22 20:45:58 +08:00
|
|
|
self.current_frame.as_mut().unwrap().add_batch::<_, T>(f);
|
2021-05-29 00:57:23 +08:00
|
|
|
|
2021-07-22 20:45:58 +08:00
|
|
|
if self.current_frame.as_ref().unwrap().is_full::<T>() {
|
2021-07-23 20:12:59 +08:00
|
|
|
if self
|
|
|
|
.queue
|
|
|
|
.enqueue(self.current_frame.take().unwrap())
|
|
|
|
.is_err()
|
|
|
|
{
|
|
|
|
// Given that the queue is the same size as the number of frames available, this
|
|
|
|
// should never occur.
|
|
|
|
panic!("Frame enqueue failure")
|
|
|
|
}
|
2021-05-18 00:33:43 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// The "consumer" portion of the data stream.
|
|
|
|
///
|
|
|
|
/// # Note
|
|
|
|
/// This is responsible for consuming data and sending it over UDP.
|
|
|
|
pub struct DataStream {
|
|
|
|
stack: NetworkReference,
|
|
|
|
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
|
2021-07-22 20:45:58 +08:00
|
|
|
queue: Consumer<'static, StreamFrame, FRAME_COUNT>,
|
2021-07-23 20:12:59 +08:00
|
|
|
frame_pool: &'static Pool<[u8; FRAME_SIZE]>,
|
2021-06-15 19:18:16 +08:00
|
|
|
remote: SocketAddr,
|
|
|
|
}
|
|
|
|
|
2021-05-17 18:43:04 +08:00
|
|
|
impl DataStream {
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Construct a new data streamer.
|
|
|
|
///
|
|
|
|
/// # Args
|
|
|
|
/// * `stack` - A reference to the shared network stack.
|
|
|
|
/// * `consumer` - The read side of the queue containing data to transmit.
|
2021-07-22 20:45:58 +08:00
|
|
|
/// * `frame_pool` - The Pool to return stream frame objects into.
|
2021-06-15 19:18:16 +08:00
|
|
|
fn new(
|
2021-05-17 18:43:04 +08:00
|
|
|
stack: NetworkReference,
|
2021-07-22 20:45:58 +08:00
|
|
|
consumer: Consumer<'static, StreamFrame, FRAME_COUNT>,
|
2021-07-23 20:12:59 +08:00
|
|
|
frame_pool: &'static Pool<[u8; FRAME_SIZE]>,
|
2021-05-17 18:43:04 +08:00
|
|
|
) -> Self {
|
|
|
|
Self {
|
|
|
|
stack,
|
|
|
|
socket: None,
|
2021-06-15 19:18:16 +08:00
|
|
|
remote: StreamTarget::default().into(),
|
2021-05-17 18:43:04 +08:00
|
|
|
queue: consumer,
|
2021-07-22 20:45:58 +08:00
|
|
|
frame_pool,
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-18 00:33:43 +08:00
|
|
|
fn close(&mut self) {
|
2021-06-25 00:16:27 +08:00
|
|
|
if let Some(socket) = self.socket.take() {
|
|
|
|
log::info!("Closing stream");
|
|
|
|
// Note(unwrap): We guarantee that the socket is available above.
|
|
|
|
self.stack.close(socket).unwrap();
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
2021-06-25 00:16:27 +08:00
|
|
|
}
|
2021-05-17 18:43:04 +08:00
|
|
|
|
2021-06-25 00:16:27 +08:00
|
|
|
// Open new socket.
|
|
|
|
fn open(&mut self) -> Result<(), ()> {
|
|
|
|
// If there is already a socket of if remote address is unspecified,
|
|
|
|
// do not open a new socket.
|
|
|
|
if self.socket.is_some() || self.remote.ip().is_unspecified() {
|
2021-06-09 21:25:59 +08:00
|
|
|
return Err(());
|
|
|
|
}
|
|
|
|
|
2021-06-24 22:36:17 +08:00
|
|
|
log::info!("Opening stream");
|
2021-06-25 00:16:27 +08:00
|
|
|
|
|
|
|
let mut socket = self.stack.socket().or(Err(()))?;
|
|
|
|
|
2021-06-15 20:19:28 +08:00
|
|
|
// Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be
|
|
|
|
// bound.
|
2021-06-25 00:16:27 +08:00
|
|
|
self.stack.connect(&mut socket, self.remote).unwrap();
|
2021-05-17 18:43:04 +08:00
|
|
|
|
|
|
|
self.socket.replace(socket);
|
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Configure the remote endpoint of the stream.
|
|
|
|
///
|
|
|
|
/// # Args
|
|
|
|
/// * `remote` - The destination to send stream data to.
|
2021-05-17 18:43:04 +08:00
|
|
|
pub fn set_remote(&mut self, remote: SocketAddr) {
|
2021-06-25 00:16:27 +08:00
|
|
|
// Close socket to be reopened if the remote has changed.
|
|
|
|
if remote != self.remote {
|
|
|
|
self.close();
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
2021-06-15 19:18:16 +08:00
|
|
|
self.remote = remote;
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
|
2021-06-15 19:18:16 +08:00
|
|
|
/// Process any data for transmission.
|
2021-05-29 00:57:23 +08:00
|
|
|
pub fn process(&mut self) {
|
2021-06-25 00:16:27 +08:00
|
|
|
match self.socket.as_mut() {
|
|
|
|
None => {
|
|
|
|
// If there's no socket available, try to connect to our remote.
|
|
|
|
if self.open().is_ok() {
|
|
|
|
// If we just successfully opened the socket, flush old data from queue.
|
2021-07-22 20:45:58 +08:00
|
|
|
while let Some(frame) = self.queue.dequeue() {
|
|
|
|
self.frame_pool.free(frame.buffer);
|
|
|
|
}
|
2021-05-26 23:56:44 +08:00
|
|
|
}
|
|
|
|
}
|
2021-06-25 00:16:27 +08:00
|
|
|
Some(handle) => {
|
2021-07-22 20:45:58 +08:00
|
|
|
if let Some(mut frame) = self.queue.dequeue() {
|
|
|
|
// Transmit the frame and return it to the pool.
|
|
|
|
self.stack.send(handle, frame.finish()).ok();
|
|
|
|
self.frame_pool.free(frame.buffer)
|
2021-06-11 22:36:19 +08:00
|
|
|
}
|
|
|
|
}
|
2021-05-17 18:43:04 +08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|