import serial import queue import threading import SoapySDR import numpy as np from scipy import signal class BufferedSDR: def __init__(self, sdr, channels, bufsize, nbufs): self.sdr = sdr self.channels = channels self.bufsize = bufsize self.stream = None self.terminate = False self.thread = None self.queue = queue.Queue(nbufs) self.available_buffers = queue.Queue(nbufs) for _ in range(nbufs): self.available_buffers.put([np.array([0]*bufsize, np.complex64) for _ in self.channels]) def start(self): self.stream = self.sdr.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32, self.channels) try: self.thread = threading.Thread(target=self.thread_target) self.thread.start() except: self.sdr.closeStream(self.stream) raise def stop(self): self.terminate = True self.thread.join() self.sdr.closeStream(self.stream) def get(self): return self.queue.get() def dispose(self, buffers): self.available_buffers.put(buffers) def thread_target(self): self.sdr.activateStream(self.stream) try: while not self.terminate: buffers = self.available_buffers.get() sr = self.sdr.readStream(self.stream, buffers, self.bufsize) if sr.ret != self.bufsize: print("SDR sampling error") return self.queue.put(buffers) finally: self.sdr.deactivateStream(self.stream) class DummyInductionHeater: def __init__(self, port, induction_min, induction_max): pass def start(self): pass def set(self, amount): print("induction", amount) def stop(self): pass class InductionHeater: """Interface to the MHS5200A function generator driving the LC tank""" def __init__(self, port, induction_min, induction_max): self.port = port self.induction_min = induction_min self.induction_max = induction_max self.queue = queue.Queue(1) def start(self): self.serial = serial.Serial(self.port, 57600) self.thread = threading.Thread(target=self.thread_target) self.thread.start() def thread_target(self): while True: amount = self.queue.get() if amount is None: break assert -0.5 <= amount <= 0.5 freq = ((self.induction_min + self.induction_max)/2 + amount*(self.induction_max - self.induction_min)) command = ":s1f{:010d}\n".format(int(freq*1e2)) self.serial.write(command.encode()) self.serial.readline() def set(self, amount): self.queue.put(amount, block=False) def stop(self): self.queue.put(0.0, block=True) self.queue.put(None, block=True) self.thread.join() self.serial.close() class Stabilizer: def __init__(self, freq_sample, block_size, freq_target, cb): self.freqs = np.fft.fftfreq(block_size, d=1/freq_sample) self.freq_target = freq_target self.cb = cb self.amp_counter = 0 self.lock_counter = 0 self.unlock_counter = 0 self.wiggle = 0.0 self.tuning = 0.0 self.amp_threshold = 80.0 self.k = 30.0e-6 self.tolerance = 10e3 self.amp_counter_threshold = 60 self.lock_counter_threshold = 60 self.unlock_counter_threshold = 500 self.wiggle_amplitude = 0.15 def input(self, samples): spectrum = np.abs(np.fft.fft(samples*signal.blackmanharris(len(samples)))) i = np.argmax(spectrum) amplitude = spectrum[i] success = False if amplitude > self.amp_threshold: freq = self.freqs[i] delta = freq - self.freq_target self.amp_counter += 1 if self.amp_counter > self.amp_counter_threshold: self.tuning = delta*self.k if abs(delta) < self.tolerance: success = True else: freq = None self.amp_counter = 0 max_tuning_abs = 0.5 - self.wiggle_amplitude - 1e-9 self.tuning = max(min(self.tuning, max_tuning_abs), -max_tuning_abs) if success: self.lock_counter += 1 else: self.lock_counter = 0 if self.locked(): self.unlock_counter = 0 else: self.unlock_counter += 1 if not success and (self.unlock_counter > self.unlock_counter_threshold): self.wiggle = self.wiggle_amplitude*np.random.uniform(-1.0, 1.0) print("wiggle", self.wiggle) self.unlock_counter = 0 self.amp_counter = 0 self.cb(spectrum, freq, self.locked(), self.tuning + self.wiggle) def locked(self): return self.lock_counter > self.lock_counter_threshold def continuous_unwrap(last_phase, last_phase_unwrapped, p): # note: np.unwrap always preserves first element of array p = np.unwrap(p) glue = np.array([last_phase_unwrapped, last_phase_unwrapped + (p[0] - last_phase)]) new_p0 = np.unwrap(glue)[1] return new_p0 + p - p[0] class PositionTracker: def __init__(self): self.reset() def reset(self): self.last_phase = 0.0 self.last_position = 0.0 def input(self, ref, meas): phase = np.angle(meas*ref.conj()) position = continuous_unwrap(self.last_phase, self.last_position, phase) self.last_phase = phase[-1] self.last_position = position[-1] return position class LinearPhaseFilter: def __init__(self, numtaps, *args, **kwargs): self.numtaps = numtaps self.coef = signal.firwin(numtaps, *args, **kwargs) self.state = np.zeros(self.numtaps - 1, dtype="complex64") def input(self, block): output, self.state = signal.lfilter(self.coef, [1.0], block, zi=self.state) return output def delay(self): assert(self.numtaps % 2 == 1) return (self.numtaps - 1)//2 class Delay: def __init__(self, block_size, delay_amount): self.block_size = block_size self.delay_amount = delay_amount self.output = np.zeros(block_size, dtype="complex64") self.next = np.zeros(delay_amount, dtype="complex64") def input(self, block): assert(len(block) == self.block_size) split = self.block_size - self.delay_amount self.output[:self.delay_amount] = self.next self.output[self.delay_amount:] = block[:split] self.next = block[split:] return self.output