noptica2-sdr/noptica.py

223 rader
6.6 KiB
Python

import serial
import queue
import threading
import SoapySDR
import numpy as np
from scipy import signal
class BufferedSDR:
def __init__(self, sdr, channels, bufsize, nbufs):
self.sdr = sdr
self.channels = channels
self.bufsize = bufsize
self.stream = None
self.terminate = False
self.thread = None
self.queue = queue.Queue(nbufs)
self.available_buffers = queue.Queue(nbufs)
for _ in range(nbufs):
self.available_buffers.put([np.array([0]*bufsize, np.complex64) for _ in self.channels])
def start(self):
self.stream = self.sdr.setupStream(SoapySDR.SOAPY_SDR_RX, SoapySDR.SOAPY_SDR_CF32, self.channels)
try:
self.thread = threading.Thread(target=self.thread_target)
self.thread.start()
except:
self.sdr.closeStream(self.stream)
raise
def stop(self):
self.terminate = True
self.thread.join()
self.sdr.closeStream(self.stream)
def get(self):
return self.queue.get()
def dispose(self, buffers):
self.available_buffers.put(buffers)
def thread_target(self):
self.sdr.activateStream(self.stream)
try:
while not self.terminate:
buffers = self.available_buffers.get()
sr = self.sdr.readStream(self.stream, buffers, self.bufsize)
if sr.ret != self.bufsize:
print("SDR sampling error")
return
self.queue.put(buffers)
finally:
self.sdr.deactivateStream(self.stream)
class DummyInductionHeater:
def __init__(self, port, induction_min, induction_max):
pass
def start(self):
pass
def set(self, amount):
print("induction", amount)
def stop(self):
pass
class InductionHeater:
"""Interface to the MHS5200A function generator driving the LC tank"""
def __init__(self, port, induction_min, induction_max):
self.port = port
self.induction_min = induction_min
self.induction_max = induction_max
self.queue = queue.Queue(1)
def start(self):
self.serial = serial.Serial(self.port, 57600)
self.thread = threading.Thread(target=self.thread_target)
self.thread.start()
def thread_target(self):
while True:
amount = self.queue.get()
if amount is None:
break
assert -0.5 <= amount <= 0.5
freq = ((self.induction_min + self.induction_max)/2
+ amount*(self.induction_max - self.induction_min))
command = ":s1f{:010d}\n".format(int(freq*1e2))
self.serial.write(command.encode())
self.serial.readline()
def set(self, amount):
self.queue.put(amount, block=False)
def stop(self):
self.queue.put(0.0, block=True)
self.queue.put(None, block=True)
self.thread.join()
self.serial.close()
class Stabilizer:
def __init__(self, freq_sample, block_size, freq_target, cb):
self.freqs = np.fft.fftfreq(block_size, d=1/freq_sample)
self.freq_target = freq_target
self.cb = cb
self.amp_counter = 0
self.lock_counter = 0
self.unlock_counter = 0
self.wiggle = 0.0
self.tuning = 0.0
self.amp_threshold = 80.0
self.k = 30.0e-6
self.tolerance = 10e3
self.amp_counter_threshold = 60
self.lock_counter_threshold = 60
self.unlock_counter_threshold = 500
self.wiggle_amplitude = 0.15
def input(self, samples):
spectrum = np.abs(np.fft.fft(samples*signal.blackmanharris(len(samples))))
i = np.argmax(spectrum)
amplitude = spectrum[i]
success = False
if amplitude > self.amp_threshold:
freq = self.freqs[i]
delta = freq - self.freq_target
self.amp_counter += 1
if self.amp_counter > self.amp_counter_threshold:
self.tuning = delta*self.k
if abs(delta) < self.tolerance:
success = True
else:
freq = None
self.amp_counter = 0
max_tuning_abs = 0.5 - self.wiggle_amplitude - 1e-9
self.tuning = max(min(self.tuning, max_tuning_abs), -max_tuning_abs)
if success:
self.lock_counter += 1
else:
self.lock_counter = 0
if self.locked():
self.unlock_counter = 0
else:
self.unlock_counter += 1
if not success and (self.unlock_counter > self.unlock_counter_threshold):
self.wiggle = self.wiggle_amplitude*np.random.uniform(-1.0, 1.0)
print("wiggle", self.wiggle)
self.unlock_counter = 0
self.amp_counter = 0
self.cb(spectrum, freq, self.locked(), self.tuning + self.wiggle)
def locked(self):
return self.lock_counter > self.lock_counter_threshold
def continuous_unwrap(last_phase, last_phase_unwrapped, p):
# note: np.unwrap always preserves first element of array
p = np.unwrap(p)
glue = np.array([last_phase_unwrapped, last_phase_unwrapped + (p[0] - last_phase)])
new_p0 = np.unwrap(glue)[1]
return new_p0 + p - p[0]
class PositionTracker:
def __init__(self):
self.reset()
def reset(self):
self.last_phase = 0.0
self.last_position = 0.0
def input(self, ref, meas):
phase = np.angle(meas*ref.conj())
position = continuous_unwrap(self.last_phase, self.last_position, phase)
self.last_phase = phase[-1]
self.last_position = position[-1]
return position
class LinearPhaseFilter:
def __init__(self, numtaps, *args, **kwargs):
self.numtaps = numtaps
self.coef = signal.firwin(numtaps, *args, **kwargs)
self.state = np.zeros(self.numtaps - 1, dtype="complex64")
def input(self, block):
output, self.state = signal.lfilter(self.coef, [1.0], block, zi=self.state)
return output
def delay(self):
assert(self.numtaps % 2 == 1)
return (self.numtaps - 1)//2
class Delay:
def __init__(self, block_size, delay_amount):
self.block_size = block_size
self.delay_amount = delay_amount
self.output = np.zeros(block_size, dtype="complex64")
self.next = np.zeros(delay_amount, dtype="complex64")
def input(self, block):
assert(len(block) == self.block_size)
split = self.block_size - self.delay_amount
self.output[:self.delay_amount] = self.next
self.output[self.delay_amount:] = block[:split]
self.next = block[split:]
return self.output