diff --git a/Cargo.toml b/Cargo.toml index 10cd9b7..041d5d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,7 +78,7 @@ pounder_v1_1 = [ ] [profile.dev] codegen-units = 1 incremental = false -opt-level = 3 +opt-level = 1 [profile.release] opt-level = 3 diff --git a/src/hardware/configuration.rs b/src/hardware/configuration.rs index f48b3d2..513e851 100644 --- a/src/hardware/configuration.rs +++ b/src/hardware/configuration.rs @@ -7,7 +7,7 @@ use stm32h7xx_hal::{ prelude::*, }; -const NUM_SOCKETS: usize = 4; +const NUM_SOCKETS: usize = 5; use heapless::{consts, Vec}; use smoltcp_nal::smoltcp; @@ -62,7 +62,7 @@ impl NetStorage { )], neighbor_cache: [None; 8], routes_cache: [None; 8], - sockets: [None, None, None, None, None], + sockets: [None, None, None, None, None, None], socket_storage: [SocketStorage::new(); NUM_SOCKETS], dhcp_tx_storage: [0; 600], dhcp_rx_storage: [0; 600], diff --git a/src/net/data_stream.rs b/src/net/data_stream.rs index ebf66cd..bbaf048 100644 --- a/src/net/data_stream.rs +++ b/src/net/data_stream.rs @@ -67,11 +67,9 @@ pub struct DataStream { stack: NetworkReference, socket: Option<::TcpSocket>, queue: Consumer<'static, AdcDacData, BlockBufferSize>, - current_index: u32, remote: Option, } -#[derive(Serialize)] struct DataBlock { block_id: u32, block_size: usize, @@ -79,6 +77,24 @@ struct DataBlock { dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], } +impl DataBlock { + pub fn serialize>(self) -> Vec { + let mut vec: Vec = Vec::new(); + vec.extend_from_slice(&self.block_id.to_be_bytes()).unwrap(); + vec.extend_from_slice(&self.block_size.to_be_bytes()).unwrap(); + for device in &[self.adcs, self.dacs] { + for channel in device { + for sample in channel { + vec.extend_from_slice(&sample.to_be_bytes()).unwrap(); + } + } + } + + vec + } + +} + impl DataStream { pub fn new( stack: NetworkReference, @@ -87,17 +103,22 @@ impl DataStream { Self { stack, socket: None, - current_index: 0, remote: None, queue: consumer, } } + fn close(&mut self) { + // Note(unwrap): We guarantee that the socket is available above. + let socket = self.socket.take().unwrap(); + self.stack.close(socket).unwrap(); + + log::info!("Stream Disconnecting"); + } + fn open(&mut self, remote: SocketAddr) -> Result<(), ()> { if self.socket.is_some() { - // Note(unwrap): We guarantee that the socket is available above. - let socket = self.socket.take().unwrap(); - self.stack.close(socket).unwrap(); + self.close(); } let socket = @@ -105,15 +126,14 @@ impl DataStream { .open(Mode::NonBlocking) .map_err(|err| match err { ::Error::NoIpAddress => (), - other => { - log::info!("Network Error: {:?}", other); - () - } + _ => () })?; // TODO: How should we handle a connection failure? let socket = self.stack.connect(socket, remote).unwrap(); + log::info!("Stream connecting to {:?}", remote); + // Note(unwrap): The socket will be empty before we replace it. self.socket.replace(socket); @@ -133,16 +153,36 @@ impl DataStream { self.remote = Some(remote); } + fn manage_reconnection(&mut self) { + if self.socket.is_none() || self.remote.is_none() { + return + } + + let mut socket = self.socket.borrow_mut().unwrap(); + let connected = match self.stack.is_connected(&mut socket) { + Ok(connected) => connected, + _ => return, + }; + + if !connected { + self.socket.replace(self.stack.connect(socket, self.remote.unwrap()).unwrap()); + } + } + pub fn process(&mut self) { - while let Some(data) = self.queue.dequeue() { + if let Some(data) = self.queue.dequeue() { + // If there's no socket available, try to connect to our remote. if self.socket.is_none() && self.remote.is_some() { // If we still can't open the remote, continue. if self.open(self.remote.unwrap()).is_err() { - continue; + return; } } + // Reconnect the socket if we're no longer connected. + self.manage_reconnection(); + let block = DataBlock { adcs: data.adcs, dacs: data.dacs, @@ -150,21 +190,23 @@ impl DataStream { block_size: SAMPLE_BUFFER_SIZE, }; - // Increment the current block index. - self.current_index = self.current_index.wrapping_add(1); - // Serialize the datablock. // TODO: Do we want to packetize the data block as well? - let data: Vec = - postcard::to_vec(&block).unwrap(); + let data: Vec = block.serialize(); let mut socket = self.socket.borrow_mut().unwrap(); // Transmit the data block. // TODO: How should we handle partial packet transmission? + // TODO: Should we measure how many packets get dropped as telemetry? match self.stack.write(&mut socket, &data) { - Ok(len) => assert!(len == data.len()), - _ => info!("Dropping packet"), + Ok(len) => { + if len != data.len() { + log::warn!("Short message: {} {}", len, data.len()); + //self.close(); + } + }, + _ => {}, } } }