diff --git a/Cargo.lock b/Cargo.lock index 274b2bb..8b8f754 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -548,6 +548,23 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58" +[[package]] +name = "postcard" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66acf3cf8ab62785852a0f67ae3bfcd38324da6561e52e7f4a049ca555c6d55e" +dependencies = [ + "heapless 0.6.1", + "postcard-cobs", + "serde", +] + +[[package]] +name = "postcard-cobs" +version = "0.1.5-pre" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f" + [[package]] name = "ppv-lite86" version = "0.2.10" @@ -772,6 +789,7 @@ dependencies = [ "nb 1.0.0", "num_enum", "paste", + "postcard", "rtt-logger", "rtt-target", "serde", diff --git a/Cargo.toml b/Cargo.toml index 790363a..10cd9b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ ad9959 = { path = "ad9959" } miniconf = "0.1.0" shared-bus = {version = "0.2.2", features = ["cortex-m"] } serde-json-core = "0.3" +postcard = "0.6" [dependencies.rtt-logger] git = "https://github.com/quartiq/rtt-logger.git" diff --git a/src/bin/dual-iir.rs b/src/bin/dual-iir.rs index 29eb474..c258a02 100644 --- a/src/bin/dual-iir.rs +++ b/src/bin/dual-iir.rs @@ -13,7 +13,9 @@ use hardware::{ DigitalInput0, DigitalInput1, InputPin, SystemTimer, AFE0, AFE1, }; -use net::{NetworkUsers, Telemetry, TelemetryBuffer, UpdateState}; +use net::{ + BlockGenerator, NetworkUsers, Telemetry, TelemetryBuffer, UpdateState, +}; const SCALE: f32 = i16::MAX as _; @@ -58,6 +60,7 @@ const APP: () = { adcs: (Adc0Input, Adc1Input), dacs: (Dac0Output, Dac1Output), network: NetworkUsers, + generator: BlockGenerator, settings: Settings, telemetry: TelemetryBuffer, @@ -71,7 +74,7 @@ const APP: () = { // Configure the microcontroller let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device); - let network = NetworkUsers::new( + let mut network = NetworkUsers::new( stabilizer.net.stack, stabilizer.net.phy, stabilizer.cycle_counter, @@ -79,6 +82,11 @@ const APP: () = { stabilizer.net.mac_address, ); + // TODO: Remove unwrap. + let remote: smoltcp_nal::embedded_nal::SocketAddr = + "10.35.16.10:1111".parse().unwrap(); + let generator = network.enable_streaming(remote.into()); + // Spawn a settings update for default settings. c.spawn.settings_update().unwrap(); c.spawn.telemetry().unwrap(); @@ -96,6 +104,7 @@ const APP: () = { afes: stabilizer.afes, adcs: stabilizer.adcs, dacs: stabilizer.dacs, + generator, network, digital_inputs: stabilizer.digital_inputs, telemetry: net::TelemetryBuffer::default(), @@ -119,7 +128,7 @@ const APP: () = { /// /// Because the ADC and DAC operate at the same rate, these two constraints actually implement /// the same time bounds, meeting one also means the other is also met. - #[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry], priority=2)] + #[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=2)] fn process(c: process::Context) { let adc_samples = [ c.resources.adcs.0.acquire_buffer(), @@ -157,6 +166,9 @@ const APP: () = { } } + // Stream the data. + c.resources.generator.send(&adc_samples, &dac_samples); + // Update telemetry measurements. c.resources.telemetry.adcs = [AdcCode(adc_samples[0][0]), AdcCode(adc_samples[1][0])]; diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs new file mode 100644 index 0000000..ebf66cd --- /dev/null +++ b/src/net/data_stream.rs @@ -0,0 +1,171 @@ +use core::borrow::BorrowMut; +use heapless::{ + spsc::{Consumer, Producer, Queue}, + Vec, +}; +use serde::Serialize; +use smoltcp_nal::embedded_nal::{Mode, SocketAddr, TcpStack}; + +use super::NetworkReference; +use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE; + +// The number of data blocks that we will buffer in the queue. +type BlockBufferSize = heapless::consts::U10; + +pub fn setup_streaming( + stack: NetworkReference, +) -> (BlockGenerator, DataStream) { + let queue = cortex_m::singleton!(: Queue = Queue::new()).unwrap(); + + let (producer, consumer) = queue.split(); + + let generator = BlockGenerator::new(producer); + + let stream = DataStream::new(stack, consumer); + + (generator, stream) +} + +pub struct AdcDacData { + block_id: u32, + adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], +} + +pub struct BlockGenerator { + queue: Producer<'static, AdcDacData, BlockBufferSize>, + current_id: u32, +} + +impl BlockGenerator { + pub fn new(queue: Producer<'static, AdcDacData, BlockBufferSize>) -> Self { + Self { + queue, + current_id: 0, + } + } + + pub fn send( + &mut self, + adcs: &[&[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2], + ) { + let block = AdcDacData { + block_id: self.current_id, + adcs: [*adcs[0], *adcs[1]], + dacs: [*dacs[0], *dacs[1]], + }; + + self.current_id = self.current_id.wrapping_add(1); + + // We perform best-effort enqueueing of the data block. + self.queue.enqueue(block).ok(); + } +} + +pub struct DataStream { + stack: NetworkReference, + socket: Option<::TcpSocket>, + queue: Consumer<'static, AdcDacData, BlockBufferSize>, + current_index: u32, + remote: Option, +} + +#[derive(Serialize)] +struct DataBlock { + block_id: u32, + block_size: usize, + adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], + dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], +} + +impl DataStream { + pub fn new( + stack: NetworkReference, + consumer: Consumer<'static, AdcDacData, BlockBufferSize>, + ) -> Self { + Self { + stack, + socket: None, + current_index: 0, + remote: None, + queue: consumer, + } + } + + fn open(&mut self, remote: SocketAddr) -> Result<(), ()> { + if self.socket.is_some() { + // Note(unwrap): We guarantee that the socket is available above. + let socket = self.socket.take().unwrap(); + self.stack.close(socket).unwrap(); + } + + let socket = + self.stack + .open(Mode::NonBlocking) + .map_err(|err| match err { + ::Error::NoIpAddress => (), + other => { + log::info!("Network Error: {:?}", other); + () + } + })?; + + // TODO: How should we handle a connection failure? + let socket = self.stack.connect(socket, remote).unwrap(); + + // Note(unwrap): The socket will be empty before we replace it. + self.socket.replace(socket); + + Ok(()) + } + + pub fn set_remote(&mut self, remote: SocketAddr) { + // If the remote is identical to what we already have, do nothing. + if let Some(current_remote) = self.remote { + if current_remote == remote { + return; + } + } + + // Open the new remote connection. + self.open(remote).ok(); + self.remote = Some(remote); + } + + pub fn process(&mut self) { + while let Some(data) = self.queue.dequeue() { + // If there's no socket available, try to connect to our remote. + if self.socket.is_none() && self.remote.is_some() { + // If we still can't open the remote, continue. + if self.open(self.remote.unwrap()).is_err() { + continue; + } + } + + let block = DataBlock { + adcs: data.adcs, + dacs: data.dacs, + block_id: data.block_id, + block_size: SAMPLE_BUFFER_SIZE, + }; + + // Increment the current block index. + self.current_index = self.current_index.wrapping_add(1); + + // Serialize the datablock. + // TODO: Do we want to packetize the data block as well? + let data: Vec = + postcard::to_vec(&block).unwrap(); + + let mut socket = self.socket.borrow_mut().unwrap(); + + // Transmit the data block. + // TODO: How should we handle partial packet transmission? + match self.stack.write(&mut socket, &data) { + Ok(len) => assert!(len == data.len()), + _ => info!("Dropping packet"), + } + } + } +} diff --git a/src/net/mod.rs b/src/net/mod.rs index 38499ca..aeaa9ab 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -11,18 +11,23 @@ use serde::Serialize; use core::fmt::Write; +mod data_stream; mod messages; mod miniconf_client; mod network_processor; mod shared; mod telemetry; +pub use data_stream::BlockGenerator; + use crate::hardware::{CycleCounter, EthernetPhy, NetworkStack}; +use data_stream::DataStream; use messages::{MqttMessage, SettingsResponse}; pub use miniconf_client::MiniconfClient; pub use network_processor::NetworkProcessor; pub use shared::NetworkManager; +use smoltcp_nal::embedded_nal::SocketAddr; pub use telemetry::{Telemetry, TelemetryBuffer, TelemetryClient}; pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>; @@ -36,7 +41,9 @@ pub enum UpdateState { /// A structure of Stabilizer's default network users. pub struct NetworkUsers { pub miniconf: MiniconfClient, - pub processor: NetworkProcessor, + processor: NetworkProcessor, + stream: DataStream, + generator: Option, pub telemetry: TelemetryClient, } @@ -87,10 +94,27 @@ where &prefix, ); + let (generator, stream) = + data_stream::setup_streaming(stack_manager.acquire_stack()); + NetworkUsers { miniconf: settings, processor, telemetry, + stream, + generator: Some(generator), + } + } + + /// Enable live data streaming. + pub fn enable_streaming(&mut self, remote: SocketAddr) -> BlockGenerator { + self.stream.set_remote(remote); + self.generator.take().unwrap() + } + + pub fn direct_stream(&mut self, remote: SocketAddr) { + if self.generator.is_none() { + self.stream.set_remote(remote); } } @@ -105,6 +129,11 @@ where // Update the MQTT clients. self.telemetry.update(); + // Update the data stream. + if self.generator.is_none() { + self.stream.process(); + } + match self.miniconf.update() { UpdateState::Updated => UpdateState::Updated, UpdateState::NoChange => poll_result,