Updating comments

master
Ryan Summers 2021-07-23 14:12:59 +02:00
parent 9b3bb62811
commit 6c87db3778
4 changed files with 52 additions and 48 deletions

View File

@ -13,10 +13,10 @@ import logging
# Representation of a single UDP packet transmitted by Stabilizer.
Packet = collections.namedtuple('Packet', ['index', 'adc', 'dac'])
Format = collections.namedtuple('Format', ['batch_size', 'batches_per_frame'])
Format = collections.namedtuple('Format', ['batch_size', 'length'])
FORMAT = {
0: Format(8, 255)
0: Format(8, 964)
}
class Timer:
@ -94,30 +94,31 @@ class PacketParser:
if len(self.buf) < 4:
return None
start_id, format_id = struct.unpack_from('!HH', self.buf)
start_id, format_id = struct.unpack_from('<HH', self.buf)
frame_format = FORMAT[format_id]
packet_size = 4 + frame_format.batch_size * frame_format.batches_per_frame * 8
if len(self.buf) < packet_size:
if len(self.buf) < frame_format.length:
return None
self.buf = self.buf[4:]
batches_per_frame = int((frame_format.length - 4) / (frame_format.batch_size * 8))
packets = []
for offset in range(num_blocks):
adcs_dacs = struct.unpack_from(f'!{4 * data_size}H', self.buf)
for offset in range(batches_per_frame):
adcs_dacs = struct.unpack_from(f'<{4 * frame_format.batch_size}H', self.buf)
adc = [
adcs_dacs[0:data_size],
adcs_dacs[data_size:2*data_size],
adcs_dacs[0:frame_format.batch_size],
adcs_dacs[frame_format.batch_size:2*frame_format.batch_size],
]
dac = [
adcs_dacs[2*data_size: 3*data_size],
adcs_dacs[3*data_size:],
adcs_dacs[2*frame_format.batch_size: 3*frame_format.batch_size],
adcs_dacs[3*frame_format.batch_size:],
]
self.buf = self.buf[8*data_size:]
self.buf = self.buf[8*frame_format.batch_size:]
packets.append(Packet(start_id + offset, adc, dac))
return packets
@ -155,7 +156,7 @@ def main():
while True:
# Receive any data over UDP and parse it.
data = connection.recv(4096)
data = connection.recv(4096 * 4)
if data and not timer.is_started():
timer.start()
@ -166,7 +167,7 @@ def main():
# Handle any dropped packets.
if not check_index(last_index, packet.index):
print(hex(last_index), hex(packet.index))
print('Drop from ', hex(last_index), hex(packet.index))
if packet.index < (last_index + 1):
dropped = packet.index + 65536 - (last_index + 1)
else:

View File

@ -308,17 +308,23 @@ const APP: () = {
}
// Stream the data.
generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>(0, |buf| {
let mut offset = 0;
for device in [adc_samples.iter(), dac_samples.iter()] {
for channel in device {
for sample in channel.iter() {
buf[offset..offset + 2]
.copy_from_slice(&sample.to_ne_bytes());
offset += 2;
}
}
}
generator.add::<_, { SAMPLE_BUFFER_SIZE * 8 }>(0, |buf| unsafe {
let dst = buf.as_ptr() as usize as *mut u16;
let adc0 = &adc_samples[0][0] as *const u16;
core::ptr::copy_nonoverlapping(adc0, dst, SAMPLE_BUFFER_SIZE);
let dst = dst.add(SAMPLE_BUFFER_SIZE);
let adc1 = &adc_samples[1][0] as *const u16;
core::ptr::copy_nonoverlapping(adc1, dst, SAMPLE_BUFFER_SIZE);
let dst = dst.add(SAMPLE_BUFFER_SIZE);
let dac0 = &dac_samples[0][0] as *const u16;
core::ptr::copy_nonoverlapping(dac0, dst, SAMPLE_BUFFER_SIZE);
let dst = dst.add(SAMPLE_BUFFER_SIZE);
let dac1 = &dac_samples[1][0] as *const u16;
core::ptr::copy_nonoverlapping(dac1, dst, SAMPLE_BUFFER_SIZE);
});
// Update telemetry measurements.

View File

@ -36,11 +36,6 @@ pub struct NetStorage {
[Option<(smoltcp::wire::IpAddress, smoltcp::iface::Neighbor)>; 8],
pub routes_cache:
[Option<(smoltcp::wire::IpCidr, smoltcp::iface::Route)>; 8],
pub dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata; 1],
pub dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata; 1],
pub dhcp_tx_storage: [u8; 600],
pub dhcp_rx_storage: [u8; 600],
}
pub struct UdpSocketStorage {
@ -94,10 +89,6 @@ impl Default for NetStorage {
sockets: [None, None, None, None, None, None],
tcp_socket_storage: [TcpSocketStorage::new(); NUM_TCP_SOCKETS],
udp_socket_storage: [UdpSocketStorage::new(); NUM_UDP_SOCKETS],
dhcp_tx_storage: [0; 600],
dhcp_rx_storage: [0; 600],
dhcp_rx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1],
dhcp_tx_metadata: [smoltcp::socket::RawPacketMetadata::EMPTY; 1],
}
}
}

