Adding internal buffering to data stream blocks

This commit is contained in:
Ryan Summers 2021-06-09 13:26:41 +02:00
parent b292cf45ab
commit b5fdb31a02
5 changed files with 108 additions and 49 deletions

View File

@ -18,11 +18,11 @@ use stabilizer::{
DigitalInput0, DigitalInput1, AFE0, AFE1, DigitalInput0, DigitalInput1, AFE0, AFE1,
}, },
net::{ net::{
data_stream::BlockGenerator,
miniconf::Miniconf, miniconf::Miniconf,
serde::Deserialize, serde::Deserialize,
telemetry::{Telemetry, TelemetryBuffer}, telemetry::{Telemetry, TelemetryBuffer},
NetworkState, NetworkUsers, NetworkState, NetworkUsers,
data_stream::BlockGenerator,
}, },
}; };
@ -215,7 +215,7 @@ const APP: () = {
c.spawn.settings_update().unwrap() c.spawn.settings_update().unwrap()
} }
NetworkState::Updated => {} NetworkState::Updated => {}
NetworkState::NoChange => {}, NetworkState::NoChange => {}
} }
} }
} }
@ -241,7 +241,9 @@ const APP: () = {
.settings .settings
.lock(|settings| (settings.afe, settings.telemetry_period)); .lock(|settings| (settings.afe, settings.telemetry_period));
c.resources.network.telemetry c.resources
.network
.telemetry
.publish(&telemetry.finalize(gains[0], gains[1])); .publish(&telemetry.finalize(gains[0], gains[1]));
// Schedule the telemetry task in the future. // Schedule the telemetry task in the future.
@ -256,7 +258,9 @@ const APP: () = {
#[task(priority = 1, resources=[network], schedule=[ethernet_link])] #[task(priority = 1, resources=[network], schedule=[ethernet_link])]
fn ethernet_link(c: ethernet_link::Context) { fn ethernet_link(c: ethernet_link::Context) {
c.resources.network.processor.handle_link(); c.resources.network.processor.handle_link();
c.schedule.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)).unwrap(); c.schedule
.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1))
.unwrap();
} }
#[task(binds = ETH, priority = 1)] #[task(binds = ETH, priority = 1)]

View File

@ -319,7 +319,9 @@ const APP: () = {
#[task(priority = 1, resources=[network], schedule=[ethernet_link])] #[task(priority = 1, resources=[network], schedule=[ethernet_link])]
fn ethernet_link(c: ethernet_link::Context) { fn ethernet_link(c: ethernet_link::Context) {
c.resources.network.processor.handle_link(); c.resources.network.processor.handle_link();
c.schedule.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)).unwrap(); c.schedule
.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1))
.unwrap();
} }
#[task(binds = ETH, priority = 1)] #[task(binds = ETH, priority = 1)]

View File

