import SoapySDR import numpy as np from scipy.signal import blackmanharris import serial fs = 5e6 base_freq = 1086e6 threshold = 0.4 target_freq = 1088.3e6 k = 200e-6 induction_min = 430e3 induction_max = 445e3 generator = serial.Serial("/dev/ttyUSB0", 57600, writeTimeout=0) def set_induction_freq(freq): command = ":s1f{:010d}\n".format(int(freq*1e2)) generator.write(command.encode()) generator.readline() def set_induction(amount): amount = max(min(amount, 0.5), -0.5) freq = (induction_min + induction_max)/2 + amount*(induction_max - induction_min) set_induction_freq(freq) def parabolic(f, x): """Quadratic interpolation for estimating the true position of an inter-sample maximum when nearby samples are known. f is a vector and x is an index for that vector. Returns (vx, vy), the coordinates of the vertex of a parabola that goes through point x and its two neighbors. Example: Defining a vector f with a local maximum at index 3 (= 6), find local maximum if points 2, 3, and 4 actually defined a parabola. In [3]: f = [2, 3, 1, 6, 4, 2, 3, 1] In [4]: parabolic(f, argmax(f)) Out[4]: (3.2142857142857144, 6.1607142857142856) """ xv = 1/2. * (f[x-1] - f[x+1]) / (f[x-1] - 2 * f[x] + f[x+1]) + x yv = f[x] - 1/4. * (f[x-1] - f[x+1]) * (xv - x) return (xv, yv) def fmt_range(x, m): pos = int(round(x*80/m)) return " "*pos + "*" sdr = SoapySDR.Device() samples = np.array([0]*4096, np.complex64) sdr.setSampleRate(SoapySDR.SOAPY_SDR_RX, 0, fs) sdr.setFrequency(SoapySDR.SOAPY_SDR_RX, 0, base_freq) rxStream = sdr.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32) sdr.activateStream(rxStream) decimation = 0 try: while True: sr = sdr.readStream(rxStream, [samples], len(samples)) if sr.ret != len(samples): print("!") continue freqs = np.abs(np.fft.fft(samples*blackmanharris(len(samples)))[0:len(samples)//2]) for i in range(len(freqs)//100): freqs[i] = 0 freqs[-i] = 0 # https://gist.github.com/endolith/255291 i = np.argmax(freqs) true_i = parabolic(freqs, i)[0] freq = 0.5 * fs * true_i / len(freqs) amplitude = freqs[i] if amplitude > threshold: actual_freq = base_freq + freq print("{:8.3f} {:6.2f} {}".format(actual_freq/1e6, amplitude, fmt_range(freq, fs/2))) induction = (actual_freq - target_freq)*k else: print("", amplitude) induction = 0.0 decimation += 1 if decimation == 15: set_induction(induction) decimation = 0 finally: sdr.deactivateStream(rxStream) sdr.closeStream(rxStream)