#!/usr/bin/python3 """ Author: Ryan Summers Description: quick test pounder_mixer_output => stabilizer ADC ADC range is +- 4 V and 16 bit resolution adc_volts_per_lsb is 0.00031250002 ADC gain can be 1,2,5,10 """ import argparse import logging import sys import time import numpy as np import matplotlib.pyplot as plt #from stabilizer.stream import StabilizerStream import socket import struct class StabilizerStream: """ Provides access to Stabilizer's livestreamed data. """ # The magic header half-word at the start of each packet. MAGIC_HEADER = 0x057B # The struct format of the header. HEADER_FORMAT = '= self.trigger_time def start(self): """ Start the timer. """ self.start_time = time.time() self.started = True def is_started(self): """ Check if the timer has started. """ return self.started def arm(self): """ Arm the timer trigger. """ self.trigger_time = time.time() + self.period def elapsed(self): """ Get the elapsed time since the timer was started. """ now = time.time() return now - self.start_time def sequence_delta(previous_sequence, next_sequence): """ Check the number of items between two sequence numbers. """ if previous_sequence is None: return 0 delta = next_sequence - (previous_sequence + 1) return delta & 0xFFFFFFFF def adc_code_to_volt (raw_adc_code_data): adc_volts_per_lsb = np.float32 ( (5.0 / 2.0) * 4.096 / (2 ** 15) ) # +- 4 V with 16 bit resolution equivalent to 4V with 15 bit resolution # op-amp has gain 1/5 and then divide into two differential inputs return np.float32( np.int16(raw_adc_code_data) ) * adc_volts_per_lsb def flatten(t): return [item for sublist in t for item in sublist] def my_plot (data): fig, (ax1, ax2, ax3) = plt.subplots(3) ax1.plot( 1000*data ) max_v = 1000*max(data) min_v = 1000*min(data) ax1.axhline(y=0.0, color='r', linestyle='-', linewidth=0.3) ax1.axhline(y=np.mean(1000*data), color='g', linestyle='-', linewidth=0.3) ax1.set_ylabel('stabilizer_adc_0 / mV') ax1.set_xlabel('time / 1.28 us') ax1.set_title(f' Stabilizer ADC gain=10 with max V = {max_v:.3f} mV and min V = {min_v:.3f} mV') data = np.array(data) # import csv # with open('data4.csv', 'w') as f: # writer = csv.writer(f) # writer.writerow(data) N = data.shape[0] timestep = 1.28e-6 f = np.fft.rfftfreq(N,timestep) f = f/1000 # convert to kHz Vxx = (1./N)*np.fft.rfft(data) Vxx = np.abs(Vxx) V_max = max(Vxx) max_freq = f[ np.argmax(Vxx) ] ax2.plot( f , Vxx ) ax2.set_ylabel('FFT Amplitude by (1./N)*np.fft.rfft(stabilizer_adc_0)') ax2.set_xlabel('Frequency / kHz') ax2.set_title(f'FFT of {N} datapoint and max is {V_max:.5f} at {max_freq:.3f} kHz') ax2.set_xlim([-10, 400]) # ax2.set_ylim([0, 0.02]) Pxx = 20*np.log10(Vxx) ax3.plot( f , Pxx ) ax3.set_ylabel('20*np.log10(Amplitude)') ax3.set_xlabel('Frequency / kHz') ax3.set_title(f'FFT of {N} datapoint') ax3.set_xlim([-10, 400]) plt.subplots_adjust(hspace = 0.5) plt.show() def main(): """ Main program. """ parser = argparse.ArgumentParser(description='Measure Stabilizer livestream quality') parser.add_argument('--port', type=int, default=1883, help='The port that stabilizer is streaming to') args = parser.parse_args() logging.basicConfig(level=logging.INFO, format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s') last_index = None drop_count = 0 good_blocks = 0 total_bytes = 0 timer = Timer() stream = StabilizerStream(args.port) adc_0_data = [] while True: # Receive any data over UDP and parse it. for (seqnum, _data) in stream.read_frame(): if not timer.is_started(): timer.start() # Handle any dropped packets. drop_count += sequence_delta(last_index, seqnum) last_index = seqnum good_blocks += 1 # if drop_count> 0 : # print("UDP packet drop warning") # break if 1000 < good_blocks and good_blocks < 15050 : # 16080 # print(_data[:8]) adc_0_data.append( _data[:8] ) # adc_ch0 # adc_0_data.append( _data[8:16] ) # adc_ch1 if good_blocks == 15051 : print(_data) print(f"drop_count is {drop_count}") my_plot( adc_code_to_volt(flatten(adc_0_data)) ) break # # Report the throughput periodically. # if timer.is_triggered(): # drate = stream.get_rx_bytes() * 8 / 1e6 / timer.elapsed() # print(f''' # Data Rate: {drate:.3f} Mbps # Received Blocks: {good_blocks} # Dropped blocks: {drop_count} # Metadata: {total_bytes / 1e6:.3f} MB in {timer.elapsed():.2f} s # ---- # ''') # sys.stdout.flush() # timer.arm() if __name__ == '__main__': main()