add LinearPhaseFilter and Delay

master
Sebastien Bourdeauducq 2020-09-26 20:34:20 +08:00
parent aded9ddfbc
commit be600231e5
1 changed files with 33 additions and 2 deletions

View File

@ -4,7 +4,7 @@ import threading
import SoapySDR
import numpy as np
from scipy.signal import blackmanharris
from scipy import signal
class BufferedSDR:
@ -127,7 +127,7 @@ class Stabilizer:
self.wiggle_amplitude = 0.15
def input(self, samples):
spectrum = np.abs(np.fft.fft(samples*blackmanharris(len(samples))))
spectrum = np.abs(np.fft.fft(samples*signal.blackmanharris(len(samples))))
i = np.argmax(spectrum)
amplitude = spectrum[i]
@ -189,3 +189,34 @@ class PositionTracker:
self.last_phase = phase[-1]
self.last_position = position[-1]
return position
class LinearPhaseFilter:
def __init__(self, numtaps, *args, **kwargs):
self.numtaps = numtaps
self.coef = signal.firwin(numtaps, *args, **kwargs)
self.state = np.zeros(self.numtaps - 1)
def input(self, block):
output, self.state = signal.lfilter(self.coef, [1.0], block, zi=self.state)
return output
def delay(self):
assert(self.numptaps % 2 == 1)
return (self.numptaps - 1)//2
class Delay:
def __init__(self, block_size, delay_amount):
self.block_size = block_size
self.delay_amount = delay_amount
self.output = np.zeros(block_size)
self.next = np.zeros(delay_amount)
def input(self, block):
assert(len(block) == self.block_size)
split = self.block_size - self.delay_amount
self.output[:self.delay_amount] = self.next
self.output[self.delay_amount:] = block[:split]
self.next = block[split:]
return self.output