From 21ca8e1c8f2751600e62dcb7eb48a313a5bd412c Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 17 May 2021 12:43:04 +0200 Subject: [PATCH 01/29] Adding initial streaming implementation --- Cargo.lock | 18 +++++ Cargo.toml | 1 + src/bin/dual-iir.rs | 18 ++++- src/net/data_stream.rs | 171 +++++++++++++++++++++++++++++++++++++++++ src/net/mod.rs | 31 +++++++- 5 files changed, 235 insertions(+), 4 deletions(-) create mode 100644 src/net/data_stream.rs diff --git a/Cargo.lock b/Cargo.lock index 274b2bb..8b8f754 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -548,6 +548,23 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58" +[[package]] +name = "postcard" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66acf3cf8ab62785852a0f67ae3bfcd38324da6561e52e7f4a049ca555c6d55e" +dependencies = [ + "heapless 0.6.1", + "postcard-cobs", + "serde", +] + +[[package]] +name = "postcard-cobs" +version = "0.1.5-pre" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f" + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -772,6 +789,7 @@ dependencies = [ "nb 1.0.0", "num_enum", "paste", + "postcard", "rtt-logger", "rtt-target", "serde", diff --git a/Cargo.toml b/Cargo.toml index 790363a..10cd9b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ ad9959 = { path = "ad9959" } miniconf = "0.1.0" shared-bus = {version = "0.2.2", features = ["cortex-m"] } serde-json-core = "0.3" +postcard = "0.6" [dependencies.rtt-logger] git = "https://github.com/quartiq/rtt-logger.git" diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 29eb474..c258a02 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -13,7 +13,9 @@ use hardware::{ DigitalInput0, DigitalInput1, InputPin, SystemTimer, AFE0, AFE1, }; -use net::{NetworkUsers, Telemetry, TelemetryBuffer, UpdateState}; +use net::{ + BlockGenerator, NetworkUsers, Telemetry, TelemetryBuffer, UpdateState, +}; const SCALE: f32 = i16::MAX as _; @@ -58,6 +60,7 @@ const APP: () = { adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), network: NetworkUsers, + generator: BlockGenerator, settings: Settings, telemetry: TelemetryBuffer, @@ -71,7 +74,7 @@ const APP: () = { // Configure the microcontroller let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); - let network = NetworkUsers::new( + let mut network = NetworkUsers::new( stabilizer.net.stack, stabilizer.net.phy, stabilizer.cycle_counter, @@ -79,6 +82,11 @@ const APP: () = { stabilizer.net.mac_address, ); + // TODO: Remove unwrap. + let remote: smoltcp_nal::embedded_nal::SocketAddr = + "10.35.16.10:1111".parse().unwrap(); + let generator = network.enable_streaming(remote.into()); + // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); c.spawn.telemetry().unwrap(); @@ -96,6 +104,7 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, + generator, network, digital_inputs: stabilizer.digital_inputs, telemetry: net::TelemetryBuffer::default(), @@ -119,7 +128,7 @@ const APP: () = { /// /// Because the ADC and DAC operate at the same rate, these two constraints actually implement /// the same time bounds, meeting one also means the other is also met. - #[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry], priority=2)] + #[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=2)] fn process(c: process::Context) { let adc_samples = [ c.resources.adcs.0.acquire_buffer(), @@ -157,6 +166,9 @@ const APP: () = { } } + // Stream the data. + c.resources.generator.send(&adc_samples, &dac_samples); + // Update telemetry measurements. c.resources.telemetry.adcs = [AdcCode(adc_samples[0][0]), AdcCode(adc_samples[1][0])]; diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs new file mode 100644 index 0000000..ebf66cd --- /dev/null +++ b/src/net/data_stream.rs @@ -0,0 +1,171 @@ +use core::borrow::BorrowMut; +use heapless::{ + spsc::{Consumer, Producer, Queue}, + Vec, +}; +use serde::Serialize; +use smoltcp_nal::embedded_nal::{Mode, SocketAddr, TcpStack}; + +use super::NetworkReference; +use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; + +// The number of data blocks that we will buffer in the queue. +type BlockBufferSize = heapless::consts::U10; + +pub fn setup_streaming( + stack: NetworkReference, +) -> (BlockGenerator, DataStream) { + let queue = cortex_m::singleton!(: Queue = Queue::new()).unwrap(); + + let (producer, consumer) = queue.split(); + + let generator = BlockGenerator::new(producer); + + let stream = DataStream::new(stack, consumer); + + (generator, stream) +} + +pub struct AdcDacData { + block_id: u32, + adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], +} + +pub struct BlockGenerator { + queue: Producer<'static, AdcDacData, BlockBufferSize>, + current_id: u32, +} + +impl BlockGenerator { + pub fn new(queue: Producer<'static, AdcDacData, BlockBufferSize>) -> Self { + Self { + queue, + current_id: 0, + } + } + + pub fn send( + &mut self, + adcs: &[&[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], + ) { + let block = AdcDacData { + block_id: self.current_id, + adcs: [*adcs[0], *adcs[1]], + dacs: [*dacs[0], *dacs[1]], + }; + + self.current_id = self.current_id.wrapping_add(1); + + // We perform best-effort enqueueing of the data block. + self.queue.enqueue(block).ok(); + } +} + +pub struct DataStream { + stack: NetworkReference, + socket: Option<::TcpSocket>, + queue: Consumer<'static, AdcDacData, BlockBufferSize>, + current_index: u32, + remote: Option, +} + +#[derive(Serialize)] +struct DataBlock { + block_id: u32, + block_size: usize, + adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], +} + +impl DataStream { + pub fn new( + stack: NetworkReference, + consumer: Consumer<'static, AdcDacData, BlockBufferSize>, + ) -> Self { + Self { + stack, + socket: None, + current_index: 0, + remote: None, + queue: consumer, + } + } + + fn open(&mut self, remote: SocketAddr) -> Result<(), ()> { + if self.socket.is_some() { + // Note(unwrap): We guarantee that the socket is available above. + let socket = self.socket.take().unwrap(); + self.stack.close(socket).unwrap(); + } + + let socket = + self.stack + .open(Mode::NonBlocking) + .map_err(|err| match err { + ::Error::NoIpAddress => (), + other => { + log::info!("Network Error: {:?}", other); + () + } + })?; + + // TODO: How should we handle a connection failure? + let socket = self.stack.connect(socket, remote).unwrap(); + + // Note(unwrap): The socket will be empty before we replace it. + self.socket.replace(socket); + + Ok(()) + } + + pub fn set_remote(&mut self, remote: SocketAddr) { + // If the remote is identical to what we already have, do nothing. + if let Some(current_remote) = self.remote { + if current_remote == remote { + return; + } + } + + // Open the new remote connection. + self.open(remote).ok(); + self.remote = Some(remote); + } + + pub fn process(&mut self) { + while let Some(data) = self.queue.dequeue() { + // If there's no socket available, try to connect to our remote. + if self.socket.is_none() && self.remote.is_some() { + // If we still can't open the remote, continue. + if self.open(self.remote.unwrap()).is_err() { + continue; + } + } + + let block = DataBlock { + adcs: data.adcs, + dacs: data.dacs, + block_id: data.block_id, + block_size: SAMPLE_BUFFER_SIZE, + }; + + // Increment the current block index. + self.current_index = self.current_index.wrapping_add(1); + + // Serialize the datablock. + // TODO: Do we want to packetize the data block as well? + let data: Vec = + postcard::to_vec(&block).unwrap(); + + let mut socket = self.socket.borrow_mut().unwrap(); + + // Transmit the data block. + // TODO: How should we handle partial packet transmission? + match self.stack.write(&mut socket, &data) { + Ok(len) => assert!(len == data.len()), + _ => info!("Dropping packet"), + } + } + } +} diff --git a/src/net/mod.rs b/src/net/mod.rs index 38499ca..aeaa9ab 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -11,18 +11,23 @@ use serde::Serialize; use core::fmt::Write; +mod data_stream; mod messages; mod miniconf_client; mod network_processor; mod shared; mod telemetry; +pub use data_stream::BlockGenerator; + use crate::hardware::{CycleCounter, EthernetPhy, NetworkStack}; +use data_stream::DataStream; use messages::{MqttMessage, SettingsResponse}; pub use miniconf_client::MiniconfClient; pub use network_processor::NetworkProcessor; pub use shared::NetworkManager; +use smoltcp_nal::embedded_nal::SocketAddr; pub use telemetry::{Telemetry, TelemetryBuffer, TelemetryClient}; pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>; @@ -36,7 +41,9 @@ pub enum UpdateState { /// A structure of Stabilizer's default network users. pub struct NetworkUsers { pub miniconf: MiniconfClient, - pub processor: NetworkProcessor, + processor: NetworkProcessor, + stream: DataStream, + generator: Option, pub telemetry: TelemetryClient, } @@ -87,10 +94,27 @@ where &prefix, ); + let (generator, stream) = + data_stream::setup_streaming(stack_manager.acquire_stack()); + NetworkUsers { miniconf: settings, processor, telemetry, + stream, + generator: Some(generator), + } + } + + /// Enable live data streaming. + pub fn enable_streaming(&mut self, remote: SocketAddr) -> BlockGenerator { + self.stream.set_remote(remote); + self.generator.take().unwrap() + } + + pub fn direct_stream(&mut self, remote: SocketAddr) { + if self.generator.is_none() { + self.stream.set_remote(remote); } } @@ -105,6 +129,11 @@ where // Update the MQTT clients. self.telemetry.update(); + // Update the data stream. + if self.generator.is_none() { + self.stream.process(); + } + match self.miniconf.update() { UpdateState::Updated => UpdateState::Updated, UpdateState::NoChange => poll_result, From 731513722f73367d94ecb33433ff5c4d4c691408 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 17 May 2021 18:33:43 +0200 Subject: [PATCH 02/29] Updating after testing --- Cargo.toml | 2 +- src/hardware/configuration.rs | 4 +- src/net/data_stream.rs | 80 ++++++++++++++++++++++++++--------- 3 files changed, 64 insertions(+), 22 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 10cd9b7..041d5d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ pounder_v1_1 = [ ] [profile.dev] codegen-units = 1 incremental = false -opt-level = 3 +opt-level = 1 [profile.release] opt-level = 3 diff --git a/src/hardware/configuration.rs b/src/hardware/configuration.rs index f48b3d2..513e851 100644 --- a/src/hardware/configuration.rs +++ b/src/hardware/configuration.rs @@ -7,7 +7,7 @@ use stm32h7xx_hal::{ prelude::*, }; -const NUM_SOCKETS: usize = 4; +const NUM_SOCKETS: usize = 5; use heapless::{consts, Vec}; use smoltcp_nal::smoltcp; @@ -62,7 +62,7 @@ impl NetStorage { )], neighbor_cache: [None; 8], routes_cache: [None; 8], - sockets: [None, None, None, None, None], + sockets: [None, None, None, None, None, None], socket_storage: [SocketStorage::new(); NUM_SOCKETS], dhcp_tx_storage: [0; 600], dhcp_rx_storage: [0; 600], diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index ebf66cd..bbaf048 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -67,11 +67,9 @@ pub struct DataStream { stack: NetworkReference, socket: Option<::TcpSocket>, queue: Consumer<'static, AdcDacData, BlockBufferSize>, - current_index: u32, remote: Option, } -#[derive(Serialize)] struct DataBlock { block_id: u32, block_size: usize, @@ -79,6 +77,24 @@ struct DataBlock { dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], } +impl DataBlock { + pub fn serialize>(self) -> Vec { + let mut vec: Vec = Vec::new(); + vec.extend_from_slice(&self.block_id.to_be_bytes()).unwrap(); + vec.extend_from_slice(&self.block_size.to_be_bytes()).unwrap(); + for device in &[self.adcs, self.dacs] { + for channel in device { + for sample in channel { + vec.extend_from_slice(&sample.to_be_bytes()).unwrap(); + } + } + } + + vec + } + +} + impl DataStream { pub fn new( stack: NetworkReference, @@ -87,17 +103,22 @@ impl DataStream { Self { stack, socket: None, - current_index: 0, remote: None, queue: consumer, } } + fn close(&mut self) { + // Note(unwrap): We guarantee that the socket is available above. + let socket = self.socket.take().unwrap(); + self.stack.close(socket).unwrap(); + + log::info!("Stream Disconnecting"); + } + fn open(&mut self, remote: SocketAddr) -> Result<(), ()> { if self.socket.is_some() { - // Note(unwrap): We guarantee that the socket is available above. - let socket = self.socket.take().unwrap(); - self.stack.close(socket).unwrap(); + self.close(); } let socket = @@ -105,15 +126,14 @@ impl DataStream { .open(Mode::NonBlocking) .map_err(|err| match err { ::Error::NoIpAddress => (), - other => { - log::info!("Network Error: {:?}", other); - () - } + _ => () })?; // TODO: How should we handle a connection failure? let socket = self.stack.connect(socket, remote).unwrap(); + log::info!("Stream connecting to {:?}", remote); + // Note(unwrap): The socket will be empty before we replace it. self.socket.replace(socket); @@ -133,16 +153,36 @@ impl DataStream { self.remote = Some(remote); } + fn manage_reconnection(&mut self) { + if self.socket.is_none() || self.remote.is_none() { + return + } + + let mut socket = self.socket.borrow_mut().unwrap(); + let connected = match self.stack.is_connected(&mut socket) { + Ok(connected) => connected, + _ => return, + }; + + if !connected { + self.socket.replace(self.stack.connect(socket, self.remote.unwrap()).unwrap()); + } + } + pub fn process(&mut self) { - while let Some(data) = self.queue.dequeue() { + if let Some(data) = self.queue.dequeue() { + // If there's no socket available, try to connect to our remote. if self.socket.is_none() && self.remote.is_some() { // If we still can't open the remote, continue. if self.open(self.remote.unwrap()).is_err() { - continue; + return; } } + // Reconnect the socket if we're no longer connected. + self.manage_reconnection(); + let block = DataBlock { adcs: data.adcs, dacs: data.dacs, @@ -150,21 +190,23 @@ impl DataStream { block_size: SAMPLE_BUFFER_SIZE, }; - // Increment the current block index. - self.current_index = self.current_index.wrapping_add(1); - // Serialize the datablock. // TODO: Do we want to packetize the data block as well? - let data: Vec = - postcard::to_vec(&block).unwrap(); + let data: Vec = block.serialize(); let mut socket = self.socket.borrow_mut().unwrap(); // Transmit the data block. // TODO: How should we handle partial packet transmission? + // TODO: Should we measure how many packets get dropped as telemetry? match self.stack.write(&mut socket, &data) { - Ok(len) => assert!(len == data.len()), - _ => info!("Dropping packet"), + Ok(len) => { + if len != data.len() { + log::warn!("Short message: {} {}", len, data.len()); + //self.close(); + } + }, + _ => {}, } } } From 70be4c1c1980a6bee6648661414121783865e45d Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 26 May 2021 15:02:50 +0200 Subject: [PATCH 03/29] Adding WIP changes to streaming --- Cargo.toml | 5 ++-- src/bin/dual-iir.rs | 29 +++++++++++++-------- src/net/data_stream.rs | 50 +++++++++++++++++++++--------------- src/net/miniconf_client.rs | 4 +-- src/net/mod.rs | 18 ++++++++++--- src/net/network_processor.rs | 5 ++++ 6 files changed, 73 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 041d5d7..04ee1af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,8 +64,9 @@ git = "https://github.com/quartiq/miniconf.git" rev = "c6f2b28" [dependencies.smoltcp-nal] -git = "https://github.com/quartiq/smoltcp-nal.git" -rev = "4a1711c" +path = "../smoltcp-nal" +# git = "https://github.com/quartiq/smoltcp-nal.git" +# rev = "4a1711c" [dependencies.minimq] git = "https://github.com/quartiq/minimq.git" diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index c258a02..ae18709 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -128,7 +128,7 @@ const APP: () = { /// /// Because the ADC and DAC operate at the same rate, these two constraints actually implement /// the same time bounds, meeting one also means the other is also met. - #[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=2)] + #[task(binds=DMA1_STR4, spawn=[stream], resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=3)] fn process(c: process::Context) { let adc_samples = [ c.resources.adcs.0.acquire_buffer(), @@ -177,6 +177,15 @@ const APP: () = { [DacCode(dac_samples[0][0]), DacCode(dac_samples[1][0])]; c.resources.telemetry.digital_inputs = digital_inputs; + + // Make a best effort to start data stream processing. It may be blocked by someone else + // using the network stack. + c.spawn.stream().ok(); + } + + #[task(priority = 2, resources=[network])] + fn stream(c: stream::Context) { + c.resources.network.update_stream() } #[idle(resources=[network], spawn=[settings_update])] @@ -192,8 +201,8 @@ const APP: () = { #[task(priority = 1, resources=[network, afes, settings])] fn settings_update(mut c: settings_update::Context) { // Update the IIR channels. - let settings = c.resources.network.miniconf.settings(); - c.resources.settings.lock(|current| *current = *settings); + let settings = c.resources.network.lock(|net| net.miniconf.settings()); + c.resources.settings.lock(|current| *current = settings); // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); @@ -210,10 +219,8 @@ const APP: () = { .settings .lock(|settings| (settings.afe, settings.telemetry_period)); - c.resources - .network - .telemetry - .publish(&telemetry.finalize(gains[0], gains[1])); + c.resources.network.lock(|net| net.telemetry + .publish(&telemetry.finalize(gains[0], gains[1]))); // Schedule the telemetry task in the future. c.schedule @@ -229,22 +236,22 @@ const APP: () = { unsafe { stm32h7xx_hal::ethernet::interrupt_handler() } } - #[task(binds = SPI2, priority = 3)] + #[task(binds = SPI2, priority = 4)] fn spi2(_: spi2::Context) { panic!("ADC0 input overrun"); } - #[task(binds = SPI3, priority = 3)] + #[task(binds = SPI3, priority = 4)] fn spi3(_: spi3::Context) { panic!("ADC1 input overrun"); } - #[task(binds = SPI4, priority = 3)] + #[task(binds = SPI4, priority = 4)] fn spi4(_: spi4::Context) { panic!("DAC0 output error"); } - #[task(binds = SPI5, priority = 3)] + #[task(binds = SPI5, priority = 4)] fn spi5(_: spi5::Context) { panic!("DAC1 output error"); } diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index bbaf048..4d828a0 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -3,14 +3,13 @@ use heapless::{ spsc::{Consumer, Producer, Queue}, Vec, }; -use serde::Serialize; -use smoltcp_nal::embedded_nal::{Mode, SocketAddr, TcpStack}; +use smoltcp_nal::{smoltcp, embedded_nal::{Mode, SocketAddr, TcpStack}}; use super::NetworkReference; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; // The number of data blocks that we will buffer in the queue. -type BlockBufferSize = heapless::consts::U10; +type BlockBufferSize = heapless::consts::U30; pub fn setup_streaming( stack: NetworkReference, @@ -132,7 +131,7 @@ impl DataStream { // TODO: How should we handle a connection failure? let socket = self.stack.connect(socket, remote).unwrap(); - log::info!("Stream connecting to {:?}", remote); + //log::info!("Stream connecting to {:?}", remote); // Note(unwrap): The socket will be empty before we replace it. self.socket.replace(socket); @@ -169,16 +168,30 @@ impl DataStream { } } - pub fn process(&mut self) { - if let Some(data) = self.queue.dequeue() { - - // If there's no socket available, try to connect to our remote. - if self.socket.is_none() && self.remote.is_some() { - // If we still can't open the remote, continue. - if self.open(self.remote.unwrap()).is_err() { - return; - } + pub fn process(&mut self) -> bool { + // If there's no socket available, try to connect to our remote. + if self.socket.is_none() && self.remote.is_some() { + // If we still can't open the remote, continue. + if self.open(self.remote.unwrap()).is_err() { + return false; } + } + + let mut handle = self.socket.borrow_mut().unwrap(); + + let capacity = self.stack.lock(|stack| { + let mut all_sockets = stack.sockets.borrow_mut(); + let socket: &mut smoltcp::socket::TcpSocket = &mut *all_sockets.get(handle); + socket.send_capacity() - socket.send_queue() + }); + + // TODO: Clean up magic numbers. + if capacity < 72 { + // We cannot send a full data block. Abort now. + return false; + } + + if let Some(data) = self.queue.dequeue() { // Reconnect the socket if we're no longer connected. self.manage_reconnection(); @@ -191,23 +204,20 @@ impl DataStream { }; // Serialize the datablock. - // TODO: Do we want to packetize the data block as well? let data: Vec = block.serialize(); - let mut socket = self.socket.borrow_mut().unwrap(); - // Transmit the data block. - // TODO: How should we handle partial packet transmission? // TODO: Should we measure how many packets get dropped as telemetry? - match self.stack.write(&mut socket, &data) { + match self.stack.write(&mut handle, &data) { Ok(len) => { if len != data.len() { - log::warn!("Short message: {} {}", len, data.len()); - //self.close(); + log::error!("Short message: {} {}", len, data.len()); } }, _ => {}, } } + + self.queue.ready() } } diff --git a/src/net/miniconf_client.rs b/src/net/miniconf_client.rs index b809d73..0499d6e 100644 --- a/src/net/miniconf_client.rs +++ b/src/net/miniconf_client.rs @@ -156,7 +156,7 @@ where } /// Get the current settings from miniconf. - pub fn settings(&self) -> &S { - &self.settings + pub fn settings(&self) -> S { + self.settings.clone() } } diff --git a/src/net/mod.rs b/src/net/mod.rs index aeaa9ab..128d2ff 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -118,6 +118,20 @@ where } } + pub fn update_stream(&mut self) { + // Update the data stream. + if self.generator.is_none() { + loop { + // Process egress of the stack. + self.processor.egress(); + + if !self.stream.process() { + return + } + } + } + } + /// Update and process all of the network users state. /// /// # Returns @@ -130,9 +144,7 @@ where self.telemetry.update(); // Update the data stream. - if self.generator.is_none() { - self.stream.process(); - } + self.update_stream(); match self.miniconf.update() { UpdateState::Updated => UpdateState::Updated, diff --git a/src/net/network_processor.rs b/src/net/network_processor.rs index a64d6e7..41e1c31 100644 --- a/src/net/network_processor.rs +++ b/src/net/network_processor.rs @@ -37,6 +37,11 @@ impl NetworkProcessor { } } + pub fn egress(&mut self) { + let now = self.clock.current_ms(); + self.stack.lock(|stack| stack.poll(now)).ok(); + } + /// Process and update the state of the network. /// /// # Note From 2dd1bb9ebf6ca0c44aceba551a08cac0ca2c6e2e Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 26 May 2021 17:56:44 +0200 Subject: [PATCH 04/29] Adding WIP livestreaming implementation --- src/bin/dual-iir.rs | 29 ++++++++++------------------ src/net/data_stream.rs | 14 ++++++++++++-- src/net/miniconf_client.rs | 4 ++-- src/net/mod.rs | 37 +++++++++++++++--------------------- src/net/network_processor.rs | 22 +++++++++++---------- 5 files changed, 51 insertions(+), 55 deletions(-) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index f5d7b04..1fad201 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -128,7 +128,7 @@ const APP: () = { /// /// Because the ADC and DAC operate at the same rate, these two constraints actually implement /// the same time bounds, meeting one also means the other is also met. - #[task(binds=DMA1_STR4, spawn=[stream], resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=3)] + #[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=2)] fn process(c: process::Context) { let adc_samples = [ c.resources.adcs.0.acquire_buffer(), @@ -177,15 +177,6 @@ const APP: () = { [DacCode(dac_samples[0][0]), DacCode(dac_samples[1][0])]; c.resources.telemetry.digital_inputs = digital_inputs; - - // Make a best effort to start data stream processing. It may be blocked by someone else - // using the network stack. - c.spawn.stream().ok(); - } - - #[task(priority = 2, resources=[network])] - fn stream(c: stream::Context) { - c.resources.network.update_stream() } #[idle(resources=[network], spawn=[settings_update])] @@ -196,7 +187,7 @@ const APP: () = { c.spawn.settings_update().unwrap() } NetworkState::Updated => {} - NetworkState::NoChange => cortex_m::asm::wfi(), + NetworkState::NoChange => {}, } } } @@ -204,8 +195,8 @@ const APP: () = { #[task(priority = 1, resources=[network, afes, settings])] fn settings_update(mut c: settings_update::Context) { // Update the IIR channels. - let settings = c.resources.network.lock(|net| net.miniconf.settings()); - c.resources.settings.lock(|current| *current = settings); + let settings = c.resources.network.miniconf.settings(); + c.resources.settings.lock(|current| *current = *settings); // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); @@ -222,8 +213,8 @@ const APP: () = { .settings .lock(|settings| (settings.afe, settings.telemetry_period)); - c.resources.network.lock(|net| net.telemetry - .publish(&telemetry.finalize(gains[0], gains[1]))); + c.resources.network.telemetry + .publish(&telemetry.finalize(gains[0], gains[1])); // Schedule the telemetry task in the future. c.schedule @@ -239,22 +230,22 @@ const APP: () = { unsafe { stm32h7xx_hal::ethernet::interrupt_handler() } } - #[task(binds = SPI2, priority = 4)] + #[task(binds = SPI2, priority = 3)] fn spi2(_: spi2::Context) { panic!("ADC0 input overrun"); } - #[task(binds = SPI3, priority = 4)] + #[task(binds = SPI3, priority = 3)] fn spi3(_: spi3::Context) { panic!("ADC1 input overrun"); } - #[task(binds = SPI4, priority = 4)] + #[task(binds = SPI4, priority = 3)] fn spi4(_: spi4::Context) { panic!("DAC0 output error"); } - #[task(binds = SPI5, priority = 4)] + #[task(binds = SPI5, priority = 3)] fn spi5(_: spi5::Context) { panic!("DAC1 output error"); } diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 4d828a0..2b025b0 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -25,6 +25,7 @@ pub fn setup_streaming( (generator, stream) } +#[derive(Debug)] pub struct AdcDacData { block_id: u32, adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], @@ -57,8 +58,9 @@ impl BlockGenerator { self.current_id = self.current_id.wrapping_add(1); - // We perform best-effort enqueueing of the data block. - self.queue.enqueue(block).ok(); + // Note(unwrap): The buffering of the queue and processing of blocks must be fast enough + // such that blocks will never be silently dropped. + self.queue.enqueue(block).unwrap(); } } @@ -173,6 +175,11 @@ impl DataStream { if self.socket.is_none() && self.remote.is_some() { // If we still can't open the remote, continue. if self.open(self.remote.unwrap()).is_err() { + + // Clear the queue out. + while self.queue.ready() { + self.queue.dequeue(); + } return false; } } @@ -188,6 +195,9 @@ impl DataStream { // TODO: Clean up magic numbers. if capacity < 72 { // We cannot send a full data block. Abort now. + while self.queue.ready() { + self.queue.dequeue(); + } return false; } diff --git a/src/net/miniconf_client.rs b/src/net/miniconf_client.rs index 0499d6e..b809d73 100644 --- a/src/net/miniconf_client.rs +++ b/src/net/miniconf_client.rs @@ -156,7 +156,7 @@ where } /// Get the current settings from miniconf. - pub fn settings(&self) -> S { - self.settings.clone() + pub fn settings(&self) -> &S { + &self.settings } } diff --git a/src/net/mod.rs b/src/net/mod.rs index 1e25dca..5e4cb19 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -124,41 +124,34 @@ where } } - pub fn update_stream(&mut self) { - // Update the data stream. - if self.generator.is_none() { - loop { - // Process egress of the stack. - self.processor.egress(); - - if !self.stream.process() { - return - } - } - } - } - /// Update and process all of the network users state. /// /// # Returns /// An indication if any of the network users indicated a state change. pub fn update(&mut self) -> NetworkState { + super::debug::high(); + // Update the MQTT clients. + self.telemetry.update(); + + // Update the data stream. + if self.generator.is_none() { + while self.stream.process() {} + } + // Poll for incoming data. let poll_result = match self.processor.update() { UpdateState::NoChange => NetworkState::NoChange, UpdateState::Updated => NetworkState::Updated, }; - // Update the MQTT clients. - self.telemetry.update(); - - // Update the data stream. - self.update_stream(); - - match self.miniconf.update() { + let result = match self.miniconf.update() { UpdateState::Updated => NetworkState::SettingsChanged, UpdateState::NoChange => poll_result, - } + }; + + super::debug::low(); + + result } } diff --git a/src/net/network_processor.rs b/src/net/network_processor.rs index 41e1c31..af00606 100644 --- a/src/net/network_processor.rs +++ b/src/net/network_processor.rs @@ -65,17 +65,19 @@ impl NetworkProcessor { // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network // stack. - match self.phy.poll_link() { - true => self.network_was_reset = false, + // TODO: Poll the link state in a task and handle resets. Polling this often is slow and + // uses necessary CPU time. + //match self.phy.poll_link() { + // true => self.network_was_reset = false, - // Only reset the network stack once per link reconnection. This prevents us from - // sending an excessive number of DHCP requests. - false if !self.network_was_reset => { - self.network_was_reset = true; - self.stack.lock(|stack| stack.handle_link_reset()); - } - _ => {} - }; + // // Only reset the network stack once per link reconnection. This prevents us from + // // sending an excessive number of DHCP requests. + // false if !self.network_was_reset => { + // self.network_was_reset = true; + // self.stack.lock(|stack| stack.handle_link_reset()); + // } + // _ => {} + //}; result } From 3ce93b8fcd807a255e1d36a92e575ca8364ab65e Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 27 May 2021 15:58:18 +0200 Subject: [PATCH 05/29] Adding WIP updates --- src/hardware/configuration.rs | 18 +++++++----------- src/net/data_stream.rs | 25 ++++++++++--------------- src/net/shared.rs | 15 +++++++++++++++ 3 files changed, 32 insertions(+), 26 deletions(-) diff --git a/src/hardware/configuration.rs b/src/hardware/configuration.rs index 513e851..e84723d 100644 --- a/src/hardware/configuration.rs +++ b/src/hardware/configuration.rs @@ -9,7 +9,6 @@ use stm32h7xx_hal::{ const NUM_SOCKETS: usize = 5; -use heapless::{consts, Vec}; use smoltcp_nal::smoltcp; use embedded_hal::digital::v2::{InputPin, OutputPin}; @@ -41,14 +40,14 @@ pub struct NetStorage { #[derive(Copy, Clone)] pub struct SocketStorage { rx_storage: [u8; 1024], - tx_storage: [u8; 1024], + tx_storage: [u8; 1024 * 3], } impl SocketStorage { const fn new() -> Self { Self { rx_storage: [0; 1024], - tx_storage: [0; 1024], + tx_storage: [0; 1024 * 3], } } } @@ -181,6 +180,8 @@ pub fn setup( let gpiof = device.GPIOF.split(ccdr.peripheral.GPIOF); let mut gpiog = device.GPIOG.split(ccdr.peripheral.GPIOG); + let _uart_tx = gpiod.pd8.into_push_pull_output().set_speed(hal::gpio::Speed::VeryHigh); + let dma_streams = hal::dma::dma::StreamsTuple::new(device.DMA1, ccdr.peripheral.DMA1); @@ -589,12 +590,10 @@ pub fn setup( .routes(routes) .finalize(); - let (mut sockets, handles) = { + let mut sockets = { let mut sockets = smoltcp::socket::SocketSet::new(&mut store.sockets[..]); - let mut handles: Vec = - Vec::new(); for storage in store.socket_storage.iter_mut() { let tcp_socket = { let rx_buffer = smoltcp::socket::TcpSocketBuffer::new( @@ -606,12 +605,10 @@ pub fn setup( smoltcp::socket::TcpSocket::new(rx_buffer, tx_buffer) }; - let handle = sockets.add(tcp_socket); - - handles.push(handle).unwrap(); + sockets.add(tcp_socket); } - (sockets, handles) + sockets }; let dhcp_client = { @@ -647,7 +644,6 @@ pub fn setup( let mut stack = smoltcp_nal::NetworkStack::new( interface, sockets, - &handles, Some(dhcp_client), ); diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 2b025b0..3796440 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -3,7 +3,7 @@ use heapless::{ spsc::{Consumer, Producer, Queue}, Vec, }; -use smoltcp_nal::{smoltcp, embedded_nal::{Mode, SocketAddr, TcpStack}}; +use smoltcp_nal::embedded_nal::{SocketAddr, TcpClientStack}; use super::NetworkReference; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; @@ -66,7 +66,7 @@ impl BlockGenerator { pub struct DataStream { stack: NetworkReference, - socket: Option<::TcpSocket>, + socket: Option<::TcpSocket>, queue: Consumer<'static, AdcDacData, BlockBufferSize>, remote: Option, } @@ -122,18 +122,18 @@ impl DataStream { self.close(); } - let socket = + let mut socket = self.stack - .open(Mode::NonBlocking) + .socket() .map_err(|err| match err { - ::Error::NoIpAddress => (), + ::Error::NoIpAddress => (), _ => () })?; // TODO: How should we handle a connection failure? - let socket = self.stack.connect(socket, remote).unwrap(); + self.stack.connect(&mut socket, remote).unwrap(); - //log::info!("Stream connecting to {:?}", remote); + log::info!("Stream connecting to {:?}", remote); // Note(unwrap): The socket will be empty before we replace it. self.socket.replace(socket); @@ -166,7 +166,7 @@ impl DataStream { }; if !connected { - self.socket.replace(self.stack.connect(socket, self.remote.unwrap()).unwrap()); + self.stack.connect(&mut socket, self.remote.unwrap()).unwrap(); } } @@ -185,12 +185,7 @@ impl DataStream { } let mut handle = self.socket.borrow_mut().unwrap(); - - let capacity = self.stack.lock(|stack| { - let mut all_sockets = stack.sockets.borrow_mut(); - let socket: &mut smoltcp::socket::TcpSocket = &mut *all_sockets.get(handle); - socket.send_capacity() - socket.send_queue() - }); + let capacity = self.stack.lock(|stack| stack.get_remaining_send_buffer(handle)).unwrap(); // TODO: Clean up magic numbers. if capacity < 72 { @@ -218,7 +213,7 @@ impl DataStream { // Transmit the data block. // TODO: Should we measure how many packets get dropped as telemetry? - match self.stack.write(&mut handle, &data) { + match self.stack.send(&mut handle, &data) { Ok(len) => { if len != data.len() { log::error!("Short message: {} {}", len, data.len()); diff --git a/src/net/shared.rs b/src/net/shared.rs index 4f86793..4761da0 100644 --- a/src/net/shared.rs +++ b/src/net/shared.rs @@ -69,6 +69,21 @@ where forward! {close(socket: S::TcpSocket) -> Result<(), S::Error>} } +impl<'a, S> embedded_nal::UdpClientStack for NetworkStackProxy<'a, S> +where + S: embedded_nal::UdpClientStack, +{ + type UdpSocket = S::UdpSocket; + type Error = S::Error; + + forward! {socket() -> Result} + forward! {connect(socket: &mut S::UdpSocket, remote: embedded_nal::SocketAddr) -> Result<(), S::Error>} + + forward! {send(socket: &mut S::UdpSocket, buffer: &[u8]) -> embedded_nal::nb::Result<(), S::Error>} + forward! {receive(socket: &mut S::UdpSocket, buffer: &mut [u8]) -> embedded_nal::nb::Result<(usize, embedded_nal::SocketAddr), S::Error>} + forward! {close(socket: S::UdpSocket) -> Result<(), S::Error>} +} + impl NetworkManager { /// Construct a new manager for a shared network stack /// From 97911c55f9d5a2bacc5e42ea1e586d1ceb7d43fa Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 27 May 2021 17:10:03 +0200 Subject: [PATCH 06/29] Unwrapping enqueue --- Cargo.lock | 5 ++- src/hardware/configuration.rs | 60 ++++++++++++++++++++++++++++------- src/net/data_stream.rs | 36 +++------------------ 3 files changed, 56 insertions(+), 45 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d1569da..fbc12fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -38,7 +38,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9a69a963b70ddacfcd382524f72a4576f359af9334b3bf48a79566590bb8bfa" dependencies = [ "bitrate", - "cortex-m 0.7.2", + "cortex-m 0.6.7", "embedded-hal", ] @@ -778,7 +778,6 @@ dependencies = [ [[package]] name = "smoltcp-nal" version = "0.1.0" -source = "git+https://github.com/quartiq/smoltcp-nal.git?branch=feature/nal-update#65d94c4ab9e06d2e5c48547a0c9cd6836591e355" dependencies = [ "embedded-nal", "heapless 0.7.1", @@ -828,7 +827,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b672c837e0ee8158ecc7fce0f9a948dd0693a9c588338e728d14b73307a0b7d" dependencies = [ "bare-metal 0.2.5", - "cortex-m 0.7.2", + "cortex-m 0.6.7", "cortex-m-rt", "vcell", ] diff --git a/src/hardware/configuration.rs b/src/hardware/configuration.rs index e84723d..786a72c 100644 --- a/src/hardware/configuration.rs +++ b/src/hardware/configuration.rs @@ -7,8 +7,6 @@ use stm32h7xx_hal::{ prelude::*, }; -const NUM_SOCKETS: usize = 5; - use smoltcp_nal::smoltcp; use embedded_hal::digital::v2::{InputPin, OutputPin}; @@ -19,13 +17,18 @@ use super::{ DigitalInput0, DigitalInput1, EthernetPhy, NetworkStack, AFE0, AFE1, }; +const NUM_TCP_SOCKETS: usize = 5; +const NUM_UDP_SOCKETS: usize = 1; +const NUM_SOCKETS: usize = NUM_UDP_SOCKETS + NUM_TCP_SOCKETS; + pub struct NetStorage { pub ip_addrs: [smoltcp::wire::IpCidr; 1], // Note: There is an additional socket set item required for the DHCP socket. pub sockets: [Option>; NUM_SOCKETS + 1], - pub socket_storage: [SocketStorage; NUM_SOCKETS], + pub tcp_socket_storage: [TcpSocketStorage; NUM_TCP_SOCKETS], + pub udp_socket_storage: [UdpSocketStorage; NUM_UDP_SOCKETS], pub neighbor_cache: [Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8], pub routes_cache: @@ -37,17 +40,35 @@ pub struct NetStorage { pub dhcp_rx_storage: [u8; 600], } -#[derive(Copy, Clone)] -pub struct SocketStorage { +pub struct UdpSocketStorage { rx_storage: [u8; 1024], - tx_storage: [u8; 1024 * 3], + tx_storage: [u8; 1024], + tx_metadata: [smoltcp::storage::PacketMetadata; 10], + rx_metadata: [smoltcp::storage::PacketMetadata; 10], } -impl SocketStorage { +impl UdpSocketStorage { const fn new() -> Self { Self { rx_storage: [0; 1024], - tx_storage: [0; 1024 * 3], + tx_storage: [0; 1024], + tx_metadata: [smoltcp::storage::PacketMetadata::::EMPTY; 10], + rx_metadata: [smoltcp::storage::PacketMetadata::::EMPTY; 10], + } + } +} + +#[derive(Copy, Clone)] +pub struct TcpSocketStorage { + rx_storage: [u8; 1024], + tx_storage: [u8; 1024], +} + +impl TcpSocketStorage { + const fn new() -> Self { + Self { + rx_storage: [0; 1024], + tx_storage: [0; 1024], } } } @@ -61,8 +82,9 @@ impl NetStorage { )], neighbor_cache: [None; 8], routes_cache: [None; 8], - sockets: [None, None, None, None, None, None], - socket_storage: [SocketStorage::new(); NUM_SOCKETS], + sockets: [None, None, None, None, None, None, None], + tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS], + udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS], dhcp_tx_storage: [0; 600], dhcp_rx_storage: [0; 600], dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1], @@ -594,7 +616,7 @@ pub fn setup( let mut sockets = smoltcp::socket::SocketSet::new(&mut store.sockets[..]); - for storage in store.socket_storage.iter_mut() { + for storage in store.tcp_socket_storage[..].iter_mut() { let tcp_socket = { let rx_buffer = smoltcp::socket::TcpSocketBuffer::new( &mut storage.rx_storage[..], @@ -608,6 +630,22 @@ pub fn setup( sockets.add(tcp_socket); } + for storage in store.udp_socket_storage[..].iter_mut() { + let udp_socket = { + let rx_buffer = smoltcp::socket::UdpSocketBuffer::new( + &mut storage.rx_metadata[..], + &mut storage.rx_storage[..], + ); + let tx_buffer = smoltcp::socket::UdpSocketBuffer::new( + &mut storage.tx_metadata[..], + &mut storage.tx_storage[..], + ); + + smoltcp::socket::UdpSocket::new(rx_buffer, tx_buffer) + }; + sockets.add(udp_socket); + } + sockets }; diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 3796440..7cd7a30 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -3,7 +3,7 @@ use heapless::{ spsc::{Consumer, Producer, Queue}, Vec, }; -use smoltcp_nal::embedded_nal::{SocketAddr, TcpClientStack}; +use smoltcp_nal::embedded_nal::{SocketAddr, UdpClientStack}; use super::NetworkReference; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; @@ -66,7 +66,7 @@ impl BlockGenerator { pub struct DataStream { stack: NetworkReference, - socket: Option<::TcpSocket>, + socket: Option<::UdpSocket>, queue: Consumer<'static, AdcDacData, BlockBufferSize>, remote: Option, } @@ -126,7 +126,7 @@ impl DataStream { self.stack .socket() .map_err(|err| match err { - ::Error::NoIpAddress => (), + ::Error::NoIpAddress => (), _ => () })?; @@ -154,22 +154,6 @@ impl DataStream { self.remote = Some(remote); } - fn manage_reconnection(&mut self) { - if self.socket.is_none() || self.remote.is_none() { - return - } - - let mut socket = self.socket.borrow_mut().unwrap(); - let connected = match self.stack.is_connected(&mut socket) { - Ok(connected) => connected, - _ => return, - }; - - if !connected { - self.stack.connect(&mut socket, self.remote.unwrap()).unwrap(); - } - } - pub fn process(&mut self) -> bool { // If there's no socket available, try to connect to our remote. if self.socket.is_none() && self.remote.is_some() { @@ -185,7 +169,7 @@ impl DataStream { } let mut handle = self.socket.borrow_mut().unwrap(); - let capacity = self.stack.lock(|stack| stack.get_remaining_send_buffer(handle)).unwrap(); + let capacity = self.stack.lock(|stack| stack.get_remaining_send_buffer(handle.handle)).unwrap(); // TODO: Clean up magic numbers. if capacity < 72 { @@ -198,9 +182,6 @@ impl DataStream { if let Some(data) = self.queue.dequeue() { - // Reconnect the socket if we're no longer connected. - self.manage_reconnection(); - let block = DataBlock { adcs: data.adcs, dacs: data.dacs, @@ -213,14 +194,7 @@ impl DataStream { // Transmit the data block. // TODO: Should we measure how many packets get dropped as telemetry? - match self.stack.send(&mut handle, &data) { - Ok(len) => { - if len != data.len() { - log::error!("Short message: {} {}", len, data.len()); - } - }, - _ => {}, - } + self.stack.send(&mut handle, &data).ok(); } self.queue.ready() From 5473ab41ab9e218f714d04797f305ddfa788347d Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Thu, 27 May 2021 17:10:19 +0200 Subject: [PATCH 07/29] Updating lock --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index fbc12fd..885cf5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -778,6 +778,7 @@ dependencies = [ [[package]] name = "smoltcp-nal" version = "0.1.0" +source = "git+https://github.com/quartiq/smoltcp-nal.git?branch=feature/udp-support#d387c79df56ba61af233846dbae3ae3bff601309" dependencies = [ "embedded-nal", "heapless 0.7.1", From 72637bebc08078d04f3efb9814267fb207329bc5 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 28 May 2021 18:57:23 +0200 Subject: [PATCH 08/29] Updating stream --- Cargo.lock | 2 +- src/net/data_stream.rs | 88 +++++++++++++++++++++++++----------------- src/net/mod.rs | 2 +- 3 files changed, 55 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 885cf5a..6a5f12c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -426,7 +426,7 @@ dependencies = [ [[package]] name = "minimq" version = "0.2.0" -source = "git+https://github.com/quartiq/minimq.git?branch=feature/nal-update#98c1dffc4b0eeeaadf696320e1ce4234d5b52de0" +source = "git+https://github.com/quartiq/minimq.git?rev=dbdbec0#dbdbec0b77d2e134dc6c025018a82c14cbdfbe34" dependencies = [ "bit_field", "embedded-nal", diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 7cd7a30..ab07457 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -1,7 +1,6 @@ use core::borrow::BorrowMut; use heapless::{ spsc::{Consumer, Producer, Queue}, - Vec, }; use smoltcp_nal::embedded_nal::{SocketAddr, UdpClientStack}; @@ -25,6 +24,43 @@ pub fn setup_streaming( (generator, stream) } +pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue: &mut Consumer<'static, + AdcDacData, BlockBufferSize>) -> &'a [u8] { + // While there is space in the buffer, serialize into it. + + let block_size = (SAMPLE_BUFFER_SIZE * 2) * 2 * 2 + 8; + + // Truncate the buffer to the maximum buffer size. + let buffer: &mut [u8] = if buffer.len() > max_buffer_size { + &mut buffer[..max_buffer_size] + } else { + buffer + }; + + // Serialize blocks into the buffer until either the buffer or the queue are exhausted. + let mut enqueued_blocks: usize = 0; + for buf in buffer.chunks_exact_mut(block_size) { + // If there are no more blocks, return the serialized data. + let data = match queue.dequeue() { + Some(data) => data, + None => break, + }; + + let block = DataBlock { + adcs: data.adcs, + dacs: data.dacs, + block_id: data.block_id, + block_size: SAMPLE_BUFFER_SIZE, + }; + + enqueued_blocks += 1; + let length = block.to_slice(buf); + assert!(length == block_size); + } + + &buffer[..block_size * enqueued_blocks] +} + #[derive(Debug)] pub struct AdcDacData { block_id: u32, @@ -69,6 +105,7 @@ pub struct DataStream { socket: Option<::UdpSocket>, queue: Consumer<'static, AdcDacData, BlockBufferSize>, remote: Option, + buffer: [u8; 1024], } struct DataBlock { @@ -79,19 +116,21 @@ struct DataBlock { } impl DataBlock { - pub fn serialize>(self) -> Vec { - let mut vec: Vec = Vec::new(); - vec.extend_from_slice(&self.block_id.to_be_bytes()).unwrap(); - vec.extend_from_slice(&self.block_size.to_be_bytes()).unwrap(); + pub fn to_slice(self, buf: &mut [u8]) -> usize { + buf[0..4].copy_from_slice(&self.block_id.to_be_bytes()); + buf[4..8].copy_from_slice(&self.block_size.to_be_bytes()); + + let mut offset: usize = 8; for device in &[self.adcs, self.dacs] { for channel in device { for sample in channel { - vec.extend_from_slice(&sample.to_be_bytes()).unwrap(); + buf[offset..offset+2].copy_from_slice(&sample.to_be_bytes()); + offset += 2; } } } - vec + offset } } @@ -106,6 +145,7 @@ impl DataStream { socket: None, remote: None, queue: consumer, + buffer: [0; 1024], } } @@ -133,8 +173,6 @@ impl DataStream { // TODO: How should we handle a connection failure? self.stack.connect(&mut socket, remote).unwrap(); - log::info!("Stream connecting to {:?}", remote); - // Note(unwrap): The socket will be empty before we replace it. self.socket.replace(socket); @@ -154,7 +192,7 @@ impl DataStream { self.remote = Some(remote); } - pub fn process(&mut self) -> bool { + pub fn process(&mut self) { // If there's no socket available, try to connect to our remote. if self.socket.is_none() && self.remote.is_some() { // If we still can't open the remote, continue. @@ -164,39 +202,19 @@ impl DataStream { while self.queue.ready() { self.queue.dequeue(); } - return false; + return; } } - let mut handle = self.socket.borrow_mut().unwrap(); - let capacity = self.stack.lock(|stack| stack.get_remaining_send_buffer(handle.handle)).unwrap(); + if self.queue.ready() { + let mut handle = self.socket.borrow_mut().unwrap(); + let capacity = self.stack.lock(|stack| stack.get_remaining_send_buffer(handle.handle)).unwrap(); - // TODO: Clean up magic numbers. - if capacity < 72 { - // We cannot send a full data block. Abort now. - while self.queue.ready() { - self.queue.dequeue(); - } - return false; - } - - if let Some(data) = self.queue.dequeue() { - - let block = DataBlock { - adcs: data.adcs, - dacs: data.dacs, - block_id: data.block_id, - block_size: SAMPLE_BUFFER_SIZE, - }; - - // Serialize the datablock. - let data: Vec = block.serialize(); + let data = serialize_blocks(&mut self.buffer, capacity, &mut self.queue); // Transmit the data block. // TODO: Should we measure how many packets get dropped as telemetry? self.stack.send(&mut handle, &data).ok(); } - - self.queue.ready() } } diff --git a/src/net/mod.rs b/src/net/mod.rs index 5e4cb19..7d52bc4 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -135,7 +135,7 @@ where // Update the data stream. if self.generator.is_none() { - while self.stream.process() {} + self.stream.process(); } // Poll for incoming data. From eb968fb503c55a8697d72d0c894c94fdb00d6e5b Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 28 May 2021 19:37:28 +0200 Subject: [PATCH 09/29] Adding subsampling factor --- src/net/data_stream.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index cc61a28..87d572f 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -10,6 +10,8 @@ use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; // The number of data blocks that we will buffer in the queue. const BLOCK_BUFFER_SIZE: usize = 30; +const SUBSAMPLE_RATE: usize = 2; + pub fn setup_streaming( stack: NetworkReference, ) -> (BlockGenerator, DataStream) { @@ -28,7 +30,7 @@ pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue: AdcDacData, BLOCK_BUFFER_SIZE>) -> &'a [u8] { // While there is space in the buffer, serialize into it. - let block_size = (SAMPLE_BUFFER_SIZE * 2) * 2 * 2 + 8; + let block_size = (SAMPLE_BUFFER_SIZE / SUBSAMPLE_RATE * 2) * 2 * 2 + 8; // Truncate the buffer to the maximum buffer size. let buffer: &mut [u8] = if buffer.len() > max_buffer_size { @@ -54,7 +56,7 @@ pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue: }; enqueued_blocks += 1; - let length = block.to_slice(buf); + let length = block.to_slice(buf, SUBSAMPLE_RATE); assert!(length == block_size); } @@ -116,14 +118,15 @@ struct DataBlock { } impl DataBlock { - pub fn to_slice(self, buf: &mut [u8]) -> usize { + pub fn to_slice(self, buf: &mut [u8], subsample: usize) -> usize { + let block_size = self.block_size / subsample; buf[0..4].copy_from_slice(&self.block_id.to_be_bytes()); - buf[4..8].copy_from_slice(&self.block_size.to_be_bytes()); + buf[4..8].copy_from_slice(&block_size.to_be_bytes()); let mut offset: usize = 8; for device in &[self.adcs, self.dacs] { for channel in device { - for sample in channel { + for sample in channel.iter().step_by(subsample) { buf[offset..offset+2].copy_from_slice(&sample.to_be_bytes()); offset += 2; } From 5e2d2beeace258daefc12a6c84d8d3a773766f33 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 28 May 2021 19:38:03 +0200 Subject: [PATCH 10/29] Updating lockfile --- Cargo.lock | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c77636..f641658 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -561,23 +561,6 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58" -[[package]] -name = "postcard" -version = "0.6.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66acf3cf8ab62785852a0f67ae3bfcd38324da6561e52e7f4a049ca555c6d55e" -dependencies = [ - "heapless 0.6.1", - "postcard-cobs", - "serde", -] - -[[package]] -name = "postcard-cobs" -version = "0.1.5-pre" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f" - [[package]] name = "ppv-lite86" version = "0.2.10" @@ -796,7 +779,6 @@ dependencies = [ "nb 1.0.0", "num_enum", "paste", - "postcard", "rtt-logger", "rtt-target", "serde", From 04f61db6f2cf8d03b8937cd734c274300e7321da Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 31 May 2021 14:06:02 +0200 Subject: [PATCH 11/29] Adding functional prototype --- Cargo.lock | 2 +- src/hardware/configuration.rs | 8 ++++---- src/net/data_stream.rs | 13 +++++++------ 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f641658..9cf237e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -752,7 +752,7 @@ dependencies = [ [[package]] name = "smoltcp-nal" version = "0.1.0" -source = "git+https://github.com/quartiq/smoltcp-nal.git?branch=feature/udp-support#d387c79df56ba61af233846dbae3ae3bff601309" +source = "git+https://github.com/quartiq/smoltcp-nal.git?branch=feature/udp-support#bd90e9d1352e17cd0cc5406ea1d7a35be5761866" dependencies = [ "embedded-nal", "heapless 0.7.1", diff --git a/src/hardware/configuration.rs b/src/hardware/configuration.rs index 2a783f9..91f105d 100644 --- a/src/hardware/configuration.rs +++ b/src/hardware/configuration.rs @@ -17,7 +17,7 @@ use super::{ DigitalInput0, DigitalInput1, EthernetPhy, NetworkStack, AFE0, AFE1, }; -const NUM_TCP_SOCKETS: usize = 5; +const NUM_TCP_SOCKETS: usize = 4; const NUM_UDP_SOCKETS: usize = 1; const NUM_SOCKETS: usize = NUM_UDP_SOCKETS + NUM_TCP_SOCKETS; @@ -42,7 +42,7 @@ pub struct NetStorage { pub struct UdpSocketStorage { rx_storage: [u8; 1024], - tx_storage: [u8; 1024], + tx_storage: [u8; 2048], tx_metadata: [smoltcp::storage::PacketMetadata; 10], rx_metadata: [smoltcp::storage::PacketMetadata; 10], } @@ -51,7 +51,7 @@ impl UdpSocketStorage { const fn new() -> Self { Self { rx_storage: [0; 1024], - tx_storage: [0; 1024], + tx_storage: [0; 2048], tx_metadata: [smoltcp::storage::PacketMetadata::::EMPTY; 10], rx_metadata: [smoltcp::storage::PacketMetadata::::EMPTY; 10], } @@ -82,7 +82,7 @@ impl NetStorage { )], neighbor_cache: [None; 8], routes_cache: [None; 8], - sockets: [None, None, None, None, None, None, None], + sockets: [None, None, None, None, None, None], tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS], udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS], dhcp_tx_storage: [0; 600], diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 87d572f..3f51450 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -10,7 +10,7 @@ use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; // The number of data blocks that we will buffer in the queue. const BLOCK_BUFFER_SIZE: usize = 30; -const SUBSAMPLE_RATE: usize = 2; +const SUBSAMPLE_RATE: usize = 1; pub fn setup_streaming( stack: NetworkReference, @@ -96,9 +96,9 @@ impl BlockGenerator { self.current_id = self.current_id.wrapping_add(1); - // Note(unwrap): The buffering of the queue and processing of blocks must be fast enough - // such that blocks will never be silently dropped. - self.queue.enqueue(block).unwrap(); + // Note: We silently ignore dropped blocks here. The queue can fill up if the service + // routing isn't being called often enough. + self.queue.enqueue(block).ok(); } } @@ -173,7 +173,6 @@ impl DataStream { _ => () })?; - // TODO: How should we handle a connection failure? self.stack.connect(&mut socket, remote).unwrap(); // Note(unwrap): The socket will be empty before we replace it. @@ -211,7 +210,9 @@ impl DataStream { if self.queue.ready() { let mut handle = self.socket.borrow_mut().unwrap(); - let capacity = self.stack.lock(|stack| stack.get_remaining_send_buffer(handle.handle)).unwrap(); + let capacity = self.stack.lock(|stack| stack.with_udp_socket(handle, |socket| { + socket.payload_send_capacity() + })).unwrap(); let data = serialize_blocks(&mut self.buffer, capacity, &mut self.queue); From 14bfbbe2a1e14cf0c5a3824e72b2aec712dd29fb Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 31 May 2021 14:28:57 +0200 Subject: [PATCH 12/29] Refactoring link status polling --- src/bin/dual-iir.rs | 11 ++++++++++- src/bin/lockin.rs | 11 ++++++++++- src/net/mod.rs | 8 +++----- src/net/network_processor.rs | 38 ++++++++++++++++++------------------ 4 files changed, 42 insertions(+), 26 deletions(-) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index a6017fb..aa2d305 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -69,7 +69,7 @@ const APP: () = { iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2], } - #[init(spawn=[telemetry, settings_update])] + #[init(spawn=[telemetry, settings_update, ethernet_link])] fn init(c: init::Context) -> init::LateResources { // Configure the microcontroller let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); @@ -91,6 +91,9 @@ const APP: () = { c.spawn.settings_update().unwrap(); c.spawn.telemetry().unwrap(); + // Spawn the ethernet link period check task. + c.spawn.ethernet_link().unwrap(); + // Enable ADC/DAC events stabilizer.adcs.0.start(); stabilizer.adcs.1.start(); @@ -227,6 +230,12 @@ const APP: () = { .unwrap(); } + #[task(priority = 1, resources=[network], schedule=[ethernet_link])] + fn ethernet_link(c: ethernet_link::Context) { + c.resources.network.processor.handle_link(); + c.schedule.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)).unwrap(); + } + #[task(binds = ETH, priority = 1)] fn eth(_: eth::Context) { unsafe { stm32h7xx_hal::ethernet::interrupt_handler() } diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index 4864760..6c95959 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -94,7 +94,7 @@ const APP: () = { lockin: Lockin<4>, } - #[init(spawn=[settings_update, telemetry])] + #[init(spawn=[settings_update, telemetry, ethernet_link])] fn init(c: init::Context) -> init::LateResources { // Configure the microcontroller let (mut stabilizer, _pounder) = setup(c.core, c.device); @@ -118,6 +118,9 @@ const APP: () = { c.spawn.settings_update().unwrap(); c.spawn.telemetry().unwrap(); + // Spawn the ethernet link servicing task. + c.spawn.ethernet_link().unwrap(); + // Enable ADC/DAC events stabilizer.adcs.0.start(); stabilizer.adcs.1.start(); @@ -298,6 +301,12 @@ const APP: () = { .unwrap(); } + #[task(priority = 1, resources=[network], schedule=[ethernet_link])] + fn ethernet_link(c: ethernet_link::Context) { + c.resources.network.processor.handle_link(); + c.schedule.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)).unwrap(); + } + #[task(binds = ETH, priority = 1)] fn eth(_: eth::Context) { unsafe { stm32h7xx_hal::ethernet::interrupt_handler() } diff --git a/src/net/mod.rs b/src/net/mod.rs index 7c31bf0..0494db5 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -47,7 +47,7 @@ pub enum NetworkState { /// A structure of Stabilizer's default network users. pub struct NetworkUsers { pub miniconf: MiniconfClient, - processor: NetworkProcessor, + pub processor: NetworkProcessor, stream: DataStream, generator: Option, pub telemetry: TelemetryClient, @@ -143,12 +143,10 @@ where UpdateState::Updated => NetworkState::Updated, }; - let result = match self.miniconf.update() { + match self.miniconf.update() { UpdateState::Updated => NetworkState::SettingsChanged, UpdateState::NoChange => poll_result, - }; - - result + } } } diff --git a/src/net/network_processor.rs b/src/net/network_processor.rs index af00606..c5f37bf 100644 --- a/src/net/network_processor.rs +++ b/src/net/network_processor.rs @@ -37,9 +37,25 @@ impl NetworkProcessor { } } - pub fn egress(&mut self) { - let now = self.clock.current_ms(); - self.stack.lock(|stack| stack.poll(now)).ok(); + /// Handle ethernet link connection status. + /// + /// # Note + /// This may take non-trivial amounts of time to communicate with the PHY. As such, this should + /// only be called as often as necessary (e.g. once per second or so). + pub fn handle_link(&mut self) { + // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network + // stack. + match self.phy.poll_link() { + true => self.network_was_reset = false, + + // Only reset the network stack once per link reconnection. This prevents us from + // sending an excessive number of DHCP requests. + false if !self.network_was_reset => { + self.network_was_reset = true; + self.stack.lock(|stack| stack.handle_link_reset()); + } + _ => {} + }; } /// Process and update the state of the network. @@ -63,22 +79,6 @@ impl NetworkProcessor { } }; - // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network - // stack. - // TODO: Poll the link state in a task and handle resets. Polling this often is slow and - // uses necessary CPU time. - //match self.phy.poll_link() { - // true => self.network_was_reset = false, - - // // Only reset the network stack once per link reconnection. This prevents us from - // // sending an excessive number of DHCP requests. - // false if !self.network_was_reset => { - // self.network_was_reset = true; - // self.stack.lock(|stack| stack.handle_link_reset()); - // } - // _ => {} - //}; - result } } From b292cf45ab9eb2cc127c2c0053465227b2d02af1 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 9 Jun 2021 12:52:13 +0200 Subject: [PATCH 13/29] Fixing merge --- src/bin/dual-iir.rs | 3 ++- src/net/data_stream.rs | 2 +- src/net/mod.rs | 2 ++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 131687b..696fc07 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -152,6 +152,7 @@ const APP: () = { ref settings, ref mut iir_state, ref mut telemetry, + ref mut generator, } = c.resources; let digital_inputs = [ @@ -192,7 +193,7 @@ const APP: () = { } // Stream the data. - c.resources.generator.send(&adc_samples, &dac_samples); + generator.send(&adc_samples, &dac_samples); // Update telemetry measurements. telemetry.adcs = diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 3f51450..1309735 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -85,7 +85,7 @@ impl BlockGenerator { pub fn send( &mut self, - adcs: &[&[u16; SAMPLE_BUFFER_SIZE]; 2], + adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], ) { let block = AdcDacData { diff --git a/src/net/mod.rs b/src/net/mod.rs index b9ceece..e781194 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -22,11 +22,13 @@ use miniconf_client::MiniconfClient; use network_processor::NetworkProcessor; use shared::NetworkManager; use telemetry::TelemetryClient; +use data_stream::{DataStream, BlockGenerator}; use core::fmt::Write; use heapless::String; use miniconf::Miniconf; use serde::Serialize; +use smoltcp_nal::embedded_nal::SocketAddr; pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>; From b5fdb31a0273aa673a520320bd5ebeaab44b9caf Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 9 Jun 2021 13:26:41 +0200 Subject: [PATCH 14/29] Adding internal buffering to data stream blocks --- src/bin/dual-iir.rs | 12 +++-- src/bin/lockin.rs | 4 +- src/hardware/setup.rs | 19 +++++-- src/net/data_stream.rs | 118 ++++++++++++++++++++++++++++------------- src/net/mod.rs | 4 +- 5 files changed, 108 insertions(+), 49 deletions(-) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 696fc07..0c3ad3e 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -18,11 +18,11 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ + data_stream::BlockGenerator, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, NetworkState, NetworkUsers, - data_stream::BlockGenerator, }, }; @@ -215,7 +215,7 @@ const APP: () = { c.spawn.settings_update().unwrap() } NetworkState::Updated => {} - NetworkState::NoChange => {}, + NetworkState::NoChange => {} } } } @@ -241,7 +241,9 @@ const APP: () = { .settings .lock(|settings| (settings.afe, settings.telemetry_period)); - c.resources.network.telemetry + c.resources + .network + .telemetry .publish(&telemetry.finalize(gains[0], gains[1])); // Schedule the telemetry task in the future. @@ -256,7 +258,9 @@ const APP: () = { #[task(priority = 1, resources=[network], schedule=[ethernet_link])] fn ethernet_link(c: ethernet_link::Context) { c.resources.network.processor.handle_link(); - c.schedule.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)).unwrap(); + c.schedule + .ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)) + .unwrap(); } #[task(binds = ETH, priority = 1)] diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index c504105..9263e2c 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -319,7 +319,9 @@ const APP: () = { #[task(priority = 1, resources=[network], schedule=[ethernet_link])] fn ethernet_link(c: ethernet_link::Context) { c.resources.network.processor.handle_link(); - c.schedule.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)).unwrap(); + c.schedule + .ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1)) + .unwrap(); } #[task(binds = ETH, priority = 1)] diff --git a/src/hardware/setup.rs b/src/hardware/setup.rs index dffb9c2..83f9b87 100644 --- a/src/hardware/setup.rs +++ b/src/hardware/setup.rs @@ -44,8 +44,10 @@ pub struct NetStorage { pub struct UdpSocketStorage { rx_storage: [u8; 1024], tx_storage: [u8; 2048], - tx_metadata: [smoltcp::storage::PacketMetadata; 10], - rx_metadata: [smoltcp::storage::PacketMetadata; 10], + tx_metadata: + [smoltcp::storage::PacketMetadata; 10], + rx_metadata: + [smoltcp::storage::PacketMetadata; 10], } impl UdpSocketStorage { @@ -53,8 +55,12 @@ impl UdpSocketStorage { Self { rx_storage: [0; 1024], tx_storage: [0; 2048], - tx_metadata: [smoltcp::storage::PacketMetadata::::EMPTY; 10], - rx_metadata: [smoltcp::storage::PacketMetadata::::EMPTY; 10], + tx_metadata: [smoltcp::storage::PacketMetadata::< + smoltcp::wire::IpEndpoint, + >::EMPTY; 10], + rx_metadata: [smoltcp::storage::PacketMetadata::< + smoltcp::wire::IpEndpoint, + >::EMPTY; 10], } } } @@ -252,7 +258,10 @@ pub fn setup( let gpiof = device.GPIOF.split(ccdr.peripheral.GPIOF); let mut gpiog = device.GPIOG.split(ccdr.peripheral.GPIOG); - let _uart_tx = gpiod.pd8.into_push_pull_output().set_speed(hal::gpio::Speed::VeryHigh); + let _uart_tx = gpiod + .pd8 + .into_push_pull_output() + .set_speed(hal::gpio::Speed::VeryHigh); let dma_streams = hal::dma::dma::StreamsTuple::new(device.DMA1, ccdr.peripheral.DMA1); diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 1309735..b323a26 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -1,12 +1,14 @@ use core::borrow::BorrowMut; -use heapless::{ - spsc::{Consumer, Producer, Queue}, -}; +use heapless::spsc::{Consumer, Producer, Queue}; use smoltcp_nal::embedded_nal::{SocketAddr, UdpClientStack}; use super::NetworkReference; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; +// The number of samples contained in a single block. Note that each sample corresponds ot 8 byte +// s(2 bytes per ADC/DAC code, 4 codes total). +const BLOCK_SAMPLE_SIZE: usize = 50; + // The number of data blocks that we will buffer in the queue. const BLOCK_BUFFER_SIZE: usize = 30; @@ -26,8 +28,11 @@ pub fn setup_streaming( (generator, stream) } -pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue: &mut Consumer<'static, - AdcDacData, BLOCK_BUFFER_SIZE>) -> &'a [u8] { +fn serialize_blocks<'a>( + buffer: &'a mut [u8], + max_buffer_size: usize, + queue: &mut Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, +) -> &'a [u8] { // While there is space in the buffer, serialize into it. let block_size = (SAMPLE_BUFFER_SIZE / SUBSAMPLE_RATE * 2) * 2 * 2 + 8; @@ -63,23 +68,31 @@ pub fn serialize_blocks<'a>(buffer: &'a mut [u8], max_buffer_size: usize, queue: &buffer[..block_size * enqueued_blocks] } -#[derive(Debug)] +#[derive(Debug, Copy, Clone)] pub struct AdcDacData { block_id: u32, - adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], - dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + adcs: [[u16; BLOCK_SAMPLE_SIZE]; 2], + dacs: [[u16; BLOCK_SAMPLE_SIZE]; 2], } pub struct BlockGenerator { queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, - current_id: u32, + current_block: AdcDacData, + num_samples: usize, } impl BlockGenerator { - pub fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self { + pub fn new( + queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + ) -> Self { Self { queue, - current_id: 0, + current_block: AdcDacData { + block_id: 0, + adcs: [[0; BLOCK_SAMPLE_SIZE]; 2], + dacs: [[0; BLOCK_SAMPLE_SIZE]; 2], + }, + num_samples: 0, } } @@ -88,17 +101,47 @@ impl BlockGenerator { adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], ) { - let block = AdcDacData { - block_id: self.current_id, - adcs: [*adcs[0], *adcs[1]], - dacs: [*dacs[0], *dacs[1]], - }; + let mut processed_samples = 0; - self.current_id = self.current_id.wrapping_add(1); + while processed_samples < SAMPLE_BUFFER_SIZE { + let remaining_samples = SAMPLE_BUFFER_SIZE - processed_samples; + let free_space = BLOCK_SAMPLE_SIZE - self.num_samples; + let copy_sample_length = if remaining_samples < free_space { + remaining_samples + } else { + free_space + }; - // Note: We silently ignore dropped blocks here. The queue can fill up if the service - // routing isn't being called often enough. - self.queue.enqueue(block).ok(); + let start_src = self.num_samples; + let end_src = start_src + copy_sample_length; + + let start_dst = processed_samples; + let end_dst = start_dst + copy_sample_length; + + self.current_block.adcs[0][start_src..end_src] + .copy_from_slice(&adcs[0][start_dst..end_dst]); + self.current_block.adcs[1][start_src..end_src] + .copy_from_slice(&adcs[1][start_dst..end_dst]); + self.current_block.dacs[0][start_src..end_src] + .copy_from_slice(&dacs[0][start_dst..end_dst]); + self.current_block.dacs[1][start_src..end_src] + .copy_from_slice(&dacs[1][start_dst..end_dst]); + + self.num_samples += copy_sample_length; + + // If the data block is full, push it onto the queue. + if self.num_samples == BLOCK_SAMPLE_SIZE { + // Note: We silently ignore dropped blocks here. The queue can fill up if the + // service routing isn't being called often enough. + self.queue.enqueue(self.current_block).ok(); + + self.current_block.block_id = + self.current_block.block_id.wrapping_add(1); + self.num_samples = 0; + } + + processed_samples += copy_sample_length; + } } } @@ -113,8 +156,8 @@ pub struct DataStream { struct DataBlock { block_id: u32, block_size: usize, - adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], - dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + adcs: [[u16; BLOCK_SAMPLE_SIZE]; 2], + dacs: [[u16; BLOCK_SAMPLE_SIZE]; 2], } impl DataBlock { @@ -127,7 +170,8 @@ impl DataBlock { for device in &[self.adcs, self.dacs] { for channel in device { for sample in channel.iter().step_by(subsample) { - buf[offset..offset+2].copy_from_slice(&sample.to_be_bytes()); + buf[offset..offset + 2] + .copy_from_slice(&sample.to_be_bytes()); offset += 2; } } @@ -135,7 +179,6 @@ impl DataBlock { offset } - } impl DataStream { @@ -165,13 +208,10 @@ impl DataStream { self.close(); } - let mut socket = - self.stack - .socket() - .map_err(|err| match err { - ::Error::NoIpAddress => (), - _ => () - })?; + let mut socket = self.stack.socket().map_err(|err| match err { + ::Error::NoIpAddress => (), + _ => (), + })?; self.stack.connect(&mut socket, remote).unwrap(); @@ -199,7 +239,6 @@ impl DataStream { if self.socket.is_none() && self.remote.is_some() { // If we still can't open the remote, continue. if self.open(self.remote.unwrap()).is_err() { - // Clear the queue out. while self.queue.ready() { self.queue.dequeue(); @@ -210,14 +249,19 @@ impl DataStream { if self.queue.ready() { let mut handle = self.socket.borrow_mut().unwrap(); - let capacity = self.stack.lock(|stack| stack.with_udp_socket(handle, |socket| { - socket.payload_send_capacity() - })).unwrap(); + let capacity = self + .stack + .lock(|stack| { + stack.with_udp_socket(handle, |socket| { + socket.payload_send_capacity() + }) + }) + .unwrap(); - let data = serialize_blocks(&mut self.buffer, capacity, &mut self.queue); + let data = + serialize_blocks(&mut self.buffer, capacity, &mut self.queue); // Transmit the data block. - // TODO: Should we measure how many packets get dropped as telemetry? self.stack.send(&mut handle, &data).ok(); } } diff --git a/src/net/mod.rs b/src/net/mod.rs index e781194..70d7dfe 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -9,20 +9,20 @@ pub use heapless; pub use miniconf; pub use serde; +pub mod data_stream; pub mod messages; pub mod miniconf_client; pub mod network_processor; pub mod shared; pub mod telemetry; -pub mod data_stream; use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack}; +use data_stream::{BlockGenerator, DataStream}; use messages::{MqttMessage, SettingsResponse}; use miniconf_client::MiniconfClient; use network_processor::NetworkProcessor; use shared::NetworkManager; use telemetry::TelemetryClient; -use data_stream::{DataStream, BlockGenerator}; use core::fmt::Write; use heapless::String; From 92c84a6bfeb5f89466d10292958dc3bedf8a6c24 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 9 Jun 2021 15:25:59 +0200 Subject: [PATCH 15/29] Allowing target to be runtime configurable --- src/bin/dual-iir.rs | 18 +++++++++++++----- src/net/data_stream.rs | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 51 insertions(+), 6 deletions(-) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 0c3ad3e..3045802 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -18,7 +18,7 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ - data_stream::BlockGenerator, + data_stream::{BlockGenerator, StreamTarget}, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, @@ -38,6 +38,7 @@ pub struct Settings { allow_hold: bool, force_hold: bool, telemetry_period: u16, + stream_target: StreamTarget, } impl Default for Settings { @@ -57,6 +58,8 @@ impl Default for Settings { force_hold: false, // The default telemetry period in seconds. telemetry_period: 10, + + stream_target: StreamTarget::default(), } } } @@ -92,10 +95,12 @@ const APP: () = { stabilizer.net.mac_address, ); - // TODO: Remove unwrap. - let remote: smoltcp_nal::embedded_nal::SocketAddr = - "10.35.16.10:1111".parse().unwrap(); - let generator = network.enable_streaming(remote.into()); + let generator = { + use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr}; + let remote = + SocketAddr::new(IpAddr::V4(Ipv4Addr::unspecified()), 0); + network.enable_streaming(remote.into()) + }; // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); @@ -229,6 +234,9 @@ const APP: () = { // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.1.set_gain(settings.afe[1]); + + let target = settings.stream_target.into(); + c.resources.network.direct_stream(target); } #[task(priority = 1, resources=[network, settings, telemetry], schedule=[telemetry])] diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index b323a26..72f0967 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -1,6 +1,8 @@ use core::borrow::BorrowMut; use heapless::spsc::{Consumer, Producer, Queue}; -use smoltcp_nal::embedded_nal::{SocketAddr, UdpClientStack}; +use miniconf::MiniconfAtomic; +use serde::Deserialize; +use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; use super::NetworkReference; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; @@ -14,6 +16,32 @@ const BLOCK_BUFFER_SIZE: usize = 30; const SUBSAMPLE_RATE: usize = 1; +#[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize)] +pub struct StreamTarget { + pub ip: [u8; 4], + pub port: u16, +} + +impl Default for StreamTarget { + fn default() -> Self { + Self { + ip: [0; 4], + port: 0, + } + } +} + +impl Into for StreamTarget { + fn into(self) -> SocketAddr { + SocketAddr::new( + IpAddr::V4(Ipv4Addr::new( + self.ip[0], self.ip[1], self.ip[2], self.ip[3], + )), + self.port, + ) + } +} + pub fn setup_streaming( stack: NetworkReference, ) -> (BlockGenerator, DataStream) { @@ -208,6 +236,15 @@ impl DataStream { self.close(); } + // If the remote address is unspecified, just close the existing socket. + if remote.ip().is_unspecified() { + if self.socket.is_some() { + self.close(); + } + + return Err(()); + } + let mut socket = self.stack.socket().map_err(|err| match err { ::Error::NoIpAddress => (), _ => (), From 9bc351109cb48ed3e569547720b6bbb880c82ffa Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 9 Jun 2021 15:30:33 +0200 Subject: [PATCH 16/29] Reverting unintended changes --- Cargo.toml | 2 +- src/bin/dual-iir.rs | 9 ++------- src/hardware/setup.rs | 5 ----- 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c809ed8..3e6b93c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ pounder_v1_1 = [ ] [profile.dev] codegen-units = 1 incremental = false -opt-level = 1 +opt-level = 3 [profile.release] opt-level = 3 diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 3045802..b7bf202 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -95,12 +95,7 @@ const APP: () = { stabilizer.net.mac_address, ); - let generator = { - use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr}; - let remote = - SocketAddr::new(IpAddr::V4(Ipv4Addr::unspecified()), 0); - network.enable_streaming(remote.into()) - }; + let generator = network.enable_streaming(StreamTarget::default().into()); // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); @@ -220,7 +215,7 @@ const APP: () = { c.spawn.settings_update().unwrap() } NetworkState::Updated => {} - NetworkState::NoChange => {} + NetworkState::NoChange => cortex_m::asm::wfi(), } } } diff --git a/src/hardware/setup.rs b/src/hardware/setup.rs index 83f9b87..8436bf0 100644 --- a/src/hardware/setup.rs +++ b/src/hardware/setup.rs @@ -258,11 +258,6 @@ pub fn setup( let gpiof = device.GPIOF.split(ccdr.peripheral.GPIOF); let mut gpiog = device.GPIOG.split(ccdr.peripheral.GPIOG); - let _uart_tx = gpiod - .pd8 - .into_push_pull_output() - .set_speed(hal::gpio::Speed::VeryHigh); - let dma_streams = hal::dma::dma::StreamsTuple::new(device.DMA1, ccdr.peripheral.DMA1); From b9284451e42ef948eb80d5093354765e8f094cf7 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 9 Jun 2021 15:31:00 +0200 Subject: [PATCH 17/29] Fixing format --- src/bin/dual-iir.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index b7bf202..e6c47d3 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -95,7 +95,8 @@ const APP: () = { stabilizer.net.mac_address, ); - let generator = network.enable_streaming(StreamTarget::default().into()); + let generator = + network.enable_streaming(StreamTarget::default().into()); // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); From b40ca17feaf80ed1c8776df96861110ebe7caabf Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 11 Jun 2021 16:36:19 +0200 Subject: [PATCH 18/29] Updating stream methodology --- src/net/data_stream.rs | 220 ++++++++++++++++++----------------------- 1 file changed, 97 insertions(+), 123 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 72f0967..71f087e 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -7,10 +7,6 @@ use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack}; use super::NetworkReference; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; -// The number of samples contained in a single block. Note that each sample corresponds ot 8 byte -// s(2 bytes per ADC/DAC code, 4 codes total). -const BLOCK_SAMPLE_SIZE: usize = 50; - // The number of data blocks that we will buffer in the queue. const BLOCK_BUFFER_SIZE: usize = 30; @@ -56,57 +52,16 @@ pub fn setup_streaming( (generator, stream) } -fn serialize_blocks<'a>( - buffer: &'a mut [u8], - max_buffer_size: usize, - queue: &mut Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, -) -> &'a [u8] { - // While there is space in the buffer, serialize into it. - - let block_size = (SAMPLE_BUFFER_SIZE / SUBSAMPLE_RATE * 2) * 2 * 2 + 8; - - // Truncate the buffer to the maximum buffer size. - let buffer: &mut [u8] = if buffer.len() > max_buffer_size { - &mut buffer[..max_buffer_size] - } else { - buffer - }; - - // Serialize blocks into the buffer until either the buffer or the queue are exhausted. - let mut enqueued_blocks: usize = 0; - for buf in buffer.chunks_exact_mut(block_size) { - // If there are no more blocks, return the serialized data. - let data = match queue.dequeue() { - Some(data) => data, - None => break, - }; - - let block = DataBlock { - adcs: data.adcs, - dacs: data.dacs, - block_id: data.block_id, - block_size: SAMPLE_BUFFER_SIZE, - }; - - enqueued_blocks += 1; - let length = block.to_slice(buf, SUBSAMPLE_RATE); - assert!(length == block_size); - } - - &buffer[..block_size * enqueued_blocks] -} - #[derive(Debug, Copy, Clone)] pub struct AdcDacData { - block_id: u32, - adcs: [[u16; BLOCK_SAMPLE_SIZE]; 2], - dacs: [[u16; BLOCK_SAMPLE_SIZE]; 2], + block_id: u16, + adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], } pub struct BlockGenerator { queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, - current_block: AdcDacData, - num_samples: usize, + current_id: u16, } impl BlockGenerator { @@ -115,12 +70,7 @@ impl BlockGenerator { ) -> Self { Self { queue, - current_block: AdcDacData { - block_id: 0, - adcs: [[0; BLOCK_SAMPLE_SIZE]; 2], - dacs: [[0; BLOCK_SAMPLE_SIZE]; 2], - }, - num_samples: 0, + current_id: 0, } } @@ -129,47 +79,14 @@ impl BlockGenerator { adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], ) { - let mut processed_samples = 0; + let block = AdcDacData { + block_id: self.current_id, + adcs: [*adcs[0], *adcs[1]], + dacs: [*dacs[0], *dacs[1]], + }; - while processed_samples < SAMPLE_BUFFER_SIZE { - let remaining_samples = SAMPLE_BUFFER_SIZE - processed_samples; - let free_space = BLOCK_SAMPLE_SIZE - self.num_samples; - let copy_sample_length = if remaining_samples < free_space { - remaining_samples - } else { - free_space - }; - - let start_src = self.num_samples; - let end_src = start_src + copy_sample_length; - - let start_dst = processed_samples; - let end_dst = start_dst + copy_sample_length; - - self.current_block.adcs[0][start_src..end_src] - .copy_from_slice(&adcs[0][start_dst..end_dst]); - self.current_block.adcs[1][start_src..end_src] - .copy_from_slice(&adcs[1][start_dst..end_dst]); - self.current_block.dacs[0][start_src..end_src] - .copy_from_slice(&dacs[0][start_dst..end_dst]); - self.current_block.dacs[1][start_src..end_src] - .copy_from_slice(&dacs[1][start_dst..end_dst]); - - self.num_samples += copy_sample_length; - - // If the data block is full, push it onto the queue. - if self.num_samples == BLOCK_SAMPLE_SIZE { - // Note: We silently ignore dropped blocks here. The queue can fill up if the - // service routing isn't being called often enough. - self.queue.enqueue(self.current_block).ok(); - - self.current_block.block_id = - self.current_block.block_id.wrapping_add(1); - self.num_samples = 0; - } - - processed_samples += copy_sample_length; - } + self.current_id = self.current_id.wrapping_add(1); + self.queue.enqueue(block).ok(); } } @@ -181,31 +98,86 @@ pub struct DataStream { buffer: [u8; 1024], } -struct DataBlock { - block_id: u32, - block_size: usize, - adcs: [[u16; BLOCK_SAMPLE_SIZE]; 2], - dacs: [[u16; BLOCK_SAMPLE_SIZE]; 2], +// Datapacket format: +// +// Header: +// [0..2]: Start block ID (u16) +// [2..3]: Num Blocks present (u8) +// [3..4]: Batch Size (u8) +// +// Following the header, batches are added sequentially. Each batch takes the form of: +// [*0..*2]: ADC0 +// [*2..*4]: ADC1 +// [*4..*6]: DAC0 +// [*6..*8]: DAC1 +struct DataPacket<'a> { + buf: &'a mut [u8], + subsample_rate: usize, + start_id: Option, + num_blocks: u8, + write_index: usize, } -impl DataBlock { - pub fn to_slice(self, buf: &mut [u8], subsample: usize) -> usize { - let block_size = self.block_size / subsample; - buf[0..4].copy_from_slice(&self.block_id.to_be_bytes()); - buf[4..8].copy_from_slice(&block_size.to_be_bytes()); +impl<'a> DataPacket<'a> { + pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self { + Self { + buf, + start_id: None, + num_blocks: 0, + subsample_rate, + write_index: 4, + } + } - let mut offset: usize = 8; - for device in &[self.adcs, self.dacs] { + pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> { + // Check that the block is sequential. + if let Some(id) = &self.start_id { + if batch.block_id != id.wrapping_add(self.num_blocks.into()) { + return Err(()); + } + } else { + // Otherwise, this is the first block. Record the strt ID. + self.start_id = Some(batch.block_id); + } + + // Check that there is space for the block. + let block_size_bytes = SAMPLE_BUFFER_SIZE / self.subsample_rate * 4 * 2; + if self.buf.len() - self.get_packet_size() < block_size_bytes { + return Err(()); + } + + // Copy the samples into the buffer. + for device in &[batch.adcs, batch.dacs] { for channel in device { - for sample in channel.iter().step_by(subsample) { - buf[offset..offset + 2] + for sample in channel.iter().step_by(self.subsample_rate) { + self.buf[self.write_index..self.write_index + 2] .copy_from_slice(&sample.to_be_bytes()); - offset += 2; + self.write_index += 2; } } } - offset + Ok(()) + } + + fn get_packet_size(&self) -> usize { + let header_length = 4; + let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; + let block_size_bytes = block_sample_size * 2 * 4; + + block_size_bytes * self.num_blocks as usize + header_length + } + + pub fn finish(self) -> usize { + let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; + + // Write the header into the block. + self.buf[0..2].copy_from_slice(&self.start_id.unwrap().to_be_bytes()); + self.buf[2] = self.num_blocks; + self.buf[3] = block_sample_size as u8; + + // Return the length of the packet to transmit. + self.get_packet_size() } } @@ -285,21 +257,23 @@ impl DataStream { } if self.queue.ready() { - let mut handle = self.socket.borrow_mut().unwrap(); - let capacity = self - .stack - .lock(|stack| { - stack.with_udp_socket(handle, |socket| { - socket.payload_send_capacity() - }) - }) - .unwrap(); + // Dequeue data from the queue into a larger block structure. + let mut packet = DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE); + while self.queue.ready() { + // Note(unwrap): We check above that the queue is ready before calling this. + if packet.add_batch(self.queue.peek().unwrap()).is_err() { + // If we cannot add another batch, break out of the loop and send the packet. + break; + } - let data = - serialize_blocks(&mut self.buffer, capacity, &mut self.queue); + // Remove the batch that we just added. + self.queue.dequeue(); + } // Transmit the data block. - self.stack.send(&mut handle, &data).ok(); + let mut handle = self.socket.borrow_mut().unwrap(); + let size = packet.finish(); + self.stack.send(&mut handle, &self.buffer[..size]).ok(); } } } From 5a947e459cf471a52d900b897287c3c8c6b75425 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Fri, 11 Jun 2021 17:52:11 +0200 Subject: [PATCH 19/29] Finalizing updates after testing --- src/net/data_stream.rs | 4 ++-- src/net/network_processor.rs | 5 +---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 71f087e..545abcb 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -157,6 +157,8 @@ impl<'a> DataPacket<'a> { } } + self.num_blocks += 1; + Ok(()) } @@ -199,8 +201,6 @@ impl DataStream { // Note(unwrap): We guarantee that the socket is available above. let socket = self.socket.take().unwrap(); self.stack.close(socket).unwrap(); - - log::info!("Stream Disconnecting"); } fn open(&mut self, remote: SocketAddr) -> Result<(), ()> { diff --git a/src/net/network_processor.rs b/src/net/network_processor.rs index 43d75e0..13f7035 100644 --- a/src/net/network_processor.rs +++ b/src/net/network_processor.rs @@ -73,10 +73,7 @@ impl NetworkProcessor { let result = match self.stack.lock(|stack| stack.poll(now)) { Ok(true) => UpdateState::Updated, Ok(false) => UpdateState::NoChange, - Err(err) => { - log::info!("Network error: {:?}", err); - UpdateState::Updated - } + Err(_) => UpdateState::Updated, }; result From 8d1f2cf9b683f15c47ef7ed64d07d976ec6f05dd Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 15 Jun 2021 12:51:42 +0200 Subject: [PATCH 20/29] Updating versions --- Cargo.lock | 13 ++++++------- Cargo.toml | 4 ++-- src/hardware/setup.rs | 34 +++++----------------------------- 3 files changed, 13 insertions(+), 38 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6127f2..7ff3b35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -389,9 +389,9 @@ dependencies = [ [[package]] name = "managed" -version = "0.7.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c75de51135344a4f8ed3cfe2720dc27736f7711989703a0b43aadf3753c55577" +checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d" [[package]] name = "matrixmultiply" @@ -743,9 +743,8 @@ dependencies = [ [[package]] name = "smoltcp" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11b5647cc4676e9358e6b15b6536b34e5b413e5ae946a06b3f85e713132bcdfa" +version = "0.8.0" +source = "git+https://github.com/smoltcp-rs/smoltcp?branch=master#027f255f904b9b7c4226cfd8b2d31f272ffa5105" dependencies = [ "bitflags", "byteorder", @@ -755,7 +754,7 @@ dependencies = [ [[package]] name = "smoltcp-nal" version = "0.1.0" -source = "git+https://github.com/quartiq/smoltcp-nal.git?branch=feature/udp-support#bd90e9d1352e17cd0cc5406ea1d7a35be5761866" +source = "git+https://github.com/quartiq/smoltcp-nal.git?rev=2967c6e#2967c6e9abe580b207e742bb885a03845fa7344c" dependencies = [ "embedded-nal", "heapless 0.7.1", @@ -812,7 +811,7 @@ dependencies = [ [[package]] name = "stm32h7xx-hal" version = "0.9.0" -source = "git+https://github.com/quartiq/stm32h7xx-hal.git?rev=acd47be#acd47beb4b84b4dc46da3a8b68688bc8c5984604" +source = "git+https://github.com/quartiq/stm32h7xx-hal.git?branch=feature/smoltcp-update#191b1d50a8a4d956492649630efaf563f59e35bf" dependencies = [ "bare-metal 1.0.0", "cast", diff --git a/Cargo.toml b/Cargo.toml index 30d0d5e..a5e0ff0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ rev = "70b0eb5" features = ["stm32h743v", "rt", "unproven", "ethernet", "quadspi"] # version = "0.9.0" git = "https://github.com/quartiq/stm32h7xx-hal.git" -rev = "acd47be" +branch = "feature/smoltcp-update" # link.x section start/end [patch.crates-io.cortex-m-rt] @@ -70,7 +70,7 @@ rev = "2750533" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" -branch = "feature/udp-support" +rev = "2967c6e" [dependencies.minimq] git = "https://github.com/quartiq/minimq.git" diff --git a/src/hardware/setup.rs b/src/hardware/setup.rs index 4381e40..d599926 100644 --- a/src/hardware/setup.rs +++ b/src/hardware/setup.rs @@ -663,14 +663,14 @@ pub fn setup( let neighbor_cache = smoltcp::iface::NeighborCache::new(&mut store.neighbor_cache[..]); - let interface = smoltcp::iface::EthernetInterfaceBuilder::new(eth_dma) + let interface = smoltcp::iface::InterfaceBuilder::new(eth_dma) .ethernet_addr(mac_addr) .neighbor_cache(neighbor_cache) .ip_addrs(&mut store.ip_addrs[..]) .routes(routes) .finalize(); - let mut sockets = { + let sockets = { let mut sockets = smoltcp::socket::SocketSet::new(&mut store.sockets[..]); @@ -704,31 +704,11 @@ pub fn setup( sockets.add(udp_socket); } + sockets.add(smoltcp::socket::Dhcpv4Socket::new()); + sockets }; - let dhcp_client = { - let dhcp_rx_buffer = smoltcp::socket::RawSocketBuffer::new( - &mut store.dhcp_rx_metadata[..], - &mut store.dhcp_rx_storage[..], - ); - - let dhcp_tx_buffer = smoltcp::socket::RawSocketBuffer::new( - &mut store.dhcp_tx_metadata[..], - &mut store.dhcp_tx_storage[..], - ); - - smoltcp::dhcp::Dhcpv4Client::new( - &mut sockets, - dhcp_rx_buffer, - dhcp_tx_buffer, - // Smoltcp indicates that an instant with a negative time is indicative that time is - // not yet available. We can't get the current instant yet, so indicate an invalid - // time value. - smoltcp::time::Instant::from_millis(-1), - ) - }; - let random_seed = { let mut rng = device.RNG.constrain(ccdr.peripheral.RNG, &ccdr.clocks); @@ -737,11 +717,7 @@ pub fn setup( data }; - let mut stack = smoltcp_nal::NetworkStack::new( - interface, - sockets, - Some(dhcp_client), - ); + let mut stack = smoltcp_nal::NetworkStack::new(interface, sockets); stack.seed_random_port(&random_seed); From a54d855cbe66e7220b294e9bde615e21ca23b41b Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 15 Jun 2021 12:54:57 +0200 Subject: [PATCH 21/29] Updating HAL tag --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index a5e0ff0..68a2053 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ rev = "70b0eb5" features = ["stm32h743v", "rt", "unproven", "ethernet", "quadspi"] # version = "0.9.0" git = "https://github.com/quartiq/stm32h7xx-hal.git" -branch = "feature/smoltcp-update" +rev = "33aa67d" # link.x section start/end [patch.crates-io.cortex-m-rt] From 2815d6d9e9723fc1f1ca22f8a16f0a5e81e84795 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 15 Jun 2021 13:18:16 +0200 Subject: [PATCH 22/29] Updating docs --- Cargo.lock | 2 +- scripts/stream_throughput.py | 189 +++++++++++++++++++++++++++++++++++ src/bin/dual-iir.rs | 3 +- src/net/data_stream.rs | 152 ++++++++++++++++++++-------- src/net/mod.rs | 7 +- 5 files changed, 309 insertions(+), 44 deletions(-) create mode 100644 scripts/stream_throughput.py diff --git a/Cargo.lock b/Cargo.lock index 7ff3b35..e10c1e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -811,7 +811,7 @@ dependencies = [ [[package]] name = "stm32h7xx-hal" version = "0.9.0" -source = "git+https://github.com/quartiq/stm32h7xx-hal.git?branch=feature/smoltcp-update#191b1d50a8a4d956492649630efaf563f59e35bf" +source = "git+https://github.com/quartiq/stm32h7xx-hal.git?rev=33aa67d#33aa67d74790cb9f680a4f281b72df0664bcf03c" dependencies = [ "bare-metal 1.0.0", "cast", diff --git a/scripts/stream_throughput.py b/scripts/stream_throughput.py new file mode 100644 index 0000000..2981b0c --- /dev/null +++ b/scripts/stream_throughput.py @@ -0,0 +1,189 @@ +#!/usr/bin/python3 +""" +Author: Ryan Summers + +Description: Provides a mechanism for measuring Stabilizer stream data throughput. +""" +import socket +import collections +import struct +import time +import logging + +# Representation of a single UDP packet transmitted by Stabilizer. +Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac']) + +class Timer: + """ A basic timer for measuring elapsed time periods. """ + + def __init__(self, period=1.0): + """ Create the timer with the provided period. """ + self.start_time = time.time() + self.trigger_time = self.start_time + period + self.period = period + self.started = False + + + def is_triggered(self): + """ Check if the timer period has elapsed. """ + now = time.time() + return now >= self.trigger_time + + + def start(self): + """ Start the timer. """ + self.start_time = time.time() + self.started = True + + + def is_started(self): + """ Check if the timer has started. """ + return self.started + + + def arm(self): + """ Arm the timer trigger. """ + self.trigger_time = time.time() + self.period + + + def elapsed(self): + """ Get the elapsed time since the timer was started. """ + now = time.time() + return now - self.start_time + + +class PacketParser: + """ Utilize class used for parsing received UDP data. """ + + def __init__(self): + """ Initialize the parser. """ + self.buf = b'' + self.total_bytes = 0 + + + def ingress(self, data): + """ Ingress received UDP data. """ + self.total_bytes += len(data) + self.buf += data + + + def parse_all_packets(self): + """ Parse all received packets from the receive buffer. + + Returns: + A list of received Packets. + """ + packets = [] + while True: + new_packets = self._parse() + if new_packets: + packets += new_packets + else: + return packets + + + def _parse(self): + """ Attempt to parse packets from the received buffer. """ + # Attempt to parse a block from the buffer. + if len(self.buf) < 4: + return None + + start_id, num_blocks, data_size = struct.unpack_from('!HBB', self.buf) + + packet_size = 4 + data_size * num_blocks * 8 + + if len(self.buf) < packet_size: + return None + + self.buf = self.buf[4:] + + packets = [] + for offset in range(num_blocks): + adcs_dacs = struct.unpack_from(f'!{4 * data_size}H', self.buf) + adc = [ + adcs_dacs[0:data_size], + adcs_dacs[data_size:2*data_size], + ] + + dac = [ + adcs_dacs[2*data_size: 3*data_size], + adcs_dacs[3*data_size:], + ] + + self.buf = self.buf[8*data_size:] + packets.append(Packet(start_id + offset, adc, dac)) + + return packets + + +def check_index(previous_index, next_index): + """ Check if two indices are sequential. """ + if previous_index == -1: + return True + + # Handle index roll-over. Indices are only stored in 16-bit numbers. + if next_index < previous_index: + next_index += 65536 + + expected_index = previous_index + 1 + + return next_index == expected_index + + +def main(): + """ Main program. """ + connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + connection.bind(("", 1111)) + + logging.basicConfig(level=logging.INFO, + format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s') + + last_index = -1 + + drop_count = 0 + good_blocks = 0 + + timer = Timer() + parser = PacketParser() + + while True: + # Receive any data over UDP and parse it. + data = connection.recv(4096) + if data and not timer.is_started(): + timer.start() + + parser.ingress(data) + + # Handle any received packets. + for packet in parser.parse_all_packets(): + + # Handle any dropped packets. + if not check_index(last_index, packet.index): + print(hex(last_index), hex(packet.index)) + if packet.index < (last_index + 1): + dropped = packet.index + 65536 - (last_index + 1) + else: + dropped = packet.index - (last_index + 1) + + drop_count += dropped + + last_index = packet.index + good_blocks += 1 + + # Report the throughput periodically. + if timer.is_triggered(): + drate = parser.total_bytes * 8 / 1e6 / timer.elapsed() + + print(f''' +Data Rate: {drate:.3f} Mbps +Received Blocks: {good_blocks} +Dropped blocks: {drop_count} + +Metadata: {parser.total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s +---- +''') + timer.arm() + + +if __name__ == '__main__': + main() diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index e6c47d3..efec9c1 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -95,8 +95,7 @@ const APP: () = { stabilizer.net.mac_address, ); - let generator = - network.enable_streaming(StreamTarget::default().into()); + let generator = network.enable_streaming(); // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 545abcb..45a349c 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -1,3 +1,20 @@ +///! Stabilizer data stream capabilities +///! +///! # Design +///! Stabilizer data streamining utilizes UDP packets to send live data streams at high throughput. +///! Packets are always sent in a best-effort fashion, and data may be dropped. Each packet contains +///! an identifier that can be used to detect any dropped data. +///! +///! The current implementation utilizes an single-producer, single-consumer queue to send data +///! between a high priority task and the UDP transmitter. +///! +///! A "batch" of data is defined to be a single item in the SPSC queue sent to the UDP transmitter +///! thread. The transmitter thread then serializes as many sequential "batches" into a single UDP +///! packet as possible. The UDP packet is also given a header indicating the starting batch +///! sequence number and the number of batches present. If the UDP transmitter encounters a +///! non-sequential batch, it does not enqueue it into the packet and instead transmits any staged +///! data. The non-sequential batch is then transmitted in a new UDP packet. This method allows a +///! receiver to detect dropped batches (e.g. due to processing overhead). use core::borrow::BorrowMut; use heapless::spsc::{Consumer, Producer, Queue}; use miniconf::MiniconfAtomic; @@ -10,8 +27,10 @@ use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; // The number of data blocks that we will buffer in the queue. const BLOCK_BUFFER_SIZE: usize = 30; +// A factor that data may be subsampled at. const SUBSAMPLE_RATE: usize = 1; +/// Represents the destination for the UDP stream to send data to. #[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize)] pub struct StreamTarget { pub ip: [u8; 4], @@ -38,6 +57,23 @@ impl Into for StreamTarget { } } +/// A basic "batch" of data. +// Note: In the future, the stream may be generic over this type. +#[derive(Debug, Copy, Clone)] +pub struct AdcDacData { + block_id: u16, + adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], +} + +/// Configure streaming on a device. +/// +/// # Args +/// * `stack` - A reference to the shared network stack. +/// +/// # Returns +/// (generator, stream) where `generator` can be used to enqueue "batches" for transmission. The +/// `stream` is the logically consumer (UDP transmitter) of the enqueued data. pub fn setup_streaming( stack: NetworkReference, ) -> (BlockGenerator, DataStream) { @@ -52,28 +88,34 @@ pub fn setup_streaming( (generator, stream) } -#[derive(Debug, Copy, Clone)] -pub struct AdcDacData { - block_id: u16, - adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], - dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], -} - +/// The data generator for a stream. pub struct BlockGenerator { queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, current_id: u16, } impl BlockGenerator { - pub fn new( - queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, - ) -> Self { + /// Construct a new generator. + /// # Args + /// * `queue` - The producer portion of the SPSC queue to enqueue data into. + /// + /// # Returns + /// The generator to use. + fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self { Self { queue, current_id: 0, } } + /// Schedule data to be sent by the generator. + /// + /// # Note + /// If no space is available, the data batch may be silently dropped. + /// + /// # Args + /// * `adcs` - The ADC data to transmit. + /// * `dacs` - The DAC data to transmit. pub fn send( &mut self, adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], @@ -90,26 +132,21 @@ impl BlockGenerator { } } -pub struct DataStream { - stack: NetworkReference, - socket: Option<::UdpSocket>, - queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, - remote: Option, - buffer: [u8; 1024], -} - -// Datapacket format: -// -// Header: -// [0..2]: Start block ID (u16) -// [2..3]: Num Blocks present (u8) -// [3..4]: Batch Size (u8) -// -// Following the header, batches are added sequentially. Each batch takes the form of: -// [*0..*2]: ADC0 -// [*2..*4]: ADC1 -// [*4..*6]: DAC0 -// [*6..*8]: DAC1 +/// Represents a single UDP packet sent by the stream. +/// +/// # Packet Format +/// All data is sent in network-endian format. The format is as follows +/// +/// Header: +/// [0..2]: Start block ID (u16) +/// [2..3]: Num Blocks present (u8) +/// [3..4]: Batch Size (u8) +/// +/// Following the header, batches are added sequentially. Each batch takes the form of: +/// [*0..*2]: ADC0 +/// [*2..*4]: ADC1 +/// [*4..*6]: DAC0 +/// [*6..*8]: DAC1 struct DataPacket<'a> { buf: &'a mut [u8], subsample_rate: usize, @@ -119,6 +156,11 @@ struct DataPacket<'a> { } impl<'a> DataPacket<'a> { + /// Construct a new packet. + /// + /// # Args + /// * `buf` - The location to serialize the data packet into. + /// * `subsample_rate` - The factor at which to subsample data from batches. pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self { Self { buf, @@ -129,6 +171,13 @@ impl<'a> DataPacket<'a> { } } + /// Add a batch of data to the packet. + /// + /// # Note + /// Serialization occurs as the packet is added. + /// + /// # Args + /// * `batch` - The batch to add to the packet. pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> { // Check that the block is sequential. if let Some(id) = &self.start_id { @@ -170,6 +219,11 @@ impl<'a> DataPacket<'a> { block_size_bytes * self.num_blocks as usize + header_length } + /// Complete the packet and prepare it for transmission. + /// + /// # Returns + /// The size of the packet. The user should utilize the original buffer provided for packet + /// construction to access the packet. pub fn finish(self) -> usize { let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate; @@ -183,15 +237,32 @@ impl<'a> DataPacket<'a> { } } +/// The "consumer" portion of the data stream. +/// +/// # Note +/// This is responsible for consuming data and sending it over UDP. +pub struct DataStream { + stack: NetworkReference, + socket: Option<::UdpSocket>, + queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, + remote: SocketAddr, + buffer: [u8; 1024], +} + impl DataStream { - pub fn new( + /// Construct a new data streamer. + /// + /// # Args + /// * `stack` - A reference to the shared network stack. + /// * `consumer` - The read side of the queue containing data to transmit. + fn new( stack: NetworkReference, consumer: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>, ) -> Self { Self { stack, socket: None, - remote: None, + remote: StreamTarget::default().into(), queue: consumer, buffer: [0; 1024], } @@ -230,24 +301,27 @@ impl DataStream { Ok(()) } + /// Configure the remote endpoint of the stream. + /// + /// # Args + /// * `remote` - The destination to send stream data to. pub fn set_remote(&mut self, remote: SocketAddr) { // If the remote is identical to what we already have, do nothing. - if let Some(current_remote) = self.remote { - if current_remote == remote { - return; - } + if remote == self.remote { + return; } // Open the new remote connection. self.open(remote).ok(); - self.remote = Some(remote); + self.remote = remote; } + /// Process any data for transmission. pub fn process(&mut self) { // If there's no socket available, try to connect to our remote. - if self.socket.is_none() && self.remote.is_some() { + if self.socket.is_none() { // If we still can't open the remote, continue. - if self.open(self.remote.unwrap()).is_err() { + if self.open(self.remote).is_err() { // Clear the queue out. while self.queue.ready() { self.queue.dequeue(); diff --git a/src/net/mod.rs b/src/net/mod.rs index 70d7dfe..115ba3b 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -113,11 +113,14 @@ where } /// Enable live data streaming. - pub fn enable_streaming(&mut self, remote: SocketAddr) -> BlockGenerator { - self.stream.set_remote(remote); + pub fn enable_streaming(&mut self) -> BlockGenerator { self.generator.take().unwrap() } + /// Direct the stream to the provided remote target. + /// + /// # Args + /// * `remote` - The destination for the streamed data. pub fn direct_stream(&mut self, remote: SocketAddr) { if self.generator.is_none() { self.stream.set_remote(remote); From 9e083842ee86040981939693d08160366de2f490 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 15 Jun 2021 13:22:38 +0200 Subject: [PATCH 23/29] Adding streaming to lockin app --- src/bin/lockin.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/bin/lockin.rs b/src/bin/lockin.rs index 9263e2c..bc1d39a 100644 --- a/src/bin/lockin.rs +++ b/src/bin/lockin.rs @@ -20,6 +20,7 @@ use stabilizer::{ DigitalInput0, DigitalInput1, AFE0, AFE1, }, net::{ + data_stream::{BlockGenerator, StreamTarget}, miniconf::Miniconf, serde::Deserialize, telemetry::{Telemetry, TelemetryBuffer}, @@ -64,6 +65,8 @@ pub struct Settings { output_conf: [Conf; 2], telemetry_period: u16, + + stream_target: StreamTarget, } impl Default for Settings { @@ -82,6 +85,8 @@ impl Default for Settings { output_conf: [Conf::InPhase, Conf::Quadrature], // The default telemetry period in seconds. telemetry_period: 10, + + stream_target: StreamTarget::default(), } } } @@ -96,6 +101,7 @@ const APP: () = { settings: Settings, telemetry: TelemetryBuffer, digital_inputs: (DigitalInput0, DigitalInput1), + generator: BlockGenerator, timestamper: InputStamper, pll: RPLL, @@ -108,7 +114,7 @@ const APP: () = { let (mut stabilizer, _pounder) = hardware::setup::setup(c.core, c.device); - let network = NetworkUsers::new( + let mut network = NetworkUsers::new( stabilizer.net.stack, stabilizer.net.phy, stabilizer.cycle_counter, @@ -116,6 +122,8 @@ const APP: () = { stabilizer.net.mac_address, ); + let generator = network.enable_streaming(); + let settings = Settings::default(); let pll = RPLL::new( @@ -155,6 +163,7 @@ const APP: () = { telemetry: TelemetryBuffer::default(), settings, + generator, pll, lockin: Lockin::default(), @@ -168,7 +177,7 @@ const APP: () = { /// This is an implementation of a externally (DI0) referenced PLL lockin on the ADC0 signal. /// It outputs either I/Q or power/phase on DAC0/DAC1. Data is normalized to full scale. /// PLL bandwidth, filter bandwidth, slope, and x/y or power/phase post-filters are available. - #[task(binds=DMA1_STR4, resources=[adcs, dacs, lockin, timestamper, pll, settings, telemetry], priority=2)] + #[task(binds=DMA1_STR4, resources=[adcs, dacs, lockin, timestamper, pll, settings, telemetry, generator], priority=2)] #[inline(never)] #[link_section = ".itcm.process"] fn process(mut c: process::Context) { @@ -180,6 +189,7 @@ const APP: () = { ref mut lockin, ref mut pll, ref mut timestamper, + ref mut generator, } = c.resources; let (reference_phase, reference_frequency) = match settings.lockin_mode @@ -252,6 +262,10 @@ const APP: () = { *sample = DacCode::from(value as i16).0; } } + + // Stream data + generator.send(&adc_samples, &dac_samples); + // Update telemetry measurements. telemetry.adcs = [AdcCode(adc_samples[0][0]), AdcCode(adc_samples[1][0])]; @@ -285,6 +299,9 @@ const APP: () = { c.resources.afes.1.set_gain(settings.afe[1]); c.resources.settings.lock(|current| *current = *settings); + + let target = settings.stream_target.into(); + c.resources.network.direct_stream(target); } #[task(priority = 1, resources=[network, digital_inputs, settings, telemetry], schedule=[telemetry])] From 7294a69b664d4bfc42f4af2e36cd59a2529ccf1a Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 15 Jun 2021 13:46:39 +0200 Subject: [PATCH 24/29] Fixing clippy --- src/net/data_stream.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 45a349c..b5e9513 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -46,13 +46,16 @@ impl Default for StreamTarget { } } -impl Into for StreamTarget { - fn into(self) -> SocketAddr { +impl From for SocketAddr { + fn from(target: StreamTarget) -> SocketAddr { SocketAddr::new( IpAddr::V4(Ipv4Addr::new( - self.ip[0], self.ip[1], self.ip[2], self.ip[3], + target.ip[0], + target.ip[1], + target.ip[2], + target.ip[3], )), - self.port, + target.port, ) } } From 99114e3c9c8bdb6536bdb7bb137b3db4d3e0f770 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 15 Jun 2021 13:49:06 +0200 Subject: [PATCH 25/29] Fixing clippy --- src/net/data_stream.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index b5e9513..5528b02 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -291,10 +291,7 @@ impl DataStream { return Err(()); } - let mut socket = self.stack.socket().map_err(|err| match err { - ::Error::NoIpAddress => (), - _ => (), - })?; + let mut socket = self.stack.socket().map_err(|_| ())?; self.stack.connect(&mut socket, remote).unwrap(); From 260b0c7767d2c702392038539d7575f2f9e26a96 Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 15 Jun 2021 14:19:28 +0200 Subject: [PATCH 26/29] Updating comments --- src/net/data_stream.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 5528b02..852cba4 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -293,9 +293,10 @@ impl DataStream { let mut socket = self.stack.socket().map_err(|_| ())?; + // Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be + // bound. self.stack.connect(&mut socket, remote).unwrap(); - // Note(unwrap): The socket will be empty before we replace it. self.socket.replace(socket); Ok(()) @@ -320,9 +321,9 @@ impl DataStream { pub fn process(&mut self) { // If there's no socket available, try to connect to our remote. if self.socket.is_none() { - // If we still can't open the remote, continue. + // If we can't open the socket (e.g. we do not have an IP address yet), clear data from + // the queue. if self.open(self.remote).is_err() { - // Clear the queue out. while self.queue.ready() { self.queue.dequeue(); } From 40a49bc8d870b0bb3a85bf04c34fd019ce57720f Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 15 Jun 2021 14:26:27 +0200 Subject: [PATCH 27/29] Updating HITL script to ignore failed kill step --- hitl/run.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hitl/run.sh b/hitl/run.sh index 3eb3e35..3aa0eb0 100755 --- a/hitl/run.sh +++ b/hitl/run.sh @@ -31,5 +31,7 @@ python3 miniconf.py dt/sinara/dual-iir/04-91-62-d9-7e-5f afe/0='"G2"' python3 miniconf.py dt/sinara/dual-iir/04-91-62-d9-7e-5f afe/0='"G1"' iir_ch/0/0=\ '{"y_min": -32767, "y_max": 32767, "y_offset": 0, "ba": [1.0, 0, 0, 0, 0]}' -kill $(jobs -p) +# Probe-run appears to sporadically fail. In that case, the job is no longer active and there's no +# background process to kill. +kill $(jobs -p) || true wait || true From 507f1f2b6c616aa99fedfd319f3022cddfbea73e Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 15 Jun 2021 14:50:55 +0200 Subject: [PATCH 28/29] Reverting hitl change --- hitl/run.sh | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/hitl/run.sh b/hitl/run.sh index 3aa0eb0..3eb3e35 100755 --- a/hitl/run.sh +++ b/hitl/run.sh @@ -31,7 +31,5 @@ python3 miniconf.py dt/sinara/dual-iir/04-91-62-d9-7e-5f afe/0='"G2"' python3 miniconf.py dt/sinara/dual-iir/04-91-62-d9-7e-5f afe/0='"G1"' iir_ch/0/0=\ '{"y_min": -32767, "y_max": 32767, "y_offset": 0, "ba": [1.0, 0, 0, 0, 0]}' -# Probe-run appears to sporadically fail. In that case, the job is no longer active and there's no -# background process to kill. -kill $(jobs -p) || true +kill $(jobs -p) wait || true From 9d34e755d86058ab2c5dc0464b016357fe6ae22a Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Mon, 21 Jun 2021 13:18:36 +0200 Subject: [PATCH 29/29] Fixing UDP reopening bug --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/net/data_stream.rs | 3 +-- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e10c1e7..9cffcb5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -754,7 +754,7 @@ dependencies = [ [[package]] name = "smoltcp-nal" version = "0.1.0" -source = "git+https://github.com/quartiq/smoltcp-nal.git?rev=2967c6e#2967c6e9abe580b207e742bb885a03845fa7344c" +source = "git+https://github.com/quartiq/smoltcp-nal.git?rev=5baf55f#5baf55fafbfe2c08d9fe56c836171e9d2fb468e8" dependencies = [ "embedded-nal", "heapless 0.7.1", diff --git a/Cargo.toml b/Cargo.toml index bc444d1..44f96c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -73,7 +73,7 @@ rev = "2750533" [dependencies.smoltcp-nal] git = "https://github.com/quartiq/smoltcp-nal.git" -rev = "2967c6e" +rev = "5baf55f" [dependencies.minimq] git = "https://github.com/quartiq/minimq.git" diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index 852cba4..61bceba 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -15,7 +15,6 @@ ///! non-sequential batch, it does not enqueue it into the packet and instead transmits any staged ///! data. The non-sequential batch is then transmitted in a new UDP packet. This method allows a ///! receiver to detect dropped batches (e.g. due to processing overhead). -use core::borrow::BorrowMut; use heapless::spsc::{Consumer, Producer, Queue}; use miniconf::MiniconfAtomic; use serde::Deserialize; @@ -346,7 +345,7 @@ impl DataStream { } // Transmit the data block. - let mut handle = self.socket.borrow_mut().unwrap(); + let mut handle = self.socket.as_mut().unwrap(); let size = packet.finish(); self.stack.send(&mut handle, &self.buffer[..size]).ok(); }