View File

@ -19,9 +19,11 @@ use heapless::pool::{Box, Init, Pool, Uninit};
use super::NetworkReference;
const FRAME_COUNT: usize = 4;
const FRAME_COUNT: usize = 6;
const FRAME_SIZE: usize = 1024;
static mut FRAME_DATA: [u8; 5200] = [0; 5200];
static mut FRAME_DATA: [u8; FRAME_SIZE * FRAME_COUNT] =
[0; FRAME_SIZE * FRAME_COUNT];
/// Represents the destination for the UDP stream to send data to.
///
@ -70,7 +72,7 @@ pub fn setup_streaming(
let (producer, consumer) = queue.split();
let frame_pool =
cortex_m::singleton!(: Pool<[u8; 1024]>= Pool::new()).unwrap();
cortex_m::singleton!(: Pool<[u8; FRAME_SIZE]>= Pool::new()).unwrap();
// Note(unsafe): We guarantee that FRAME_DATA is only accessed once in this function.
let memory = unsafe { &mut FRAME_DATA };
@ -86,13 +88,13 @@ pub fn setup_streaming(
struct StreamFrame {
format: u16,
sequence_number: u16,
buffer: Box<[u8; 1024], Init>,
buffer: Box<[u8; FRAME_SIZE], Init>,
offset: usize,
}
impl StreamFrame {
pub fn new(
buffer: Box<[u8; 1024], Uninit>,
buffer: Box<[u8; FRAME_SIZE], Uninit>,
format: u16,
sequence_number: u16,
) -> Self {
@ -132,7 +134,7 @@ impl StreamFrame {
/// The data generator for a stream.
pub struct FrameGenerator {
queue: Producer<'static, StreamFrame, FRAME_COUNT>,
pool: &'static Pool<[u8; 1024]>,
pool: &'static Pool<[u8; FRAME_SIZE]>,
current_frame: Option<StreamFrame>,
sequence_number: u16,
}
@ -140,7 +142,7 @@ pub struct FrameGenerator {
impl FrameGenerator {
fn new(
queue: Producer<'static, StreamFrame, FRAME_COUNT>,
pool: &'static Pool<[u8; 1024]>,
pool: &'static Pool<[u8; FRAME_SIZE]>,
) -> Self {
Self {
queue,
@ -172,11 +174,15 @@ impl FrameGenerator {
self.current_frame.as_mut().unwrap().add_batch::<_, T>(f);
if self.current_frame.as_ref().unwrap().is_full::<T>() {
// If we fail to enqueue the frame, free the underlying buffer.
match self.queue.enqueue(self.current_frame.take().unwrap()) {
Err(frame) => self.pool.free(frame.buffer),
_ => {}
};
if self
.queue
.enqueue(self.current_frame.take().unwrap())
.is_err()
{
// Given that the queue is the same size as the number of frames available, this
// should never occur.
panic!("Frame enqueue failure")
}
}
}
}
@ -189,7 +195,7 @@ pub struct DataStream {
stack: NetworkReference,
socket: Option<<NetworkReference as UdpClientStack>::UdpSocket>,
queue: Consumer<'static, StreamFrame, FRAME_COUNT>,
frame_pool: &'static Pool<[u8; 1024]>,
frame_pool: &'static Pool<[u8; FRAME_SIZE]>,
remote: SocketAddr,
}
@ -203,7 +209,7 @@ impl DataStream {
fn new(
stack: NetworkReference,
consumer: Consumer<'static, StreamFrame, FRAME_COUNT>,
frame_pool: &'static Pool<[u8; 1024]>,
frame_pool: &'static Pool<[u8; FRAME_SIZE]>,
) -> Self {
Self {
stack,