Updating after testing

This commit is contained in:
Ryan Summers 2021-05-17 18:33:43 +02:00
parent 21ca8e1c8f
commit 731513722f
3 changed files with 64 additions and 22 deletions

View File

@ -78,7 +78,7 @@ pounder_v1_1 = [ ]
[profile.dev] [profile.dev]
codegen-units = 1 codegen-units = 1
incremental = false incremental = false
opt-level = 3 opt-level = 1
[profile.release] [profile.release]
opt-level = 3 opt-level = 3

View File

@ -7,7 +7,7 @@ use stm32h7xx_hal::{
prelude::*, prelude::*,
}; };
const NUM_SOCKETS: usize = 4; const NUM_SOCKETS: usize = 5;
use heapless::{consts, Vec}; use heapless::{consts, Vec};
use smoltcp_nal::smoltcp; use smoltcp_nal::smoltcp;
@ -62,7 +62,7 @@ impl NetStorage {
)], )],
neighbor_cache: [None; 8], neighbor_cache: [None; 8],
routes_cache: [None; 8], routes_cache: [None; 8],
sockets: [None, None, None, None, None], sockets: [None, None, None, None, None, None],
socket_storage: [SocketStorage::new(); NUM_SOCKETS], socket_storage: [SocketStorage::new(); NUM_SOCKETS],
dhcp_tx_storage: [0; 600], dhcp_tx_storage: [0; 600],
dhcp_rx_storage: [0; 600], dhcp_rx_storage: [0; 600],

View File

@ -67,11 +67,9 @@ pub struct DataStream {
stack: NetworkReference, stack: NetworkReference,
socket: Option<<NetworkReference as TcpStack>::TcpSocket>, socket: Option<<NetworkReference as TcpStack>::TcpSocket>,
queue: Consumer<'static, AdcDacData, BlockBufferSize>, queue: Consumer<'static, AdcDacData, BlockBufferSize>,
current_index: u32,
remote: Option<SocketAddr>, remote: Option<SocketAddr>,
} }
#[derive(Serialize)]
struct DataBlock { struct DataBlock {
block_id: u32, block_id: u32,
block_size: usize, block_size: usize,
@ -79,6 +77,24 @@ struct DataBlock {
dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2], dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
} }
impl DataBlock {
pub fn serialize<T: heapless::ArrayLength<u8>>(self) -> Vec<u8, T> {
let mut vec: Vec<u8, T> = Vec::new();
vec.extend_from_slice(&self.block_id.to_be_bytes()).unwrap();
vec.extend_from_slice(&self.block_size.to_be_bytes()).unwrap();
for device in &[self.adcs, self.dacs] {
for channel in device {
for sample in channel {
vec.extend_from_slice(&sample.to_be_bytes()).unwrap();
}
}
}
vec
}
}
impl DataStream { impl DataStream {
pub fn new( pub fn new(
stack: NetworkReference, stack: NetworkReference,
@ -87,17 +103,22 @@ impl DataStream {
Self { Self {
stack, stack,
socket: None, socket: None,
current_index: 0,
remote: None, remote: None,
queue: consumer, queue: consumer,
} }
} }
fn close(&mut self) {
// Note(unwrap): We guarantee that the socket is available above.
let socket = self.socket.take().unwrap();
self.stack.close(socket).unwrap();
log::info!("Stream Disconnecting");
}
fn open(&mut self, remote: SocketAddr) -> Result<(), ()> { fn open(&mut self, remote: SocketAddr) -> Result<(), ()> {
if self.socket.is_some() { if self.socket.is_some() {
// Note(unwrap): We guarantee that the socket is available above. self.close();
let socket = self.socket.take().unwrap();
self.stack.close(socket).unwrap();
} }
let socket = let socket =
@ -105,15 +126,14 @@ impl DataStream {
.open(Mode::NonBlocking) .open(Mode::NonBlocking)
.map_err(|err| match err { .map_err(|err| match err {
<NetworkReference as TcpStack>::Error::NoIpAddress => (), <NetworkReference as TcpStack>::Error::NoIpAddress => (),
other => { _ => ()
log::info!("Network Error: {:?}", other);
()
}
})?; })?;
// TODO: How should we handle a connection failure? // TODO: How should we handle a connection failure?
let socket = self.stack.connect(socket, remote).unwrap(); let socket = self.stack.connect(socket, remote).unwrap();
log::info!("Stream connecting to {:?}", remote);
// Note(unwrap): The socket will be empty before we replace it. // Note(unwrap): The socket will be empty before we replace it.
self.socket.replace(socket); self.socket.replace(socket);
@ -133,16 +153,36 @@ impl DataStream {
self.remote = Some(remote); self.remote = Some(remote);
} }
fn manage_reconnection(&mut self) {
if self.socket.is_none() || self.remote.is_none() {
return
}
let mut socket = self.socket.borrow_mut().unwrap();
let connected = match self.stack.is_connected(&mut socket) {
Ok(connected) => connected,
_ => return,
};
if !connected {
self.socket.replace(self.stack.connect(socket, self.remote.unwrap()).unwrap());
}
}
pub fn process(&mut self) { pub fn process(&mut self) {
while let Some(data) = self.queue.dequeue() { if let Some(data) = self.queue.dequeue() {
// If there's no socket available, try to connect to our remote. // If there's no socket available, try to connect to our remote.
if self.socket.is_none() && self.remote.is_some() { if self.socket.is_none() && self.remote.is_some() {
// If we still can't open the remote, continue. // If we still can't open the remote, continue.
if self.open(self.remote.unwrap()).is_err() { if self.open(self.remote.unwrap()).is_err() {
continue; return;
} }
} }
// Reconnect the socket if we're no longer connected.
self.manage_reconnection();
let block = DataBlock { let block = DataBlock {
adcs: data.adcs, adcs: data.adcs,
dacs: data.dacs, dacs: data.dacs,
@ -150,21 +190,23 @@ impl DataStream {
block_size: SAMPLE_BUFFER_SIZE, block_size: SAMPLE_BUFFER_SIZE,
}; };
// Increment the current block index.
self.current_index = self.current_index.wrapping_add(1);
// Serialize the datablock. // Serialize the datablock.
// TODO: Do we want to packetize the data block as well? // TODO: Do we want to packetize the data block as well?
let data: Vec<u8, heapless::consts::U256> = let data: Vec<u8, heapless::consts::U256> = block.serialize();
postcard::to_vec(&block).unwrap();
let mut socket = self.socket.borrow_mut().unwrap(); let mut socket = self.socket.borrow_mut().unwrap();
// Transmit the data block. // Transmit the data block.
// TODO: How should we handle partial packet transmission? // TODO: How should we handle partial packet transmission?
// TODO: Should we measure how many packets get dropped as telemetry?
match self.stack.write(&mut socket, &data) { match self.stack.write(&mut socket, &data) {
Ok(len) => assert!(len == data.len()), Ok(len) => {
_ => info!("Dropping packet"), if len != data.len() {
log::warn!("Short message: {} {}", len, data.len());
//self.close();
}
},
_ => {},
} }
} }
} }