forked from M-Labs/artiq
gateware/dsp: add FIR and test
This commit is contained in:
parent
d34084be0f
commit
7e0f3edca5
|
@ -0,0 +1,68 @@
|
|||
from operator import add
|
||||
from functools import reduce
|
||||
import numpy as np
|
||||
from migen import *
|
||||
|
||||
|
||||
def halfgen4(up, n):
|
||||
"""
|
||||
http://recycle.lbl.gov/~ldoolitt/halfband
|
||||
|
||||
params:
|
||||
* `up` is the stopband width, as a fraction of input sampling rate
|
||||
* `n is the order of half-band filter to generate
|
||||
returns:
|
||||
* `a` is the full set of FIR coefficients, `4*n-1` long.
|
||||
implement wisely.
|
||||
"""
|
||||
|
||||
npt = n*40
|
||||
wmax = 2*np.pi*up
|
||||
wfit = (1 - np.linspace(0, 1, npt)[:, None]**2)*wmax
|
||||
|
||||
target = .5*np.ones_like(wfit)
|
||||
basis = np.cos(wfit*np.arange(1, 2*n, 2))
|
||||
l = np.linalg.pinv(basis)@target
|
||||
|
||||
weight = np.ones_like(wfit)
|
||||
for i in range(40):
|
||||
err = np.fabs(basis@l - .5)
|
||||
weight[err > .99*np.max(err)] *= 1 + 1.5/(i + 11)
|
||||
l = np.linalg.pinv(basis*weight)@(target*weight)
|
||||
a = np.c_[l, np.zeros_like(l)].ravel()[:-1]
|
||||
a = np.r_[a[::-1], 1, a]/2
|
||||
return a
|
||||
|
||||
|
||||
class FIR(Module):
|
||||
"""Full-rate finite impulse response filter.
|
||||
|
||||
:param coefficients: integer taps.
|
||||
:param width: bit width of input and output.
|
||||
:param shift: scale factor (as power of two).
|
||||
"""
|
||||
def __init__(self, coefficients, width=16, shift=None):
|
||||
self.width = width
|
||||
self.i = Signal((width, True))
|
||||
self.o = Signal((width, True))
|
||||
self.latency = (len(coefficients) + 1)//2 + 1
|
||||
|
||||
###
|
||||
|
||||
n = len(coefficients)
|
||||
x = [Signal((width, True)) for _ in range(n)]
|
||||
self.comb += x[0].eq(self.i)
|
||||
self.sync += [x[i + 1].eq(x[i]) for i in range(n - 1)]
|
||||
|
||||
o = []
|
||||
for i, c in enumerate(coefficients):
|
||||
# simplify for halfband and symmetric filters
|
||||
if c == 0 or c in coefficients[:i]:
|
||||
continue
|
||||
o.append(c*reduce(add, [
|
||||
xj for xj, cj in zip(x, coefficients) if cj == c
|
||||
]))
|
||||
|
||||
if shift is None:
|
||||
shift = width - 1
|
||||
self.sync += self.o.eq(reduce(add, o) >> shift)
|
|
@ -0,0 +1,78 @@
|
|||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
from migen import *
|
||||
from migen.fhdl import verilog
|
||||
from artiq.gateware.dsp import fir
|
||||
|
||||
|
||||
class Transfer(Module):
|
||||
def __init__(self, dut):
|
||||
self.submodules.dut = dut
|
||||
|
||||
def drive(self, x):
|
||||
for xi in x:
|
||||
yield self.dut.i.eq(int(xi))
|
||||
yield
|
||||
|
||||
def record(self, y):
|
||||
for i in range(self.dut.latency):
|
||||
yield
|
||||
for i in range(len(y)):
|
||||
y[i] = (yield self.dut.o)
|
||||
yield
|
||||
|
||||
def run(self, samples, amplitude=1.):
|
||||
w = 2**(self.dut.width - 1) - 1
|
||||
x = np.round(np.random.uniform(
|
||||
-amplitude*w, amplitude*w, samples))
|
||||
y = np.empty_like(x)
|
||||
run_simulation(self, [self.drive(x), self.record(y)],
|
||||
vcd_name="fir.vcd")
|
||||
x /= w
|
||||
y /= w
|
||||
return x, y
|
||||
|
||||
def analyze(self, x, y):
|
||||
fig, ax = plt.subplots(3)
|
||||
ax[0].plot(x, "c-.", label="input")
|
||||
ax[0].plot(y, "r-", label="output")
|
||||
ax[0].legend(loc="right")
|
||||
ax[0].set_xlabel("time (1/fs)")
|
||||
ax[0].set_ylabel("signal")
|
||||
n = len(x)
|
||||
w = np.hanning(n)
|
||||
x = (x.reshape(-1, n)*w).sum(0)
|
||||
y = (y.reshape(-1, n)*w).sum(0)
|
||||
t = (np.fft.rfft(y)/np.fft.rfft(x))
|
||||
f = np.fft.rfftfreq(n)*2
|
||||
fmin = f[1]
|
||||
ax[1].plot(f, 20*np.log10(np.abs(t)), "r-")
|
||||
ax[1].set_ylim(-70, 3)
|
||||
ax[1].set_xlim(fmin, 1.)
|
||||
# ax[1].set_xscale("log")
|
||||
ax[1].set_xlabel("frequency (fs/2)")
|
||||
ax[1].set_ylabel("magnitude (dB)")
|
||||
ax[1].grid(True)
|
||||
ax[2].plot(f, np.rad2deg(np.angle(t)), "r-")
|
||||
ax[2].set_xlim(fmin, 1.)
|
||||
# ax[2].set_xscale("log")
|
||||
ax[2].set_xlabel("frequency (fs/2)")
|
||||
ax[2].set_ylabel("phase (deg)")
|
||||
ax[2].grid(True)
|
||||
return fig
|
||||
|
||||
|
||||
def _main():
|
||||
coeff = fir.halfgen4(.4/2, 8)
|
||||
coeff_int = [int(round(c * (1 << 16 - 1))) for c in coeff]
|
||||
dut = fir.FIR(coeff_int, width=16)
|
||||
# print(verilog.convert(dut, ios={dut.i, dut.o}))
|
||||
tb = Transfer(dut)
|
||||
x, y = tb.run(samples=1 << 10, amplitude=.8)
|
||||
tb.analyze(x, y)
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
_main()
|
Loading…
Reference in New Issue