#!/usr/bin/python3 """ Author: Ryan Summers Description: Provides a mechanism for measuring Stabilizer stream data throughput. """ import argparse import socket import collections import struct import time import logging # Representation of a single data batch transmitted by Stabilizer. Packet = collections.namedtuple('Packet', ['index', 'data']) # The magic header half-word at the start of each packet. MAGIC_HEADER = 0x057B # The struct format of the header. HEADER_FORMAT = '= self.trigger_time def start(self): """ Start the timer. """ self.start_time = time.time() self.started = True def is_started(self): """ Check if the timer has started. """ return self.started def arm(self): """ Arm the timer trigger. """ self.trigger_time = time.time() + self.period def elapsed(self): """ Get the elapsed time since the timer was started. """ now = time.time() return now - self.start_time def sequence_delta(previous_sequence, next_sequence): """ Check the number of items between two sequence numbers. """ if previous_sequence is None: return 0 delta = next_sequence - (previous_sequence + 1) return delta & 0xFFFFFFFF def main(): """ Main program. """ parser = argparse.ArgumentParser(description='Measure Stabilizer livestream quality') parser.add_argument('--port', default=1111, help='The port that stabilizer is streaming to') args = parser.parse_args() connection = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) connection.bind(("", args.port)) logging.basicConfig(level=logging.INFO, format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s') last_index = None drop_count = 0 good_blocks = 0 total_bytes = 0 timer = Timer() while True: # Receive any data over UDP and parse it. data = connection.recv(1024) if data and not timer.is_started(): timer.start() # Handle any received packets. total_bytes += len(data) for packet in parse_packet(data): # Handle any dropped packets. drop_count += sequence_delta(last_index, packet.index) last_index = packet.index good_blocks += 1 # Report the throughput periodically. if timer.is_triggered(): drate = total_bytes * 8 / 1e6 / timer.elapsed() print(f''' Data Rate: {drate:.3f} Mbps Received Blocks: {good_blocks} Dropped blocks: {drop_count} Metadata: {total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s ---- ''') timer.arm() if __name__ == '__main__': main()