Updating stream

This commit is contained in:
Ryan Summers 2021-05-28 18:57:23 +02:00
parent 7012cbdc29
commit 72637bebc0
3 changed files with 55 additions and 37 deletions

2
Cargo.lock generated
View File

@ -426,7 +426,7 @@ dependencies = [
[[package]] [[package]]
name = "minimq" name = "minimq"
version = "0.2.0" version = "0.2.0"
source = "git+https://github.com/quartiq/minimq.git?branch=feature/nal-update#98c1dffc4b0eeeaadf696320e1ce4234d5b52de0" source = "git+https://github.com/quartiq/minimq.git?rev=dbdbec0#dbdbec0b77d2e134dc6c025018a82c14cbdfbe34"
dependencies = [ dependencies = [
"bit_field", "bit_field",
"embedded-nal", "embedded-nal",

View File

@ -1,7 +1,6 @@
use core::borrow::BorrowMut; use core::borrow::BorrowMut;
use heapless::{ use heapless::{
spsc::{Consumer, Producer, Queue}, spsc::{Consumer, Producer, Queue},
Vec,
}; };
use smoltcp_nal::embedded_nal::{SocketAddr, UdpClientStack}; use smoltcp_nal::embedded_nal::{SocketAddr, UdpClientStack};
@ -25,6 +24,43 @@ pub fn setup_streaming(
(generator, stream) (generator, stream)
} }
pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue: &mut Consumer<'static,
AdcDacData, BlockBufferSize>) -> &'a [u8] {
// While there is space in the buffer, serialize into it.
let block_size = (SAMPLE_BUFFER_SIZE * 2) * 2 * 2 + 8;
// Truncate the buffer to the maximum buffer size.
let buffer: &mut [u8] = if buffer.len() > max_buffer_size {
&mut buffer[..max_buffer_size]
} else {
buffer
};
// Serialize blocks into the buffer until either the buffer or the queue are exhausted.
let mut enqueued_blocks: usize = 0;
for buf in buffer.chunks_exact_mut(block_size) {
// If there are no more blocks, return the serialized data.
let data = match queue.dequeue() {
Some(data) => data,
None => break,
};
let block = DataBlock {
adcs: data.adcs,
dacs: data.dacs,
block_id: data.block_id,
block_size: SAMPLE_BUFFER_SIZE,
};
enqueued_blocks += 1;
let length = block.to_slice(buf);
assert!(length == block_size);
}
&buffer[..block_size * enqueued_blocks]
}
#[derive(Debug)] #[derive(Debug)]
pub struct AdcDacData { pub struct AdcDacData {
block_id: u32, block_id: u32,
@ -69,6 +105,7 @@ pub struct DataStream {
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>, socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
queue: Consumer<'static, AdcDacData, BlockBufferSize>, queue: Consumer<'static, AdcDacData, BlockBufferSize>,
remote: Option<SocketAddr>, remote: Option<SocketAddr>,
buffer: [u8; 1024],
} }
struct DataBlock { struct DataBlock {
@ -79,19 +116,21 @@ struct DataBlock {
} }
impl DataBlock { impl DataBlock {
pub fn serialize<T: heapless::ArrayLength<u8>>(self) -> Vec<u8, T> { pub fn to_slice(self, buf: &mut [u8]) -> usize {
let mut vec: Vec<u8, T> = Vec::new(); buf[0..4].copy_from_slice(&self.block_id.to_be_bytes());
vec.extend_from_slice(&self.block_id.to_be_bytes()).unwrap(); buf[4..8].copy_from_slice(&self.block_size.to_be_bytes());
vec.extend_from_slice(&self.block_size.to_be_bytes()).unwrap();
let mut offset: usize = 8;
for device in &[self.adcs, self.dacs] { for device in &[self.adcs, self.dacs] {
for channel in device { for channel in device {
for sample in channel { for sample in channel {
vec.extend_from_slice(&sample.to_be_bytes()).unwrap(); buf[offset..offset+2].copy_from_slice(&sample.to_be_bytes());
offset += 2;
} }
} }
} }
vec offset
} }
} }
@ -106,6 +145,7 @@ impl DataStream {
socket: None, socket: None,
remote: None, remote: None,
queue: consumer, queue: consumer,
buffer: [0; 1024],
} }
} }
@ -133,8 +173,6 @@ impl DataStream {
// TODO: How should we handle a connection failure? // TODO: How should we handle a connection failure?
self.stack.connect(&mut socket, remote).unwrap(); self.stack.connect(&mut socket, remote).unwrap();
log::info!("Stream connecting to {:?}", remote);
// Note(unwrap): The socket will be empty before we replace it. // Note(unwrap): The socket will be empty before we replace it.
self.socket.replace(socket); self.socket.replace(socket);
@ -154,7 +192,7 @@ impl DataStream {
self.remote = Some(remote); self.remote = Some(remote);
} }
pub fn process(&mut self) -> bool { pub fn process(&mut self) {
// If there's no socket available, try to connect to our remote. // If there's no socket available, try to connect to our remote.
if self.socket.is_none() && self.remote.is_some() { if self.socket.is_none() && self.remote.is_some() {
// If we still can't open the remote, continue. // If we still can't open the remote, continue.
@ -164,39 +202,19 @@ impl DataStream {
while self.queue.ready() { while self.queue.ready() {
self.queue.dequeue(); self.queue.dequeue();
} }
return false; return;
} }
} }
if self.queue.ready() {
let mut handle = self.socket.borrow_mut().unwrap(); let mut handle = self.socket.borrow_mut().unwrap();
let capacity = self.stack.lock(|stack| stack.get_remaining_send_buffer(handle.handle)).unwrap(); let capacity = self.stack.lock(|stack| stack.get_remaining_send_buffer(handle.handle)).unwrap();
// TODO: Clean up magic numbers. let data = serialize_blocks(&mut self.buffer, capacity, &mut self.queue);
if capacity < 72 {
// We cannot send a full data block. Abort now.
while self.queue.ready() {
self.queue.dequeue();
}
return false;
}
if let Some(data) = self.queue.dequeue() {
let block = DataBlock {
adcs: data.adcs,
dacs: data.dacs,
block_id: data.block_id,
block_size: SAMPLE_BUFFER_SIZE,
};
// Serialize the datablock.
let data: Vec<u8, heapless::consts::U256> = block.serialize();
// Transmit the data block. // Transmit the data block.
// TODO: Should we measure how many packets get dropped as telemetry? // TODO: Should we measure how many packets get dropped as telemetry?
self.stack.send(&mut handle, &data).ok(); self.stack.send(&mut handle, &data).ok();
} }
self.queue.ready()
} }
} }

View File

@ -135,7 +135,7 @@ where
// Update the data stream. // Update the data stream.
if self.generator.is_none() { if self.generator.is_none() {
while self.stream.process() {} self.stream.process();
} }
// Poll for incoming data. // Poll for incoming data.