From b40ca17feaf80ed1c8776df96861110ebe7caabf Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 11 Jun 2021 16:36:19 +0200 Subject: [PATCH] Updating stream methodology --- src/net/data_stream.rs | 220 ++++++++++++++++++----------------------- 1 file changed, 97 insertions(+), 123 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 72f0967..71f087e 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -7,10 +7,6 @@ use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; use super::NetworkReference; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; -// The number of samples contained in a single block. Note that each sample corresponds ot 8 byte -// s(2 bytes per ADC/DAC code, 4 codes total). -const BLOCK_SAMPLE_SIZE: usize = 50; - // The number of data blocks that we will buffer in the queue. const BLOCK_BUFFER_SIZE: usize = 30; @@ -56,57 +52,16 @@ pub fn setup_streaming( (generator, stream) } -fn serialize_blocks<'a>( - buffer: &'a mut [u8], - max_buffer_size: usize, - queue: &mut Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, -) -> &'a [u8] { - // While there is space in the buffer, serialize into it. - - let block_size = (SAMPLE_BUFFER_SIZE / SUBSAMPLE_RATE * 2) * 2 * 2 + 8; - - // Truncate the buffer to the maximum buffer size. - let buffer: &mut [u8] = if buffer.len() > max_buffer_size { - &mut buffer[..max_buffer_size] - } else { - buffer - }; - - // Serialize blocks into the buffer until either the buffer or the queue are exhausted. - let mut enqueued_blocks: usize = 0; - for buf in buffer.chunks_exact_mut(block_size) { - // If there are no more blocks, return the serialized data. - let data = match queue.dequeue() { - Some(data) => data, - None => break, - }; - - let block = DataBlock { - adcs: data.adcs, - dacs: data.dacs, - block_id: data.block_id, - block_size: SAMPLE_BUFFER_SIZE, - }; - - enqueued_blocks += 1; - let length = block.to_slice(buf, SUBSAMPLE_RATE); - assert!(length == block_size); - } - - &buffer[..block_size * enqueued_blocks] -} - #[derive(Debug, Copy, Clone)] pub struct AdcDacData { - block_id: u32, - adcs: [[u16; BLOCK_SAMPLE_SIZE]; 2], - dacs: [[u16; BLOCK_SAMPLE_SIZE]; 2], + block_id: u16, + adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], } pub struct BlockGenerator { queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, - current_block: AdcDacData, - num_samples: usize, + current_id: u16, } impl BlockGenerator { @@ -115,12 +70,7 @@ impl BlockGenerator { ) -> Self { Self { queue, - current_block: AdcDacData { - block_id: 0, - adcs: [[0; BLOCK_SAMPLE_SIZE]; 2], - dacs: [[0; BLOCK_SAMPLE_SIZE]; 2], - }, - num_samples: 0, + current_id: 0, } } @@ -129,47 +79,14 @@ impl BlockGenerator { adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], ) { - let mut processed_samples = 0; + let block = AdcDacData { + block_id: self.current_id, + adcs: [*adcs[0], *adcs[1]], + dacs: [*dacs[0], *dacs[1]], + }; - while processed_samples < SAMPLE_BUFFER_SIZE { - let remaining_samples = SAMPLE_BUFFER_SIZE - processed_samples; - let free_space = BLOCK_SAMPLE_SIZE - self.num_samples; - let copy_sample_length = if remaining_samples < free_space { - remaining_samples - } else { - free_space - }; - - let start_src = self.num_samples; - let end_src = start_src + copy_sample_length; - - let start_dst = processed_samples; - let end_dst = start_dst + copy_sample_length; - - self.current_block.adcs[0][start_src..end_src] - .copy_from_slice(&adcs[0][start_dst..end_dst]); - self.current_block.adcs[1][start_src..end_src] - .copy_from_slice(&adcs[1][start_dst..end_dst]); - self.current_block.dacs[0][start_src..end_src] - .copy_from_slice(&dacs[0][start_dst..end_dst]); - self.current_block.dacs[1][start_src..end_src] - .copy_from_slice(&dacs[1][start_dst..end_dst]); - - self.num_samples += copy_sample_length; - - // If the data block is full, push it onto the queue. - if self.num_samples == BLOCK_SAMPLE_SIZE { - // Note: We silently ignore dropped blocks here. The queue can fill up if the - // service routing isn't being called often enough. - self.queue.enqueue(self.current_block).ok(); - - self.current_block.block_id = - self.current_block.block_id.wrapping_add(1); - self.num_samples = 0; - } - - processed_samples += copy_sample_length; - } + self.current_id = self.current_id.wrapping_add(1); + self.queue.enqueue(block).ok(); } } @@ -181,31 +98,86 @@ pub struct DataStream { buffer: [u8; 1024], } -struct DataBlock { - block_id: u32, - block_size: usize, - adcs: [[u16; BLOCK_SAMPLE_SIZE]; 2], - dacs: [[u16; BLOCK_SAMPLE_SIZE]; 2], +// Datapacket format: +// +// Header: +// [0..2]: Start block ID (u16) +// [2..3]: Num Blocks present (u8) +// [3..4]: Batch Size (u8) +// +// Following the header, batches are added sequentially. Each batch takes the form of: +// [*0..*2]: ADC0 +// [*2..*4]: ADC1 +// [*4..*6]: DAC0 +// [*6..*8]: DAC1 +struct DataPacket<'a> { + buf: &'a mut [u8], + subsample_rate: usize, + start_id: Option, + num_blocks: u8, + write_index: usize, } -impl DataBlock { - pub fn to_slice(self, buf: &mut [u8], subsample: usize) -> usize { - let block_size = self.block_size / subsample; - buf[0..4].copy_from_slice(&self.block_id.to_be_bytes()); - buf[4..8].copy_from_slice(&block_size.to_be_bytes()); +impl<'a> DataPacket<'a> { + pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self { + Self { + buf, + start_id: None, + num_blocks: 0, + subsample_rate, + write_index: 4, + } + } - let mut offset: usize = 8; - for device in &[self.adcs, self.dacs] { + pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> { + // Check that the block is sequential. + if let Some(id) = &self.start_id { + if batch.block_id != id.wrapping_add(self.num_blocks.into()) { + return Err(()); + } + } else { + // Otherwise, this is the first block. Record the strt ID. + self.start_id = Some(batch.block_id); + } + + // Check that there is space for the block. + let block_size_bytes = SAMPLE_BUFFER_SIZE / self.subsample_rate * 4 * 2; + if self.buf.len() - self.get_packet_size() < block_size_bytes { + return Err(()); + } + + // Copy the samples into the buffer. + for device in &[batch.adcs, batch.dacs] { for channel in device { - for sample in channel.iter().step_by(subsample) { - buf[offset..offset + 2] + for sample in channel.iter().step_by(self.subsample_rate) { + self.buf[self.write_index..self.write_index + 2] .copy_from_slice(&sample.to_be_bytes()); - offset += 2; + self.write_index += 2; } } } - offset + Ok(()) + } + + fn get_packet_size(&self) -> usize { + let header_length = 4; + let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; + let block_size_bytes = block_sample_size * 2 * 4; + + block_size_bytes * self.num_blocks as usize + header_length + } + + pub fn finish(self) -> usize { + let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; + + // Write the header into the block. + self.buf[0..2].copy_from_slice(&self.start_id.unwrap().to_be_bytes()); + self.buf[2] = self.num_blocks; + self.buf[3] = block_sample_size as u8; + + // Return the length of the packet to transmit. + self.get_packet_size() } } @@ -285,21 +257,23 @@ impl DataStream { } if self.queue.ready() { - let mut handle = self.socket.borrow_mut().unwrap(); - let capacity = self - .stack - .lock(|stack| { - stack.with_udp_socket(handle, |socket| { - socket.payload_send_capacity() - }) - }) - .unwrap(); + // Dequeue data from the queue into a larger block structure. + let mut packet = DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); + while self.queue.ready() { + // Note(unwrap): We check above that the queue is ready before calling this. + if packet.add_batch(self.queue.peek().unwrap()).is_err() { + // If we cannot add another batch, break out of the loop and send the packet. + break; + } - let data = - serialize_blocks(&mut self.buffer, capacity, &mut self.queue); + // Remove the batch that we just added. + self.queue.dequeue(); + } // Transmit the data block. - self.stack.send(&mut handle, &data).ok(); + let mut handle = self.socket.borrow_mut().unwrap(); + let size = packet.finish(); + self.stack.send(&mut handle, &self.buffer[..size]).ok(); } } }