diff --git a/Cargo.lock b/Cargo.lock index dc95b9b..9cffcb5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,7 +38,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9a69a963b70ddacfcd382524f72a4576f359af9334b3bf48a79566590bb8bfa" dependencies = [ "bitrate", - "cortex-m 0.7.2", + "cortex-m 0.6.7", "embedded-hal", ] @@ -389,9 +389,9 @@ dependencies = [ [[package]] name = "managed" -version = "0.7.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c75de51135344a4f8ed3cfe2720dc27736f7711989703a0b43aadf3753c55577" +checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d" [[package]] name = "matrixmultiply" @@ -455,9 +455,9 @@ checksum = "546c37ac5d9e56f55e73b677106873d9d9f5190605e41a856503623648488cae" [[package]] name = "ndarray" -version = "0.15.2" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02b2e4807aaa21dc6dcc3417e5902dc199c3648043bf27b7af4b202332fe4760" +checksum = "cc1372704f14bb132a49a6701c2238970a359ee0829fed481b522a63bf25456a" dependencies = [ "matrixmultiply", "num-complex", @@ -743,9 +743,8 @@ dependencies = [ [[package]] name = "smoltcp" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b5647cc4676e9358e6b15b6536b34e5b413e5ae946a06b3f85e713132bcdfa" +version = "0.8.0" +source = "git+https://github.com/smoltcp-rs/smoltcp?branch=master#027f255f904b9b7c4226cfd8b2d31f272ffa5105" dependencies = [ "bitflags", "byteorder", @@ -755,7 +754,7 @@ dependencies = [ [[package]] name = "smoltcp-nal" version = "0.1.0" -source = "git+https://github.com/quartiq/smoltcp-nal.git?rev=5e56576#5e56576fbbc594f0a0e1df9222a20eb35c8e7511" +source = "git+https://github.com/quartiq/smoltcp-nal.git?rev=5baf55f#5baf55fafbfe2c08d9fe56c836171e9d2fb468e8" dependencies = [ "embedded-nal", "heapless 0.7.1", @@ -804,7 +803,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b672c837e0ee8158ecc7fce0f9a948dd0693a9c588338e728d14b73307a0b7d" dependencies = [ "bare-metal 0.2.5", - "cortex-m 0.7.2", + "cortex-m 0.6.7", "cortex-m-rt", "vcell", ] @@ -812,7 +811,7 @@ dependencies = [ [[package]] name = "stm32h7xx-hal" version = "0.9.0" -source = "git+https://github.com/quartiq/stm32h7xx-hal.git?rev=acd47be#acd47beb4b84b4dc46da3a8b68688bc8c5984604" +source = "git+https://github.com/quartiq/stm32h7xx-hal.git?rev=33aa67d#33aa67d74790cb9f680a4f281b72df0664bcf03c" dependencies = [ "bare-metal 1.0.0", "cast", diff --git a/Cargo.toml b/Cargo.toml index 9539e6d..44f96c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,7 +60,7 @@ rev = "70b0eb5" features = ["stm32h743v", "rt", "unproven", "ethernet", "quadspi"] # version = "0.9.0" git = "https://github.com/quartiq/stm32h7xx-hal.git" -rev = "acd47be" +rev = "33aa67d" # link.x section start/end [patch.crates-io.cortex-m-rt] @@ -73,7 +73,7 @@ rev = "2750533" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" -rev = "5e56576" +rev = "5baf55f" [dependencies.minimq] git = "https://github.com/quartiq/minimq.git" diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py new file mode 100644 index 0000000..2981b0c --- /dev/null +++ b/scripts/stream_throughput.py @@ -0,0 +1,189 @@ +#!/usr/bin/python3 +""" +Author: Ryan Summers + +Description: Provides a mechanism for measuring Stabilizer stream data throughput. +""" +import socket +import collections +import struct +import time +import logging + +# Representation of a single UDP packet transmitted by Stabilizer. +Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac']) + +class Timer: + """ A basic timer for measuring elapsed time periods. """ + + def __init__(self, period=1.0): + """ Create the timer with the provided period. """ + self.start_time = time.time() + self.trigger_time = self.start_time + period + self.period = period + self.started = False + + + def is_triggered(self): + """ Check if the timer period has elapsed. """ + now = time.time() + return now >= self.trigger_time + + + def start(self): + """ Start the timer. """ + self.start_time = time.time() + self.started = True + + + def is_started(self): + """ Check if the timer has started. """ + return self.started + + + def arm(self): + """ Arm the timer trigger. """ + self.trigger_time = time.time() + self.period + + + def elapsed(self): + """ Get the elapsed time since the timer was started. """ + now = time.time() + return now - self.start_time + + +class PacketParser: + """ Utilize class used for parsing received UDP data. """ + + def __init__(self): + """ Initialize the parser. """ + self.buf = b'' + self.total_bytes = 0 + + + def ingress(self, data): + """ Ingress received UDP data. """ + self.total_bytes += len(data) + self.buf += data + + + def parse_all_packets(self): + """ Parse all received packets from the receive buffer. + + Returns: + A list of received Packets. + """ + packets = [] + while True: + new_packets = self._parse() + if new_packets: + packets += new_packets + else: + return packets + + + def _parse(self): + """ Attempt to parse packets from the received buffer. """ + # Attempt to parse a block from the buffer. + if len(self.buf) < 4: + return None + + start_id, num_blocks, data_size = struct.unpack_from('!HBB', self.buf) + + packet_size = 4 + data_size * num_blocks * 8 + + if len(self.buf) < packet_size: + return None + + self.buf = self.buf[4:] + + packets = [] + for offset in range(num_blocks): + adcs_dacs = struct.unpack_from(f'!{4 * data_size}H', self.buf) + adc = [ + adcs_dacs[0:data_size], + adcs_dacs[data_size:2*data_size], + ] + + dac = [ + adcs_dacs[2*data_size: 3*data_size], + adcs_dacs[3*data_size:], + ] + + self.buf = self.buf[8*data_size:] + packets.append(Packet(start_id + offset, adc, dac)) + + return packets + + +def check_index(previous_index, next_index): + """ Check if two indices are sequential. """ + if previous_index == -1: + return True + + # Handle index roll-over. Indices are only stored in 16-bit numbers. + if next_index < previous_index: + next_index += 65536 + + expected_index = previous_index + 1 + + return next_index == expected_index + + +def main(): + """ Main program. """ + connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + connection.bind(("", 1111)) + + logging.basicConfig(level=logging.INFO, + format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s') + + last_index = -1 + + drop_count = 0 + good_blocks = 0 + + timer = Timer() + parser = PacketParser() + + while True: + # Receive any data over UDP and parse it. + data = connection.recv(4096) + if data and not timer.is_started(): + timer.start() + + parser.ingress(data) + + # Handle any received packets. + for packet in parser.parse_all_packets(): + + # Handle any dropped packets. + if not check_index(last_index, packet.index): + print(hex(last_index), hex(packet.index)) + if packet.index < (last_index + 1): + dropped = packet.index + 65536 - (last_index + 1) + else: + dropped = packet.index - (last_index + 1) + + drop_count += dropped + + last_index = packet.index + good_blocks += 1 + + # Report the throughput periodically. + if timer.is_triggered(): + drate = parser.total_bytes * 8 / 1e6 / timer.elapsed() + + print(f''' +Data Rate: {drate:.3f} Mbps +Received Blocks: {good_blocks} +Dropped blocks: {drop_count} + +Metadata: {parser.total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s +---- +''') + timer.arm() + + +if __name__ == '__main__': + main() diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index d511ce1..efec9c1 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -18,6 +18,7 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ + data_stream::{BlockGenerator, StreamTarget}, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, @@ -37,6 +38,7 @@ pub struct Settings { allow_hold: bool, force_hold: bool, telemetry_period: u16, + stream_target: StreamTarget, } impl Default for Settings { @@ -56,6 +58,8 @@ impl Default for Settings { force_hold: false, // The default telemetry period in seconds. telemetry_period: 10, + + stream_target: StreamTarget::default(), } } } @@ -68,6 +72,7 @@ const APP: () = { adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), network: NetworkUsers, + generator: BlockGenerator, settings: Settings, telemetry: TelemetryBuffer, @@ -76,13 +81,13 @@ const APP: () = { iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2], } - #[init(spawn=[telemetry, settings_update])] + #[init(spawn=[telemetry, settings_update, ethernet_link])] fn init(c: init::Context) -> init::LateResources { // Configure the microcontroller let (mut stabilizer, _pounder) = hardware::setup::setup(c.core, c.device); - let network = NetworkUsers::new( + let mut network = NetworkUsers::new( stabilizer.net.stack, stabilizer.net.phy, stabilizer.cycle_counter, @@ -90,10 +95,15 @@ const APP: () = { stabilizer.net.mac_address, ); + let generator = network.enable_streaming(); + // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); c.spawn.telemetry().unwrap(); + // Spawn the ethernet link period check task. + c.spawn.ethernet_link().unwrap(); + // Enable ADC/DAC events stabilizer.adcs.0.start(); stabilizer.adcs.1.start(); @@ -107,6 +117,7 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, + generator, network, digital_inputs: stabilizer.digital_inputs, telemetry: TelemetryBuffer::default(), @@ -130,7 +141,7 @@ const APP: () = { /// /// Because the ADC and DAC operate at the same rate, these two constraints actually implement /// the same time bounds, meeting one also means the other is also met. - #[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry], priority=2)] + #[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=2)] #[inline(never)] #[link_section = ".itcm.process"] fn process(mut c: process::Context) { @@ -141,6 +152,7 @@ const APP: () = { ref settings, ref mut iir_state, ref mut telemetry, + ref mut generator, } = c.resources; let digital_inputs = [ @@ -180,6 +192,9 @@ const APP: () = { .last(); } + // Stream the data. + generator.send(&adc_samples, &dac_samples); + // Update telemetry measurements. telemetry.adcs = [AdcCode(adc_samples[0][0]), AdcCode(adc_samples[1][0])]; @@ -214,6 +229,9 @@ const APP: () = { // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.1.set_gain(settings.afe[1]); + + let target = settings.stream_target.into(); + c.resources.network.direct_stream(target); } #[task(priority = 1, resources=[network, settings, telemetry], schedule=[telemetry])] @@ -240,6 +258,14 @@ const APP: () = { .unwrap(); } + #[task(priority = 1, resources=[network], schedule=[ethernet_link])] + fn ethernet_link(c: ethernet_link::Context) { + c.resources.network.processor.handle_link(); + c.schedule + .ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)) + .unwrap(); + } + #[task(binds = ETH, priority = 1)] fn eth(_: eth::Context) { unsafe { hal::ethernet::interrupt_handler() } diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index 46ffdc0..bc1d39a 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -20,6 +20,7 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ + data_stream::{BlockGenerator, StreamTarget}, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, @@ -64,6 +65,8 @@ pub struct Settings { output_conf: [Conf; 2], telemetry_period: u16, + + stream_target: StreamTarget, } impl Default for Settings { @@ -82,6 +85,8 @@ impl Default for Settings { output_conf: [Conf::InPhase, Conf::Quadrature], // The default telemetry period in seconds. telemetry_period: 10, + + stream_target: StreamTarget::default(), } } } @@ -96,19 +101,20 @@ const APP: () = { settings: Settings, telemetry: TelemetryBuffer, digital_inputs: (DigitalInput0, DigitalInput1), + generator: BlockGenerator, timestamper: InputStamper, pll: RPLL, lockin: Lockin<4>, } - #[init(spawn=[settings_update, telemetry])] + #[init(spawn=[settings_update, telemetry, ethernet_link])] fn init(c: init::Context) -> init::LateResources { // Configure the microcontroller let (mut stabilizer, _pounder) = hardware::setup::setup(c.core, c.device); - let network = NetworkUsers::new( + let mut network = NetworkUsers::new( stabilizer.net.stack, stabilizer.net.phy, stabilizer.cycle_counter, @@ -116,6 +122,8 @@ const APP: () = { stabilizer.net.mac_address, ); + let generator = network.enable_streaming(); + let settings = Settings::default(); let pll = RPLL::new( @@ -127,6 +135,9 @@ const APP: () = { c.spawn.settings_update().unwrap(); c.spawn.telemetry().unwrap(); + // Spawn the ethernet link servicing task. + c.spawn.ethernet_link().unwrap(); + // Enable ADC/DAC events stabilizer.adcs.0.start(); stabilizer.adcs.1.start(); @@ -152,6 +163,7 @@ const APP: () = { telemetry: TelemetryBuffer::default(), settings, + generator, pll, lockin: Lockin::default(), @@ -165,7 +177,7 @@ const APP: () = { /// This is an implementation of a externally (DI0) referenced PLL lockin on the ADC0 signal. /// It outputs either I/Q or power/phase on DAC0/DAC1. Data is normalized to full scale. /// PLL bandwidth, filter bandwidth, slope, and x/y or power/phase post-filters are available. - #[task(binds=DMA1_STR4, resources=[adcs, dacs, lockin, timestamper, pll, settings, telemetry], priority=2)] + #[task(binds=DMA1_STR4, resources=[adcs, dacs, lockin, timestamper, pll, settings, telemetry, generator], priority=2)] #[inline(never)] #[link_section = ".itcm.process"] fn process(mut c: process::Context) { @@ -177,6 +189,7 @@ const APP: () = { ref mut lockin, ref mut pll, ref mut timestamper, + ref mut generator, } = c.resources; let (reference_phase, reference_frequency) = match settings.lockin_mode @@ -249,6 +262,10 @@ const APP: () = { *sample = DacCode::from(value as i16).0; } } + + // Stream data + generator.send(&adc_samples, &dac_samples); + // Update telemetry measurements. telemetry.adcs = [AdcCode(adc_samples[0][0]), AdcCode(adc_samples[1][0])]; @@ -282,6 +299,9 @@ const APP: () = { c.resources.afes.1.set_gain(settings.afe[1]); c.resources.settings.lock(|current| *current = *settings); + + let target = settings.stream_target.into(); + c.resources.network.direct_stream(target); } #[task(priority = 1, resources=[network, digital_inputs, settings, telemetry], schedule=[telemetry])] @@ -313,6 +333,14 @@ const APP: () = { .unwrap(); } + #[task(priority = 1, resources=[network], schedule=[ethernet_link])] + fn ethernet_link(c: ethernet_link::Context) { + c.resources.network.processor.handle_link(); + c.schedule + .ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)) + .unwrap(); + } + #[task(binds = ETH, priority = 1)] fn eth(_: eth::Context) { unsafe { hal::ethernet::interrupt_handler() } diff --git a/src/hardware/setup.rs b/src/hardware/setup.rs index c2f8730..20842ae 100644 --- a/src/hardware/setup.rs +++ b/src/hardware/setup.rs @@ -7,9 +7,6 @@ use stm32h7xx_hal::{ prelude::*, }; -const NUM_SOCKETS: usize = 4; - -use heapless::Vec; use smoltcp_nal::smoltcp; use embedded_hal::digital::v2::{InputPin, OutputPin}; @@ -21,13 +18,18 @@ use super::{ NetworkStack, AFE0, AFE1, }; +const NUM_TCP_SOCKETS: usize = 4; +const NUM_UDP_SOCKETS: usize = 1; +const NUM_SOCKETS: usize = NUM_UDP_SOCKETS + NUM_TCP_SOCKETS; + pub struct NetStorage { pub ip_addrs: [smoltcp::wire::IpCidr; 1], // Note: There is an additional socket set item required for the DHCP socket. pub sockets: [Option>; NUM_SOCKETS + 1], - pub socket_storage: [SocketStorage; NUM_SOCKETS], + pub tcp_socket_storage: [TcpSocketStorage; NUM_TCP_SOCKETS], + pub udp_socket_storage: [UdpSocketStorage; NUM_UDP_SOCKETS], pub neighbor_cache: [Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8], pub routes_cache: @@ -39,13 +41,37 @@ pub struct NetStorage { pub dhcp_rx_storage: [u8; 600], } +pub struct UdpSocketStorage { + rx_storage: [u8; 1024], + tx_storage: [u8; 2048], + tx_metadata: + [smoltcp::storage::PacketMetadata; 10], + rx_metadata: + [smoltcp::storage::PacketMetadata; 10], +} + +impl UdpSocketStorage { + const fn new() -> Self { + Self { + rx_storage: [0; 1024], + tx_storage: [0; 2048], + tx_metadata: [smoltcp::storage::PacketMetadata::< + smoltcp::wire::IpEndpoint, + >::EMPTY; 10], + rx_metadata: [smoltcp::storage::PacketMetadata::< + smoltcp::wire::IpEndpoint, + >::EMPTY; 10], + } + } +} + #[derive(Copy, Clone)] -pub struct SocketStorage { +pub struct TcpSocketStorage { rx_storage: [u8; 1024], tx_storage: [u8; 1024], } -impl SocketStorage { +impl TcpSocketStorage { const fn new() -> Self { Self { rx_storage: [0; 1024], @@ -63,8 +89,9 @@ impl NetStorage { )], neighbor_cache: [None; 8], routes_cache: [None; 8], - sockets: [None, None, None, None, None], - socket_storage: [SocketStorage::new(); NUM_SOCKETS], + sockets: [None, None, None, None, None, None], + tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS], + udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS], dhcp_tx_storage: [0; 600], dhcp_rx_storage: [0; 600], dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1], @@ -634,20 +661,18 @@ pub fn setup( let neighbor_cache = smoltcp::iface::NeighborCache::new(&mut store.neighbor_cache[..]); - let interface = smoltcp::iface::EthernetInterfaceBuilder::new(eth_dma) + let interface = smoltcp::iface::InterfaceBuilder::new(eth_dma) .ethernet_addr(mac_addr) .neighbor_cache(neighbor_cache) .ip_addrs(&mut store.ip_addrs[..]) .routes(routes) .finalize(); - let (mut sockets, handles) = { + let sockets = { let mut sockets = smoltcp::socket::SocketSet::new(&mut store.sockets[..]); - let mut handles: Vec = - Vec::new(); - for storage in store.socket_storage.iter_mut() { + for storage in store.tcp_socket_storage[..].iter_mut() { let tcp_socket = { let rx_buffer = smoltcp::socket::TcpSocketBuffer::new( &mut storage.rx_storage[..], @@ -658,34 +683,28 @@ pub fn setup( smoltcp::socket::TcpSocket::new(rx_buffer, tx_buffer) }; - let handle = sockets.add(tcp_socket); - - handles.push(handle).unwrap(); + sockets.add(tcp_socket); } - (sockets, handles) - }; + for storage in store.udp_socket_storage[..].iter_mut() { + let udp_socket = { + let rx_buffer = smoltcp::socket::UdpSocketBuffer::new( + &mut storage.rx_metadata[..], + &mut storage.rx_storage[..], + ); + let tx_buffer = smoltcp::socket::UdpSocketBuffer::new( + &mut storage.tx_metadata[..], + &mut storage.tx_storage[..], + ); - let dhcp_client = { - let dhcp_rx_buffer = smoltcp::socket::RawSocketBuffer::new( - &mut store.dhcp_rx_metadata[..], - &mut store.dhcp_rx_storage[..], - ); + smoltcp::socket::UdpSocket::new(rx_buffer, tx_buffer) + }; + sockets.add(udp_socket); + } - let dhcp_tx_buffer = smoltcp::socket::RawSocketBuffer::new( - &mut store.dhcp_tx_metadata[..], - &mut store.dhcp_tx_storage[..], - ); + sockets.add(smoltcp::socket::Dhcpv4Socket::new()); - smoltcp::dhcp::Dhcpv4Client::new( - &mut sockets, - dhcp_rx_buffer, - dhcp_tx_buffer, - // Smoltcp indicates that an instant with a negative time is indicative that time is - // not yet available. We can't get the current instant yet, so indicate an invalid - // time value. - smoltcp::time::Instant::from_millis(-1), - ) + sockets }; let random_seed = { @@ -696,12 +715,7 @@ pub fn setup( data }; - let mut stack = smoltcp_nal::NetworkStack::new( - interface, - sockets, - &handles, - Some(dhcp_client), - ); + let mut stack = smoltcp_nal::NetworkStack::new(interface, sockets); stack.seed_random_port(&random_seed); diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs new file mode 100644 index 0000000..61bceba --- /dev/null +++ b/src/net/data_stream.rs @@ -0,0 +1,353 @@ +///! Stabilizer data stream capabilities +///! +///! # Design +///! Stabilizer data streamining utilizes UDP packets to send live data streams at high throughput. +///! Packets are always sent in a best-effort fashion, and data may be dropped. Each packet contains +///! an identifier that can be used to detect any dropped data. +///! +///! The current implementation utilizes an single-producer, single-consumer queue to send data +///! between a high priority task and the UDP transmitter. +///! +///! A "batch" of data is defined to be a single item in the SPSC queue sent to the UDP transmitter +///! thread. The transmitter thread then serializes as many sequential "batches" into a single UDP +///! packet as possible. The UDP packet is also given a header indicating the starting batch +///! sequence number and the number of batches present. If the UDP transmitter encounters a +///! non-sequential batch, it does not enqueue it into the packet and instead transmits any staged +///! data. The non-sequential batch is then transmitted in a new UDP packet. This method allows a +///! receiver to detect dropped batches (e.g. due to processing overhead). +use heapless::spsc::{Consumer, Producer, Queue}; +use miniconf::MiniconfAtomic; +use serde::Deserialize; +use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; + +use super::NetworkReference; +use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; + +// The number of data blocks that we will buffer in the queue. +const BLOCK_BUFFER_SIZE: usize = 30; + +// A factor that data may be subsampled at. +const SUBSAMPLE_RATE: usize = 1; + +/// Represents the destination for the UDP stream to send data to. +#[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize)] +pub struct StreamTarget { + pub ip: [u8; 4], + pub port: u16, +} + +impl Default for StreamTarget { + fn default() -> Self { + Self { + ip: [0; 4], + port: 0, + } + } +} + +impl From for SocketAddr { + fn from(target: StreamTarget) -> SocketAddr { + SocketAddr::new( + IpAddr::V4(Ipv4Addr::new( + target.ip[0], + target.ip[1], + target.ip[2], + target.ip[3], + )), + target.port, + ) + } +} + +/// A basic "batch" of data. +// Note: In the future, the stream may be generic over this type. +#[derive(Debug, Copy, Clone)] +pub struct AdcDacData { + block_id: u16, + adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], +} + +/// Configure streaming on a device. +/// +/// # Args +/// * `stack` - A reference to the shared network stack. +/// +/// # Returns +/// (generator, stream) where `generator` can be used to enqueue "batches" for transmission. The +/// `stream` is the logically consumer (UDP transmitter) of the enqueued data. +pub fn setup_streaming( + stack: NetworkReference, +) -> (BlockGenerator, DataStream) { + let queue = cortex_m::singleton!(: Queue = Queue::new()).unwrap(); + + let (producer, consumer) = queue.split(); + + let generator = BlockGenerator::new(producer); + + let stream = DataStream::new(stack, consumer); + + (generator, stream) +} + +/// The data generator for a stream. +pub struct BlockGenerator { + queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + current_id: u16, +} + +impl BlockGenerator { + /// Construct a new generator. + /// # Args + /// * `queue` - The producer portion of the SPSC queue to enqueue data into. + /// + /// # Returns + /// The generator to use. + fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self { + Self { + queue, + current_id: 0, + } + } + + /// Schedule data to be sent by the generator. + /// + /// # Note + /// If no space is available, the data batch may be silently dropped. + /// + /// # Args + /// * `adcs` - The ADC data to transmit. + /// * `dacs` - The DAC data to transmit. + pub fn send( + &mut self, + adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], + ) { + let block = AdcDacData { + block_id: self.current_id, + adcs: [*adcs[0], *adcs[1]], + dacs: [*dacs[0], *dacs[1]], + }; + + self.current_id = self.current_id.wrapping_add(1); + self.queue.enqueue(block).ok(); + } +} + +/// Represents a single UDP packet sent by the stream. +/// +/// # Packet Format +/// All data is sent in network-endian format. The format is as follows +/// +/// Header: +/// [0..2]: Start block ID (u16) +/// [2..3]: Num Blocks present (u8) +/// [3..4]: Batch Size (u8) +/// +/// Following the header, batches are added sequentially. Each batch takes the form of: +/// [*0..*2]: ADC0 +/// [*2..*4]: ADC1 +/// [*4..*6]: DAC0 +/// [*6..*8]: DAC1 +struct DataPacket<'a> { + buf: &'a mut [u8], + subsample_rate: usize, + start_id: Option, + num_blocks: u8, + write_index: usize, +} + +impl<'a> DataPacket<'a> { + /// Construct a new packet. + /// + /// # Args + /// * `buf` - The location to serialize the data packet into. + /// * `subsample_rate` - The factor at which to subsample data from batches. + pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self { + Self { + buf, + start_id: None, + num_blocks: 0, + subsample_rate, + write_index: 4, + } + } + + /// Add a batch of data to the packet. + /// + /// # Note + /// Serialization occurs as the packet is added. + /// + /// # Args + /// * `batch` - The batch to add to the packet. + pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> { + // Check that the block is sequential. + if let Some(id) = &self.start_id { + if batch.block_id != id.wrapping_add(self.num_blocks.into()) { + return Err(()); + } + } else { + // Otherwise, this is the first block. Record the strt ID. + self.start_id = Some(batch.block_id); + } + + // Check that there is space for the block. + let block_size_bytes = SAMPLE_BUFFER_SIZE / self.subsample_rate * 4 * 2; + if self.buf.len() - self.get_packet_size() < block_size_bytes { + return Err(()); + } + + // Copy the samples into the buffer. + for device in &[batch.adcs, batch.dacs] { + for channel in device { + for sample in channel.iter().step_by(self.subsample_rate) { + self.buf[self.write_index..self.write_index + 2] + .copy_from_slice(&sample.to_be_bytes()); + self.write_index += 2; + } + } + } + + self.num_blocks += 1; + + Ok(()) + } + + fn get_packet_size(&self) -> usize { + let header_length = 4; + let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; + let block_size_bytes = block_sample_size * 2 * 4; + + block_size_bytes * self.num_blocks as usize + header_length + } + + /// Complete the packet and prepare it for transmission. + /// + /// # Returns + /// The size of the packet. The user should utilize the original buffer provided for packet + /// construction to access the packet. + pub fn finish(self) -> usize { + let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; + + // Write the header into the block. + self.buf[0..2].copy_from_slice(&self.start_id.unwrap().to_be_bytes()); + self.buf[2] = self.num_blocks; + self.buf[3] = block_sample_size as u8; + + // Return the length of the packet to transmit. + self.get_packet_size() + } +} + +/// The "consumer" portion of the data stream. +/// +/// # Note +/// This is responsible for consuming data and sending it over UDP. +pub struct DataStream { + stack: NetworkReference, + socket: Option<::UdpSocket>, + queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + remote: SocketAddr, + buffer: [u8; 1024], +} + +impl DataStream { + /// Construct a new data streamer. + /// + /// # Args + /// * `stack` - A reference to the shared network stack. + /// * `consumer` - The read side of the queue containing data to transmit. + fn new( + stack: NetworkReference, + consumer: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + ) -> Self { + Self { + stack, + socket: None, + remote: StreamTarget::default().into(), + queue: consumer, + buffer: [0; 1024], + } + } + + fn close(&mut self) { + // Note(unwrap): We guarantee that the socket is available above. + let socket = self.socket.take().unwrap(); + self.stack.close(socket).unwrap(); + } + + fn open(&mut self, remote: SocketAddr) -> Result<(), ()> { + if self.socket.is_some() { + self.close(); + } + + // If the remote address is unspecified, just close the existing socket. + if remote.ip().is_unspecified() { + if self.socket.is_some() { + self.close(); + } + + return Err(()); + } + + let mut socket = self.stack.socket().map_err(|_| ())?; + + // Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be + // bound. + self.stack.connect(&mut socket, remote).unwrap(); + + self.socket.replace(socket); + + Ok(()) + } + + /// Configure the remote endpoint of the stream. + /// + /// # Args + /// * `remote` - The destination to send stream data to. + pub fn set_remote(&mut self, remote: SocketAddr) { + // If the remote is identical to what we already have, do nothing. + if remote == self.remote { + return; + } + + // Open the new remote connection. + self.open(remote).ok(); + self.remote = remote; + } + + /// Process any data for transmission. + pub fn process(&mut self) { + // If there's no socket available, try to connect to our remote. + if self.socket.is_none() { + // If we can't open the socket (e.g. we do not have an IP address yet), clear data from + // the queue. + if self.open(self.remote).is_err() { + while self.queue.ready() { + self.queue.dequeue(); + } + return; + } + } + + if self.queue.ready() { + // Dequeue data from the queue into a larger block structure. + let mut packet = DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); + while self.queue.ready() { + // Note(unwrap): We check above that the queue is ready before calling this. + if packet.add_batch(self.queue.peek().unwrap()).is_err() { + // If we cannot add another batch, break out of the loop and send the packet. + break; + } + + // Remove the batch that we just added. + self.queue.dequeue(); + } + + // Transmit the data block. + let mut handle = self.socket.as_mut().unwrap(); + let size = packet.finish(); + self.stack.send(&mut handle, &self.buffer[..size]).ok(); + } + } +} diff --git a/src/net/mod.rs b/src/net/mod.rs index e9a2538..115ba3b 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -9,6 +9,7 @@ pub use heapless; pub use miniconf; pub use serde; +pub mod data_stream; pub mod messages; pub mod miniconf_client; pub mod network_processor; @@ -16,6 +17,7 @@ pub mod shared; pub mod telemetry; use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack}; +use data_stream::{BlockGenerator, DataStream}; use messages::{MqttMessage, SettingsResponse}; use miniconf_client::MiniconfClient; use network_processor::NetworkProcessor; @@ -26,6 +28,7 @@ use core::fmt::Write; use heapless::String; use miniconf::Miniconf; use serde::Serialize; +use smoltcp_nal::embedded_nal::SocketAddr; pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>; @@ -45,6 +48,8 @@ pub enum NetworkState { pub struct NetworkUsers { pub miniconf: MiniconfClient, pub processor: NetworkProcessor, + stream: DataStream, + generator: Option, pub telemetry: TelemetryClient, } @@ -95,10 +100,30 @@ where &prefix, ); + let (generator, stream) = + data_stream::setup_streaming(stack_manager.acquire_stack()); + NetworkUsers { miniconf: settings, processor, telemetry, + stream, + generator: Some(generator), + } + } + + /// Enable live data streaming. + pub fn enable_streaming(&mut self) -> BlockGenerator { + self.generator.take().unwrap() + } + + /// Direct the stream to the provided remote target. + /// + /// # Args + /// * `remote` - The destination for the streamed data. + pub fn direct_stream(&mut self, remote: SocketAddr) { + if self.generator.is_none() { + self.stream.set_remote(remote); } } @@ -107,15 +132,20 @@ where /// # Returns /// An indication if any of the network users indicated a state change. pub fn update(&mut self) -> NetworkState { + // Update the MQTT clients. + self.telemetry.update(); + + // Update the data stream. + if self.generator.is_none() { + self.stream.process(); + } + // Poll for incoming data. let poll_result = match self.processor.update() { UpdateState::NoChange => NetworkState::NoChange, UpdateState::Updated => NetworkState::Updated, }; - // Update the MQTT clients. - self.telemetry.update(); - match self.miniconf.update() { UpdateState::Updated => NetworkState::SettingsChanged, UpdateState::NoChange => poll_result, diff --git a/src/net/network_processor.rs b/src/net/network_processor.rs index a8168d3..13f7035 100644 --- a/src/net/network_processor.rs +++ b/src/net/network_processor.rs @@ -37,6 +37,27 @@ impl NetworkProcessor { } } + /// Handle ethernet link connection status. + /// + /// # Note + /// This may take non-trivial amounts of time to communicate with the PHY. As such, this should + /// only be called as often as necessary (e.g. once per second or so). + pub fn handle_link(&mut self) { + // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network + // stack. + match self.phy.poll_link() { + true => self.network_was_reset = false, + + // Only reset the network stack once per link reconnection. This prevents us from + // sending an excessive number of DHCP requests. + false if !self.network_was_reset => { + self.network_was_reset = true; + self.stack.lock(|stack| stack.handle_link_reset()); + } + _ => {} + }; + } + /// Process and update the state of the network. /// /// # Note @@ -52,24 +73,7 @@ impl NetworkProcessor { let result = match self.stack.lock(|stack| stack.poll(now)) { Ok(true) => UpdateState::Updated, Ok(false) => UpdateState::NoChange, - Err(err) => { - log::info!("Network error: {:?}", err); - UpdateState::Updated - } - }; - - // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network - // stack. - match self.phy.poll_link() { - true => self.network_was_reset = false, - - // Only reset the network stack once per link reconnection. This prevents us from - // sending an excessive number of DHCP requests. - false if !self.network_was_reset => { - self.network_was_reset = true; - self.stack.lock(|stack| stack.handle_link_reset()); - } - _ => {} + Err(_) => UpdateState::Updated, }; result diff --git a/src/net/shared.rs b/src/net/shared.rs index 4f86793..4761da0 100644 --- a/src/net/shared.rs +++ b/src/net/shared.rs @@ -69,6 +69,21 @@ where forward! {close(socket: S::TcpSocket) -> Result<(), S::Error>} } +impl<'a, S> embedded_nal::UdpClientStack for NetworkStackProxy<'a, S> +where + S: embedded_nal::UdpClientStack, +{ + type UdpSocket = S::UdpSocket; + type Error = S::Error; + + forward! {socket() -> Result} + forward! {connect(socket: &mut S::UdpSocket, remote: embedded_nal::SocketAddr) -> Result<(), S::Error>} + + forward! {send(socket: &mut S::UdpSocket, buffer: &[u8]) -> embedded_nal::nb::Result<(), S::Error>} + forward! {receive(socket: &mut S::UdpSocket, buffer: &mut [u8]) -> embedded_nal::nb::Result<(usize, embedded_nal::SocketAddr), S::Error>} + forward! {close(socket: S::UdpSocket) -> Result<(), S::Error>} +} + impl NetworkManager { /// Construct a new manager for a shared network stack ///