Updating clear method

master
Ryan Summers 2021-08-09 16:23:18 +02:00
parent e478dfc70f
commit 5d7cac2293
8 changed files with 132 additions and 134 deletions

View File

@ -11,10 +11,11 @@
set -eux set -eux
# Set up python for testing # Set up python for testing
python3 -m venv --system-site-packages py python3 -m venv --system-site-packages py-venv
. py/bin/activate . py/bin/activate
# Install Miniconf utilities for configuring stabilizer. # Install Miniconf utilities for configuring stabilizer.
python3 -m pip install -e py/
python3 -m pip install git+https://github.com/quartiq/miniconf#subdirectory=py/miniconf-mqtt python3 -m pip install git+https://github.com/quartiq/miniconf#subdirectory=py/miniconf-mqtt
python3 -m pip install gmqtt python3 -m pip install gmqtt

View File

@ -6,13 +6,11 @@ Description: Implements HITL testing of Stabilizer data livestream capabilities.
""" """
import asyncio import asyncio
import sys import sys
import time
import argparse import argparse
import socket import socket
import struct
import logging
from miniconf import Miniconf from miniconf import Miniconf
from stabilizer.stream import StabilizerStream
def _get_ip(broker): def _get_ip(broker):
""" Get the IP of the local device. """ Get the IP of the local device.
@ -42,74 +40,6 @@ def sequence_delta(previous_sequence, next_sequence):
return delta & 0xFFFFFFFF return delta & 0xFFFFFFFF
class StabilizerStream:
""" Provides access to Stabilizer's livestreamed data. """
# The magic header half-word at the start of each packet.
MAGIC_HEADER = 0x057B
# The struct format of the header.
HEADER_FORMAT = '<HBBI'
# All supported formats by this reception script.
#
# The items in this dict are functions that will be provided the sample batch size and will
# return the struct deserialization code to unpack a single batch.
FORMAT = {
1: lambda batch_size: f'<{batch_size}H{batch_size}H{batch_size}H{batch_size}H'
}
def __init__(self, port):
""" Initialize the stream. """
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(("", port))
self.socket.settimeout(0.5)
def clear(self, duration=5):
""" Clear the socket RX buffer by reading all available data.
Args:
duration: The maximum duration in seconds to read data for.
"""
start = time.time()
while (time.time() - start) < duration:
try:
self.socket.recv(4096)
except socket.timeout:
return
def read_frame(self):
""" Read a single frame from the stream.
Returns:
Yields the (seqnum, data) of the batches available in the frame.
"""
buf = self.socket.recv(4096)
# Attempt to parse a block from the buffer.
if len(buf) < struct.calcsize(self.HEADER_FORMAT):
return
# Parse out the packet header
magic, format_id, batch_size, sequence_number = struct.unpack_from(self.HEADER_FORMAT, buf)
buf = buf[struct.calcsize(self.HEADER_FORMAT):]
if magic != self.MAGIC_HEADER:
logging.warning('Encountered bad magic header: %s', hex(magic))
return
frame_format = self.FORMAT[format_id](batch_size)
batch_count = int(len(buf) / struct.calcsize(frame_format))
for offset in range(batch_count):
data = struct.unpack_from(frame_format, buf)
buf = buf[struct.calcsize(frame_format):]
yield (sequence_number + offset, data)
def main(): def main():
""" Main program entry point. """ """ Main program entry point. """
parser = argparse.ArgumentParser(description='Loopback tests for Stabilizer HITL testing',) parser = argparse.ArgumentParser(description='Loopback tests for Stabilizer HITL testing',)
@ -126,12 +56,13 @@ def main():
""" The actual testing being completed. """ """ The actual testing being completed. """
local_ip = _get_ip(args.broker) local_ip = _get_ip(args.broker)
interface = await Miniconf.create(args.prefix, args.broker) interface = await Miniconf.create(args.prefix, args.broker)
stream = StabilizerStream(args.port) stream = StabilizerStream(args.port, timeout=0.5)
# Configure the stream # Configure the stream
print(f'Configuring stream to target {".".join(map(str, local_ip))}:{args.port}') print(f'Configuring stream to target {".".join(map(str, local_ip))}:{args.port}')
print('') print('')
await interface.command('stream_target', {'ip': local_ip, 'port': args.port}) await interface.command('stream_target', {'ip': local_ip, 'port': args.port}, retain=False)
await interface.command('telemetry_period', 10, retain=False)
# Verify frame reception # Verify frame reception
print('Testing stream reception') print('Testing stream reception')
@ -139,13 +70,14 @@ def main():
last_sequence = None last_sequence = None
for _ in range(5000): for _ in range(5000):
for (seqnum, _data) in stream.read_frame(): for (seqnum, _data) in stream.read_frame():
assert sequence_delta(last_sequence, seqnum) == 0 assert sequence_delta(last_sequence, seqnum) == 0, \
f'Frame drop detected: 0x{last_sequence:08X} -> 0x{seqnum:08X}'
last_sequence = seqnum last_sequence = seqnum
# Disable the stream. # Disable the stream.
print('Closing stream') print('Closing stream')
print('') print('')
await interface.command('stream_target', {'ip': [0, 0, 0, 0], 'port': 0}) await interface.command('stream_target', {'ip': [0, 0, 0, 0], 'port': 0}, retain=False)
stream.clear() stream.clear()
print('Verifying no further data is received') print('Verifying no further data is received')

