Merge #380
380: Adding data livestream capability r=ryan-summers a=ryan-summers This PR implements livestream capability for ADC and DAC samples. The current implementation provides a static configuration. **TODO** - [x] Clean up documentation - [x] Implement streaming for `lockin` app - [x] Test new block-packetization scheme - [x] Add reference python stream receiver - [x] Merge #381 first Co-authored-by: Ryan Summers <ryan.summers@vertigo-designs.com>
This commit is contained in:
commit
8c9842f3cf
|
@ -38,7 +38,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "b9a69a963b70ddacfcd382524f72a4576f359af9334b3bf48a79566590bb8bfa"
|
checksum = "b9a69a963b70ddacfcd382524f72a4576f359af9334b3bf48a79566590bb8bfa"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitrate",
|
"bitrate",
|
||||||
"cortex-m 0.7.2",
|
"cortex-m 0.6.7",
|
||||||
"embedded-hal",
|
"embedded-hal",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -389,9 +389,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "managed"
|
name = "managed"
|
||||||
version = "0.7.2"
|
version = "0.8.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c75de51135344a4f8ed3cfe2720dc27736f7711989703a0b43aadf3753c55577"
|
checksum = "0ca88d725a0a943b096803bd34e73a4437208b6077654cc4ecb2947a5f91618d"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "matrixmultiply"
|
name = "matrixmultiply"
|
||||||
|
@ -455,9 +455,9 @@ checksum = "546c37ac5d9e56f55e73b677106873d9d9f5190605e41a856503623648488cae"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ndarray"
|
name = "ndarray"
|
||||||
version = "0.15.2"
|
version = "0.15.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "02b2e4807aaa21dc6dcc3417e5902dc199c3648043bf27b7af4b202332fe4760"
|
checksum = "cc1372704f14bb132a49a6701c2238970a359ee0829fed481b522a63bf25456a"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"matrixmultiply",
|
"matrixmultiply",
|
||||||
"num-complex",
|
"num-complex",
|
||||||
|
@ -743,9 +743,8 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "smoltcp"
|
name = "smoltcp"
|
||||||
version = "0.7.3"
|
version = "0.8.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/smoltcp-rs/smoltcp?branch=master#027f255f904b9b7c4226cfd8b2d31f272ffa5105"
|
||||||
checksum = "11b5647cc4676e9358e6b15b6536b34e5b413e5ae946a06b3f85e713132bcdfa"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bitflags",
|
"bitflags",
|
||||||
"byteorder",
|
"byteorder",
|
||||||
|
@ -755,7 +754,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "smoltcp-nal"
|
name = "smoltcp-nal"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/quartiq/smoltcp-nal.git?rev=5e56576#5e56576fbbc594f0a0e1df9222a20eb35c8e7511"
|
source = "git+https://github.com/quartiq/smoltcp-nal.git?rev=5baf55f#5baf55fafbfe2c08d9fe56c836171e9d2fb468e8"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"embedded-nal",
|
"embedded-nal",
|
||||||
"heapless 0.7.1",
|
"heapless 0.7.1",
|
||||||
|
@ -804,7 +803,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "8b672c837e0ee8158ecc7fce0f9a948dd0693a9c588338e728d14b73307a0b7d"
|
checksum = "8b672c837e0ee8158ecc7fce0f9a948dd0693a9c588338e728d14b73307a0b7d"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bare-metal 0.2.5",
|
"bare-metal 0.2.5",
|
||||||
"cortex-m 0.7.2",
|
"cortex-m 0.6.7",
|
||||||
"cortex-m-rt",
|
"cortex-m-rt",
|
||||||
"vcell",
|
"vcell",
|
||||||
]
|
]
|
||||||
|
@ -812,7 +811,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "stm32h7xx-hal"
|
name = "stm32h7xx-hal"
|
||||||
version = "0.9.0"
|
version = "0.9.0"
|
||||||
source = "git+https://github.com/quartiq/stm32h7xx-hal.git?rev=acd47be#acd47beb4b84b4dc46da3a8b68688bc8c5984604"
|
source = "git+https://github.com/quartiq/stm32h7xx-hal.git?rev=33aa67d#33aa67d74790cb9f680a4f281b72df0664bcf03c"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bare-metal 1.0.0",
|
"bare-metal 1.0.0",
|
||||||
"cast",
|
"cast",
|
||||||
|
|
|
@ -60,7 +60,7 @@ rev = "70b0eb5"
|
||||||
features = ["stm32h743v", "rt", "unproven", "ethernet", "quadspi"]
|
features = ["stm32h743v", "rt", "unproven", "ethernet", "quadspi"]
|
||||||
# version = "0.9.0"
|
# version = "0.9.0"
|
||||||
git = "https://github.com/quartiq/stm32h7xx-hal.git"
|
git = "https://github.com/quartiq/stm32h7xx-hal.git"
|
||||||
rev = "acd47be"
|
rev = "33aa67d"
|
||||||
|
|
||||||
# link.x section start/end
|
# link.x section start/end
|
||||||
[patch.crates-io.cortex-m-rt]
|
[patch.crates-io.cortex-m-rt]
|
||||||
|
@ -73,7 +73,7 @@ rev = "2750533"
|
||||||
|
|
||||||
[dependencies.smoltcp-nal]
|
[dependencies.smoltcp-nal]
|
||||||
git = "https://github.com/quartiq/smoltcp-nal.git"
|
git = "https://github.com/quartiq/smoltcp-nal.git"
|
||||||
rev = "5e56576"
|
rev = "5baf55f"
|
||||||
|
|
||||||
[dependencies.minimq]
|
[dependencies.minimq]
|
||||||
git = "https://github.com/quartiq/minimq.git"
|
git = "https://github.com/quartiq/minimq.git"
|
||||||
|
|
|
@ -0,0 +1,189 @@
|
||||||
|
#!/usr/bin/python3
|
||||||
|
"""
|
||||||
|
Author: Ryan Summers
|
||||||
|
|
||||||
|
Description: Provides a mechanism for measuring Stabilizer stream data throughput.
|
||||||
|
"""
|
||||||
|
import socket
|
||||||
|
import collections
|
||||||
|
import struct
|
||||||
|
import time
|
||||||
|
import logging
|
||||||
|
|
||||||
|
# Representation of a single UDP packet transmitted by Stabilizer.
|
||||||
|
Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac'])
|
||||||
|
|
||||||
|
class Timer:
|
||||||
|
""" A basic timer for measuring elapsed time periods. """
|
||||||
|
|
||||||
|
def __init__(self, period=1.0):
|
||||||
|
""" Create the timer with the provided period. """
|
||||||
|
self.start_time = time.time()
|
||||||
|
self.trigger_time = self.start_time + period
|
||||||
|
self.period = period
|
||||||
|
self.started = False
|
||||||
|
|
||||||
|
|
||||||
|
def is_triggered(self):
|
||||||
|
""" Check if the timer period has elapsed. """
|
||||||
|
now = time.time()
|
||||||
|
return now >= self.trigger_time
|
||||||
|
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
""" Start the timer. """
|
||||||
|
self.start_time = time.time()
|
||||||
|
self.started = True
|
||||||
|
|
||||||
|
|
||||||
|
def is_started(self):
|
||||||
|
""" Check if the timer has started. """
|
||||||
|
return self.started
|
||||||
|
|
||||||
|
|
||||||
|
def arm(self):
|
||||||
|
""" Arm the timer trigger. """
|
||||||
|
self.trigger_time = time.time() + self.period
|
||||||
|
|
||||||
|
|
||||||
|
def elapsed(self):
|
||||||
|
""" Get the elapsed time since the timer was started. """
|
||||||
|
now = time.time()
|
||||||
|
return now - self.start_time
|
||||||
|
|
||||||
|
|
||||||
|
class PacketParser:
|
||||||
|
""" Utilize class used for parsing received UDP data. """
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
""" Initialize the parser. """
|
||||||
|
self.buf = b''
|
||||||
|
self.total_bytes = 0
|
||||||
|
|
||||||
|
|
||||||
|
def ingress(self, data):
|
||||||
|
""" Ingress received UDP data. """
|
||||||
|
self.total_bytes += len(data)
|
||||||
|
self.buf += data
|
||||||
|
|
||||||
|
|
||||||
|
def parse_all_packets(self):
|
||||||
|
""" Parse all received packets from the receive buffer.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A list of received Packets.
|
||||||
|
"""
|
||||||
|
packets = []
|
||||||
|
while True:
|
||||||
|
new_packets = self._parse()
|
||||||
|
if new_packets:
|
||||||
|
packets += new_packets
|
||||||
|
else:
|
||||||
|
return packets
|
||||||
|
|
||||||
|
|
||||||
|
def _parse(self):
|
||||||
|
""" Attempt to parse packets from the received buffer. """
|
||||||
|
# Attempt to parse a block from the buffer.
|
||||||
|
if len(self.buf) < 4:
|
||||||
|
return None
|
||||||
|
|
||||||
|
start_id, num_blocks, data_size = struct.unpack_from('!HBB', self.buf)
|
||||||
|
|
||||||
|
packet_size = 4 + data_size * num_blocks * 8
|
||||||
|
|
||||||
|
if len(self.buf) < packet_size:
|
||||||
|
return None
|
||||||
|
|
||||||
|
self.buf = self.buf[4:]
|
||||||
|
|
||||||
|
packets = []
|
||||||
|
for offset in range(num_blocks):
|
||||||
|
adcs_dacs = struct.unpack_from(f'!{4 * data_size}H', self.buf)
|
||||||
|
adc = [
|
||||||
|
adcs_dacs[0:data_size],
|
||||||
|
adcs_dacs[data_size:2*data_size],
|
||||||
|
]
|
||||||
|
|
||||||
|
dac = [
|
||||||
|
adcs_dacs[2*data_size: 3*data_size],
|
||||||
|
adcs_dacs[3*data_size:],
|
||||||
|
]
|
||||||
|
|
||||||
|
self.buf = self.buf[8*data_size:]
|
||||||
|
packets.append(Packet(start_id + offset, adc, dac))
|
||||||
|
|
||||||
|
return packets
|
||||||
|
|
||||||
|
|
||||||
|
def check_index(previous_index, next_index):
|
||||||
|
""" Check if two indices are sequential. """
|
||||||
|
if previous_index == -1:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Handle index roll-over. Indices are only stored in 16-bit numbers.
|
||||||
|
if next_index < previous_index:
|
||||||
|
next_index += 65536
|
||||||
|
|
||||||
|
expected_index = previous_index + 1
|
||||||
|
|
||||||
|
return next_index == expected_index
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
""" Main program. """
|
||||||
|
connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
connection.bind(("", 1111))
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO,
|
||||||
|
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s')
|
||||||
|
|
||||||
|
last_index = -1
|
||||||
|
|
||||||
|
drop_count = 0
|
||||||
|
good_blocks = 0
|
||||||
|
|
||||||
|
timer = Timer()
|
||||||
|
parser = PacketParser()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# Receive any data over UDP and parse it.
|
||||||
|
data = connection.recv(4096)
|
||||||
|
if data and not timer.is_started():
|
||||||
|
timer.start()
|
||||||
|
|
||||||
|
parser.ingress(data)
|
||||||
|
|
||||||
|
# Handle any received packets.
|
||||||
|
for packet in parser.parse_all_packets():
|
||||||
|
|
||||||
|
# Handle any dropped packets.
|
||||||
|
if not check_index(last_index, packet.index):
|
||||||
|
print(hex(last_index), hex(packet.index))
|
||||||
|
if packet.index < (last_index + 1):
|
||||||
|
dropped = packet.index + 65536 - (last_index + 1)
|
||||||
|
else:
|
||||||
|
dropped = packet.index - (last_index + 1)
|
||||||
|
|
||||||
|
drop_count += dropped
|
||||||
|
|
||||||
|
last_index = packet.index
|
||||||
|
good_blocks += 1
|
||||||
|
|
||||||
|
# Report the throughput periodically.
|
||||||
|
if timer.is_triggered():
|
||||||
|
drate = parser.total_bytes * 8 / 1e6 / timer.elapsed()
|
||||||
|
|
||||||
|
print(f'''
|
||||||
|
Data Rate: {drate:.3f} Mbps
|
||||||
|
Received Blocks: {good_blocks}
|
||||||
|
Dropped blocks: {drop_count}
|
||||||
|
|
||||||
|
Metadata: {parser.total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s
|
||||||
|
----
|
||||||
|
''')
|
||||||
|
timer.arm()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
|
@ -18,6 +18,7 @@ use stabilizer::{
|
||||||
DigitalInput0, DigitalInput1, AFE0, AFE1,
|
DigitalInput0, DigitalInput1, AFE0, AFE1,
|
||||||
},
|
},
|
||||||
net::{
|
net::{
|
||||||
|
data_stream::{BlockGenerator, StreamTarget},
|
||||||
miniconf::Miniconf,
|
miniconf::Miniconf,
|
||||||
serde::Deserialize,
|
serde::Deserialize,
|
||||||
telemetry::{Telemetry, TelemetryBuffer},
|
telemetry::{Telemetry, TelemetryBuffer},
|
||||||
|
@ -37,6 +38,7 @@ pub struct Settings {
|
||||||
allow_hold: bool,
|
allow_hold: bool,
|
||||||
force_hold: bool,
|
force_hold: bool,
|
||||||
telemetry_period: u16,
|
telemetry_period: u16,
|
||||||
|
stream_target: StreamTarget,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Settings {
|
impl Default for Settings {
|
||||||
|
@ -56,6 +58,8 @@ impl Default for Settings {
|
||||||
force_hold: false,
|
force_hold: false,
|
||||||
// The default telemetry period in seconds.
|
// The default telemetry period in seconds.
|
||||||
telemetry_period: 10,
|
telemetry_period: 10,
|
||||||
|
|
||||||
|
stream_target: StreamTarget::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -68,6 +72,7 @@ const APP: () = {
|
||||||
adcs: (Adc0Input, Adc1Input),
|
adcs: (Adc0Input, Adc1Input),
|
||||||
dacs: (Dac0Output, Dac1Output),
|
dacs: (Dac0Output, Dac1Output),
|
||||||
network: NetworkUsers<Settings, Telemetry>,
|
network: NetworkUsers<Settings, Telemetry>,
|
||||||
|
generator: BlockGenerator,
|
||||||
|
|
||||||
settings: Settings,
|
settings: Settings,
|
||||||
telemetry: TelemetryBuffer,
|
telemetry: TelemetryBuffer,
|
||||||
|
@ -76,13 +81,13 @@ const APP: () = {
|
||||||
iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2],
|
iir_state: [[iir::Vec5; IIR_CASCADE_LENGTH]; 2],
|
||||||
}
|
}
|
||||||
|
|
||||||
#[init(spawn=[telemetry, settings_update])]
|
#[init(spawn=[telemetry, settings_update, ethernet_link])]
|
||||||
fn init(c: init::Context) -> init::LateResources {
|
fn init(c: init::Context) -> init::LateResources {
|
||||||
// Configure the microcontroller
|
// Configure the microcontroller
|
||||||
let (mut stabilizer, _pounder) =
|
let (mut stabilizer, _pounder) =
|
||||||
hardware::setup::setup(c.core, c.device);
|
hardware::setup::setup(c.core, c.device);
|
||||||
|
|
||||||
let network = NetworkUsers::new(
|
let mut network = NetworkUsers::new(
|
||||||
stabilizer.net.stack,
|
stabilizer.net.stack,
|
||||||
stabilizer.net.phy,
|
stabilizer.net.phy,
|
||||||
stabilizer.cycle_counter,
|
stabilizer.cycle_counter,
|
||||||
|
@ -90,10 +95,15 @@ const APP: () = {
|
||||||
stabilizer.net.mac_address,
|
stabilizer.net.mac_address,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let generator = network.enable_streaming();
|
||||||
|
|
||||||
// Spawn a settings update for default settings.
|
// Spawn a settings update for default settings.
|
||||||
c.spawn.settings_update().unwrap();
|
c.spawn.settings_update().unwrap();
|
||||||
c.spawn.telemetry().unwrap();
|
c.spawn.telemetry().unwrap();
|
||||||
|
|
||||||
|
// Spawn the ethernet link period check task.
|
||||||
|
c.spawn.ethernet_link().unwrap();
|
||||||
|
|
||||||
// Enable ADC/DAC events
|
// Enable ADC/DAC events
|
||||||
stabilizer.adcs.0.start();
|
stabilizer.adcs.0.start();
|
||||||
stabilizer.adcs.1.start();
|
stabilizer.adcs.1.start();
|
||||||
|
@ -107,6 +117,7 @@ const APP: () = {
|
||||||
afes: stabilizer.afes,
|
afes: stabilizer.afes,
|
||||||
adcs: stabilizer.adcs,
|
adcs: stabilizer.adcs,
|
||||||
dacs: stabilizer.dacs,
|
dacs: stabilizer.dacs,
|
||||||
|
generator,
|
||||||
network,
|
network,
|
||||||
digital_inputs: stabilizer.digital_inputs,
|
digital_inputs: stabilizer.digital_inputs,
|
||||||
telemetry: TelemetryBuffer::default(),
|
telemetry: TelemetryBuffer::default(),
|
||||||
|
@ -130,7 +141,7 @@ const APP: () = {
|
||||||
///
|
///
|
||||||
/// Because the ADC and DAC operate at the same rate, these two constraints actually implement
|
/// Because the ADC and DAC operate at the same rate, these two constraints actually implement
|
||||||
/// the same time bounds, meeting one also means the other is also met.
|
/// the same time bounds, meeting one also means the other is also met.
|
||||||
#[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry], priority=2)]
|
#[task(binds=DMA1_STR4, resources=[adcs, digital_inputs, dacs, iir_state, settings, telemetry, generator], priority=2)]
|
||||||
#[inline(never)]
|
#[inline(never)]
|
||||||
#[link_section = ".itcm.process"]
|
#[link_section = ".itcm.process"]
|
||||||
fn process(mut c: process::Context) {
|
fn process(mut c: process::Context) {
|
||||||
|
@ -141,6 +152,7 @@ const APP: () = {
|
||||||
ref settings,
|
ref settings,
|
||||||
ref mut iir_state,
|
ref mut iir_state,
|
||||||
ref mut telemetry,
|
ref mut telemetry,
|
||||||
|
ref mut generator,
|
||||||
} = c.resources;
|
} = c.resources;
|
||||||
|
|
||||||
let digital_inputs = [
|
let digital_inputs = [
|
||||||
|
@ -180,6 +192,9 @@ const APP: () = {
|
||||||
.last();
|
.last();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stream the data.
|
||||||
|
generator.send(&adc_samples, &dac_samples);
|
||||||
|
|
||||||
// Update telemetry measurements.
|
// Update telemetry measurements.
|
||||||
telemetry.adcs =
|
telemetry.adcs =
|
||||||
[AdcCode(adc_samples[0][0]), AdcCode(adc_samples[1][0])];
|
[AdcCode(adc_samples[0][0]), AdcCode(adc_samples[1][0])];
|
||||||
|
@ -214,6 +229,9 @@ const APP: () = {
|
||||||
// Update AFEs
|
// Update AFEs
|
||||||
c.resources.afes.0.set_gain(settings.afe[0]);
|
c.resources.afes.0.set_gain(settings.afe[0]);
|
||||||
c.resources.afes.1.set_gain(settings.afe[1]);
|
c.resources.afes.1.set_gain(settings.afe[1]);
|
||||||
|
|
||||||
|
let target = settings.stream_target.into();
|
||||||
|
c.resources.network.direct_stream(target);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[task(priority = 1, resources=[network, settings, telemetry], schedule=[telemetry])]
|
#[task(priority = 1, resources=[network, settings, telemetry], schedule=[telemetry])]
|
||||||
|
@ -240,6 +258,14 @@ const APP: () = {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[task(priority = 1, resources=[network], schedule=[ethernet_link])]
|
||||||
|
fn ethernet_link(c: ethernet_link::Context) {
|
||||||
|
c.resources.network.processor.handle_link();
|
||||||
|
c.schedule
|
||||||
|
.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[task(binds = ETH, priority = 1)]
|
#[task(binds = ETH, priority = 1)]
|
||||||
fn eth(_: eth::Context) {
|
fn eth(_: eth::Context) {
|
||||||
unsafe { hal::ethernet::interrupt_handler() }
|
unsafe { hal::ethernet::interrupt_handler() }
|
||||||
|
|
|
@ -20,6 +20,7 @@ use stabilizer::{
|
||||||
DigitalInput0, DigitalInput1, AFE0, AFE1,
|
DigitalInput0, DigitalInput1, AFE0, AFE1,
|
||||||
},
|
},
|
||||||
net::{
|
net::{
|
||||||
|
data_stream::{BlockGenerator, StreamTarget},
|
||||||
miniconf::Miniconf,
|
miniconf::Miniconf,
|
||||||
serde::Deserialize,
|
serde::Deserialize,
|
||||||
telemetry::{Telemetry, TelemetryBuffer},
|
telemetry::{Telemetry, TelemetryBuffer},
|
||||||
|
@ -64,6 +65,8 @@ pub struct Settings {
|
||||||
|
|
||||||
output_conf: [Conf; 2],
|
output_conf: [Conf; 2],
|
||||||
telemetry_period: u16,
|
telemetry_period: u16,
|
||||||
|
|
||||||
|
stream_target: StreamTarget,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for Settings {
|
impl Default for Settings {
|
||||||
|
@ -82,6 +85,8 @@ impl Default for Settings {
|
||||||
output_conf: [Conf::InPhase, Conf::Quadrature],
|
output_conf: [Conf::InPhase, Conf::Quadrature],
|
||||||
// The default telemetry period in seconds.
|
// The default telemetry period in seconds.
|
||||||
telemetry_period: 10,
|
telemetry_period: 10,
|
||||||
|
|
||||||
|
stream_target: StreamTarget::default(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -96,19 +101,20 @@ const APP: () = {
|
||||||
settings: Settings,
|
settings: Settings,
|
||||||
telemetry: TelemetryBuffer,
|
telemetry: TelemetryBuffer,
|
||||||
digital_inputs: (DigitalInput0, DigitalInput1),
|
digital_inputs: (DigitalInput0, DigitalInput1),
|
||||||
|
generator: BlockGenerator,
|
||||||
|
|
||||||
timestamper: InputStamper,
|
timestamper: InputStamper,
|
||||||
pll: RPLL,
|
pll: RPLL,
|
||||||
lockin: Lockin<4>,
|
lockin: Lockin<4>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[init(spawn=[settings_update, telemetry])]
|
#[init(spawn=[settings_update, telemetry, ethernet_link])]
|
||||||
fn init(c: init::Context) -> init::LateResources {
|
fn init(c: init::Context) -> init::LateResources {
|
||||||
// Configure the microcontroller
|
// Configure the microcontroller
|
||||||
let (mut stabilizer, _pounder) =
|
let (mut stabilizer, _pounder) =
|
||||||
hardware::setup::setup(c.core, c.device);
|
hardware::setup::setup(c.core, c.device);
|
||||||
|
|
||||||
let network = NetworkUsers::new(
|
let mut network = NetworkUsers::new(
|
||||||
stabilizer.net.stack,
|
stabilizer.net.stack,
|
||||||
stabilizer.net.phy,
|
stabilizer.net.phy,
|
||||||
stabilizer.cycle_counter,
|
stabilizer.cycle_counter,
|
||||||
|
@ -116,6 +122,8 @@ const APP: () = {
|
||||||
stabilizer.net.mac_address,
|
stabilizer.net.mac_address,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let generator = network.enable_streaming();
|
||||||
|
|
||||||
let settings = Settings::default();
|
let settings = Settings::default();
|
||||||
|
|
||||||
let pll = RPLL::new(
|
let pll = RPLL::new(
|
||||||
|
@ -127,6 +135,9 @@ const APP: () = {
|
||||||
c.spawn.settings_update().unwrap();
|
c.spawn.settings_update().unwrap();
|
||||||
c.spawn.telemetry().unwrap();
|
c.spawn.telemetry().unwrap();
|
||||||
|
|
||||||
|
// Spawn the ethernet link servicing task.
|
||||||
|
c.spawn.ethernet_link().unwrap();
|
||||||
|
|
||||||
// Enable ADC/DAC events
|
// Enable ADC/DAC events
|
||||||
stabilizer.adcs.0.start();
|
stabilizer.adcs.0.start();
|
||||||
stabilizer.adcs.1.start();
|
stabilizer.adcs.1.start();
|
||||||
|
@ -152,6 +163,7 @@ const APP: () = {
|
||||||
telemetry: TelemetryBuffer::default(),
|
telemetry: TelemetryBuffer::default(),
|
||||||
|
|
||||||
settings,
|
settings,
|
||||||
|
generator,
|
||||||
|
|
||||||
pll,
|
pll,
|
||||||
lockin: Lockin::default(),
|
lockin: Lockin::default(),
|
||||||
|
@ -165,7 +177,7 @@ const APP: () = {
|
||||||
/// This is an implementation of a externally (DI0) referenced PLL lockin on the ADC0 signal.
|
/// This is an implementation of a externally (DI0) referenced PLL lockin on the ADC0 signal.
|
||||||
/// It outputs either I/Q or power/phase on DAC0/DAC1. Data is normalized to full scale.
|
/// It outputs either I/Q or power/phase on DAC0/DAC1. Data is normalized to full scale.
|
||||||
/// PLL bandwidth, filter bandwidth, slope, and x/y or power/phase post-filters are available.
|
/// PLL bandwidth, filter bandwidth, slope, and x/y or power/phase post-filters are available.
|
||||||
#[task(binds=DMA1_STR4, resources=[adcs, dacs, lockin, timestamper, pll, settings, telemetry], priority=2)]
|
#[task(binds=DMA1_STR4, resources=[adcs, dacs, lockin, timestamper, pll, settings, telemetry, generator], priority=2)]
|
||||||
#[inline(never)]
|
#[inline(never)]
|
||||||
#[link_section = ".itcm.process"]
|
#[link_section = ".itcm.process"]
|
||||||
fn process(mut c: process::Context) {
|
fn process(mut c: process::Context) {
|
||||||
|
@ -177,6 +189,7 @@ const APP: () = {
|
||||||
ref mut lockin,
|
ref mut lockin,
|
||||||
ref mut pll,
|
ref mut pll,
|
||||||
ref mut timestamper,
|
ref mut timestamper,
|
||||||
|
ref mut generator,
|
||||||
} = c.resources;
|
} = c.resources;
|
||||||
|
|
||||||
let (reference_phase, reference_frequency) = match settings.lockin_mode
|
let (reference_phase, reference_frequency) = match settings.lockin_mode
|
||||||
|
@ -249,6 +262,10 @@ const APP: () = {
|
||||||
*sample = DacCode::from(value as i16).0;
|
*sample = DacCode::from(value as i16).0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stream data
|
||||||
|
generator.send(&adc_samples, &dac_samples);
|
||||||
|
|
||||||
// Update telemetry measurements.
|
// Update telemetry measurements.
|
||||||
telemetry.adcs =
|
telemetry.adcs =
|
||||||
[AdcCode(adc_samples[0][0]), AdcCode(adc_samples[1][0])];
|
[AdcCode(adc_samples[0][0]), AdcCode(adc_samples[1][0])];
|
||||||
|
@ -282,6 +299,9 @@ const APP: () = {
|
||||||
c.resources.afes.1.set_gain(settings.afe[1]);
|
c.resources.afes.1.set_gain(settings.afe[1]);
|
||||||
|
|
||||||
c.resources.settings.lock(|current| *current = *settings);
|
c.resources.settings.lock(|current| *current = *settings);
|
||||||
|
|
||||||
|
let target = settings.stream_target.into();
|
||||||
|
c.resources.network.direct_stream(target);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[task(priority = 1, resources=[network, digital_inputs, settings, telemetry], schedule=[telemetry])]
|
#[task(priority = 1, resources=[network, digital_inputs, settings, telemetry], schedule=[telemetry])]
|
||||||
|
@ -313,6 +333,14 @@ const APP: () = {
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[task(priority = 1, resources=[network], schedule=[ethernet_link])]
|
||||||
|
fn ethernet_link(c: ethernet_link::Context) {
|
||||||
|
c.resources.network.processor.handle_link();
|
||||||
|
c.schedule
|
||||||
|
.ethernet_link(c.scheduled + SystemTimer::ticks_from_secs(1))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
#[task(binds = ETH, priority = 1)]
|
#[task(binds = ETH, priority = 1)]
|
||||||
fn eth(_: eth::Context) {
|
fn eth(_: eth::Context) {
|
||||||
unsafe { hal::ethernet::interrupt_handler() }
|
unsafe { hal::ethernet::interrupt_handler() }
|
||||||
|
|
|
@ -7,9 +7,6 @@ use stm32h7xx_hal::{
|
||||||
prelude::*,
|
prelude::*,
|
||||||
};
|
};
|
||||||
|
|
||||||
const NUM_SOCKETS: usize = 4;
|
|
||||||
|
|
||||||
use heapless::Vec;
|
|
||||||
use smoltcp_nal::smoltcp;
|
use smoltcp_nal::smoltcp;
|
||||||
|
|
||||||
use embedded_hal::digital::v2::{InputPin, OutputPin};
|
use embedded_hal::digital::v2::{InputPin, OutputPin};
|
||||||
|
@ -21,13 +18,18 @@ use super::{
|
||||||
NetworkStack, AFE0, AFE1,
|
NetworkStack, AFE0, AFE1,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const NUM_TCP_SOCKETS: usize = 4;
|
||||||
|
const NUM_UDP_SOCKETS: usize = 1;
|
||||||
|
const NUM_SOCKETS: usize = NUM_UDP_SOCKETS + NUM_TCP_SOCKETS;
|
||||||
|
|
||||||
pub struct NetStorage {
|
pub struct NetStorage {
|
||||||
pub ip_addrs: [smoltcp::wire::IpCidr; 1],
|
pub ip_addrs: [smoltcp::wire::IpCidr; 1],
|
||||||
|
|
||||||
// Note: There is an additional socket set item required for the DHCP socket.
|
// Note: There is an additional socket set item required for the DHCP socket.
|
||||||
pub sockets:
|
pub sockets:
|
||||||
[Option<smoltcp::socket::SocketSetItem<'static>>; NUM_SOCKETS + 1],
|
[Option<smoltcp::socket::SocketSetItem<'static>>; NUM_SOCKETS + 1],
|
||||||
pub socket_storage: [SocketStorage; NUM_SOCKETS],
|
pub tcp_socket_storage: [TcpSocketStorage; NUM_TCP_SOCKETS],
|
||||||
|
pub udp_socket_storage: [UdpSocketStorage; NUM_UDP_SOCKETS],
|
||||||
pub neighbor_cache:
|
pub neighbor_cache:
|
||||||
[Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8],
|
[Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8],
|
||||||
pub routes_cache:
|
pub routes_cache:
|
||||||
|
@ -39,13 +41,37 @@ pub struct NetStorage {
|
||||||
pub dhcp_rx_storage: [u8; 600],
|
pub dhcp_rx_storage: [u8; 600],
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct UdpSocketStorage {
|
||||||
|
rx_storage: [u8; 1024],
|
||||||
|
tx_storage: [u8; 2048],
|
||||||
|
tx_metadata:
|
||||||
|
[smoltcp::storage::PacketMetadata<smoltcp::wire::IpEndpoint>; 10],
|
||||||
|
rx_metadata:
|
||||||
|
[smoltcp::storage::PacketMetadata<smoltcp::wire::IpEndpoint>; 10],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UdpSocketStorage {
|
||||||
|
const fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
rx_storage: [0; 1024],
|
||||||
|
tx_storage: [0; 2048],
|
||||||
|
tx_metadata: [smoltcp::storage::PacketMetadata::<
|
||||||
|
smoltcp::wire::IpEndpoint,
|
||||||
|
>::EMPTY; 10],
|
||||||
|
rx_metadata: [smoltcp::storage::PacketMetadata::<
|
||||||
|
smoltcp::wire::IpEndpoint,
|
||||||
|
>::EMPTY; 10],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
pub struct SocketStorage {
|
pub struct TcpSocketStorage {
|
||||||
rx_storage: [u8; 1024],
|
rx_storage: [u8; 1024],
|
||||||
tx_storage: [u8; 1024],
|
tx_storage: [u8; 1024],
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SocketStorage {
|
impl TcpSocketStorage {
|
||||||
const fn new() -> Self {
|
const fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
rx_storage: [0; 1024],
|
rx_storage: [0; 1024],
|
||||||
|
@ -63,8 +89,9 @@ impl NetStorage {
|
||||||
)],
|
)],
|
||||||
neighbor_cache: [None; 8],
|
neighbor_cache: [None; 8],
|
||||||
routes_cache: [None; 8],
|
routes_cache: [None; 8],
|
||||||
sockets: [None, None, None, None, None],
|
sockets: [None, None, None, None, None, None],
|
||||||
socket_storage: [SocketStorage::new(); NUM_SOCKETS],
|
tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS],
|
||||||
|
udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS],
|
||||||
dhcp_tx_storage: [0; 600],
|
dhcp_tx_storage: [0; 600],
|
||||||
dhcp_rx_storage: [0; 600],
|
dhcp_rx_storage: [0; 600],
|
||||||
dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1],
|
dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1],
|
||||||
|
@ -634,20 +661,18 @@ pub fn setup(
|
||||||
let neighbor_cache =
|
let neighbor_cache =
|
||||||
smoltcp::iface::NeighborCache::new(&mut store.neighbor_cache[..]);
|
smoltcp::iface::NeighborCache::new(&mut store.neighbor_cache[..]);
|
||||||
|
|
||||||
let interface = smoltcp::iface::EthernetInterfaceBuilder::new(eth_dma)
|
let interface = smoltcp::iface::InterfaceBuilder::new(eth_dma)
|
||||||
.ethernet_addr(mac_addr)
|
.ethernet_addr(mac_addr)
|
||||||
.neighbor_cache(neighbor_cache)
|
.neighbor_cache(neighbor_cache)
|
||||||
.ip_addrs(&mut store.ip_addrs[..])
|
.ip_addrs(&mut store.ip_addrs[..])
|
||||||
.routes(routes)
|
.routes(routes)
|
||||||
.finalize();
|
.finalize();
|
||||||
|
|
||||||
let (mut sockets, handles) = {
|
let sockets = {
|
||||||
let mut sockets =
|
let mut sockets =
|
||||||
smoltcp::socket::SocketSet::new(&mut store.sockets[..]);
|
smoltcp::socket::SocketSet::new(&mut store.sockets[..]);
|
||||||
|
|
||||||
let mut handles: Vec<smoltcp::socket::SocketHandle, 64> =
|
for storage in store.tcp_socket_storage[..].iter_mut() {
|
||||||
Vec::new();
|
|
||||||
for storage in store.socket_storage.iter_mut() {
|
|
||||||
let tcp_socket = {
|
let tcp_socket = {
|
||||||
let rx_buffer = smoltcp::socket::TcpSocketBuffer::new(
|
let rx_buffer = smoltcp::socket::TcpSocketBuffer::new(
|
||||||
&mut storage.rx_storage[..],
|
&mut storage.rx_storage[..],
|
||||||
|
@ -658,34 +683,28 @@ pub fn setup(
|
||||||
|
|
||||||
smoltcp::socket::TcpSocket::new(rx_buffer, tx_buffer)
|
smoltcp::socket::TcpSocket::new(rx_buffer, tx_buffer)
|
||||||
};
|
};
|
||||||
let handle = sockets.add(tcp_socket);
|
sockets.add(tcp_socket);
|
||||||
|
|
||||||
handles.push(handle).unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
(sockets, handles)
|
for storage in store.udp_socket_storage[..].iter_mut() {
|
||||||
};
|
let udp_socket = {
|
||||||
|
let rx_buffer = smoltcp::socket::UdpSocketBuffer::new(
|
||||||
|
&mut storage.rx_metadata[..],
|
||||||
|
&mut storage.rx_storage[..],
|
||||||
|
);
|
||||||
|
let tx_buffer = smoltcp::socket::UdpSocketBuffer::new(
|
||||||
|
&mut storage.tx_metadata[..],
|
||||||
|
&mut storage.tx_storage[..],
|
||||||
|
);
|
||||||
|
|
||||||
let dhcp_client = {
|
smoltcp::socket::UdpSocket::new(rx_buffer, tx_buffer)
|
||||||
let dhcp_rx_buffer = smoltcp::socket::RawSocketBuffer::new(
|
};
|
||||||
&mut store.dhcp_rx_metadata[..],
|
sockets.add(udp_socket);
|
||||||
&mut store.dhcp_rx_storage[..],
|
}
|
||||||
);
|
|
||||||
|
|
||||||
let dhcp_tx_buffer = smoltcp::socket::RawSocketBuffer::new(
|
sockets.add(smoltcp::socket::Dhcpv4Socket::new());
|
||||||
&mut store.dhcp_tx_metadata[..],
|
|
||||||
&mut store.dhcp_tx_storage[..],
|
|
||||||
);
|
|
||||||
|
|
||||||
smoltcp::dhcp::Dhcpv4Client::new(
|
sockets
|
||||||
&mut sockets,
|
|
||||||
dhcp_rx_buffer,
|
|
||||||
dhcp_tx_buffer,
|
|
||||||
// Smoltcp indicates that an instant with a negative time is indicative that time is
|
|
||||||
// not yet available. We can't get the current instant yet, so indicate an invalid
|
|
||||||
// time value.
|
|
||||||
smoltcp::time::Instant::from_millis(-1),
|
|
||||||
)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let random_seed = {
|
let random_seed = {
|
||||||
|
@ -696,12 +715,7 @@ pub fn setup(
|
||||||
data
|
data
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut stack = smoltcp_nal::NetworkStack::new(
|
let mut stack = smoltcp_nal::NetworkStack::new(interface, sockets);
|
||||||
interface,
|
|
||||||
sockets,
|
|
||||||
&handles,
|
|
||||||
Some(dhcp_client),
|
|
||||||
);
|
|
||||||
|
|
||||||
stack.seed_random_port(&random_seed);
|
stack.seed_random_port(&random_seed);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,353 @@
|
||||||
|
///! Stabilizer data stream capabilities
|
||||||
|
///!
|
||||||
|
///! # Design
|
||||||
|
///! Stabilizer data streamining utilizes UDP packets to send live data streams at high throughput.
|
||||||
|
///! Packets are always sent in a best-effort fashion, and data may be dropped. Each packet contains
|
||||||
|
///! an identifier that can be used to detect any dropped data.
|
||||||
|
///!
|
||||||
|
///! The current implementation utilizes an single-producer, single-consumer queue to send data
|
||||||
|
///! between a high priority task and the UDP transmitter.
|
||||||
|
///!
|
||||||
|
///! A "batch" of data is defined to be a single item in the SPSC queue sent to the UDP transmitter
|
||||||
|
///! thread. The transmitter thread then serializes as many sequential "batches" into a single UDP
|
||||||
|
///! packet as possible. The UDP packet is also given a header indicating the starting batch
|
||||||
|
///! sequence number and the number of batches present. If the UDP transmitter encounters a
|
||||||
|
///! non-sequential batch, it does not enqueue it into the packet and instead transmits any staged
|
||||||
|
///! data. The non-sequential batch is then transmitted in a new UDP packet. This method allows a
|
||||||
|
///! receiver to detect dropped batches (e.g. due to processing overhead).
|
||||||
|
use heapless::spsc::{Consumer, Producer, Queue};
|
||||||
|
use miniconf::MiniconfAtomic;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use smoltcp_nal::embedded_nal::{IpAddr, Ipv4Addr, SocketAddr, UdpClientStack};
|
||||||
|
|
||||||
|
use super::NetworkReference;
|
||||||
|
use crate::hardware::design_parameters::SAMPLE_BUFFER_SIZE;
|
||||||
|
|
||||||
|
// The number of data blocks that we will buffer in the queue.
|
||||||
|
const BLOCK_BUFFER_SIZE: usize = 30;
|
||||||
|
|
||||||
|
// A factor that data may be subsampled at.
|
||||||
|
const SUBSAMPLE_RATE: usize = 1;
|
||||||
|
|
||||||
|
/// Represents the destination for the UDP stream to send data to.
|
||||||
|
#[derive(Copy, Clone, Debug, MiniconfAtomic, Deserialize)]
|
||||||
|
pub struct StreamTarget {
|
||||||
|
pub ip: [u8; 4],
|
||||||
|
pub port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for StreamTarget {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
ip: [0; 4],
|
||||||
|
port: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<StreamTarget> for SocketAddr {
|
||||||
|
fn from(target: StreamTarget) -> SocketAddr {
|
||||||
|
SocketAddr::new(
|
||||||
|
IpAddr::V4(Ipv4Addr::new(
|
||||||
|
target.ip[0],
|
||||||
|
target.ip[1],
|
||||||
|
target.ip[2],
|
||||||
|
target.ip[3],
|
||||||
|
)),
|
||||||
|
target.port,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A basic "batch" of data.
|
||||||
|
// Note: In the future, the stream may be generic over this type.
|
||||||
|
#[derive(Debug, Copy, Clone)]
|
||||||
|
pub struct AdcDacData {
|
||||||
|
block_id: u16,
|
||||||
|
adcs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
|
||||||
|
dacs: [[u16; SAMPLE_BUFFER_SIZE]; 2],
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configure streaming on a device.
|
||||||
|
///
|
||||||
|
/// # Args
|
||||||
|
/// * `stack` - A reference to the shared network stack.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// (generator, stream) where `generator` can be used to enqueue "batches" for transmission. The
|
||||||
|
/// `stream` is the logically consumer (UDP transmitter) of the enqueued data.
|
||||||
|
pub fn setup_streaming(
|
||||||
|
stack: NetworkReference,
|
||||||
|
) -> (BlockGenerator, DataStream) {
|
||||||
|
let queue = cortex_m::singleton!(: Queue<AdcDacData, BLOCK_BUFFER_SIZE> = Queue::new()).unwrap();
|
||||||
|
|
||||||
|
let (producer, consumer) = queue.split();
|
||||||
|
|
||||||
|
let generator = BlockGenerator::new(producer);
|
||||||
|
|
||||||
|
let stream = DataStream::new(stack, consumer);
|
||||||
|
|
||||||
|
(generator, stream)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The data generator for a stream.
|
||||||
|
pub struct BlockGenerator {
|
||||||
|
queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>,
|
||||||
|
current_id: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BlockGenerator {
|
||||||
|
/// Construct a new generator.
|
||||||
|
/// # Args
|
||||||
|
/// * `queue` - The producer portion of the SPSC queue to enqueue data into.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// The generator to use.
|
||||||
|
fn new(queue: Producer<'static, AdcDacData, BLOCK_BUFFER_SIZE>) -> Self {
|
||||||
|
Self {
|
||||||
|
queue,
|
||||||
|
current_id: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Schedule data to be sent by the generator.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
/// If no space is available, the data batch may be silently dropped.
|
||||||
|
///
|
||||||
|
/// # Args
|
||||||
|
/// * `adcs` - The ADC data to transmit.
|
||||||
|
/// * `dacs` - The DAC data to transmit.
|
||||||
|
pub fn send(
|
||||||
|
&mut self,
|
||||||
|
adcs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2],
|
||||||
|
dacs: &[&mut [u16; SAMPLE_BUFFER_SIZE]; 2],
|
||||||
|
) {
|
||||||
|
let block = AdcDacData {
|
||||||
|
block_id: self.current_id,
|
||||||
|
adcs: [*adcs[0], *adcs[1]],
|
||||||
|
dacs: [*dacs[0], *dacs[1]],
|
||||||
|
};
|
||||||
|
|
||||||
|
self.current_id = self.current_id.wrapping_add(1);
|
||||||
|
self.queue.enqueue(block).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Represents a single UDP packet sent by the stream.
|
||||||
|
///
|
||||||
|
/// # Packet Format
|
||||||
|
/// All data is sent in network-endian format. The format is as follows
|
||||||
|
///
|
||||||
|
/// Header:
|
||||||
|
/// [0..2]: Start block ID (u16)
|
||||||
|
/// [2..3]: Num Blocks present (u8) <N>
|
||||||
|
/// [3..4]: Batch Size (u8) <BS>
|
||||||
|
///
|
||||||
|
/// Following the header, batches are added sequentially. Each batch takes the form of:
|
||||||
|
/// [<BS>*0..<BS>*2]: ADC0
|
||||||
|
/// [<BS>*2..<BS>*4]: ADC1
|
||||||
|
/// [<BS>*4..<BS>*6]: DAC0
|
||||||
|
/// [<BS>*6..<BS>*8]: DAC1
|
||||||
|
struct DataPacket<'a> {
|
||||||
|
buf: &'a mut [u8],
|
||||||
|
subsample_rate: usize,
|
||||||
|
start_id: Option<u16>,
|
||||||
|
num_blocks: u8,
|
||||||
|
write_index: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> DataPacket<'a> {
|
||||||
|
/// Construct a new packet.
|
||||||
|
///
|
||||||
|
/// # Args
|
||||||
|
/// * `buf` - The location to serialize the data packet into.
|
||||||
|
/// * `subsample_rate` - The factor at which to subsample data from batches.
|
||||||
|
pub fn new(buf: &'a mut [u8], subsample_rate: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
buf,
|
||||||
|
start_id: None,
|
||||||
|
num_blocks: 0,
|
||||||
|
subsample_rate,
|
||||||
|
write_index: 4,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a batch of data to the packet.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
/// Serialization occurs as the packet is added.
|
||||||
|
///
|
||||||
|
/// # Args
|
||||||
|
/// * `batch` - The batch to add to the packet.
|
||||||
|
pub fn add_batch(&mut self, batch: &AdcDacData) -> Result<(), ()> {
|
||||||
|
// Check that the block is sequential.
|
||||||
|
if let Some(id) = &self.start_id {
|
||||||
|
if batch.block_id != id.wrapping_add(self.num_blocks.into()) {
|
||||||
|
return Err(());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Otherwise, this is the first block. Record the strt ID.
|
||||||
|
self.start_id = Some(batch.block_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that there is space for the block.
|
||||||
|
let block_size_bytes = SAMPLE_BUFFER_SIZE / self.subsample_rate * 4 * 2;
|
||||||
|
if self.buf.len() - self.get_packet_size() < block_size_bytes {
|
||||||
|
return Err(());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy the samples into the buffer.
|
||||||
|
for device in &[batch.adcs, batch.dacs] {
|
||||||
|
for channel in device {
|
||||||
|
for sample in channel.iter().step_by(self.subsample_rate) {
|
||||||
|
self.buf[self.write_index..self.write_index + 2]
|
||||||
|
.copy_from_slice(&sample.to_be_bytes());
|
||||||
|
self.write_index += 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.num_blocks += 1;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_packet_size(&self) -> usize {
|
||||||
|
let header_length = 4;
|
||||||
|
let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate;
|
||||||
|
let block_size_bytes = block_sample_size * 2 * 4;
|
||||||
|
|
||||||
|
block_size_bytes * self.num_blocks as usize + header_length
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Complete the packet and prepare it for transmission.
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
/// The size of the packet. The user should utilize the original buffer provided for packet
|
||||||
|
/// construction to access the packet.
|
||||||
|
pub fn finish(self) -> usize {
|
||||||
|
let block_sample_size = SAMPLE_BUFFER_SIZE / self.subsample_rate;
|
||||||
|
|
||||||
|
// Write the header into the block.
|
||||||
|
self.buf[0..2].copy_from_slice(&self.start_id.unwrap().to_be_bytes());
|
||||||
|
self.buf[2] = self.num_blocks;
|
||||||
|
self.buf[3] = block_sample_size as u8;
|
||||||
|
|
||||||
|
// Return the length of the packet to transmit.
|
||||||
|
self.get_packet_size()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The "consumer" portion of the data stream.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
/// This is responsible for consuming data and sending it over UDP.
|
||||||
|
pub struct DataStream {
|
||||||
|
stack: NetworkReference,
|
||||||
|
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
|
||||||
|
queue: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>,
|
||||||
|
remote: SocketAddr,
|
||||||
|
buffer: [u8; 1024],
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DataStream {
|
||||||
|
/// Construct a new data streamer.
|
||||||
|
///
|
||||||
|
/// # Args
|
||||||
|
/// * `stack` - A reference to the shared network stack.
|
||||||
|
/// * `consumer` - The read side of the queue containing data to transmit.
|
||||||
|
fn new(
|
||||||
|
stack: NetworkReference,
|
||||||
|
consumer: Consumer<'static, AdcDacData, BLOCK_BUFFER_SIZE>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
stack,
|
||||||
|
socket: None,
|
||||||
|
remote: StreamTarget::default().into(),
|
||||||
|
queue: consumer,
|
||||||
|
buffer: [0; 1024],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&mut self) {
|
||||||
|
// Note(unwrap): We guarantee that the socket is available above.
|
||||||
|
let socket = self.socket.take().unwrap();
|
||||||
|
self.stack.close(socket).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open(&mut self, remote: SocketAddr) -> Result<(), ()> {
|
||||||
|
if self.socket.is_some() {
|
||||||
|
self.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the remote address is unspecified, just close the existing socket.
|
||||||
|
if remote.ip().is_unspecified() {
|
||||||
|
if self.socket.is_some() {
|
||||||
|
self.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Err(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut socket = self.stack.socket().map_err(|_| ())?;
|
||||||
|
|
||||||
|
// Note(unwrap): We only connect with a new socket, so it is guaranteed to not already be
|
||||||
|
// bound.
|
||||||
|
self.stack.connect(&mut socket, remote).unwrap();
|
||||||
|
|
||||||
|
self.socket.replace(socket);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Configure the remote endpoint of the stream.
|
||||||
|
///
|
||||||
|
/// # Args
|
||||||
|
/// * `remote` - The destination to send stream data to.
|
||||||
|
pub fn set_remote(&mut self, remote: SocketAddr) {
|
||||||
|
// If the remote is identical to what we already have, do nothing.
|
||||||
|
if remote == self.remote {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open the new remote connection.
|
||||||
|
self.open(remote).ok();
|
||||||
|
self.remote = remote;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Process any data for transmission.
|
||||||
|
pub fn process(&mut self) {
|
||||||
|
// If there's no socket available, try to connect to our remote.
|
||||||
|
if self.socket.is_none() {
|
||||||
|
// If we can't open the socket (e.g. we do not have an IP address yet), clear data from
|
||||||
|
// the queue.
|
||||||
|
if self.open(self.remote).is_err() {
|
||||||
|
while self.queue.ready() {
|
||||||
|
self.queue.dequeue();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.queue.ready() {
|
||||||
|
// Dequeue data from the queue into a larger block structure.
|
||||||
|
let mut packet = DataPacket::new(&mut self.buffer, SUBSAMPLE_RATE);
|
||||||
|
while self.queue.ready() {
|
||||||
|
// Note(unwrap): We check above that the queue is ready before calling this.
|
||||||
|
if packet.add_batch(self.queue.peek().unwrap()).is_err() {
|
||||||
|
// If we cannot add another batch, break out of the loop and send the packet.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove the batch that we just added.
|
||||||
|
self.queue.dequeue();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Transmit the data block.
|
||||||
|
let mut handle = self.socket.as_mut().unwrap();
|
||||||
|
let size = packet.finish();
|
||||||
|
self.stack.send(&mut handle, &self.buffer[..size]).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -9,6 +9,7 @@ pub use heapless;
|
||||||
pub use miniconf;
|
pub use miniconf;
|
||||||
pub use serde;
|
pub use serde;
|
||||||
|
|
||||||
|
pub mod data_stream;
|
||||||
pub mod messages;
|
pub mod messages;
|
||||||
pub mod miniconf_client;
|
pub mod miniconf_client;
|
||||||
pub mod network_processor;
|
pub mod network_processor;
|
||||||
|
@ -16,6 +17,7 @@ pub mod shared;
|
||||||
pub mod telemetry;
|
pub mod telemetry;
|
||||||
|
|
||||||
use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack};
|
use crate::hardware::{cycle_counter::CycleCounter, EthernetPhy, NetworkStack};
|
||||||
|
use data_stream::{BlockGenerator, DataStream};
|
||||||
use messages::{MqttMessage, SettingsResponse};
|
use messages::{MqttMessage, SettingsResponse};
|
||||||
use miniconf_client::MiniconfClient;
|
use miniconf_client::MiniconfClient;
|
||||||
use network_processor::NetworkProcessor;
|
use network_processor::NetworkProcessor;
|
||||||
|
@ -26,6 +28,7 @@ use core::fmt::Write;
|
||||||
use heapless::String;
|
use heapless::String;
|
||||||
use miniconf::Miniconf;
|
use miniconf::Miniconf;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
use smoltcp_nal::embedded_nal::SocketAddr;
|
||||||
|
|
||||||
pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>;
|
pub type NetworkReference = shared::NetworkStackProxy<'static, NetworkStack>;
|
||||||
|
|
||||||
|
@ -45,6 +48,8 @@ pub enum NetworkState {
|
||||||
pub struct NetworkUsers<S: Default + Clone + Miniconf, T: Serialize> {
|
pub struct NetworkUsers<S: Default + Clone + Miniconf, T: Serialize> {
|
||||||
pub miniconf: MiniconfClient<S>,
|
pub miniconf: MiniconfClient<S>,
|
||||||
pub processor: NetworkProcessor,
|
pub processor: NetworkProcessor,
|
||||||
|
stream: DataStream,
|
||||||
|
generator: Option<BlockGenerator>,
|
||||||
pub telemetry: TelemetryClient<T>,
|
pub telemetry: TelemetryClient<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,10 +100,30 @@ where
|
||||||
&prefix,
|
&prefix,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let (generator, stream) =
|
||||||
|
data_stream::setup_streaming(stack_manager.acquire_stack());
|
||||||
|
|
||||||
NetworkUsers {
|
NetworkUsers {
|
||||||
miniconf: settings,
|
miniconf: settings,
|
||||||
processor,
|
processor,
|
||||||
telemetry,
|
telemetry,
|
||||||
|
stream,
|
||||||
|
generator: Some(generator),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enable live data streaming.
|
||||||
|
pub fn enable_streaming(&mut self) -> BlockGenerator {
|
||||||
|
self.generator.take().unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Direct the stream to the provided remote target.
|
||||||
|
///
|
||||||
|
/// # Args
|
||||||
|
/// * `remote` - The destination for the streamed data.
|
||||||
|
pub fn direct_stream(&mut self, remote: SocketAddr) {
|
||||||
|
if self.generator.is_none() {
|
||||||
|
self.stream.set_remote(remote);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,15 +132,20 @@ where
|
||||||
/// # Returns
|
/// # Returns
|
||||||
/// An indication if any of the network users indicated a state change.
|
/// An indication if any of the network users indicated a state change.
|
||||||
pub fn update(&mut self) -> NetworkState {
|
pub fn update(&mut self) -> NetworkState {
|
||||||
|
// Update the MQTT clients.
|
||||||
|
self.telemetry.update();
|
||||||
|
|
||||||
|
// Update the data stream.
|
||||||
|
if self.generator.is_none() {
|
||||||
|
self.stream.process();
|
||||||
|
}
|
||||||
|
|
||||||
// Poll for incoming data.
|
// Poll for incoming data.
|
||||||
let poll_result = match self.processor.update() {
|
let poll_result = match self.processor.update() {
|
||||||
UpdateState::NoChange => NetworkState::NoChange,
|
UpdateState::NoChange => NetworkState::NoChange,
|
||||||
UpdateState::Updated => NetworkState::Updated,
|
UpdateState::Updated => NetworkState::Updated,
|
||||||
};
|
};
|
||||||
|
|
||||||
// Update the MQTT clients.
|
|
||||||
self.telemetry.update();
|
|
||||||
|
|
||||||
match self.miniconf.update() {
|
match self.miniconf.update() {
|
||||||
UpdateState::Updated => NetworkState::SettingsChanged,
|
UpdateState::Updated => NetworkState::SettingsChanged,
|
||||||
UpdateState::NoChange => poll_result,
|
UpdateState::NoChange => poll_result,
|
||||||
|
|
|
@ -37,6 +37,27 @@ impl NetworkProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle ethernet link connection status.
|
||||||
|
///
|
||||||
|
/// # Note
|
||||||
|
/// This may take non-trivial amounts of time to communicate with the PHY. As such, this should
|
||||||
|
/// only be called as often as necessary (e.g. once per second or so).
|
||||||
|
pub fn handle_link(&mut self) {
|
||||||
|
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
|
||||||
|
// stack.
|
||||||
|
match self.phy.poll_link() {
|
||||||
|
true => self.network_was_reset = false,
|
||||||
|
|
||||||
|
// Only reset the network stack once per link reconnection. This prevents us from
|
||||||
|
// sending an excessive number of DHCP requests.
|
||||||
|
false if !self.network_was_reset => {
|
||||||
|
self.network_was_reset = true;
|
||||||
|
self.stack.lock(|stack| stack.handle_link_reset());
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// Process and update the state of the network.
|
/// Process and update the state of the network.
|
||||||
///
|
///
|
||||||
/// # Note
|
/// # Note
|
||||||
|
@ -52,24 +73,7 @@ impl NetworkProcessor {
|
||||||
let result = match self.stack.lock(|stack| stack.poll(now)) {
|
let result = match self.stack.lock(|stack| stack.poll(now)) {
|
||||||
Ok(true) => UpdateState::Updated,
|
Ok(true) => UpdateState::Updated,
|
||||||
Ok(false) => UpdateState::NoChange,
|
Ok(false) => UpdateState::NoChange,
|
||||||
Err(err) => {
|
Err(_) => UpdateState::Updated,
|
||||||
log::info!("Network error: {:?}", err);
|
|
||||||
UpdateState::Updated
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// If the PHY indicates there's no more ethernet link, reset the DHCP server in the network
|
|
||||||
// stack.
|
|
||||||
match self.phy.poll_link() {
|
|
||||||
true => self.network_was_reset = false,
|
|
||||||
|
|
||||||
// Only reset the network stack once per link reconnection. This prevents us from
|
|
||||||
// sending an excessive number of DHCP requests.
|
|
||||||
false if !self.network_was_reset => {
|
|
||||||
self.network_was_reset = true;
|
|
||||||
self.stack.lock(|stack| stack.handle_link_reset());
|
|
||||||
}
|
|
||||||
_ => {}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
result
|
result
|
||||||
|
|
|
@ -69,6 +69,21 @@ where
|
||||||
forward! {close(socket: S::TcpSocket) -> Result<(), S::Error>}
|
forward! {close(socket: S::TcpSocket) -> Result<(), S::Error>}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<'a, S> embedded_nal::UdpClientStack for NetworkStackProxy<'a, S>
|
||||||
|
where
|
||||||
|
S: embedded_nal::UdpClientStack,
|
||||||
|
{
|
||||||
|
type UdpSocket = S::UdpSocket;
|
||||||
|
type Error = S::Error;
|
||||||
|
|
||||||
|
forward! {socket() -> Result<S::UdpSocket, S::Error>}
|
||||||
|
forward! {connect(socket: &mut S::UdpSocket, remote: embedded_nal::SocketAddr) -> Result<(), S::Error>}
|
||||||
|
|
||||||
|
forward! {send(socket: &mut S::UdpSocket, buffer: &[u8]) -> embedded_nal::nb::Result<(), S::Error>}
|
||||||
|
forward! {receive(socket: &mut S::UdpSocket, buffer: &mut [u8]) -> embedded_nal::nb::Result<(usize, embedded_nal::SocketAddr), S::Error>}
|
||||||
|
forward! {close(socket: S::UdpSocket) -> Result<(), S::Error>}
|
||||||
|
}
|
||||||
|
|
||||||
impl NetworkManager {
|
impl NetworkManager {
|
||||||
/// Construct a new manager for a shared network stack
|
/// Construct a new manager for a shared network stack
|
||||||
///
|
///
|
||||||
|
|
Loading…
Reference in New Issue