diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py index 3624333..2e2b33c 100644 --- a/scripts/stream_throughput.py +++ b/scripts/stream_throughput.py @@ -10,13 +10,15 @@ import struct import time import logging -# Representation of a single UDP packet transmitted by Stabilizer. -Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac']) +# Representation of a single data batch transmitted by Stabilizer. +Packet = collections.namedtuple('Packet', ['index', 'data']) -Format = collections.namedtuple('Format', ['batch_size', 'length']) +Format = collections.namedtuple('Format', ['sample_size_bytes', 'batch_format']) +# All supported formats by this reception script. FORMAT = { - 0: Format(8, 964) + 0: Format(sample_size_bytes=8, + batch_format='<{batch_size}H{batch_size}H{batch_size}H{batch_size}H') } class Timer: @@ -94,32 +96,25 @@ class PacketParser: if len(self.buf) < 4: return None - start_id, format_id = struct.unpack_from('(0, |buf| unsafe { - let dst = buf.as_ptr() as usize as *mut u16; + generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>( + StreamFormat::AdcDacData, + |buf| unsafe { + let dst = buf.as_ptr() as usize as *mut u16; - let adc0 = &adc_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping(adc0, dst, SAMPLE_BUFFER_SIZE); + let adc0 = &adc_samples[0][0] as *const u16; + core::ptr::copy_nonoverlapping( + adc0, + dst, + SAMPLE_BUFFER_SIZE, + ); - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let adc1 = &adc_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping(adc1, dst, SAMPLE_BUFFER_SIZE); + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let adc1 = &adc_samples[1][0] as *const u16; + core::ptr::copy_nonoverlapping( + adc1, + dst, + SAMPLE_BUFFER_SIZE, + ); - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac0 = &dac_samples[0][0] as *const u16; - core::ptr::copy_nonoverlapping(dac0, dst, SAMPLE_BUFFER_SIZE); + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let dac0 = &dac_samples[0][0] as *const u16; + core::ptr::copy_nonoverlapping( + dac0, + dst, + SAMPLE_BUFFER_SIZE, + ); - let dst = dst.add(SAMPLE_BUFFER_SIZE); - let dac1 = &dac_samples[1][0] as *const u16; - core::ptr::copy_nonoverlapping(dac1, dst, SAMPLE_BUFFER_SIZE); - }); + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let dac1 = &dac_samples[1][0] as *const u16; + core::ptr::copy_nonoverlapping( + dac1, + dst, + SAMPLE_BUFFER_SIZE, + ); + }, + ); // Update telemetry measurements. telemetry.adcs = diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index c21b4f8..7e09506 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -52,7 +52,7 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ - data_stream::{FrameGenerator, StreamTarget}, + data_stream::{FrameGenerator, StreamFormat, StreamTarget}, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, @@ -396,18 +396,43 @@ const APP: () = { } // Stream the data. - generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>(0, |buf| { - let mut offset = 0; - for device in [adc_samples.iter(), dac_samples.iter()] { - for channel in device { - for sample in channel.iter() { - buf[offset..offset + 2] - .copy_from_slice(&sample.to_ne_bytes()); - offset += 2; - } - } - } - }); + generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>( + StreamFormat::AdcDacData, + |buf| unsafe { + let dst = buf.as_ptr() as usize as *mut u16; + + let adc0 = &adc_samples[0][0] as *const u16; + core::ptr::copy_nonoverlapping( + adc0, + dst, + SAMPLE_BUFFER_SIZE, + ); + + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let adc1 = &adc_samples[1][0] as *const u16; + core::ptr::copy_nonoverlapping( + adc1, + dst, + SAMPLE_BUFFER_SIZE, + ); + + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let dac0 = &dac_samples[0][0] as *const u16; + core::ptr::copy_nonoverlapping( + dac0, + dst, + SAMPLE_BUFFER_SIZE, + ); + + let dst = dst.add(SAMPLE_BUFFER_SIZE); + let dac1 = &dac_samples[1][0] as *const u16; + core::ptr::copy_nonoverlapping( + dac1, + dst, + SAMPLE_BUFFER_SIZE, + ); + }, + ); // Update telemetry measurements. telemetry.adcs = diff --git a/src/configuration.rs b/src/configuration.rs index 01e410a..228f40e 100644 --- a/src/configuration.rs +++ b/src/configuration.rs @@ -3,7 +3,7 @@ /// MQTT broker IPv4 address /// /// In the default configuration, the IP address is defined as 10.34.16.10. -pub const MQTT_BROKER: [u8; 4] = [10, 34, 16, 10]; +pub const MQTT_BROKER: [u8; 4] = [10, 35, 16, 10]; /// Sampling Frequency /// diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 5766721..08bf352 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -15,6 +15,8 @@ use miniconf::MiniconfAtomic; use serde::Deserialize; use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; +use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; + use heapless::pool::{Box, Init, Pool, Uninit}; use super::NetworkReference; @@ -41,6 +43,15 @@ pub struct StreamTarget { pub port: u16, } +/// Specifies the format of streamed data +#[repr(u16)] +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum StreamFormat { + /// Streamed data contains ADC0, ADC1, DAC0, and DAC1 sequentially in little-endian format. Each + /// batch is loaded into the stream frame sequentially until the frame is full. + AdcDacData = 0, +} + impl From for SocketAddr { fn from(target: StreamTarget) -> SocketAddr { SocketAddr::new( @@ -86,23 +97,27 @@ pub fn setup_streaming( } struct StreamFrame { - format: u16, + format: StreamFormat, sequence_number: u16, buffer: Box<[u8; FRAME_SIZE], Init>, offset: usize, + batch_count: u16, + batch_size: u8, } impl StreamFrame { pub fn new( buffer: Box<[u8; FRAME_SIZE], Uninit>, - format: u16, + format: StreamFormat, sequence_number: u16, ) -> Self { Self { format, - offset: 4, + offset: 7, sequence_number, buffer: unsafe { buffer.assume_init() }, + batch_size: SAMPLE_BUFFER_SIZE as u8, + batch_count: 0, } } @@ -115,6 +130,7 @@ impl StreamFrame { let result = f(&mut self.buffer[self.offset..self.offset + T]); self.offset += T; + self.batch_count = self.batch_count.checked_add(1).unwrap(); result } @@ -126,7 +142,9 @@ impl StreamFrame { pub fn finish(&mut self) -> &[u8] { let offset = self.offset; self.buffer[0..2].copy_from_slice(&self.sequence_number.to_ne_bytes()); - self.buffer[2..4].copy_from_slice(&self.format.to_ne_bytes()); + self.buffer[2..4].copy_from_slice(&(self.format as u16).to_ne_bytes()); + self.buffer[4..6].copy_from_slice(&self.batch_count.to_ne_bytes()); + self.buffer[6] = self.batch_size; &self.buffer[..offset] } } @@ -152,7 +170,7 @@ impl FrameGenerator { } } - pub fn add(&mut self, format: u16, f: F) + pub fn add(&mut self, format: StreamFormat, f: F) where F: FnMut(&mut [u8]), { @@ -171,6 +189,11 @@ impl FrameGenerator { } } + assert!( + format == self.current_frame.as_ref().unwrap().format, + "Unexpected stream format encountered" + ); + self.current_frame.as_mut().unwrap().add_batch::<_, T>(f); if self.current_frame.as_ref().unwrap().is_full::() {