From ea21f474a7fb3066626ea72a60cd93d3037c3376 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Mon, 9 Jan 2023 18:37:19 +0800 Subject: [PATCH] gateware: remove SAWG simulations --- artiq/gateware/test/dsp/__init__.py | 0 artiq/gateware/test/dsp/fir.py | 112 ---------- artiq/gateware/test/dsp/test_accu.py | 46 ---- artiq/gateware/test/dsp/test_satadd.py | 71 ------- artiq/gateware/test/dsp/test_sawg.py | 36 ---- artiq/gateware/test/dsp/test_sawg_fe.py | 255 ----------------------- artiq/gateware/test/dsp/test_sawg_phy.py | 70 ------- artiq/gateware/test/dsp/test_spline.py | 31 --- artiq/gateware/test/dsp/tools.py | 49 ----- 9 files changed, 670 deletions(-) delete mode 100644 artiq/gateware/test/dsp/__init__.py delete mode 100644 artiq/gateware/test/dsp/fir.py delete mode 100644 artiq/gateware/test/dsp/test_accu.py delete mode 100644 artiq/gateware/test/dsp/test_satadd.py delete mode 100644 artiq/gateware/test/dsp/test_sawg.py delete mode 100644 artiq/gateware/test/dsp/test_sawg_fe.py delete mode 100644 artiq/gateware/test/dsp/test_sawg_phy.py delete mode 100644 artiq/gateware/test/dsp/test_spline.py delete mode 100644 artiq/gateware/test/dsp/tools.py diff --git a/artiq/gateware/test/dsp/__init__.py b/artiq/gateware/test/dsp/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/artiq/gateware/test/dsp/fir.py b/artiq/gateware/test/dsp/fir.py deleted file mode 100644 index 31f5740b2..000000000 --- a/artiq/gateware/test/dsp/fir.py +++ /dev/null @@ -1,112 +0,0 @@ -import numpy as np -import matplotlib.pyplot as plt - -from migen import * -from migen.fhdl import verilog -from artiq.gateware.dsp import fir - - -class Transfer(Module): - def __init__(self, dut): - self.submodules.dut = dut - - def drive(self, x): - for xi in x.reshape(-1, self.dut.parallelism): - yield [ij.eq(int(xj)) for ij, xj in zip(self.dut.i, xi)] - yield - - def record(self, y): - for i in range(self.dut.latency): - yield - for yi in y.reshape(-1, self.dut.parallelism): - yield - yi[:] = (yield from [(yield o) for o in self.dut.o]) - - def run(self, samples, amplitude=1., seed=None): - if seed is not None: - np.random.seed(seed) - w = 2**(self.dut.width - 1) - 1 - x = np.round(np.random.uniform( - -amplitude*w, amplitude*w, samples)) - y = self.run_data(x) - x /= w - y /= w - return x, y - - def run_data(self, x): - y = np.empty_like(x) - run_simulation(self, [self.drive(x), self.record(y)], - vcd_name="fir.vcd") - return y - - def analyze(self, x, y): - fig, ax = plt.subplots(3) - ax[0].plot(x, "c-.", label="input") - ax[0].plot(y, "r-", label="output") - ax[0].legend(loc="right") - ax[0].set_xlabel("time (1/fs)") - ax[0].set_ylabel("signal") - n = len(x) - w = np.hanning(n) - x = (x.reshape(-1, n)*w).sum(0) - y = (y.reshape(-1, n)*w).sum(0) - t = (np.fft.rfft(y)/np.fft.rfft(x)) - f = np.fft.rfftfreq(n)*2 - fmin = f[1] - ax[1].plot(f, 20*np.log10(np.abs(t)), "r-") - ax[1].set_ylim(-70, 3) - ax[1].set_xlim(fmin, 1.) - # ax[1].set_xscale("log") - ax[1].set_xlabel("frequency (fs/2)") - ax[1].set_ylabel("magnitude (dB)") - ax[1].grid(True) - ax[2].plot(f, np.rad2deg(np.angle(t)), "r-") - ax[2].set_xlim(fmin, 1.) - # ax[2].set_xscale("log") - ax[2].set_xlabel("frequency (fs/2)") - ax[2].set_ylabel("phase (deg)") - ax[2].grid(True) - return fig - - -class UpTransfer(Transfer): - def drive(self, x): - x = x.reshape(-1, len(self.dut.o)) - x[:, 1:] = 0 - for xi in x: - yield self.dut.i.eq(int(xi[0])) - yield - - def record(self, y): - for i in range(self.dut.latency): - yield - for yi in y.reshape(-1, len(self.dut.o)): - yield - yi[:] = (yield from [(yield o) for o in self.dut.o]) - - -def _main(): - if True: - coeff = fir.halfgen4_cascade(2, width=.4, order=8) - dut = fir.ParallelHBFUpsampler(coeff, width=16) - # print(verilog.convert(dut, ios=set([dut.i] + dut.o))) - tb = UpTransfer(dut) - else: - coeff = fir.halfgen4(.4/2, 8) - dut = fir.ParallelFIR(coeff, parallelism=4, width=16) - # print(verilog.convert(dut, ios=set(dut.i + dut.o))) - tb = Transfer(dut) - - if True: - x, y = tb.run(samples=1 << 10, amplitude=.5, seed=0x1234567) - else: - x = np.zeros(100) - x[:50] = 1 << 8 - x[50:] = 1 << 13 - y = tb.run_data(x) - tb.analyze(x, y) - plt.show() - - -if __name__ == "__main__": - _main() diff --git a/artiq/gateware/test/dsp/test_accu.py b/artiq/gateware/test/dsp/test_accu.py deleted file mode 100644 index 384fba9cc..000000000 --- a/artiq/gateware/test/dsp/test_accu.py +++ /dev/null @@ -1,46 +0,0 @@ -import numpy as np - -from migen import * -from migen.fhdl.verilog import convert - -from artiq.gateware.dsp.accu import Accu, PhasedAccu -from .tools import xfer - - -def read(o, n): - p = [] - for i in range(n): - p.append((yield from [(yield pi) for pi in o.payload.flatten()])) - yield - return p - - -def _test_gen_accu(dut, o): - yield dut.o.ack.eq(1) - yield from xfer(dut, i=dict(p=0, f=1, clr=1)) - o.extend((yield from read(dut.o, 8))) - yield from xfer(dut, i=dict(p=0, f=2, clr=0)) - o.extend((yield from read(dut.o, 8))) - yield from xfer(dut, i=dict(p=0, f=2, clr=1)) - o.extend((yield from read(dut.o, 8))) - yield from xfer(dut, i=dict(p=8, f=-1, clr=1)) - o.extend((yield from read(dut.o, 8))) - yield from xfer(dut, i=dict(p=0, f=0, clr=1)) - yield from xfer(dut, i=dict(p=1, f=0, clr=0)) - o.extend((yield from read(dut.o, 8))) - - -def _test_accu(): - dut = PhasedAccu(8, parallelism=8) - - if False: - print(convert(dut)) - else: - o = [] - run_simulation(dut, _test_gen_accu(dut, o), vcd_name="accu.vcd") - o = np.array(o) - print(o) - - -if __name__ == "__main__": - _test_accu() diff --git a/artiq/gateware/test/dsp/test_satadd.py b/artiq/gateware/test/dsp/test_satadd.py deleted file mode 100644 index 2121d4287..000000000 --- a/artiq/gateware/test/dsp/test_satadd.py +++ /dev/null @@ -1,71 +0,0 @@ -import unittest -import migen as mg - -from artiq.gateware.dsp.tools import SatAddMixin - - -class DUT(mg.Module, SatAddMixin): - def __init__(self, width): - self.o = mg.Signal((width, True)) - self.i0 = mg.Signal.like(self.o) - self.i1 = mg.Signal.like(self.o) - self.l0 = mg.Signal.like(self.o) - self.l1 = mg.Signal.like(self.o) - self.c = mg.Signal(2) - self.comb += self.o.eq(self.sat_add((self.i0, self.i1), - width=4, limits=(self.l0, self.l1), clipped=self.c)) - - -class SatAddTest(unittest.TestCase): - def setUp(self): - self.dut = DUT(width=4) - # import migen.fhdl.verilog - # print(mg.fhdl.verilog.convert(self.dut)) - - def _sweep(self): - def gen(): - for i0 in range(-8, 8): - yield self.dut.i0.eq(i0) - for i1 in range(-8, 8): - yield self.dut.i1.eq(i1) - yield - - def rec(): - l0 = yield self.dut.l0 - l1 = yield self.dut.l1 - for i in range(1 << 8): - i0 = yield self.dut.i0 - i1 = yield self.dut.i1 - o = yield self.dut.o - c = yield self.dut.c - - full = i0 + i1 - lim = full - clip = 0 - if full < l0: - lim = l0 - clip = 1 - if full > l1: - lim = l1 - clip = 2 - with self.subTest(i0=i0, i1=i1): - self.assertEqual(lim, o) - self.assertEqual(clip, c) - yield - - mg.run_simulation(self.dut, (gen(), rec())) - - def test_inst(self): - pass - - def test_run(self): - self._sweep() - - def test_limits(self): - for l0 in -8, 0, 1, 7: - for l1 in -8, 0, 1, 7: - self.setUp() - self.dut.l0.reset = l0 - self.dut.l1.reset = l1 - with self.subTest(l0=l0, l1=l1): - self._sweep() diff --git a/artiq/gateware/test/dsp/test_sawg.py b/artiq/gateware/test/dsp/test_sawg.py deleted file mode 100644 index fd392607d..000000000 --- a/artiq/gateware/test/dsp/test_sawg.py +++ /dev/null @@ -1,36 +0,0 @@ -import numpy as np - -from migen import * -from migen.fhdl.verilog import convert - -from artiq.gateware.dsp import sawg -from artiq.gateware.test.dsp.tools import xfer - - -def _test_gen_dds(dut, o): - yield from xfer(dut, - a=dict(a0=10), - p=dict(a0=0), - f=dict(a0=1), - ) - for i in range(256//dut.parallelism): - yield - o.append((yield from [(yield _) for _ in dut.xo])) - - -def _test_channel(): - widths = sawg._Widths(t=8, a=4*8, p=8, f=16) - orders = sawg._Orders(a=4, p=1, f=2) - dut = sawg.SplineParallelDDS(widths, orders, parallelism=2) - - if False: - print(convert(dut)) - else: - o = [] - run_simulation(dut, _test_gen_dds(dut, o), vcd_name="dds.vcd") - o = np.array(o) - print(o[:, :]) - - -if __name__ == "__main__": - _test_channel() diff --git a/artiq/gateware/test/dsp/test_sawg_fe.py b/artiq/gateware/test/dsp/test_sawg_fe.py deleted file mode 100644 index 1f7cdc08c..000000000 --- a/artiq/gateware/test/dsp/test_sawg_fe.py +++ /dev/null @@ -1,255 +0,0 @@ -import unittest - -import migen as mg -from numpy import int32 - -from artiq.coredevice import sawg, spline -from artiq.language import (at_mu, now_mu, delay, - core as core_language) -from artiq.gateware.rtio.phy.sawg import Channel -from artiq.sim import devices as sim_devices, time as sim_time - - -class RTIOManager: - def __init__(self): - self.outputs = [] - - def rtio_output(self, target, data): - channel = target >> 8 - addr = target & 0xff - self.outputs.append((now_mu(), channel, addr, data)) - - def rtio_output_wide(self, *args, **kwargs): - self.rtio_output(*args, **kwargs) - - def delay_mu(self, t): - delay(t) - - def patch(self, mod): - assert not hasattr(mod, "_saved") - mod._saved = {} - for name in "rtio_output rtio_output_wide delay_mu".split(): - mod._saved[name] = getattr(mod, name, None) - setattr(mod, name, getattr(self, name)) - - def unpatch(self, mod): - mod.__dict__.update(mod._saved) - del mod._saved - - -class SAWGTest(unittest.TestCase): - def setUp(self): - core_language.set_time_manager(sim_time.Manager()) - self.rtio_manager = RTIOManager() - self.rtio_manager.patch(spline) - self.rtio_manager.patch(sawg) - self.core = sim_devices.Core({}) - self.core.coarse_ref_period = 20/3 - self.core.ref_multiplier = 1 - self.t = self.core.coarse_ref_period - self.channel = mg.ClockDomainsRenamer({"rio_phy": "sys"})( - Channel(width=16, parallelism=2)) - self.driver = sawg.SAWG({"core": self.core}, channel_base=0, - parallelism=self.channel.parallelism) - - def tearDown(self): - self.rtio_manager.unpatch(spline) - self.rtio_manager.unpatch(sawg) - - def test_instantiate(self): - pass - - def test_make_events(self): - d = self.driver - d.offset.set(.9) - delay(2*self.t) - d.frequency0.set(.1) - d.frequency1.set(.1) - delay(2*self.t) - d.offset.set(0) - v = int(round((1 << 48) * .1 * self.t)) - self.assertEqual( - self.rtio_manager.outputs, [ - (0., 1, 0, int(round(self.driver.offset.scale*.9))), - (2.*self.t, 8, 0, int(round( - (1 << self.driver.frequency0.width) * - self.t/self.channel.parallelism*.1))), - (2.*self.t, 3, 0, [int32(v), int32(v >> 32)]), - (4.*self.t, 1, 0, 0), - ]) - - def run_channel(self, events): - def gen(dut, events): - c = 0 - for time, channel, address, data in events: - time //= self.t - assert c <= time - while c < time: - yield - c += 1 - for phy in dut.phys: - yield phy.rtlink.o.stb.eq(0) - rt = dut.phys[channel].rtlink.o - if isinstance(data, list): - data = sum(int(d) << (i*32) for i, d in enumerate(data)) - yield rt.data.eq(int(data)) - if hasattr(rt, "address"): - yield rt.address.eq(address) - yield rt.stb.eq(1) - assert not (yield rt.busy) - # print("{}: set ch {} to {}".format(time, channel, hex(data))) - - def log(dut, data, n): - for i in range(n + dut.latency): - yield - data.append((yield from [(yield _) for _ in dut.o])) - - data = [] - # print(int(events[-1][0]) + 1) - mg.run_simulation(self.channel, [ - gen(self.channel, events), - log(self.channel, data, int(events[-1][0]//self.t) + 1)], - vcd_name="dds.vcd") - return data - - def test_run_channel(self): - self.test_make_events() - self.run_channel(self.rtio_manager.outputs) - - def test_coeff(self): - import struct - # these get discrete_compensate - # [.1, .01, -.00001], [.1, .01, .00001, -.000000001] - for v in [-.1], [.1, -.01]: - ch = self.driver.offset - p = ch.coeff_as_packed(v) - t = ch.time_width - w = ch.width - p = [_ & 0xffffffff for _ in p] - p0 = [int(round(vi*ch.scale*ch.time_scale**i)) - for i, vi in enumerate(v)] - p0 = [struct.pack("<" + "_bhiiqqqq"[(w + i*t)//8], vi - )[:(w + i*t)//8] - for i, vi in enumerate(p0)] - p0 = b"".join(p0) - if len(p0) % 4: - p0 += b"\x00"*(4 - len(p0) % 4) - p0 = list(struct.unpack("<" + "I"*((len(p0) + 3)//4), p0)) - with self.subTest(v): - self.assertEqual(p, p0) - - def test_linear(self): - d = self.driver - d.offset.set_coeff_mu([100, 10]) - delay(10*self.t) - d.offset.set_coeff([0]) - delay(1*self.t) - out = self.run_channel(self.rtio_manager.outputs) - out = out[self.channel.latency + self.channel.u.latency:][:11] - for i in range(len(out) - 1): - with self.subTest(i): - v = 100 + i*10 - self.assertEqual(out[i], [v, v]) - self.assertEqual(out[-1], [0, 0]) - - def test_pack(self): - ch = self.driver.offset - self.assertEqual(ch.coeff_as_packed_mu([1]), [1]) - self.assertEqual(ch.coeff_as_packed_mu([1, 1 << 16]), [1, 1]) - self.assertEqual(ch.coeff_as_packed_mu([1, 1 << 32]), [1, 0]) - self.assertEqual(ch.coeff_as_packed_mu([0x1234, 0xa5a5a5a5]), - [0xa5a51234, 0xa5a5]) - self.assertEqual(ch.coeff_as_packed_mu([1, 2, 3, 4]), - [0x20001, 0x30000, 0, 4, 0]) - self.assertEqual(ch.coeff_as_packed_mu([-1, -2, -3, -4]), - [0xfffeffff, 0xfffdffff, -1, -4, -1]) - self.assertEqual(ch.coeff_as_packed_mu([0, -1, 0, -1]), - [0xffff0000, 0x0000ffff, 0, -1, -1]) - - def test_smooth_linear(self): - ch = self.driver.offset - ch.smooth(.1, .2, 13*self.t, 1) - ch.set(.2) - delay(1*self.t) - out = self.run_channel(self.rtio_manager.outputs) - out = out[self.channel.latency + self.channel.u.latency:][:14] - a = int(round(.1*ch.scale)) - da = int(round(.1*ch.scale*(1 << ch.width)//13)) - for i in range(len(out) - 1): - with self.subTest(i): - v = a + (i*da >> ch.width) - self.assertEqual(out[i], [v, v]) - a = int(round(.2*ch.scale)) - self.assertEqual(out[-1], [a, a]) - - def test_smooth_cubic(self): - ch = self.driver.offset - ch.smooth(.1, .2, 13*self.t, 3) - ch.set(.2) - delay(1*self.t) - out = self.run_channel(self.rtio_manager.outputs) - out = out[self.channel.latency + self.channel.u.latency:][:14] - if False: - import matplotlib.pyplot as plt - plt.plot(sum(out, [])) - plt.show() - - @unittest.skip("needs artiq.sim.time.TimeManager tweak for " - "reverse timeline jumps") - def test_demo_2tone(self): - MHz = 1e-3 - ns = 1. - self.sawg0 = self.driver - - t_up = t_hold = t_down = 400*ns - a1 = .3 - a2 = .4 - order = 3 - - self.sawg0.frequency0.set(10*MHz) - self.sawg0.phase0.set(0.) - self.sawg0.frequency1.set(1*MHz) - self.sawg0.phase1.set(0.) - self.sawg0.frequency2.set(13*MHz) - self.sawg0.phase2.set(0.) - t = now_mu() - self.sawg0.amplitude1.smooth(.0, a1, t_up, order) - at_mu(t) - self.sawg0.amplitude2.smooth(.0, a2, t_up, order) - self.sawg0.amplitude1.set(a1) - self.sawg0.amplitude2.set(a2) - delay(t_hold) - t = now_mu() - self.sawg0.amplitude1.smooth(a1, .0, t_down, order) - at_mu(t) - self.sawg0.amplitude2.smooth(a2, .0, t_down, order) - self.sawg0.amplitude1.set(.0) - self.sawg0.amplitude2.set(.0) - - out = self.run_channel(self.rtio_manager.outputs) - out = sum(out, []) - if True: - import matplotlib.pyplot as plt - plt.plot(out) - plt.show() - - def test_fir_overflow(self): - MHz = 1e-3 - ns = 1. - f1 = self.driver.frequency1 - a1 = self.driver.amplitude1 - p1 = self.driver.phase1 - cfg = self.driver.config - f1.set(1*MHz) - a1.set(.99) - delay(100*ns) - p1.set(.5) - delay(100*ns) - a1.set(0) - - out = self.run_channel(self.rtio_manager.outputs) - out = sum(out, []) - if False: - import matplotlib.pyplot as plt - plt.plot(out) - plt.show() diff --git a/artiq/gateware/test/dsp/test_sawg_phy.py b/artiq/gateware/test/dsp/test_sawg_phy.py deleted file mode 100644 index 35666a5f6..000000000 --- a/artiq/gateware/test/dsp/test_sawg_phy.py +++ /dev/null @@ -1,70 +0,0 @@ -import numpy as np -from operator import or_ - -from migen import * -from migen.fhdl.verilog import convert - -from artiq.gateware.rtio.phy.sawg import Channel -from .tools import rtio_xfer - - -def pack_tri(port, *v): - r = 0 - w = 0 - for vi, p in zip(v, port.payload.flatten()): - w += len(p) - r |= int(vi*(1 << w)) - return r - - -def gen_rtio(dut): - yield - yield from rtio_xfer( - dut, - a1=pack_tri(dut.a1.a, .1), - f0=pack_tri(dut.b.f, .01234567), - f1=pack_tri(dut.a1.f, .01234567), - a2=pack_tri(dut.a1.a, .05), - f2=pack_tri(dut.a1.f, .00534567), - ) - - -def gen_log(dut, o, n): - for i in range(3 + dut.latency): - yield - for i in range(n): - yield - o.append((yield from [(yield _) for _ in dut.o])) - #o.append([(yield dut.a1.xo[0])]) - - -def _test_channel(): - width = 16 - - dut = ClockDomainsRenamer({"rio_phy": "sys"})( - Channel(width=width, parallelism=4) - ) - - if False: - print(convert(dut)) - return - - o = [] - run_simulation( - dut, - [gen_rtio(dut), gen_log(dut, o, 128)], - vcd_name="dds.vcd") - o = np.array(o)/(1 << (width - 1)) - o = o.ravel() - np.savez_compressed("dds.npz", o=o) - - import matplotlib.pyplot as plt - fig, ax = plt.subplots(2) - ax[0].step(np.arange(o.size), o) - ax[1].psd(o, 1 << 10, Fs=1, noverlap=1 << 9, scale_by_freq=False) - fig.savefig("dds.pdf") - plt.show() - - -if __name__ == "__main__": - _test_channel() diff --git a/artiq/gateware/test/dsp/test_spline.py b/artiq/gateware/test/dsp/test_spline.py deleted file mode 100644 index fe2d82c28..000000000 --- a/artiq/gateware/test/dsp/test_spline.py +++ /dev/null @@ -1,31 +0,0 @@ -import numpy as np - -from migen import * -from migen.fhdl.verilog import convert - -from artiq.gateware.dsp.spline import Spline -from .tools import xfer - - -def _test_gen_spline(dut, o): - yield dut.o.ack.eq(1) - yield from xfer(dut, i=dict(a0=0, a1=1, a2=2)) - for i in range(20): - yield - o.append((yield dut.o.a0)) - - -def _test_spline(): - dut = Spline(order=3, width=16, step=1) - - if False: - print(convert(dut)) - else: - o = [] - run_simulation(dut, _test_gen_spline(dut, o), vcd_name="spline.vcd") - o = np.array(o) - print(o) - - -if __name__ == "__main__": - _test_spline() diff --git a/artiq/gateware/test/dsp/tools.py b/artiq/gateware/test/dsp/tools.py deleted file mode 100644 index 8d22ad475..000000000 --- a/artiq/gateware/test/dsp/tools.py +++ /dev/null @@ -1,49 +0,0 @@ -def set_dict(e, **k): - for k, v in k.items(): - if isinstance(v, dict): - yield from set_dict(getattr(e, k), **v) - else: - yield getattr(e, k).eq(v) - - -def xfer(dut, **kw): - ep = [] - for e, v in kw.items(): - e = getattr(dut, e) - yield from set_dict(e, **v) - ep.append(e) - for e in ep: - yield e.stb.eq(1) - while ep: - yield - for e in ep[:]: - if hasattr(e, "busy") and (yield e.busy): - raise ValueError(e, "busy") - if not hasattr(e, "ack") or (yield e.ack): - yield e.stb.eq(0) - ep.remove(e) - - -def szip(*iters): - active = {it: None for it in iters} - while active: - for it in list(active): - while True: - try: - val = it.send(active[it]) - except StopIteration: - del active[it] - break - if val is None: - break - else: - active[it] = (yield val) - val = (yield None) - for it in active: - active[it] = val - - -def rtio_xfer(dut, **kwargs): - yield from szip(*( - xfer(dut.phys_named[k].rtlink, o={"data": v}) - for k, v in kwargs.items()))