///! Stabilizer data stream capabilities ///! ///! # Design ///! Stabilizer data streamining utilizes UDP packets to send live data streams at high throughput. ///! Packets are always sent in a best-effort fashion, and data may be dropped. Each packet contains ///! an identifier that can be used to detect any dropped data. ///! ///! The current implementation utilizes an single-producer, single-consumer queue to send data ///! between a high priority task and the UDP transmitter. ///! ///! A "batch" of data is defined to be a single item in the SPSC queue sent to the UDP transmitter ///! thread. The transmitter thread then serializes as many sequential "batches" into a single UDP ///! packet as possible. The UDP packet is also given a header indicating the starting batch ///! sequence number and the number of batches present. If the UDP transmitter encounters a ///! non-sequential batch, it does not enqueue it into the packet and instead transmits any staged ///! data. The non-sequential batch is then transmitted in a new UDP packet. This method allows a ///! receiver to detect dropped batches (e.g. due to processing overhead). use heapless::spsc::{Consumer, Producer, Queue}; use miniconf::MiniconfAtomic; use serde::Deserialize; use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; use super::NetworkReference; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; // The number of data blocks that we will buffer in the queue. const BLOCK_BUFFER_SIZE: usize = 30; // A factor that data may be subsampled at. const SUBSAMPLE_RATE: usize = 1; /// Represents the destination for the UDP stream to send data to. #[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize, Default)] pub struct StreamTarget { pub ip: [u8; 4], pub port: u16, } impl From for SocketAddr { fn from(target: StreamTarget) -> SocketAddr { SocketAddr::new( IpAddr::V4(Ipv4Addr::new( target.ip[0], target.ip[1], target.ip[2], target.ip[3], )), target.port, ) } } /// A basic "batch" of data. // Note: In the future, the stream may be generic over this type. #[derive(Debug, Copy, Clone)] pub struct AdcDacData { block_id: u16, adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], } /// Configure streaming on a device. /// /// # Args /// * `stack` - A reference to the shared network stack. /// /// # Returns /// (generator, stream) where `generator` can be used to enqueue "batches" for transmission. The /// `stream` is the logically consumer (UDP transmitter) of the enqueued data. pub fn setup_streaming( stack: NetworkReference, ) -> (BlockGenerator, DataStream) { let queue = cortex_m::singleton!(: Queue = Queue::new()).unwrap(); let (producer, consumer) = queue.split(); let generator = BlockGenerator::new(producer); let stream = DataStream::new(stack, consumer); (generator, stream) } /// The data generator for a stream. pub struct BlockGenerator { queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, current_id: u16, } impl BlockGenerator { /// Construct a new generator. /// # Args /// * `queue` - The producer portion of the SPSC queue to enqueue data into. /// /// # Returns /// The generator to use. fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self { Self { queue, current_id: 0, } } /// Schedule data to be sent by the generator. /// /// # Note /// If no space is available, the data batch may be silently dropped. /// /// # Args /// * `adcs` - The ADC data to transmit. /// * `dacs` - The DAC data to transmit. pub fn send( &mut self, adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], ) { let block = AdcDacData { block_id: self.current_id, adcs: [*adcs[0], *adcs[1]], dacs: [*dacs[0], *dacs[1]], }; self.current_id = self.current_id.wrapping_add(1); self.queue.enqueue(block).ok(); } } /// Represents a single UDP packet sent by the stream. /// /// # Packet Format /// All data is sent in network-endian format. The format is as follows /// /// Header: /// [0..2]: Start block ID (u16) /// [2..3]: Num Blocks present (u8) /// [3..4]: Batch Size (u8) /// /// Following the header, batches are added sequentially. Each batch takes the form of: /// [*0..*2]: ADC0 /// [*2..*4]: ADC1 /// [*4..*6]: DAC0 /// [*6..*8]: DAC1 struct DataPacket<'a> { buf: &'a mut [u8], subsample_rate: usize, start_id: Option, num_blocks: u8, write_index: usize, } impl<'a> DataPacket<'a> { /// Construct a new packet. /// /// # Args /// * `buf` - The location to serialize the data packet into. /// * `subsample_rate` - The factor at which to subsample data from batches. pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self { Self { buf, start_id: None, num_blocks: 0, subsample_rate, write_index: 4, } } /// Add a batch of data to the packet. /// /// # Note /// Serialization occurs as the packet is added. /// /// # Args /// * `batch` - The batch to add to the packet. pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> { // Check that the block is sequential. if let Some(id) = &self.start_id { if batch.block_id != id.wrapping_add(self.num_blocks.into()) { return Err(()); } } else { // Otherwise, this is the first block. Record the strt ID. self.start_id = Some(batch.block_id); } // Check that there is space for the block. let block_size_bytes = SAMPLE_BUFFER_SIZE / self.subsample_rate * 4 * 2; if self.buf.len() - self.get_packet_size() < block_size_bytes { return Err(()); } // Copy the samples into the buffer. for device in &[batch.adcs, batch.dacs] { for channel in device { for sample in channel.iter().step_by(self.subsample_rate) { self.buf[self.write_index..self.write_index + 2] .copy_from_slice(&sample.to_be_bytes()); self.write_index += 2; } } } self.num_blocks += 1; Ok(()) } fn get_packet_size(&self) -> usize { let header_length = 4; let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; let block_size_bytes = block_sample_size * 2 * 4; block_size_bytes * self.num_blocks as usize + header_length } /// Complete the packet and prepare it for transmission. /// /// # Returns /// The size of the packet. The user should utilize the original buffer provided for packet /// construction to access the packet. pub fn finish(self) -> usize { let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; // Write the header into the block. self.buf[0..2].copy_from_slice(&self.start_id.unwrap().to_be_bytes()); self.buf[2] = self.num_blocks; self.buf[3] = block_sample_size as u8; // Return the length of the packet to transmit. self.get_packet_size() } } /// The "consumer" portion of the data stream. /// /// # Note /// This is responsible for consuming data and sending it over UDP. pub struct DataStream { stack: NetworkReference, socket: Option<::UdpSocket>, queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, remote: SocketAddr, buffer: [u8; 1024], } impl DataStream { /// Construct a new data streamer. /// /// # Args /// * `stack` - A reference to the shared network stack. /// * `consumer` - The read side of the queue containing data to transmit. fn new( stack: NetworkReference, consumer: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, ) -> Self { Self { stack, socket: None, remote: StreamTarget::default().into(), queue: consumer, buffer: [0; 1024], } } fn close(&mut self) { if let Some(socket) = self.socket.take() { log::info!("Closing stream"); // Note(unwrap): We guarantee that the socket is available above. self.stack.close(socket).unwrap(); } } // Open new socket. fn open(&mut self) -> Result<(), ()> { // If there is already a socket of if remote address is unspecified, // do not open a new socket. if self.socket.is_some() || self.remote.ip().is_unspecified() { return Err(()); } log::info!("Opening stream"); let mut socket = self.stack.socket().or(Err(()))?; // Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be // bound. self.stack.connect(&mut socket, self.remote).unwrap(); self.socket.replace(socket); Ok(()) } /// Configure the remote endpoint of the stream. /// /// # Args /// * `remote` - The destination to send stream data to. pub fn set_remote(&mut self, remote: SocketAddr) { // Close socket to be reopened if the remote has changed. if remote != self.remote { self.close(); } self.remote = remote; } /// Process any data for transmission. pub fn process(&mut self) { match self.socket.as_mut() { None => { // If there's no socket available, try to connect to our remote. if self.open().is_ok() { // If we just successfully opened the socket, flush old data from queue. while self.queue.dequeue().is_some() {} } } Some(handle) => { if self.queue.ready() { // Dequeue data from the queue into a larger block structure. let mut packet = DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); while self .queue .peek() .and_then(|batch| packet.add_batch(batch).ok()) .is_some() { // Dequeue the batch that we just added to the packet. self.queue.dequeue(); } // Transmit the data packet. let size = packet.finish(); self.stack.send(handle, &self.buffer[..size]).ok(); } } } } }