Adding initial streaming implementation

This commit is contained in:
Ryan Summers 2021-05-17 12:43:04 +02:00
parent d8cc3c74d9
commit 21ca8e1c8f
5 changed files with 235 additions and 4 deletions

18
Cargo.lock generated
View File

@ -548,6 +548,23 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf547ad0c65e31259204bd90935776d1c693cec2f4ff7abb7a1bbbd40dfe58"
[[package]]
name = "postcard"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "66acf3cf8ab62785852a0f67ae3bfcd38324da6561e52e7f4a049ca555c6d55e"
dependencies = [
"heapless 0.6.1",
"postcard-cobs",
"serde",
]
[[package]]
name = "postcard-cobs"
version = "0.1.5-pre"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c68cb38ed13fd7bc9dd5db8f165b7c8d9c1a315104083a2b10f11354c2af97f"
[[package]]
name = "ppv-lite86"
version = "0.2.10"
@ -772,6 +789,7 @@ dependencies = [
"nb 1.0.0",
"num_enum",
"paste",
"postcard",
"rtt-logger",
"rtt-target",
"serde",

View File

@ -45,6 +45,7 @@ ad9959 = { path = "ad9959" }
miniconf = "0.1.0"
shared-bus = {version = "0.2.2", features = ["cortex-m"] }
serde-json-core = "0.3"
postcard = "0.6"
[dependencies.rtt-logger]
git = "https://github.com/quartiq/rtt-logger.git"

View File

@ -13,7 +13,9 @@ use hardware::{
DigitalInput0, DigitalInput1, InputPin, SystemTimer, AFE0, AFE1,
};
use net::{NetworkUsers, Telemetry, TelemetryBuffer, UpdateState};
use net::{
BlockGenerator, NetworkUsers, Telemetry, TelemetryBuffer, UpdateState,
};
const SCALE: f32 = i16::MAX as _;
@ -58,6 +60,7 @@ const APP: () = {
adcs: (Adc0Input, Adc1Input),
dacs: (Dac0Output, Dac1Output),
network: NetworkUsers<Settings, Telemetry>,
generator: BlockGenerator,
settings: Settings,
telemetry: TelemetryBuffer,
@ -71,7 +74,7 @@ const APP: () = {
// Configure the microcontroller
let (mut stabilizer, _pounder) = hardware::setup(c.core, c.device);
let network = NetworkUsers::new(
let mut network = NetworkUsers::new(
stabilizer.net.stack,
stabilizer.net.phy,
stabilizer.cycle_counter,
@ -79,6 +82,11 @@ const APP: () = {
stabilizer.net.mac_address,
);
// TODO: Remove unwrap.
let remote: smoltcp_nal::embedded_nal::SocketAddr =
"10.35.16.10:1111".parse().unwrap();
let generator = network.enable_streaming(remote.into());
// Spawn a settings update for default settings.
c.spawn.settings_update().unwrap();
c.spawn.telemetry().unwrap();
@ -96,6 +104,7 @@ const APP: () = {
afes: stabilizer.afes,
adcs: stabilizer.adcs,
dacs: stabilizer.dacs,
generator,
network,
digital_inputs: stabilizer.digital_inputs,
telemetry: net::TelemetryBuffer::default(),
@ -119,7 +128,7 @@ const APP: () = {
///
/// Because the ADC and DAC operate at the same rate, these two constraints actually implement
/// the same time bounds, meeting one also means the other is also met.
#[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry], priority=2)]
#[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=2)]
fn process(c: process::Context) {
let adc_samples = [
c.resources.adcs.0.acquire_buffer(),
@ -157,6 +166,9 @@ const APP: () = {
}
}
// Stream the data.
c.resources.generator.send(&adc_samples, &dac_samples);
// Update telemetry measurements.
c.resources.telemetry.adcs =
[AdcCode(adc_samples[0][0]), AdcCode(adc_samples[1][0])];

171
src/net/data_stream.rs Normal file
View File

@ -0,0 +1,171 @@
use core::borrow::BorrowMut;
use heapless::{
spsc::{Consumer, Producer, Queue},
Vec,
};
use serde::Serialize;
use smoltcp_nal::embedded_nal::{Mode, SocketAddr, TcpStack};
use super::NetworkReference;
use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE;
// The number of data blocks that we will buffer in the queue.
type BlockBufferSize = heapless::consts::U10;
pub fn setup_streaming(
stack: NetworkReference,
) -> (BlockGenerator, DataStream) {
let queue = cortex_m::singleton!(: Queue<AdcDacData, BlockBufferSize> = Queue::new()).unwrap();
let (producer, consumer) = queue.split();
let generator = BlockGenerator::new(producer);
let stream = DataStream::new(stack, consumer);
(generator, stream)
}
pub struct AdcDacData {
block_id: u32,
adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
}
pub struct BlockGenerator {
queue: Producer<'static, AdcDacData, BlockBufferSize>,
current_id: u32,
}
impl BlockGenerator {
pub fn new(queue: Producer<'static, AdcDacData, BlockBufferSize>) -> Self {
Self {
queue,
current_id: 0,
}
}
pub fn send(
&mut self,
adcs: &[&[u16; SAMPLE_BUFFER_SIZE]; 2],
dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2],
) {
let block = AdcDacData {
block_id: self.current_id,
adcs: [*adcs[0], *adcs[1]],
dacs: [*dacs[0], *dacs[1]],
};
self.current_id = self.current_id.wrapping_add(1);
// We perform best-effort enqueueing of the data block.
self.queue.enqueue(block).ok();
}
}
pub struct DataStream {
stack: NetworkReference,
socket: Option<<NetworkReference as TcpStack>::TcpSocket>,
queue: Consumer<'static, AdcDacData, BlockBufferSize>,
current_index: u32,
remote: Option<SocketAddr>,
}
#[derive(Serialize)]
struct DataBlock {
block_id: u32,
block_size: usize,
adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
}
impl DataStream {
pub fn new(
stack: NetworkReference,
consumer: Consumer<'static, AdcDacData, BlockBufferSize>,
) -> Self {
Self {
stack,
socket: None,
current_index: 0,
remote: None,
queue: consumer,
}
}
fn open(&mut self, remote: SocketAddr) -> Result<(), ()> {
if self.socket.is_some() {
// Note(unwrap): We guarantee that the socket is available above.
let socket = self.socket.take().unwrap();
self.stack.close(socket).unwrap();
}
let socket =
self.stack
.open(Mode::NonBlocking)
.map_err(|err| match err {
<NetworkReference as TcpStack>::Error::NoIpAddress => (),
other => {
log::info!("Network Error: {:?}", other);
()
}
})?;
// TODO: How should we handle a connection failure?
let socket = self.stack.connect(socket, remote).unwrap();
// Note(unwrap): The socket will be empty before we replace it.
self.socket.replace(socket);
Ok(())
}
pub fn set_remote(&mut self, remote: SocketAddr) {
// If the remote is identical to what we already have, do nothing.
if let Some(current_remote) = self.remote {
if current_remote == remote {
return;
}
}
// Open the new remote connection.
self.open(remote).ok();
self.remote = Some(remote);
}
pub fn process(&mut self) {
while let Some(data) = self.queue.dequeue() {
// If there's no socket available, try to connect to our remote.
if self.socket.is_none() && self.remote.is_some() {
// If we still can't open the remote, continue.
if self.open(self.remote.unwrap()).is_err() {
continue;
}
}
let block = DataBlock {
adcs: data.adcs,
dacs: data.dacs,
block_id: data.block_id,
block_size: SAMPLE_BUFFER_SIZE,
};
// Increment the current block index.
self.current_index = self.current_index.wrapping_add(1);
// Serialize the datablock.
// TODO: Do we want to packetize the data block as well?
let data: Vec<u8, heapless::consts::U256> =
postcard::to_vec(&block).unwrap();
let mut socket = self.socket.borrow_mut().unwrap();
// Transmit the data block.
// TODO: How should we handle partial packet transmission?
match self.stack.write(&mut socket, &data) {
Ok(len) => assert!(len == data.len()),
_ => info!("Dropping packet"),
}
}
}
}

View File

@ -11,18 +11,23 @@ use serde::Serialize;
use core::fmt::Write;
mod data_stream;
mod messages;
mod miniconf_client;
mod network_processor;
mod shared;
mod telemetry;
pub use data_stream::BlockGenerator;
use crate::hardware::{CycleCounter, EthernetPhy, NetworkStack};
use data_stream::DataStream;
use messages::{MqttMessage, SettingsResponse};
pub use miniconf_client::MiniconfClient;
pub use network_processor::NetworkProcessor;
pub use shared::NetworkManager;
use smoltcp_nal::embedded_nal::SocketAddr;
pub use telemetry::{Telemetry, TelemetryBuffer, TelemetryClient};
pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>;
@ -36,7 +41,9 @@ pub enum UpdateState {
/// A structure of Stabilizer's default network users.
pub struct NetworkUsers<S: Default + Clone + Miniconf, T: Serialize> {
pub miniconf: MiniconfClient<S>,
pub processor: NetworkProcessor,
processor: NetworkProcessor,
stream: DataStream,
generator: Option<BlockGenerator>,
pub telemetry: TelemetryClient<T>,
}
@ -87,10 +94,27 @@ where
&prefix,
);
let (generator, stream) =
data_stream::setup_streaming(stack_manager.acquire_stack());
NetworkUsers {
miniconf: settings,
processor,
telemetry,
stream,
generator: Some(generator),
}
}
/// Enable live data streaming.
pub fn enable_streaming(&mut self, remote: SocketAddr) -> BlockGenerator {
self.stream.set_remote(remote);
self.generator.take().unwrap()
}
pub fn direct_stream(&mut self, remote: SocketAddr) {
if self.generator.is_none() {
self.stream.set_remote(remote);
}
}
@ -105,6 +129,11 @@ where
// Update the MQTT clients.
self.telemetry.update();
// Update the data stream.
if self.generator.is_none() {
self.stream.process();
}
match self.miniconf.update() {
UpdateState::Updated => UpdateState::Updated,
UpdateState::NoChange => poll_result,