diff --git a/artiq/gateware/dsp/fir.py b/artiq/gateware/dsp/fir.py new file mode 100644 index 000000000..50628864f --- /dev/null +++ b/artiq/gateware/dsp/fir.py @@ -0,0 +1,68 @@ +from operator import add +from functools import reduce +import numpy as np +from migen import * + + +def halfgen4(up, n): + """ + http://recycle.lbl.gov/~ldoolitt/halfband + + params: + * `up` is the stopband width, as a fraction of input sampling rate + * `n is the order of half-band filter to generate + returns: + * `a` is the full set of FIR coefficients, `4*n-1` long. + implement wisely. + """ + + npt = n*40 + wmax = 2*np.pi*up + wfit = (1 - np.linspace(0, 1, npt)[:, None]**2)*wmax + + target = .5*np.ones_like(wfit) + basis = np.cos(wfit*np.arange(1, 2*n, 2)) + l = np.linalg.pinv(basis)@target + + weight = np.ones_like(wfit) + for i in range(40): + err = np.fabs(basis@l - .5) + weight[err > .99*np.max(err)] *= 1 + 1.5/(i + 11) + l = np.linalg.pinv(basis*weight)@(target*weight) + a = np.c_[l, np.zeros_like(l)].ravel()[:-1] + a = np.r_[a[::-1], 1, a]/2 + return a + + +class FIR(Module): + """Full-rate finite impulse response filter. + + :param coefficients: integer taps. + :param width: bit width of input and output. + :param shift: scale factor (as power of two). + """ + def __init__(self, coefficients, width=16, shift=None): + self.width = width + self.i = Signal((width, True)) + self.o = Signal((width, True)) + self.latency = (len(coefficients) + 1)//2 + 1 + + ### + + n = len(coefficients) + x = [Signal((width, True)) for _ in range(n)] + self.comb += x[0].eq(self.i) + self.sync += [x[i + 1].eq(x[i]) for i in range(n - 1)] + + o = [] + for i, c in enumerate(coefficients): + # simplify for halfband and symmetric filters + if c == 0 or c in coefficients[:i]: + continue + o.append(c*reduce(add, [ + xj for xj, cj in zip(x, coefficients) if cj == c + ])) + + if shift is None: + shift = width - 1 + self.sync += self.o.eq(reduce(add, o) >> shift) diff --git a/artiq/test/gateware/test_fir.py b/artiq/test/gateware/test_fir.py new file mode 100644 index 000000000..55181f7e9 --- /dev/null +++ b/artiq/test/gateware/test_fir.py @@ -0,0 +1,78 @@ +import numpy as np +import matplotlib.pyplot as plt + +from migen import * +from migen.fhdl import verilog +from artiq.gateware.dsp import fir + + +class Transfer(Module): + def __init__(self, dut): + self.submodules.dut = dut + + def drive(self, x): + for xi in x: + yield self.dut.i.eq(int(xi)) + yield + + def record(self, y): + for i in range(self.dut.latency): + yield + for i in range(len(y)): + y[i] = (yield self.dut.o) + yield + + def run(self, samples, amplitude=1.): + w = 2**(self.dut.width - 1) - 1 + x = np.round(np.random.uniform( + -amplitude*w, amplitude*w, samples)) + y = np.empty_like(x) + run_simulation(self, [self.drive(x), self.record(y)], + vcd_name="fir.vcd") + x /= w + y /= w + return x, y + + def analyze(self, x, y): + fig, ax = plt.subplots(3) + ax[0].plot(x, "c-.", label="input") + ax[0].plot(y, "r-", label="output") + ax[0].legend(loc="right") + ax[0].set_xlabel("time (1/fs)") + ax[0].set_ylabel("signal") + n = len(x) + w = np.hanning(n) + x = (x.reshape(-1, n)*w).sum(0) + y = (y.reshape(-1, n)*w).sum(0) + t = (np.fft.rfft(y)/np.fft.rfft(x)) + f = np.fft.rfftfreq(n)*2 + fmin = f[1] + ax[1].plot(f, 20*np.log10(np.abs(t)), "r-") + ax[1].set_ylim(-70, 3) + ax[1].set_xlim(fmin, 1.) + # ax[1].set_xscale("log") + ax[1].set_xlabel("frequency (fs/2)") + ax[1].set_ylabel("magnitude (dB)") + ax[1].grid(True) + ax[2].plot(f, np.rad2deg(np.angle(t)), "r-") + ax[2].set_xlim(fmin, 1.) + # ax[2].set_xscale("log") + ax[2].set_xlabel("frequency (fs/2)") + ax[2].set_ylabel("phase (deg)") + ax[2].grid(True) + return fig + + +def _main(): + coeff = fir.halfgen4(.4/2, 8) + coeff_int = [int(round(c * (1 << 16 - 1))) for c in coeff] + dut = fir.FIR(coeff_int, width=16) + # print(verilog.convert(dut, ios={dut.i, dut.o})) + tb = Transfer(dut) + x, y = tb.run(samples=1 << 10, amplitude=.8) + tb.analyze(x, y) + plt.show() + + +if __name__ == "__main__": + _main()