diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 696fc07..0c3ad3e 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -18,11 +18,11 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ + data_stream::BlockGenerator, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, NetworkState, NetworkUsers, - data_stream::BlockGenerator, }, }; @@ -215,7 +215,7 @@ const APP: () = { c.spawn.settings_update().unwrap() } NetworkState::Updated => {} - NetworkState::NoChange => {}, + NetworkState::NoChange => {} } } } @@ -241,7 +241,9 @@ const APP: () = { .settings .lock(|settings| (settings.afe, settings.telemetry_period)); - c.resources.network.telemetry + c.resources + .network + .telemetry .publish(&telemetry.finalize(gains[0], gains[1])); // Schedule the telemetry task in the future. @@ -256,7 +258,9 @@ const APP: () = { #[task(priority = 1, resources=[network], schedule=[ethernet_link])] fn ethernet_link(c: ethernet_link::Context) { c.resources.network.processor.handle_link(); - c.schedule.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)).unwrap(); + c.schedule + .ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)) + .unwrap(); } #[task(binds = ETH, priority = 1)] diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index c504105..9263e2c 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -319,7 +319,9 @@ const APP: () = { #[task(priority = 1, resources=[network], schedule=[ethernet_link])] fn ethernet_link(c: ethernet_link::Context) { c.resources.network.processor.handle_link(); - c.schedule.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)).unwrap(); + c.schedule + .ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)) + .unwrap(); } #[task(binds = ETH, priority = 1)] diff --git a/src/hardware/setup.rs b/src/hardware/setup.rs index dffb9c2..83f9b87 100644 --- a/src/hardware/setup.rs +++ b/src/hardware/setup.rs @@ -44,8 +44,10 @@ pub struct NetStorage { pub struct UdpSocketStorage { rx_storage: [u8; 1024], tx_storage: [u8; 2048], - tx_metadata: [smoltcp::storage::PacketMetadata; 10], - rx_metadata: [smoltcp::storage::PacketMetadata; 10], + tx_metadata: + [smoltcp::storage::PacketMetadata; 10], + rx_metadata: + [smoltcp::storage::PacketMetadata; 10], } impl UdpSocketStorage { @@ -53,8 +55,12 @@ impl UdpSocketStorage { Self { rx_storage: [0; 1024], tx_storage: [0; 2048], - tx_metadata: [smoltcp::storage::PacketMetadata::::EMPTY; 10], - rx_metadata: [smoltcp::storage::PacketMetadata::::EMPTY; 10], + tx_metadata: [smoltcp::storage::PacketMetadata::< + smoltcp::wire::IpEndpoint, + >::EMPTY; 10], + rx_metadata: [smoltcp::storage::PacketMetadata::< + smoltcp::wire::IpEndpoint, + >::EMPTY; 10], } } } @@ -252,7 +258,10 @@ pub fn setup( let gpiof = device.GPIOF.split(ccdr.peripheral.GPIOF); let mut gpiog = device.GPIOG.split(ccdr.peripheral.GPIOG); - let _uart_tx = gpiod.pd8.into_push_pull_output().set_speed(hal::gpio::Speed::VeryHigh); + let _uart_tx = gpiod + .pd8 + .into_push_pull_output() + .set_speed(hal::gpio::Speed::VeryHigh); let dma_streams = hal::dma::dma::StreamsTuple::new(device.DMA1, ccdr.peripheral.DMA1); diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 1309735..b323a26 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -1,12 +1,14 @@ use core::borrow::BorrowMut; -use heapless::{ - spsc::{Consumer, Producer, Queue}, -}; +use heapless::spsc::{Consumer, Producer, Queue}; use smoltcp_nal::embedded_nal::{SocketAddr, UdpClientStack}; use super::NetworkReference; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; +// The number of samples contained in a single block. Note that each sample corresponds ot 8 byte +// s(2 bytes per ADC/DAC code, 4 codes total). +const BLOCK_SAMPLE_SIZE: usize = 50; + // The number of data blocks that we will buffer in the queue. const BLOCK_BUFFER_SIZE: usize = 30; @@ -26,8 +28,11 @@ pub fn setup_streaming( (generator, stream) } -pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue: &mut Consumer<'static, - AdcDacData, BLOCK_BUFFER_SIZE>) -> &'a [u8] { +fn serialize_blocks<'a>( + buffer: &'a mut [u8], + max_buffer_size: usize, + queue: &mut Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, +) -> &'a [u8] { // While there is space in the buffer, serialize into it. let block_size = (SAMPLE_BUFFER_SIZE / SUBSAMPLE_RATE * 2) * 2 * 2 + 8; @@ -63,23 +68,31 @@ pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue: &buffer[..block_size * enqueued_blocks] } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub struct AdcDacData { block_id: u32, - adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], - dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + adcs: [[u16; BLOCK_SAMPLE_SIZE]; 2], + dacs: [[u16; BLOCK_SAMPLE_SIZE]; 2], } pub struct BlockGenerator { queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, - current_id: u32, + current_block: AdcDacData, + num_samples: usize, } impl BlockGenerator { - pub fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self { + pub fn new( + queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + ) -> Self { Self { queue, - current_id: 0, + current_block: AdcDacData { + block_id: 0, + adcs: [[0; BLOCK_SAMPLE_SIZE]; 2], + dacs: [[0; BLOCK_SAMPLE_SIZE]; 2], + }, + num_samples: 0, } } @@ -88,17 +101,47 @@ impl BlockGenerator { adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], ) { - let block = AdcDacData { - block_id: self.current_id, - adcs: [*adcs[0], *adcs[1]], - dacs: [*dacs[0], *dacs[1]], - }; + let mut processed_samples = 0; - self.current_id = self.current_id.wrapping_add(1); + while processed_samples < SAMPLE_BUFFER_SIZE { + let remaining_samples = SAMPLE_BUFFER_SIZE - processed_samples; + let free_space = BLOCK_SAMPLE_SIZE - self.num_samples; + let copy_sample_length = if remaining_samples < free_space { + remaining_samples + } else { + free_space + }; - // Note: We silently ignore dropped blocks here. The queue can fill up if the service - // routing isn't being called often enough. - self.queue.enqueue(block).ok(); + let start_src = self.num_samples; + let end_src = start_src + copy_sample_length; + + let start_dst = processed_samples; + let end_dst = start_dst + copy_sample_length; + + self.current_block.adcs[0][start_src..end_src] + .copy_from_slice(&adcs[0][start_dst..end_dst]); + self.current_block.adcs[1][start_src..end_src] + .copy_from_slice(&adcs[1][start_dst..end_dst]); + self.current_block.dacs[0][start_src..end_src] + .copy_from_slice(&dacs[0][start_dst..end_dst]); + self.current_block.dacs[1][start_src..end_src] + .copy_from_slice(&dacs[1][start_dst..end_dst]); + + self.num_samples += copy_sample_length; + + // If the data block is full, push it onto the queue. + if self.num_samples == BLOCK_SAMPLE_SIZE { + // Note: We silently ignore dropped blocks here. The queue can fill up if the + // service routing isn't being called often enough. + self.queue.enqueue(self.current_block).ok(); + + self.current_block.block_id = + self.current_block.block_id.wrapping_add(1); + self.num_samples = 0; + } + + processed_samples += copy_sample_length; + } } } @@ -113,8 +156,8 @@ pub struct DataStream { struct DataBlock { block_id: u32, block_size: usize, - adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], - dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + adcs: [[u16; BLOCK_SAMPLE_SIZE]; 2], + dacs: [[u16; BLOCK_SAMPLE_SIZE]; 2], } impl DataBlock { @@ -127,7 +170,8 @@ impl DataBlock { for device in &[self.adcs, self.dacs] { for channel in device { for sample in channel.iter().step_by(subsample) { - buf[offset..offset+2].copy_from_slice(&sample.to_be_bytes()); + buf[offset..offset + 2] + .copy_from_slice(&sample.to_be_bytes()); offset += 2; } } @@ -135,7 +179,6 @@ impl DataBlock { offset } - } impl DataStream { @@ -165,13 +208,10 @@ impl DataStream { self.close(); } - let mut socket = - self.stack - .socket() - .map_err(|err| match err { - ::Error::NoIpAddress => (), - _ => () - })?; + let mut socket = self.stack.socket().map_err(|err| match err { + ::Error::NoIpAddress => (), + _ => (), + })?; self.stack.connect(&mut socket, remote).unwrap(); @@ -199,7 +239,6 @@ impl DataStream { if self.socket.is_none() && self.remote.is_some() { // If we still can't open the remote, continue. if self.open(self.remote.unwrap()).is_err() { - // Clear the queue out. while self.queue.ready() { self.queue.dequeue(); @@ -210,14 +249,19 @@ impl DataStream { if self.queue.ready() { let mut handle = self.socket.borrow_mut().unwrap(); - let capacity = self.stack.lock(|stack| stack.with_udp_socket(handle, |socket| { - socket.payload_send_capacity() - })).unwrap(); + let capacity = self + .stack + .lock(|stack| { + stack.with_udp_socket(handle, |socket| { + socket.payload_send_capacity() + }) + }) + .unwrap(); - let data = serialize_blocks(&mut self.buffer, capacity, &mut self.queue); + let data = + serialize_blocks(&mut self.buffer, capacity, &mut self.queue); // Transmit the data block. - // TODO: Should we measure how many packets get dropped as telemetry? self.stack.send(&mut handle, &data).ok(); } } diff --git a/src/net/mod.rs b/src/net/mod.rs index e781194..70d7dfe 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -9,20 +9,20 @@ pub use heapless; pub use miniconf; pub use serde; +pub mod data_stream; pub mod messages; pub mod miniconf_client; pub mod network_processor; pub mod shared; pub mod telemetry; -pub mod data_stream; use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack}; +use data_stream::{BlockGenerator, DataStream}; use messages::{MqttMessage, SettingsResponse}; use miniconf_client::MiniconfClient; use network_processor::NetworkProcessor; use shared::NetworkManager; use telemetry::TelemetryClient; -use data_stream::{DataStream, BlockGenerator}; use core::fmt::Write; use heapless::String;