From 72637bebc08078d04f3efb9814267fb207329bc5 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 28 May 2021 18:57:23 +0200 Subject: [PATCH] Updating stream --- Cargo.lock | 2 +- src/net/data_stream.rs | 88 +++++++++++++++++++++++++----------------- src/net/mod.rs | 2 +- 3 files changed, 55 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 885cf5a..6a5f12c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -426,7 +426,7 @@ dependencies = [ [[package]] name = "minimq" version = "0.2.0" -source = "git+https://github.com/quartiq/minimq.git?branch=feature/nal-update#98c1dffc4b0eeeaadf696320e1ce4234d5b52de0" +source = "git+https://github.com/quartiq/minimq.git?rev=dbdbec0#dbdbec0b77d2e134dc6c025018a82c14cbdfbe34" dependencies = [ "bit_field", "embedded-nal", diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 7cd7a30..ab07457 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -1,7 +1,6 @@ use core::borrow::BorrowMut; use heapless::{ spsc::{Consumer, Producer, Queue}, - Vec, }; use smoltcp_nal::embedded_nal::{SocketAddr, UdpClientStack}; @@ -25,6 +24,43 @@ pub fn setup_streaming( (generator, stream) } +pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue: &mut Consumer<'static, + AdcDacData, BlockBufferSize>) -> &'a [u8] { + // While there is space in the buffer, serialize into it. + + let block_size = (SAMPLE_BUFFER_SIZE * 2) * 2 * 2 + 8; + + // Truncate the buffer to the maximum buffer size. + let buffer: &mut [u8] = if buffer.len() > max_buffer_size { + &mut buffer[..max_buffer_size] + } else { + buffer + }; + + // Serialize blocks into the buffer until either the buffer or the queue are exhausted. + let mut enqueued_blocks: usize = 0; + for buf in buffer.chunks_exact_mut(block_size) { + // If there are no more blocks, return the serialized data. + let data = match queue.dequeue() { + Some(data) => data, + None => break, + }; + + let block = DataBlock { + adcs: data.adcs, + dacs: data.dacs, + block_id: data.block_id, + block_size: SAMPLE_BUFFER_SIZE, + }; + + enqueued_blocks += 1; + let length = block.to_slice(buf); + assert!(length == block_size); + } + + &buffer[..block_size * enqueued_blocks] +} + #[derive(Debug)] pub struct AdcDacData { block_id: u32, @@ -69,6 +105,7 @@ pub struct DataStream { socket: Option<::UdpSocket>, queue: Consumer<'static, AdcDacData, BlockBufferSize>, remote: Option, + buffer: [u8; 1024], } struct DataBlock { @@ -79,19 +116,21 @@ struct DataBlock { } impl DataBlock { - pub fn serialize>(self) -> Vec { - let mut vec: Vec = Vec::new(); - vec.extend_from_slice(&self.block_id.to_be_bytes()).unwrap(); - vec.extend_from_slice(&self.block_size.to_be_bytes()).unwrap(); + pub fn to_slice(self, buf: &mut [u8]) -> usize { + buf[0..4].copy_from_slice(&self.block_id.to_be_bytes()); + buf[4..8].copy_from_slice(&self.block_size.to_be_bytes()); + + let mut offset: usize = 8; for device in &[self.adcs, self.dacs] { for channel in device { for sample in channel { - vec.extend_from_slice(&sample.to_be_bytes()).unwrap(); + buf[offset..offset+2].copy_from_slice(&sample.to_be_bytes()); + offset += 2; } } } - vec + offset } } @@ -106,6 +145,7 @@ impl DataStream { socket: None, remote: None, queue: consumer, + buffer: [0; 1024], } } @@ -133,8 +173,6 @@ impl DataStream { // TODO: How should we handle a connection failure? self.stack.connect(&mut socket, remote).unwrap(); - log::info!("Stream connecting to {:?}", remote); - // Note(unwrap): The socket will be empty before we replace it. self.socket.replace(socket); @@ -154,7 +192,7 @@ impl DataStream { self.remote = Some(remote); } - pub fn process(&mut self) -> bool { + pub fn process(&mut self) { // If there's no socket available, try to connect to our remote. if self.socket.is_none() && self.remote.is_some() { // If we still can't open the remote, continue. @@ -164,39 +202,19 @@ impl DataStream { while self.queue.ready() { self.queue.dequeue(); } - return false; + return; } } - let mut handle = self.socket.borrow_mut().unwrap(); - let capacity = self.stack.lock(|stack| stack.get_remaining_send_buffer(handle.handle)).unwrap(); + if self.queue.ready() { + let mut handle = self.socket.borrow_mut().unwrap(); + let capacity = self.stack.lock(|stack| stack.get_remaining_send_buffer(handle.handle)).unwrap(); - // TODO: Clean up magic numbers. - if capacity < 72 { - // We cannot send a full data block. Abort now. - while self.queue.ready() { - self.queue.dequeue(); - } - return false; - } - - if let Some(data) = self.queue.dequeue() { - - let block = DataBlock { - adcs: data.adcs, - dacs: data.dacs, - block_id: data.block_id, - block_size: SAMPLE_BUFFER_SIZE, - }; - - // Serialize the datablock. - let data: Vec = block.serialize(); + let data = serialize_blocks(&mut self.buffer, capacity, &mut self.queue); // Transmit the data block. // TODO: Should we measure how many packets get dropped as telemetry? self.stack.send(&mut handle, &data).ok(); } - - self.queue.ready() } } diff --git a/src/net/mod.rs b/src/net/mod.rs index 5e4cb19..7d52bc4 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -135,7 +135,7 @@ where // Update the data stream. if self.generator.is_none() { - while self.stream.process() {} + self.stream.process(); } // Poll for incoming data.