From 70be4c1c1980a6bee6648661414121783865e45d Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Wed, 26 May 2021 15:02:50 +0200 Subject: [PATCH] Adding WIP changes to streaming --- Cargo.toml | 5 ++-- src/bin/dual-iir.rs | 29 +++++++++++++-------- src/net/data_stream.rs | 50 +++++++++++++++++++++--------------- src/net/miniconf_client.rs | 4 +-- src/net/mod.rs | 18 ++++++++++--- src/net/network_processor.rs | 5 ++++ 6 files changed, 73 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 041d5d7..04ee1af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,8 +64,9 @@ git = "https://github.com/quartiq/miniconf.git" rev = "c6f2b28" [dependencies.smoltcp-nal] -git = "https://github.com/quartiq/smoltcp-nal.git" -rev = "4a1711c" +path = "../smoltcp-nal" +# git = "https://github.com/quartiq/smoltcp-nal.git" +# rev = "4a1711c" [dependencies.minimq] git = "https://github.com/quartiq/minimq.git" diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index c258a02..ae18709 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -128,7 +128,7 @@ const APP: () = { /// /// Because the ADC and DAC operate at the same rate, these two constraints actually implement /// the same time bounds, meeting one also means the other is also met. - #[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=2)] + #[task(binds=DMA1_STR4, spawn=[stream], resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=3)] fn process(c: process::Context) { let adc_samples = [ c.resources.adcs.0.acquire_buffer(), @@ -177,6 +177,15 @@ const APP: () = { [DacCode(dac_samples[0][0]), DacCode(dac_samples[1][0])]; c.resources.telemetry.digital_inputs = digital_inputs; + + // Make a best effort to start data stream processing. It may be blocked by someone else + // using the network stack. + c.spawn.stream().ok(); + } + + #[task(priority = 2, resources=[network])] + fn stream(c: stream::Context) { + c.resources.network.update_stream() } #[idle(resources=[network], spawn=[settings_update])] @@ -192,8 +201,8 @@ const APP: () = { #[task(priority = 1, resources=[network, afes, settings])] fn settings_update(mut c: settings_update::Context) { // Update the IIR channels. - let settings = c.resources.network.miniconf.settings(); - c.resources.settings.lock(|current| *current = *settings); + let settings = c.resources.network.lock(|net| net.miniconf.settings()); + c.resources.settings.lock(|current| *current = settings); // Update AFEs c.resources.afes.0.set_gain(settings.afe[0]); @@ -210,10 +219,8 @@ const APP: () = { .settings .lock(|settings| (settings.afe, settings.telemetry_period)); - c.resources - .network - .telemetry - .publish(&telemetry.finalize(gains[0], gains[1])); + c.resources.network.lock(|net| net.telemetry + .publish(&telemetry.finalize(gains[0], gains[1]))); // Schedule the telemetry task in the future. c.schedule @@ -229,22 +236,22 @@ const APP: () = { unsafe { stm32h7xx_hal::ethernet::interrupt_handler() } } - #[task(binds = SPI2, priority = 3)] + #[task(binds = SPI2, priority = 4)] fn spi2(_: spi2::Context) { panic!("ADC0 input overrun"); } - #[task(binds = SPI3, priority = 3)] + #[task(binds = SPI3, priority = 4)] fn spi3(_: spi3::Context) { panic!("ADC1 input overrun"); } - #[task(binds = SPI4, priority = 3)] + #[task(binds = SPI4, priority = 4)] fn spi4(_: spi4::Context) { panic!("DAC0 output error"); } - #[task(binds = SPI5, priority = 3)] + #[task(binds = SPI5, priority = 4)] fn spi5(_: spi5::Context) { panic!("DAC1 output error"); } diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index bbaf048..4d828a0 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -3,14 +3,13 @@ use heapless::{ spsc::{Consumer, Producer, Queue}, Vec, }; -use serde::Serialize; -use smoltcp_nal::embedded_nal::{Mode, SocketAddr, TcpStack}; +use smoltcp_nal::{smoltcp, embedded_nal::{Mode, SocketAddr, TcpStack}}; use super::NetworkReference; use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; // The number of data blocks that we will buffer in the queue. -type BlockBufferSize = heapless::consts::U10; +type BlockBufferSize = heapless::consts::U30; pub fn setup_streaming( stack: NetworkReference, @@ -132,7 +131,7 @@ impl DataStream { // TODO: How should we handle a connection failure? let socket = self.stack.connect(socket, remote).unwrap(); - log::info!("Stream connecting to {:?}", remote); + //log::info!("Stream connecting to {:?}", remote); // Note(unwrap): The socket will be empty before we replace it. self.socket.replace(socket); @@ -169,16 +168,30 @@ impl DataStream { } } - pub fn process(&mut self) { - if let Some(data) = self.queue.dequeue() { - - // If there's no socket available, try to connect to our remote. - if self.socket.is_none() && self.remote.is_some() { - // If we still can't open the remote, continue. - if self.open(self.remote.unwrap()).is_err() { - return; - } + pub fn process(&mut self) -> bool { + // If there's no socket available, try to connect to our remote. + if self.socket.is_none() && self.remote.is_some() { + // If we still can't open the remote, continue. + if self.open(self.remote.unwrap()).is_err() { + return false; } + } + + let mut handle = self.socket.borrow_mut().unwrap(); + + let capacity = self.stack.lock(|stack| { + let mut all_sockets = stack.sockets.borrow_mut(); + let socket: &mut smoltcp::socket::TcpSocket = &mut *all_sockets.get(handle); + socket.send_capacity() - socket.send_queue() + }); + + // TODO: Clean up magic numbers. + if capacity < 72 { + // We cannot send a full data block. Abort now. + return false; + } + + if let Some(data) = self.queue.dequeue() { // Reconnect the socket if we're no longer connected. self.manage_reconnection(); @@ -191,23 +204,20 @@ impl DataStream { }; // Serialize the datablock. - // TODO: Do we want to packetize the data block as well? let data: Vec = block.serialize(); - let mut socket = self.socket.borrow_mut().unwrap(); - // Transmit the data block. - // TODO: How should we handle partial packet transmission? // TODO: Should we measure how many packets get dropped as telemetry? - match self.stack.write(&mut socket, &data) { + match self.stack.write(&mut handle, &data) { Ok(len) => { if len != data.len() { - log::warn!("Short message: {} {}", len, data.len()); - //self.close(); + log::error!("Short message: {} {}", len, data.len()); } }, _ => {}, } } + + self.queue.ready() } } diff --git a/src/net/miniconf_client.rs b/src/net/miniconf_client.rs index b809d73..0499d6e 100644 --- a/src/net/miniconf_client.rs +++ b/src/net/miniconf_client.rs @@ -156,7 +156,7 @@ where } /// Get the current settings from miniconf. - pub fn settings(&self) -> &S { - &self.settings + pub fn settings(&self) -> S { + self.settings.clone() } } diff --git a/src/net/mod.rs b/src/net/mod.rs index aeaa9ab..128d2ff 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -118,6 +118,20 @@ where } } + pub fn update_stream(&mut self) { + // Update the data stream. + if self.generator.is_none() { + loop { + // Process egress of the stack. + self.processor.egress(); + + if !self.stream.process() { + return + } + } + } + } + /// Update and process all of the network users state. /// /// # Returns @@ -130,9 +144,7 @@ where self.telemetry.update(); // Update the data stream. - if self.generator.is_none() { - self.stream.process(); - } + self.update_stream(); match self.miniconf.update() { UpdateState::Updated => UpdateState::Updated, diff --git a/src/net/network_processor.rs b/src/net/network_processor.rs index a64d6e7..41e1c31 100644 --- a/src/net/network_processor.rs +++ b/src/net/network_processor.rs @@ -37,6 +37,11 @@ impl NetworkProcessor { } } + pub fn egress(&mut self) { + let now = self.clock.current_ms(); + self.stack.lock(|stack| stack.poll(now)).ok(); + } + /// Process and update the state of the network. /// /// # Note