Adding WIP livestreaming implementation

This commit is contained in:
Ryan Summers 2021-05-26 17:56:44 +02:00
parent 24dd749da9
commit 2dd1bb9ebf
5 changed files with 51 additions and 55 deletions

View File

@ -128,7 +128,7 @@ const APP: () = {
/// ///
/// Because the ADC and DAC operate at the same rate, these two constraints actually implement /// Because the ADC and DAC operate at the same rate, these two constraints actually implement
/// the same time bounds, meeting one also means the other is also met. /// the same time bounds, meeting one also means the other is also met.
#[task(binds=DMA1_STR4, spawn=[stream], resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=3)] #[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=2)]
fn process(c: process::Context) { fn process(c: process::Context) {
let adc_samples = [ let adc_samples = [
c.resources.adcs.0.acquire_buffer(), c.resources.adcs.0.acquire_buffer(),
@ -177,15 +177,6 @@ const APP: () = {
[DacCode(dac_samples[0][0]), DacCode(dac_samples[1][0])]; [DacCode(dac_samples[0][0]), DacCode(dac_samples[1][0])];
c.resources.telemetry.digital_inputs = digital_inputs; c.resources.telemetry.digital_inputs = digital_inputs;
// Make a best effort to start data stream processing. It may be blocked by someone else
// using the network stack.
c.spawn.stream().ok();
}
#[task(priority = 2, resources=[network])]
fn stream(c: stream::Context) {
c.resources.network.update_stream()
} }
#[idle(resources=[network], spawn=[settings_update])] #[idle(resources=[network], spawn=[settings_update])]
@ -196,7 +187,7 @@ const APP: () = {
c.spawn.settings_update().unwrap() c.spawn.settings_update().unwrap()
} }
NetworkState::Updated => {} NetworkState::Updated => {}
NetworkState::NoChange => cortex_m::asm::wfi(), NetworkState::NoChange => {},
} }
} }
} }
@ -204,8 +195,8 @@ const APP: () = {
#[task(priority = 1, resources=[network, afes, settings])] #[task(priority = 1, resources=[network, afes, settings])]
fn settings_update(mut c: settings_update::Context) { fn settings_update(mut c: settings_update::Context) {
// Update the IIR channels. // Update the IIR channels.
let settings = c.resources.network.lock(|net| net.miniconf.settings()); let settings = c.resources.network.miniconf.settings();
c.resources.settings.lock(|current| *current = settings); c.resources.settings.lock(|current| *current = *settings);
// Update AFEs // Update AFEs
c.resources.afes.0.set_gain(settings.afe[0]); c.resources.afes.0.set_gain(settings.afe[0]);
@ -222,8 +213,8 @@ const APP: () = {
.settings .settings
.lock(|settings| (settings.afe, settings.telemetry_period)); .lock(|settings| (settings.afe, settings.telemetry_period));
c.resources.network.lock(|net| net.telemetry c.resources.network.telemetry
.publish(&telemetry.finalize(gains[0], gains[1]))); .publish(&telemetry.finalize(gains[0], gains[1]));
// Schedule the telemetry task in the future. // Schedule the telemetry task in the future.
c.schedule c.schedule
@ -239,22 +230,22 @@ const APP: () = {
unsafe { stm32h7xx_hal::ethernet::interrupt_handler() } unsafe { stm32h7xx_hal::ethernet::interrupt_handler() }
} }
#[task(binds = SPI2, priority = 4)] #[task(binds = SPI2, priority = 3)]
fn spi2(_: spi2::Context) { fn spi2(_: spi2::Context) {
panic!("ADC0 input overrun"); panic!("ADC0 input overrun");
} }
#[task(binds = SPI3, priority = 4)] #[task(binds = SPI3, priority = 3)]
fn spi3(_: spi3::Context) { fn spi3(_: spi3::Context) {
panic!("ADC1 input overrun"); panic!("ADC1 input overrun");
} }
#[task(binds = SPI4, priority = 4)] #[task(binds = SPI4, priority = 3)]
fn spi4(_: spi4::Context) { fn spi4(_: spi4::Context) {
panic!("DAC0 output error"); panic!("DAC0 output error");
} }
#[task(binds = SPI5, priority = 4)] #[task(binds = SPI5, priority = 3)]
fn spi5(_: spi5::Context) { fn spi5(_: spi5::Context) {
panic!("DAC1 output error"); panic!("DAC1 output error");
} }

View File

@ -25,6 +25,7 @@ pub fn setup_streaming(
(generator, stream) (generator, stream)
} }
#[derive(Debug)]
pub struct AdcDacData { pub struct AdcDacData {
block_id: u32, block_id: u32,
adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2], adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
@ -57,8 +58,9 @@ impl BlockGenerator {
self.current_id = self.current_id.wrapping_add(1); self.current_id = self.current_id.wrapping_add(1);
// We perform best-effort enqueueing of the data block. // Note(unwrap): The buffering of the queue and processing of blocks must be fast enough
self.queue.enqueue(block).ok(); // such that blocks will never be silently dropped.
self.queue.enqueue(block).unwrap();
} }
} }
@ -173,6 +175,11 @@ impl DataStream {
if self.socket.is_none() && self.remote.is_some() { if self.socket.is_none() && self.remote.is_some() {
// If we still can't open the remote, continue. // If we still can't open the remote, continue.
if self.open(self.remote.unwrap()).is_err() { if self.open(self.remote.unwrap()).is_err() {
// Clear the queue out.
while self.queue.ready() {
self.queue.dequeue();
}
return false; return false;
} }
} }
@ -188,6 +195,9 @@ impl DataStream {
// TODO: Clean up magic numbers. // TODO: Clean up magic numbers.
if capacity < 72 { if capacity < 72 {
// We cannot send a full data block. Abort now. // We cannot send a full data block. Abort now.
while self.queue.ready() {
self.queue.dequeue();
}
return false; return false;
} }

View File

@ -156,7 +156,7 @@ where
} }
/// Get the current settings from miniconf. /// Get the current settings from miniconf.
pub fn settings(&self) -> S { pub fn settings(&self) -> &S {
self.settings.clone() &self.settings
} }
} }

