#!/usr/bin/python3 """ Author: Vertigo Designs, Ryan Summers Description: Implements HITL testing of Stabilizer data livestream capabilities. """ import asyncio import sys import time import argparse import socket import struct import logging from miniconf import Miniconf def _get_ip(broker): """ Get the IP of the local device. Args: broker: The broker IP of the test. Used to select an interface to get the IP of. Returns: The IP as an array of integers. """ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: sock.connect((broker, 1883)) address = sock.getsockname()[0] finally: sock.close() return list(map(int, address.split('.'))) def sequence_delta(previous_sequence, next_sequence): """ Check the number of items between two sequence numbers. """ if previous_sequence is None: return 0 delta = next_sequence - (previous_sequence + 1) return delta & 0xFFFFFFFF class StabilizerStream: """ Provides access to Stabilizer's livestreamed data. """ # The magic header half-word at the start of each packet. MAGIC_HEADER = 0x057B # The struct format of the header. HEADER_FORMAT = '