From b226dbd2573b658a15067c36b7c5f4ac9c3c7015 Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Mon, 21 Nov 2016 12:35:57 +0100 Subject: [PATCH] sawg: unittest data format --- artiq/coredevice/sawg.py | 3 +- artiq/test/gateware/test_sawg_fe.py | 141 ++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+), 2 deletions(-) create mode 100644 artiq/test/gateware/test_sawg_fe.py diff --git a/artiq/coredevice/sawg.py b/artiq/coredevice/sawg.py index 3858527d6..49d018843 100644 --- a/artiq/coredevice/sawg.py +++ b/artiq/coredevice/sawg.py @@ -26,7 +26,7 @@ class Spline: @portable(flags=["fast-math"]) def to_mu64(self, value: TFloat) -> TList(TInt32): v = int(round(value*self.scale), width=64) - return [int(v >> 32, width=32), int(v, width=32)] + return [int(v, width=32), int(v >> 32, width=32)] @kernel def set_mu(self, value: TInt32): @@ -73,7 +73,6 @@ class Spline: wi = (vi >> k) & 0xffff v[j//2] += wi << (16 * ((j + 1)//2 - j//2)) j += 1 - v.append(vi) return v @kernel diff --git a/artiq/test/gateware/test_sawg_fe.py b/artiq/test/gateware/test_sawg_fe.py new file mode 100644 index 000000000..325709340 --- /dev/null +++ b/artiq/test/gateware/test_sawg_fe.py @@ -0,0 +1,141 @@ +import unittest +import numpy as np + +import migen as mg + +from artiq.coredevice import sawg +from artiq.language import delay_mu, core as core_language +from artiq.gateware.rtio.phy.sawg import Channel +from artiq.sim import devices as sim_devices, time as sim_time + + +class RTIOManager: + def __init__(self): + self.outputs = [] + + def rtio_output(self, now, channel, addr, data): + self.outputs.append((now, channel, addr, data)) + + def rtio_output_list(self, *args, **kwargs): + self.rtio_output(*args, **kwargs) + + def int(self, value, width=32): + if width == 32: + return np.int32(value) + elif width == 64: + return np.int64(value) + else: + raise ValueError(width) + + def patch(self, mod): + assert not getattr(mod, "_saved", None) + mod._saved = {} + for name in "rtio_output rtio_output_list int".split(): + mod._saved[name] = getattr(mod, name, None) + setattr(mod, name, getattr(self, name)) + + def unpatch(self, mod): + mod.__dict__.update(mod._saved) + del mod._saved + + +class SAWGTest(unittest.TestCase): + def setUp(self): + core_language.set_time_manager(sim_time.Manager()) + self.rtio_manager = RTIOManager() + self.rtio_manager.patch(sawg) + self.core = sim_devices.Core({}) + self.core.coarse_ref_period = 8 + self.channel = mg.ClockDomainsRenamer({"rio_phy": "sys"})( + Channel(width=16, parallelism=4)) + self.driver = sawg.SAWG({"core": self.core}, channel_base=0, + parallelism=self.channel.parallelism) + + def tearDown(self): + self.rtio_manager.unpatch(sawg) + + def test_instantiate(self): + pass + + def test_make_events(self): + d = self.driver + d.offset.set(.9) + delay_mu(2*8) + d.frequency0.set64(.1) + delay_mu(2*8) + d.offset.set(0) + self.assertEqual( + self.rtio_manager.outputs, [ + (0, 1, 0, int(round( + (1 << self.driver.offset.width - 1)*.9))), + (2*8, 8, 0, [0, int(round( + (1 << self.driver.frequency0.width - 1) * + self.channel.parallelism*.1))]), + (4*8, 1, 0, 0), + ]) + + def run_channel(self, events): + def gen(dut, events): + c = 0 + for time, channel, address, data in events: + assert c <= time + while c < time: + yield + c += 1 + for phy in dut.phys: + yield phy.rtlink.o.stb.eq(0) + rt = dut.phys[channel].rtlink.o + if isinstance(data, list): + data = sum(d << i*32 for i, d in enumerate(data)) + yield rt.data.eq(int(data)) + yield rt.stb.eq(1) + assert not (yield rt.busy) + + def log(dut, data, n): + for i in range(dut.latency): + yield + for i in range(n): + yield + data.append((yield from [(yield _) for _ in dut.o])) + + data = [] + mg.run_simulation(self.channel, [ + gen(self.channel, events), + log(self.channel, data, int(events[-1][0]//8) + 1)], + vcd_name="dds.vcd") + return sum(data, []) + + def test_channel(self): + self.test_make_events() + out = self.run_channel(self.rtio_manager.outputs) + print(out) + + def test_coeff(self): + import struct + for v in [-.1], [.1, -.01], [.1, .01, -.00001], \ + [.1, .01, .00001, -.000000001]: + ch = self.driver.offset + p = ch.coeff_to_mu(v) + t = ch.time_width + w = ch.width + p0 = [struct.pack("<" + "_hiqq"[(w + i*t)//16], + int(round(vi*ch.scale*ch.time_scale**i)) + )[:(w + i*t)//8] + for i, vi in enumerate(v)] + p0 = b"".join(p0) + if len(p0) % 4: + p0 += b"\x00"*(4 - len(p0) % 4) + p0 = list(struct.unpack("<" + "I"*((len(p0) + 3)//4), p0)) + with self.subTest(v): + self.assertEqual(p, p0) + + def test_linear(self): + d = self.driver + d.offset.set_list_mu([100, 10]) + delay_mu(10*8) + d.offset.set_list([0]) + delay_mu(1*8) + out = self.run_channel(self.rtio_manager.outputs) + self.assertEqual( + out, sum(([100 + i*10]*self.channel.parallelism + for i in range(11)), []))