2
py/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
/*.egg-info/
__pycache__/

8
py/README.md Normal file
View File

@ -0,0 +1,8 @@
# Stabilizer Python Utilities
This directory contains common Python utilities for Stabilizer, such as livestream data receivers.
To install this module locally (in editable mode):
```
python -m pip install -e .
```

7
py/setup.py Normal file
View File

@ -0,0 +1,7 @@
from setuptools import setup
setup(name='stabilizer',
version='0.1',
description='Stabilizer Utilities',
author='QUARTIQ GmbH',
license='MIT')

View File

92
py/stabilizer/stream.py Normal file
View File

@ -0,0 +1,92 @@
#!/usr/bin/python3
"""
Author: Vertigo Designs, Ryan Summers
Description: Provides a means of accessing Stabilizer livestream data.
"""
import socket
import time
import logging
import struct
class StabilizerStream:
""" Provides access to Stabilizer's livestreamed data. """
# The magic header half-word at the start of each packet.
MAGIC_HEADER = 0x057B
# The struct format of the header.
HEADER_FORMAT = '<HBBI'
# All supported formats by this reception script.
#
# The items in this dict are functions that will be provided the sample batch size and will
# return the struct deserialization code to unpack a single batch.
FORMAT = {
1: lambda batch_size: f'<{batch_size}H{batch_size}H{batch_size}H{batch_size}H'
}
def __init__(self, port, timeout=None):
""" Initialize the stream.
Args:
port: The UDP port to receive the stream from.
timeout: The timeout to set on the UDP socket.
"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(("", port))
self.total_bytes = 0
if timeout is not None:
self.socket.settimeout(timeout)
def clear(self, duration=1):
""" Clear the socket RX buffer by reading all available data.
Args:
duration: The maximum duration in seconds to read data for.
"""
time.sleep(duration)
try:
while self.socket.recv(4096):
pass
except socket.timeout:
pass
def get_rx_bytes(self):
""" Get the number of bytes read from the stream. """
return self.total_bytes
def read_frame(self):
""" Read a single frame from the stream.
Returns:
Yields the (seqnum, data) of the batches available in the frame.
"""
buf = self.socket.recv(4096)
self.total_bytes += len(buf)
# Attempt to parse a block from the buffer.
if len(buf) < struct.calcsize(self.HEADER_FORMAT):
return
# Parse out the packet header
magic, format_id, batch_size, sequence_number = struct.unpack_from(self.HEADER_FORMAT, buf)
buf = buf[struct.calcsize(self.HEADER_FORMAT):]
if magic != self.MAGIC_HEADER:
logging.warning('Encountered bad magic header: %s', hex(magic))
return
frame_format = self.FORMAT[format_id](batch_size)
batch_count = int(len(buf) / struct.calcsize(frame_format))
for offset in range(batch_count):
data = struct.unpack_from(frame_format, buf)
buf = buf[struct.calcsize(frame_format):]
yield (sequence_number + offset, data)

View File

@ -5,53 +5,11 @@ Author: Ryan Summers
Description: Provides a mechanism for measuring Stabilizer stream data throughput. Description: Provides a mechanism for measuring Stabilizer stream data throughput.
""" """
import argparse import argparse
import socket
import collections
import struct
import time
import logging import logging
import sys
import time
# Representation of a single data batch transmitted by Stabilizer. from stabilizer.stream import StabilizerStream
Packet = collections.namedtuple('Packet', ['index', 'data'])
# The magic header half-word at the start of each packet.
MAGIC_HEADER = 0x057B
# The struct format of the header.
HEADER_FORMAT = '<HBBI'
# All supported formats by this reception script.
#
# The items in this dict are functions that will be provided the sample batch size and will return
# the struct deserialization code to unpack a single batch.
FORMAT = {
1: lambda batch_size: f'<{batch_size}H{batch_size}H{batch_size}H{batch_size}H'
}
def parse_packet(buf):
""" Attempt to parse packets from the received buffer. """
# Attempt to parse a block from the buffer.
if len(buf) < struct.calcsize(HEADER_FORMAT):
return
# Parse out the packet header
magic, format_id, batch_size, sequence_number = struct.unpack_from(HEADER_FORMAT, buf)
buf = buf[struct.calcsize(HEADER_FORMAT):]
if magic != MAGIC_HEADER:
logging.warning('Encountered bad magic header: %s', hex(magic))
return
frame_format = FORMAT[format_id](batch_size)
batch_count = int(len(buf) / struct.calcsize(frame_format))
for offset in range(batch_count):
data = struct.unpack_from(frame_format, buf)
buf = buf[struct.calcsize(frame_format):]
yield Packet(sequence_number + offset, data)
class Timer: class Timer:
""" A basic timer for measuring elapsed time periods. """ """ A basic timer for measuring elapsed time periods. """
@ -104,13 +62,11 @@ def sequence_delta(previous_sequence, next_sequence):
def main(): def main():
""" Main program. """ """ Main program. """
parser = argparse.ArgumentParser(description='Measure Stabilizer livestream quality') parser = argparse.ArgumentParser(description='Measure Stabilizer livestream quality')
parser.add_argument('--port', type=int, default=1111, help='The port that stabilizer is streaming to') parser.add_argument('--port', type=int, default=2000,
help='The port that stabilizer is streaming to')
args = parser.parse_args() args = parser.parse_args()
connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
connection.bind(("", args.port))
logging.basicConfig(level=logging.INFO, logging.basicConfig(level=logging.INFO,
format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s') format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s')
@ -122,23 +78,22 @@ def main():
timer = Timer() timer = Timer()
stream = StabilizerStream(args.port)
while True: while True:
# Receive any data over UDP and parse it. # Receive any data over UDP and parse it.
data = connection.recv(4096) for (seqnum, _) in stream.read_frame():
if data and not timer.is_started(): if not timer.is_started():
timer.start() timer.start()
# Handle any received packets.
total_bytes += len(data)
for packet in parse_packet(data):
# Handle any dropped packets. # Handle any dropped packets.
drop_count += sequence_delta(last_index, packet.index) drop_count += sequence_delta(last_index, seqnum)
last_index = packet.index last_index = seqnum
good_blocks += 1 good_blocks += 1
# Report the throughput periodically. # Report the throughput periodically.
if timer.is_triggered(): if timer.is_triggered():
drate = total_bytes * 8 / 1e6 / timer.elapsed() drate = stream.get_rx_bytes() * 8 / 1e6 / timer.elapsed()
print(f''' print(f'''
Data Rate: {drate:.3f} Mbps Data Rate: {drate:.3f} Mbps
@ -148,6 +103,7 @@ Dropped blocks: {drop_count}
Metadata: {total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s Metadata: {total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s
---- ----
''') ''')
sys.stdout.flush()
timer.arm() timer.arm()