StreamFormat -> Into<u8>
This commit is contained in:
parent
c0157b7095
commit
11d279a1b6
|
@ -14,19 +14,50 @@ import logging
|
||||||
# Representation of a single data batch transmitted by Stabilizer.
|
# Representation of a single data batch transmitted by Stabilizer.
|
||||||
Packet = collections.namedtuple('Packet', ['index', 'data'])
|
Packet = collections.namedtuple('Packet', ['index', 'data'])
|
||||||
|
|
||||||
# Specifies a known format for incoming packets.
|
# The magic header half-word at the start of each packet.
|
||||||
#
|
MAGIC_HEADER = 0x057B
|
||||||
# * `sample_size_bytes` is the number of bytes required for each sample in the batch.
|
|
||||||
# * `batch_format` is a `struct` format string that will be provided the `batch_size` as an named
|
# The struct format of the header.
|
||||||
# argument. This format string will be used to deserialize each batch of data from the frame.
|
HEADER_FORMAT = '<HBBI'
|
||||||
Format = collections.namedtuple('Format', ['sample_size_bytes', 'batch_format'])
|
|
||||||
|
|
||||||
# All supported formats by this reception script.
|
# All supported formats by this reception script.
|
||||||
|
#
|
||||||
|
# The items in this dict are functions that will be provided the sampel batch size and will return
|
||||||
|
# the struct deserialization code to unpack a single batch.
|
||||||
FORMAT = {
|
FORMAT = {
|
||||||
0: Format(sample_size_bytes=8,
|
1: lambda batch_size: f'<{batch_size}H{batch_size}H{batch_size}H{batch_size}H'
|
||||||
batch_format='<{batch_size}H{batch_size}H{batch_size}H{batch_size}H')
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def parse_packet(buf):
|
||||||
|
""" Attempt to parse packets from the received buffer. """
|
||||||
|
# Attempt to parse a block from the buffer.
|
||||||
|
if len(buf) < struct.calcsize(HEADER_FORMAT):
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Parse out the packet header
|
||||||
|
magic, format_id, batch_size, sequence_number = struct.unpack_from(HEADER_FORMAT, buf)
|
||||||
|
buf = buf[struct.calcsize(HEADER_FORMAT):]
|
||||||
|
|
||||||
|
if magic != MAGIC_HEADER:
|
||||||
|
logging.warning('Encountered bad magic header: %s', hex(magic))
|
||||||
|
return None
|
||||||
|
|
||||||
|
if format_id not in FORMAT:
|
||||||
|
raise Exception(f'Unknown format specifier: {format_id}')
|
||||||
|
|
||||||
|
frame_format = FORMAT[format_id](batch_size)
|
||||||
|
|
||||||
|
batch_count = len(buf) / struct.calcsize(frame_format)
|
||||||
|
|
||||||
|
packets = []
|
||||||
|
for offset in range(batch_count):
|
||||||
|
data = struct.unpack_from(frame_format, buf)
|
||||||
|
buf = buf[struct.calcsize(frame_format):]
|
||||||
|
packets.append(Packet(sequence_number + offset, data))
|
||||||
|
|
||||||
|
return packets
|
||||||
|
|
||||||
|
|
||||||
class Timer:
|
class Timer:
|
||||||
""" A basic timer for measuring elapsed time periods. """
|
""" A basic timer for measuring elapsed time periods. """
|
||||||
|
|
||||||
|
@ -66,78 +97,13 @@ class Timer:
|
||||||
return now - self.start_time
|
return now - self.start_time
|
||||||
|
|
||||||
|
|
||||||
class PacketParser:
|
def sequence_delta(previous_sequence, next_sequence):
|
||||||
""" Utilize class used for parsing received UDP data. """
|
""" Check the number of items between two sequence numbers. """
|
||||||
|
if previous_sequence is None:
|
||||||
|
return 0
|
||||||
|
|
||||||
def __init__(self):
|
delta = next_sequence - (previous_sequence + 1)
|
||||||
""" Initialize the parser. """
|
return delta & 0xFFFFFFFF
|
||||||
self.buf = b''
|
|
||||||
self.total_bytes = 0
|
|
||||||
|
|
||||||
|
|
||||||
def ingress(self, data):
|
|
||||||
""" Ingress received UDP data. """
|
|
||||||
self.total_bytes += len(data)
|
|
||||||
self.buf += data
|
|
||||||
|
|
||||||
|
|
||||||
def parse_all_packets(self):
|
|
||||||
""" Parse all received packets from the receive buffer.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
A list of received Packets.
|
|
||||||
"""
|
|
||||||
packets = []
|
|
||||||
while True:
|
|
||||||
new_packets = self._parse()
|
|
||||||
if new_packets:
|
|
||||||
packets += new_packets
|
|
||||||
else:
|
|
||||||
return packets
|
|
||||||
|
|
||||||
|
|
||||||
def _parse(self):
|
|
||||||
""" Attempt to parse packets from the received buffer. """
|
|
||||||
# Attempt to parse a block from the buffer.
|
|
||||||
if len(self.buf) < 7:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Parse out the packet header
|
|
||||||
start_id, format_id, batch_count, batch_size = struct.unpack_from('<HHHB', self.buf)
|
|
||||||
|
|
||||||
if format_id not in FORMAT:
|
|
||||||
raise Exception(f'Unknown format specifier: {format_id}')
|
|
||||||
|
|
||||||
frame_format = FORMAT[format_id]
|
|
||||||
required_length = 7 + batch_count * frame_format.sample_size_bytes * batch_size
|
|
||||||
|
|
||||||
if len(self.buf) < required_length:
|
|
||||||
return None
|
|
||||||
|
|
||||||
self.buf = self.buf[7:]
|
|
||||||
|
|
||||||
packets = []
|
|
||||||
for offset in range(batch_count):
|
|
||||||
format_string = frame_format.batch_format.format(batch_size=batch_size)
|
|
||||||
data = struct.unpack_from(format_string, self.buf)
|
|
||||||
self.buf = self.buf[struct.calcsize(format_string):]
|
|
||||||
packets.append(Packet(start_id + offset, data))
|
|
||||||
|
|
||||||
return packets
|
|
||||||
|
|
||||||
|
|
||||||
def check_index(previous_index, next_index):
|
|
||||||
""" Check if two indices are sequential. """
|
|
||||||
if previous_index == -1:
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Handle index roll-over. Indices are only stored in 16-bit numbers.
|
|
||||||
if next_index < previous_index:
|
|
||||||
next_index += 65536
|
|
||||||
|
|
||||||
expected_index = previous_index + 1
|
|
||||||
|
|
||||||
return next_index == expected_index
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
@ -153,48 +119,40 @@ def main():
|
||||||
logging.basicConfig(level=logging.INFO,
|
logging.basicConfig(level=logging.INFO,
|
||||||
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s')
|
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s')
|
||||||
|
|
||||||
last_index = -1
|
last_index = None
|
||||||
|
|
||||||
drop_count = 0
|
drop_count = 0
|
||||||
good_blocks = 0
|
good_blocks = 0
|
||||||
|
total_bytes = 0
|
||||||
|
|
||||||
timer = Timer()
|
timer = Timer()
|
||||||
parser = PacketParser()
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
# Receive any data over UDP and parse it.
|
# Receive any data over UDP and parse it.
|
||||||
data = connection.recv(4096)
|
data = connection.recv(1024)
|
||||||
if data and not timer.is_started():
|
if data and not timer.is_started():
|
||||||
timer.start()
|
timer.start()
|
||||||
|
|
||||||
parser.ingress(data)
|
|
||||||
|
|
||||||
# Handle any received packets.
|
# Handle any received packets.
|
||||||
for packet in parser.parse_all_packets():
|
total_bytes += len(data)
|
||||||
|
packet = parse_packet(data)
|
||||||
|
|
||||||
|
if packet:
|
||||||
# Handle any dropped packets.
|
# Handle any dropped packets.
|
||||||
if not check_index(last_index, packet.index):
|
drop_count += sequence_delta(last_index, packet.index)
|
||||||
print(f'Drop from {hex(last_index)} to {hex(packet.index)}')
|
|
||||||
if packet.index < (last_index + 1):
|
|
||||||
dropped = packet.index + 65536 - (last_index + 1)
|
|
||||||
else:
|
|
||||||
dropped = packet.index - (last_index + 1)
|
|
||||||
|
|
||||||
drop_count += dropped
|
|
||||||
|
|
||||||
last_index = packet.index
|
last_index = packet.index
|
||||||
good_blocks += 1
|
good_blocks += 1
|
||||||
|
|
||||||
# Report the throughput periodically.
|
# Report the throughput periodically.
|
||||||
if timer.is_triggered():
|
if timer.is_triggered():
|
||||||
drate = parser.total_bytes * 8 / 1e6 / timer.elapsed()
|
drate = total_bytes * 8 / 1e6 / timer.elapsed()
|
||||||
|
|
||||||
print(f'''
|
print(f'''
|
||||||
Data Rate: {drate:.3f} Mbps
|
Data Rate: {drate:.3f} Mbps
|
||||||
Received Blocks: {good_blocks}
|
Received Blocks: {good_blocks}
|
||||||
Dropped blocks: {drop_count}
|
Dropped blocks: {drop_count}
|
||||||
|
|
||||||
Metadata: {parser.total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s
|
Metadata: {total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s
|
||||||
----
|
----
|
||||||
''')
|
''')
|
||||||
timer.arm()
|
timer.arm()
|
||||||
|
|
|
@ -89,6 +89,12 @@ pub enum StreamFormat {
|
||||||
AdcDacData = 1,
|
AdcDacData = 1,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<StreamFormat> for u8 {
|
||||||
|
fn from(format: StreamFormat) -> u8 {
|
||||||
|
format as u8
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<StreamTarget> for SocketAddr {
|
impl From<StreamTarget> for SocketAddr {
|
||||||
fn from(target: StreamTarget) -> SocketAddr {
|
fn from(target: StreamTarget) -> SocketAddr {
|
||||||
SocketAddr::new(
|
SocketAddr::new(
|
||||||
|
@ -184,7 +190,7 @@ pub struct FrameGenerator {
|
||||||
pool: &'static Pool<[u8; FRAME_SIZE]>,
|
pool: &'static Pool<[u8; FRAME_SIZE]>,
|
||||||
current_frame: Option<StreamFrame>,
|
current_frame: Option<StreamFrame>,
|
||||||
sequence_number: u32,
|
sequence_number: u32,
|
||||||
format: StreamFormat,
|
format: u8,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FrameGenerator {
|
impl FrameGenerator {
|
||||||
|
@ -195,7 +201,7 @@ impl FrameGenerator {
|
||||||
Self {
|
Self {
|
||||||
queue,
|
queue,
|
||||||
pool,
|
pool,
|
||||||
format: StreamFormat::Unknown,
|
format: StreamFormat::Unknown.into(),
|
||||||
current_frame: None,
|
current_frame: None,
|
||||||
sequence_number: 0,
|
sequence_number: 0,
|
||||||
}
|
}
|
||||||
|
@ -209,10 +215,8 @@ impl FrameGenerator {
|
||||||
/// # Args
|
/// # Args
|
||||||
/// * `format` - The desired format of the stream.
|
/// * `format` - The desired format of the stream.
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
pub(crate) fn set_format(&mut self, format: StreamFormat) {
|
pub(crate) fn set_format(&mut self, format: impl Into<u8>) {
|
||||||
assert!(self.format == StreamFormat::Unknown);
|
self.format = format.into();
|
||||||
assert!(format != StreamFormat::Unknown);
|
|
||||||
self.format = format;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a batch to the current stream frame.
|
/// Add a batch to the current stream frame.
|
||||||
|
|
|
@ -113,9 +113,12 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Enable live data streaming.
|
/// Enable live data streaming.
|
||||||
|
///
|
||||||
|
/// # Args
|
||||||
|
/// * `format` - A unique u8 code indicating the format of the data.
|
||||||
pub fn enable_streaming(
|
pub fn enable_streaming(
|
||||||
&mut self,
|
&mut self,
|
||||||
format: data_stream::StreamFormat,
|
format: impl Into<u8>,
|
||||||
) -> FrameGenerator {
|
) -> FrameGenerator {
|
||||||
let mut generator = self.generator.take().unwrap();
|
let mut generator = self.generator.take().unwrap();
|
||||||
generator.set_format(format);
|
generator.set_format(format);
|
||||||
|
|
Loading…
Reference in New Issue