View File

@ -124,41 +124,34 @@ where
} }
} }
pub fn update_stream(&mut self) {
// Update the data stream.
if self.generator.is_none() {
loop {
// Process egress of the stack.
self.processor.egress();
if !self.stream.process() {
return
}
}
}
}
/// Update and process all of the network users state. /// Update and process all of the network users state.
/// ///
/// # Returns /// # Returns
/// An indication if any of the network users indicated a state change. /// An indication if any of the network users indicated a state change.
pub fn update(&mut self) -> NetworkState { pub fn update(&mut self) -> NetworkState {
super::debug::high();
// Update the MQTT clients.
self.telemetry.update();
// Update the data stream.
if self.generator.is_none() {
while self.stream.process() {}
}
// Poll for incoming data. // Poll for incoming data.
let poll_result = match self.processor.update() { let poll_result = match self.processor.update() {
UpdateState::NoChange => NetworkState::NoChange, UpdateState::NoChange => NetworkState::NoChange,
UpdateState::Updated => NetworkState::Updated, UpdateState::Updated => NetworkState::Updated,
}; };
// Update the MQTT clients. let result = match self.miniconf.update() {
self.telemetry.update();
// Update the data stream.
self.update_stream();
match self.miniconf.update() {
UpdateState::Updated => NetworkState::SettingsChanged, UpdateState::Updated => NetworkState::SettingsChanged,
UpdateState::NoChange => poll_result, UpdateState::NoChange => poll_result,
} };
super::debug::low();
result
} }
} }

View File

@ -65,17 +65,19 @@ impl NetworkProcessor {
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network // If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
// stack. // stack.
match self.phy.poll_link() { // TODO: Poll the link state in a task and handle resets. Polling this often is slow and
true => self.network_was_reset = false, // uses necessary CPU time.
//match self.phy.poll_link() {
// true => self.network_was_reset = false,
// Only reset the network stack once per link reconnection. This prevents us from // // Only reset the network stack once per link reconnection. This prevents us from
// sending an excessive number of DHCP requests. // // sending an excessive number of DHCP requests.
false if !self.network_was_reset => { // false if !self.network_was_reset => {
self.network_was_reset = true; // self.network_was_reset = true;
self.stack.lock(|stack| stack.handle_link_reset()); // self.stack.lock(|stack| stack.handle_link_reset());
} // }
_ => {} // _ => {}
}; //};
result result
} }