@ -44,8 +44,10 @@ pub struct NetStorage {
pub struct UdpSocketStorage { pub struct UdpSocketStorage {
rx_storage: [u8; 1024], rx_storage: [u8; 1024],
tx_storage: [u8; 2048], tx_storage: [u8; 2048],
tx_metadata: [smoltcp::storage::PacketMetadata<smoltcp::wire::IpEndpoint>; 10], tx_metadata:
rx_metadata: [smoltcp::storage::PacketMetadata<smoltcp::wire::IpEndpoint>; 10], [smoltcp::storage::PacketMetadata<smoltcp::wire::IpEndpoint>; 10],
rx_metadata:
[smoltcp::storage::PacketMetadata<smoltcp::wire::IpEndpoint>; 10],
} }
impl UdpSocketStorage { impl UdpSocketStorage {
@ -53,8 +55,12 @@ impl UdpSocketStorage {
Self { Self {
rx_storage: [0; 1024], rx_storage: [0; 1024],
tx_storage: [0; 2048], tx_storage: [0; 2048],
tx_metadata: [smoltcp::storage::PacketMetadata::<smoltcp::wire::IpEndpoint>::EMPTY; 10], tx_metadata: [smoltcp::storage::PacketMetadata::<
rx_metadata: [smoltcp::storage::PacketMetadata::<smoltcp::wire::IpEndpoint>::EMPTY; 10], smoltcp::wire::IpEndpoint,
>::EMPTY; 10],
rx_metadata: [smoltcp::storage::PacketMetadata::<
smoltcp::wire::IpEndpoint,
>::EMPTY; 10],
} }
} }
} }
@ -252,7 +258,10 @@ pub fn setup(
let gpiof = device.GPIOF.split(ccdr.peripheral.GPIOF); let gpiof = device.GPIOF.split(ccdr.peripheral.GPIOF);
let mut gpiog = device.GPIOG.split(ccdr.peripheral.GPIOG); let mut gpiog = device.GPIOG.split(ccdr.peripheral.GPIOG);
let _uart_tx = gpiod.pd8.into_push_pull_output().set_speed(hal::gpio::Speed::VeryHigh); let _uart_tx = gpiod
.pd8
.into_push_pull_output()
.set_speed(hal::gpio::Speed::VeryHigh);
let dma_streams = let dma_streams =
hal::dma::dma::StreamsTuple::new(device.DMA1, ccdr.peripheral.DMA1); hal::dma::dma::StreamsTuple::new(device.DMA1, ccdr.peripheral.DMA1);

View File

@ -1,12 +1,14 @@
use core::borrow::BorrowMut; use core::borrow::BorrowMut;
use heapless::{ use heapless::spsc::{Consumer, Producer, Queue};
spsc::{Consumer, Producer, Queue},
};
use smoltcp_nal::embedded_nal::{SocketAddr, UdpClientStack}; use smoltcp_nal::embedded_nal::{SocketAddr, UdpClientStack};
use super::NetworkReference; use super::NetworkReference;
use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE;
// The number of samples contained in a single block. Note that each sample corresponds ot 8 byte
// s(2 bytes per ADC/DAC code, 4 codes total).
const BLOCK_SAMPLE_SIZE: usize = 50;
// The number of data blocks that we will buffer in the queue. // The number of data blocks that we will buffer in the queue.
const BLOCK_BUFFER_SIZE: usize = 30; const BLOCK_BUFFER_SIZE: usize = 30;
@ -26,8 +28,11 @@ pub fn setup_streaming(
(generator, stream) (generator, stream)
} }
pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue: &mut Consumer<'static, fn serialize_blocks<'a>(
AdcDacData, BLOCK_BUFFER_SIZE>) -> &'a [u8] { buffer: &'a mut [u8],
max_buffer_size: usize,
queue: &mut Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>,
) -> &'a [u8] {
// While there is space in the buffer, serialize into it. // While there is space in the buffer, serialize into it.
let block_size = (SAMPLE_BUFFER_SIZE / SUBSAMPLE_RATE * 2) * 2 * 2 + 8; let block_size = (SAMPLE_BUFFER_SIZE / SUBSAMPLE_RATE * 2) * 2 * 2 + 8;
@ -63,23 +68,31 @@ pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue:
&buffer[..block_size * enqueued_blocks] &buffer[..block_size * enqueued_blocks]
} }
#[derive(Debug)] #[derive(Debug, Copy, Clone)]
pub struct AdcDacData { pub struct AdcDacData {
block_id: u32, block_id: u32,
adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], adcs: [[u16; BLOCK_SAMPLE_SIZE]; 2],
dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], dacs: [[u16; BLOCK_SAMPLE_SIZE]; 2],
} }
pub struct BlockGenerator { pub struct BlockGenerator {
queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>,
current_id: u32, current_block: AdcDacData,
num_samples: usize,
} }
impl BlockGenerator { impl BlockGenerator {
pub fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self { pub fn new(
queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>,
) -> Self {
Self { Self {
queue, queue,
current_id: 0, current_block: AdcDacData {
block_id: 0,
adcs: [[0; BLOCK_SAMPLE_SIZE]; 2],
dacs: [[0; BLOCK_SAMPLE_SIZE]; 2],
},
num_samples: 0,
} }
} }
@ -88,17 +101,47 @@ impl BlockGenerator {
adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2],
dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2],
) { ) {
let block = AdcDacData { let mut processed_samples = 0;
block_id: self.current_id,
adcs: [*adcs[0], *adcs[1]], while processed_samples < SAMPLE_BUFFER_SIZE {
dacs: [*dacs[0], *dacs[1]], let remaining_samples = SAMPLE_BUFFER_SIZE - processed_samples;
let free_space = BLOCK_SAMPLE_SIZE - self.num_samples;
let copy_sample_length = if remaining_samples < free_space {
remaining_samples
} else {
free_space
}; };
self.current_id = self.current_id.wrapping_add(1); let start_src = self.num_samples;
let end_src = start_src + copy_sample_length;
// Note: We silently ignore dropped blocks here. The queue can fill up if the service let start_dst = processed_samples;
// routing isn't being called often enough. let end_dst = start_dst + copy_sample_length;
self.queue.enqueue(block).ok();
self.current_block.adcs[0][start_src..end_src]
.copy_from_slice(&adcs[0][start_dst..end_dst]);
self.current_block.adcs[1][start_src..end_src]
.copy_from_slice(&adcs[1][start_dst..end_dst]);
self.current_block.dacs[0][start_src..end_src]
.copy_from_slice(&dacs[0][start_dst..end_dst]);
self.current_block.dacs[1][start_src..end_src]
.copy_from_slice(&dacs[1][start_dst..end_dst]);
self.num_samples += copy_sample_length;
// If the data block is full, push it onto the queue.
if self.num_samples == BLOCK_SAMPLE_SIZE {
// Note: We silently ignore dropped blocks here. The queue can fill up if the
// service routing isn't being called often enough.
self.queue.enqueue(self.current_block).ok();
self.current_block.block_id =
self.current_block.block_id.wrapping_add(1);
self.num_samples = 0;
}
processed_samples += copy_sample_length;
}
} }
} }
@ -113,8 +156,8 @@ pub struct DataStream {
struct DataBlock { struct DataBlock {
block_id: u32, block_id: u32,
block_size: usize, block_size: usize,
adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], adcs: [[u16; BLOCK_SAMPLE_SIZE]; 2],
dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], dacs: [[u16; BLOCK_SAMPLE_SIZE]; 2],
} }
impl DataBlock { impl DataBlock {
@ -127,7 +170,8 @@ impl DataBlock {
for device in &[self.adcs, self.dacs] { for device in &[self.adcs, self.dacs] {
for channel in device { for channel in device {
for sample in channel.iter().step_by(subsample) { for sample in channel.iter().step_by(subsample) {
buf[offset..offset+2].copy_from_slice(&sample.to_be_bytes()); buf[offset..offset + 2]
.copy_from_slice(&sample.to_be_bytes());
offset += 2; offset += 2;
} }
} }
@ -135,7 +179,6 @@ impl DataBlock {
offset offset
} }
} }
impl DataStream { impl DataStream {
@ -165,12 +208,9 @@ impl DataStream {
self.close(); self.close();
} }
let mut socket = let mut socket = self.stack.socket().map_err(|err| match err {
self.stack
.socket()
.map_err(|err| match err {
<NetworkReference as UdpClientStack>::Error::NoIpAddress => (), <NetworkReference as UdpClientStack>::Error::NoIpAddress => (),
_ => () _ => (),
})?; })?;
self.stack.connect(&mut socket, remote).unwrap(); self.stack.connect(&mut socket, remote).unwrap();
@ -199,7 +239,6 @@ impl DataStream {
if self.socket.is_none() && self.remote.is_some() { if self.socket.is_none() && self.remote.is_some() {
// If we still can't open the remote, continue. // If we still can't open the remote, continue.
if self.open(self.remote.unwrap()).is_err() { if self.open(self.remote.unwrap()).is_err() {
// Clear the queue out. // Clear the queue out.
while self.queue.ready() { while self.queue.ready() {
self.queue.dequeue(); self.queue.dequeue();
@ -210,14 +249,19 @@ impl DataStream {
if self.queue.ready() { if self.queue.ready() {
let mut handle = self.socket.borrow_mut().unwrap(); let mut handle = self.socket.borrow_mut().unwrap();
let capacity = self.stack.lock(|stack| stack.with_udp_socket(handle, |socket| { let capacity = self
.stack
.lock(|stack| {
stack.with_udp_socket(handle, |socket| {
socket.payload_send_capacity() socket.payload_send_capacity()
})).unwrap(); })
})
.unwrap();
let data = serialize_blocks(&mut self.buffer, capacity, &mut self.queue); let data =
serialize_blocks(&mut self.buffer, capacity, &mut self.queue);
// Transmit the data block. // Transmit the data block.
// TODO: Should we measure how many packets get dropped as telemetry?
self.stack.send(&mut handle, &data).ok(); self.stack.send(&mut handle, &data).ok();
} }
} }

View File

@ -9,20 +9,20 @@ pub use heapless;
pub use miniconf; pub use miniconf;
pub use serde; pub use serde;
pub mod data_stream;
pub mod messages; pub mod messages;
pub mod miniconf_client; pub mod miniconf_client;
pub mod network_processor; pub mod network_processor;
pub mod shared; pub mod shared;
pub mod telemetry; pub mod telemetry;
pub mod data_stream;
use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack}; use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack};
use data_stream::{BlockGenerator, DataStream};
use messages::{MqttMessage, SettingsResponse}; use messages::{MqttMessage, SettingsResponse};
use miniconf_client::MiniconfClient; use miniconf_client::MiniconfClient;
use network_processor::NetworkProcessor; use network_processor::NetworkProcessor;
use shared::NetworkManager; use shared::NetworkManager;
use telemetry::TelemetryClient; use telemetry::TelemetryClient;
use data_stream::{DataStream, BlockGenerator};
use core::fmt::Write; use core::fmt::Write;
use heapless::String; use heapless::String;