import serial import queue import threading import SoapySDR import numpy as np from scipy.signal import blackmanharris class BufferedSDR: def __init__(self, sdr, channels, bufsize, nbufs): self.sdr = sdr self.channels = channels self.bufsize = bufsize self.stream = None self.terminate = False self.thread = None self.queue = queue.Queue(nbufs) self.available_buffers = queue.Queue(nbufs) for _ in range(nbufs): self.available_buffers.put([np.array([0]*bufsize, np.complex64) for _ in self.channels]) def start(self): self.stream = self.sdr.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32, self.channels) try: self.thread = threading.Thread(target=self.thread_target) self.thread.start() except: self.sdr.closeStream(self.stream) raise def stop(self): self.terminate = True self.thread.join() self.sdr.closeStream(self.stream) def get(self): return self.queue.get() def dispose(self, buffers): self.available_buffers.put(buffers) def thread_target(self): self.sdr.activateStream(self.stream) try: while not self.terminate: buffers = self.available_buffers.get() sr = self.sdr.readStream(self.stream, buffers, self.bufsize) if sr.ret != self.bufsize: print("SDR sampling error") return self.queue.put(buffers) finally: self.sdr.deactivateStream(self.stream) class DummyInductionHeater: def __init__(self, port, induction_min, induction_max): pass def start(self): pass def set(self, amount): print("induction", amount) def stop(self): pass class InductionHeater: """Interface to the MHS5200A function generator driving the LC tank""" def __init__(self, port, induction_min, induction_max): self.port = port self.induction_min = induction_min self.induction_max = induction_max self.queue = queue.Queue(1) def start(self): self.serial = serial.Serial(self.port, 57600) self.thread = threading.Thread(target=self.thread_target) self.thread.start() def thread_target(self): while True: amount = self.queue.get() if amount is None: break amount = max(min(amount, 0.5), -0.5) freq = ((self.induction_min + self.induction_max)/2 + amount*(self.induction_max - self.induction_min)) command = ":s1f{:010d}\n".format(int(freq*1e2)) self.serial.write(command.encode()) self.serial.readline() def set(self, amount): self.queue.put(amount, block=False) def stop(self): self.queue.put(None, block=True) self.thread.join() self.serial.close() class Stabilizer: def __init__(self, fft_size, amp_threshold, freq_target, k, tuner): self.freqs = np.fft.fftfreq(fft_size) self.amp_threshold = amp_threshold self.freq_target = freq_target self.k = k self.tuner = tuner def input(self, samples): spectrum = np.abs(np.fft.fft(samples*blackmanharris(len(samples)))) i = np.argmax(spectrum) freq = self.freqs[i] amplitude = spectrum[i] if amplitude > self.amp_threshold: tuning = (freq - self.freq_target)*self.k else: tuning = 0.0 self.tuner.set(tuning) def continuous_unwrap(last_phase, last_phase_unwrapped, p): # note: np.unwrap always preserves first element of array p = np.unwrap(p) glue = np.array([last_phase_unwrapped, last_phase_unwrapped + (p[0] - last_phase)]) new_p0 = np.unwrap(glue)[1] return new_p0 + p - p[0] class PositionTracker: def __init__(self, leakage_avg): self.last_phase = 0.0 self.last_position = 0.0 self.leakage = np.zeros(leakage_avg) self.leakage_ptr = 0 def input(self, ref, meas): demod = np.conjugate(ref)*meas self.leakage[self.leakage_ptr] = np.real(np.sum(demod)/len(demod)) self.leakage_ptr = (self.leakage_ptr + 1) % len(self.leakage) leakage = np.sum(self.leakage)/len(self.leakage) phase = np.angle(demod - leakage) position = continuous_unwrap(self.last_phase, self.last_position, phase)/(2.0*np.pi) self.last_phase = phase[-1] self.last_position = position[-1] return position, leakage