Adding WIP changes to streaming
This commit is contained in:
parent
731513722f
commit
70be4c1c19
|
@ -64,8 +64,9 @@ git = "https://github.com/quartiq/miniconf.git"
|
||||||
rev = "c6f2b28"
|
rev = "c6f2b28"
|
||||||
|
|
||||||
[dependencies.smoltcp-nal]
|
[dependencies.smoltcp-nal]
|
||||||
git = "https://github.com/quartiq/smoltcp-nal.git"
|
path = "../smoltcp-nal"
|
||||||
rev = "4a1711c"
|
# git = "https://github.com/quartiq/smoltcp-nal.git"
|
||||||
|
# rev = "4a1711c"
|
||||||
|
|
||||||
[dependencies.minimq]
|
[dependencies.minimq]
|
||||||
git = "https://github.com/quartiq/minimq.git"
|
git = "https://github.com/quartiq/minimq.git"
|
||||||
|
|
|
@ -128,7 +128,7 @@ const APP: () = {
|
||||||
///
|
///
|
||||||
/// Because the ADC and DAC operate at the same rate, these two constraints actually implement
|
/// Because the ADC and DAC operate at the same rate, these two constraints actually implement
|
||||||
/// the same time bounds, meeting one also means the other is also met.
|
/// the same time bounds, meeting one also means the other is also met.
|
||||||
#[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=2)]
|
#[task(binds=DMA1_STR4, spawn=[stream], resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=3)]
|
||||||
fn process(c: process::Context) {
|
fn process(c: process::Context) {
|
||||||
let adc_samples = [
|
let adc_samples = [
|
||||||
c.resources.adcs.0.acquire_buffer(),
|
c.resources.adcs.0.acquire_buffer(),
|
||||||
|
@ -177,6 +177,15 @@ const APP: () = {
|
||||||
[DacCode(dac_samples[0][0]), DacCode(dac_samples[1][0])];
|
[DacCode(dac_samples[0][0]), DacCode(dac_samples[1][0])];
|
||||||
|
|
||||||
c.resources.telemetry.digital_inputs = digital_inputs;
|
c.resources.telemetry.digital_inputs = digital_inputs;
|
||||||
|
|
||||||
|
// Make a best effort to start data stream processing. It may be blocked by someone else
|
||||||
|
// using the network stack.
|
||||||
|
c.spawn.stream().ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[task(priority = 2, resources=[network])]
|
||||||
|
fn stream(c: stream::Context) {
|
||||||
|
c.resources.network.update_stream()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[idle(resources=[network], spawn=[settings_update])]
|
#[idle(resources=[network], spawn=[settings_update])]
|
||||||
|
@ -192,8 +201,8 @@ const APP: () = {
|
||||||
#[task(priority = 1, resources=[network, afes, settings])]
|
#[task(priority = 1, resources=[network, afes, settings])]
|
||||||
fn settings_update(mut c: settings_update::Context) {
|
fn settings_update(mut c: settings_update::Context) {
|
||||||
// Update the IIR channels.
|
// Update the IIR channels.
|
||||||
let settings = c.resources.network.miniconf.settings();
|
let settings = c.resources.network.lock(|net| net.miniconf.settings());
|
||||||
c.resources.settings.lock(|current| *current = *settings);
|
c.resources.settings.lock(|current| *current = settings);
|
||||||
|
|
||||||
// Update AFEs
|
// Update AFEs
|
||||||
c.resources.afes.0.set_gain(settings.afe[0]);
|
c.resources.afes.0.set_gain(settings.afe[0]);
|
||||||
|
@ -210,10 +219,8 @@ const APP: () = {
|
||||||
.settings
|
.settings
|
||||||
.lock(|settings| (settings.afe, settings.telemetry_period));
|
.lock(|settings| (settings.afe, settings.telemetry_period));
|
||||||
|
|
||||||
c.resources
|
c.resources.network.lock(|net| net.telemetry
|
||||||
.network
|
.publish(&telemetry.finalize(gains[0], gains[1])));
|
||||||
.telemetry
|
|
||||||
.publish(&telemetry.finalize(gains[0], gains[1]));
|
|
||||||
|
|
||||||
// Schedule the telemetry task in the future.
|
// Schedule the telemetry task in the future.
|
||||||
c.schedule
|
c.schedule
|
||||||
|
@ -229,22 +236,22 @@ const APP: () = {
|
||||||
unsafe { stm32h7xx_hal::ethernet::interrupt_handler() }
|
unsafe { stm32h7xx_hal::ethernet::interrupt_handler() }
|
||||||
}
|
}
|
||||||
|
|
||||||
#[task(binds = SPI2, priority = 3)]
|
#[task(binds = SPI2, priority = 4)]
|
||||||
fn spi2(_: spi2::Context) {
|
fn spi2(_: spi2::Context) {
|
||||||
panic!("ADC0 input overrun");
|
panic!("ADC0 input overrun");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[task(binds = SPI3, priority = 3)]
|
#[task(binds = SPI3, priority = 4)]
|
||||||
fn spi3(_: spi3::Context) {
|
fn spi3(_: spi3::Context) {
|
||||||
panic!("ADC1 input overrun");
|
panic!("ADC1 input overrun");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[task(binds = SPI4, priority = 3)]
|
#[task(binds = SPI4, priority = 4)]
|
||||||
fn spi4(_: spi4::Context) {
|
fn spi4(_: spi4::Context) {
|
||||||
panic!("DAC0 output error");
|
panic!("DAC0 output error");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[task(binds = SPI5, priority = 3)]
|
#[task(binds = SPI5, priority = 4)]
|
||||||
fn spi5(_: spi5::Context) {
|
fn spi5(_: spi5::Context) {
|
||||||
panic!("DAC1 output error");
|
panic!("DAC1 output error");
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,14 +3,13 @@ use heapless::{
|
||||||
spsc::{Consumer, Producer, Queue},
|
spsc::{Consumer, Producer, Queue},
|
||||||
Vec,
|
Vec,
|
||||||
};
|
};
|
||||||
use serde::Serialize;
|
use smoltcp_nal::{smoltcp, embedded_nal::{Mode, SocketAddr, TcpStack}};
|
||||||
use smoltcp_nal::embedded_nal::{Mode, SocketAddr, TcpStack};
|
|
||||||
|
|
||||||
use super::NetworkReference;
|
use super::NetworkReference;
|
||||||
use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE;
|
use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE;
|
||||||
|
|
||||||
// The number of data blocks that we will buffer in the queue.
|
// The number of data blocks that we will buffer in the queue.
|
||||||
type BlockBufferSize = heapless::consts::U10;
|
type BlockBufferSize = heapless::consts::U30;
|
||||||
|
|
||||||
pub fn setup_streaming(
|
pub fn setup_streaming(
|
||||||
stack: NetworkReference,
|
stack: NetworkReference,
|
||||||
|
@ -132,7 +131,7 @@ impl DataStream {
|
||||||
// TODO: How should we handle a connection failure?
|
// TODO: How should we handle a connection failure?
|
||||||
let socket = self.stack.connect(socket, remote).unwrap();
|
let socket = self.stack.connect(socket, remote).unwrap();
|
||||||
|
|
||||||
log::info!("Stream connecting to {:?}", remote);
|
//log::info!("Stream connecting to {:?}", remote);
|
||||||
|
|
||||||
// Note(unwrap): The socket will be empty before we replace it.
|
// Note(unwrap): The socket will be empty before we replace it.
|
||||||
self.socket.replace(socket);
|
self.socket.replace(socket);
|
||||||
|
@ -169,17 +168,31 @@ impl DataStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn process(&mut self) {
|
pub fn process(&mut self) -> bool {
|
||||||
if let Some(data) = self.queue.dequeue() {
|
|
||||||
|
|
||||||
// If there's no socket available, try to connect to our remote.
|
// If there's no socket available, try to connect to our remote.
|
||||||
if self.socket.is_none() && self.remote.is_some() {
|
if self.socket.is_none() && self.remote.is_some() {
|
||||||
// If we still can't open the remote, continue.
|
// If we still can't open the remote, continue.
|
||||||
if self.open(self.remote.unwrap()).is_err() {
|
if self.open(self.remote.unwrap()).is_err() {
|
||||||
return;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut handle = self.socket.borrow_mut().unwrap();
|
||||||
|
|
||||||
|
let capacity = self.stack.lock(|stack| {
|
||||||
|
let mut all_sockets = stack.sockets.borrow_mut();
|
||||||
|
let socket: &mut smoltcp::socket::TcpSocket = &mut *all_sockets.get(handle);
|
||||||
|
socket.send_capacity() - socket.send_queue()
|
||||||
|
});
|
||||||
|
|
||||||
|
// TODO: Clean up magic numbers.
|
||||||
|
if capacity < 72 {
|
||||||
|
// We cannot send a full data block. Abort now.
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(data) = self.queue.dequeue() {
|
||||||
|
|
||||||
// Reconnect the socket if we're no longer connected.
|
// Reconnect the socket if we're no longer connected.
|
||||||
self.manage_reconnection();
|
self.manage_reconnection();
|
||||||
|
|
||||||
|
@ -191,23 +204,20 @@ impl DataStream {
|
||||||
};
|
};
|
||||||
|
|
||||||
// Serialize the datablock.
|
// Serialize the datablock.
|
||||||
// TODO: Do we want to packetize the data block as well?
|
|
||||||
let data: Vec<u8, heapless::consts::U256> = block.serialize();
|
let data: Vec<u8, heapless::consts::U256> = block.serialize();
|
||||||
|
|
||||||
let mut socket = self.socket.borrow_mut().unwrap();
|
|
||||||
|
|
||||||
// Transmit the data block.
|
// Transmit the data block.
|
||||||
// TODO: How should we handle partial packet transmission?
|
|
||||||
// TODO: Should we measure how many packets get dropped as telemetry?
|
// TODO: Should we measure how many packets get dropped as telemetry?
|
||||||
match self.stack.write(&mut socket, &data) {
|
match self.stack.write(&mut handle, &data) {
|
||||||
Ok(len) => {
|
Ok(len) => {
|
||||||
if len != data.len() {
|
if len != data.len() {
|
||||||
log::warn!("Short message: {} {}", len, data.len());
|
log::error!("Short message: {} {}", len, data.len());
|
||||||
//self.close();
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ => {},
|
_ => {},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.queue.ready()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -156,7 +156,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the current settings from miniconf.
|
/// Get the current settings from miniconf.
|
||||||
pub fn settings(&self) -> &S {
|
pub fn settings(&self) -> S {
|
||||||
&self.settings
|
self.settings.clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,6 +118,20 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn update_stream(&mut self) {
|
||||||
|
// Update the data stream.
|
||||||
|
if self.generator.is_none() {
|
||||||
|
loop {
|
||||||
|
// Process egress of the stack.
|
||||||
|
self.processor.egress();
|
||||||
|
|
||||||
|
if !self.stream.process() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Update and process all of the network users state.
|
/// Update and process all of the network users state.
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// # Returns
|
||||||
|
@ -130,9 +144,7 @@ where
|
||||||
self.telemetry.update();
|
self.telemetry.update();
|
||||||
|
|
||||||
// Update the data stream.
|
// Update the data stream.
|
||||||
if self.generator.is_none() {
|
self.update_stream();
|
||||||
self.stream.process();
|
|
||||||
}
|
|
||||||
|
|
||||||
match self.miniconf.update() {
|
match self.miniconf.update() {
|
||||||
UpdateState::Updated => UpdateState::Updated,
|
UpdateState::Updated => UpdateState::Updated,
|
||||||
|
|
|
@ -37,6 +37,11 @@ impl NetworkProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn egress(&mut self) {
|
||||||
|
let now = self.clock.current_ms();
|
||||||
|
self.stack.lock(|stack| stack.poll(now)).ok();
|
||||||
|
}
|
||||||
|
|
||||||
/// Process and update the state of the network.
|
/// Process and update the state of the network.
|
||||||
///
|
///
|
||||||
/// # Note
|
/// # Note
|
||||||
|
|
Loading…
Reference in New Issue