From c8fd63754ac8a90cd1e09a2a46aceef6add0bc8e Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Mon, 23 Apr 2018 16:08:57 +0000 Subject: [PATCH] suservo: add unittests m-labs/artiq#788 --- artiq/gateware/test/suservo/__init__.py | 0 artiq/gateware/test/suservo/test_adc.py | 165 ++++++++++++++++++++++ artiq/gateware/test/suservo/test_dds.py | 93 ++++++++++++ artiq/gateware/test/suservo/test_iir.py | 56 ++++++++ artiq/gateware/test/suservo/test_servo.py | 109 ++++++++++++++ 5 files changed, 423 insertions(+) create mode 100644 artiq/gateware/test/suservo/__init__.py create mode 100644 artiq/gateware/test/suservo/test_adc.py create mode 100644 artiq/gateware/test/suservo/test_dds.py create mode 100644 artiq/gateware/test/suservo/test_iir.py create mode 100644 artiq/gateware/test/suservo/test_servo.py diff --git a/artiq/gateware/test/suservo/__init__.py b/artiq/gateware/test/suservo/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/artiq/gateware/test/suservo/test_adc.py b/artiq/gateware/test/suservo/test_adc.py new file mode 100644 index 000000000..2dbcd6342 --- /dev/null +++ b/artiq/gateware/test/suservo/test_adc.py @@ -0,0 +1,165 @@ +import logging +import string +import unittest + +from migen import * +from migen.genlib import io + +from artiq.gateware.suservo.adc_ser import ADC, ADCParams + + +class DDROutputImpl(Module): + def __init__(self, i1, i2, o, clk): + do_clk0 = Signal(reset_less=True) + do_j1 = Signal(reset_less=True) + do_j2 = Signal(reset_less=True) + do_j3 = Signal(reset_less=True) + self.sync.async += [ + do_clk0.eq(clk), + do_j1.eq(i1), + do_j2.eq(i2), + If(Cat(do_clk0, clk) == 0b10, + o.eq(do_j1), + do_j3.eq(do_j2), + ).Elif(Cat(do_clk0, clk) == 0b01, + o.eq(do_j3), + ) + ] + + +class DDROutput: + @staticmethod + def lower(dr): + return DDROutputImpl(dr.i1, dr.i2, dr.o, dr.clk) + + +class DDRInputImpl(Module): + def __init__(self, i, o1, o2, clk): + di_clk0 = Signal(reset_less=True) + # SAME_EDGE_PIPELINED is effectively one register for o1 + # (during rising clock) + di_j1 = Signal(reset_less=True) + di_j2 = Signal(reset_less=True) + di_j3 = Signal(reset_less=True) + self.sync.async += [ + di_clk0.eq(clk), + di_j1.eq(i), + If(Cat(di_clk0, clk) == 0b10, + di_j3.eq(di_j1), + o1.eq(di_j3), + o2.eq(di_j2) + ).Elif(Cat(di_clk0, clk) == 0b01, + di_j2.eq(di_j1) + ) + ] + + +class DDRInput: + @staticmethod + def lower(dr): + return DDRInputImpl(dr.i, dr.o1, dr.o2, dr.clk) + + +class TB(Module): + def __init__(self, params): + self.params = p = params + + self.sck = Signal() + self.clkout = Signal(reset_less=True) + self.cnv_b = Signal() + + self.sck_en = Signal() + self.sck_en_ret = Signal() + + adc_sck_en = Signal() + cd_adc = ClockDomain("adc", reset_less=True) + self.clock_domains += cd_adc + + self.sdo = [] + self.data = [Signal((p.width, True), reset_less=True) + for i in range(p.channels)] + + srs = [] + for i in range(p.lanes): + name = "sdo" + string.ascii_lowercase[i] + sdo = Signal(name=name, reset_less=True) + self.sdo.append(sdo) + setattr(self, name, sdo) + sr = Signal(p.width*p.channels//p.lanes, reset_less=True) + srs.append(sr) + self.specials += io.DDROutput( + # one for async + self._dly(sr[-1], -1), self._dly(sr[-2], -1), sdo) + self.sync.adc += [ + If(adc_sck_en, + sr[2:].eq(sr) + ) + ] + cnv_b_old = Signal(reset_less=True) + self.sync.async += [ + cnv_b_old.eq(self.cnv_b), + If(Cat(cnv_b_old, self.cnv_b) == 0b10, + sr.eq(Cat(reversed(self.data[2*i:2*i + 2]))), + ) + ] + + adc_clk_rec = Signal() + self.comb += [ + adc_sck_en.eq(self._dly(self.sck_en, 1)), + self.sck_en_ret.eq(self._dly(adc_sck_en)), + adc_clk_rec.eq(self._dly(self.sck, 1)), + self.clkout.eq(self._dly(adc_clk_rec)), + ] + + def _dly(self, sig, n=0): + n += self.params.t_rtt*4//2 # t_{sys,adc,ret}/t_async half rtt + dly = Signal(n, reset_less=True) + self.sync.async += dly.eq(Cat(sig, dly)) + return dly[-1] + + +def main(): + params = ADCParams(width=8, channels=4, lanes=2, + t_cnvh=3, t_conv=5, t_rtt=4) + tb = TB(params) + adc = ADC(tb, params) + tb.submodules += adc + + def run(tb): + dut = adc + for i, ch in enumerate(tb.data): + yield ch.eq(i) + assert (yield dut.done) + yield dut.start.eq(1) + yield + yield dut.start.eq(0) + yield + assert not (yield dut.done) + while not (yield dut.done): + yield + x = (yield from [(yield d) for d in dut.data]) + for i, ch in enumerate(x): + assert ch == i, (hex(ch), hex(i)) + + run_simulation(tb, [run(tb)], + vcd_name="adc.vcd", + clocks={ + "sys": (8, 0), + "adc": (8, 0), + "ret": (8, 0), + "async": (2, 0), + }, + special_overrides={ + io.DDROutput: DDROutput, + io.DDRInput: DDRInput + }) + + +class ADCTest(unittest.TestCase): + def test_run(self): + main() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() diff --git a/artiq/gateware/test/suservo/test_dds.py b/artiq/gateware/test/suservo/test_dds.py new file mode 100644 index 000000000..e8c221f51 --- /dev/null +++ b/artiq/gateware/test/suservo/test_dds.py @@ -0,0 +1,93 @@ +import logging +import unittest + +from migen import * + +from artiq.gateware.suservo.dds_ser import DDSParams, DDS + + +class TB(Module): + def __init__(self, p): + self.cs_n = Signal() + self.clk = Signal() + self.mosi = [Signal() for i in range(p.channels)] + for i, m in enumerate(self.mosi): + setattr(self, "mosi{}".format(i), m) + self.miso = Signal() + self.io_update = Signal() + + clk0 = Signal() + self.sync += clk0.eq(self.clk) + sample = Signal() + self.comb += sample.eq(Cat(self.clk, clk0) == 0b01) + + self.ddss = [] + for i in range(p.channels): + dds = Record([("ftw", 32), ("pow", 16), ("asf", 16), ("cmd", 8)]) + sr = Signal(len(dds)) + self.sync += [ + If(~self.cs_n & sample, + sr.eq(Cat(self.mosi[i], sr)) + ), + If(self.io_update, + dds.raw_bits().eq(sr) + ) + ] + self.ddss.append(dds) + + @passive + def log(self, data): + i = 0 + while True: + i += 1 + if (yield self.io_update): + yield + dat = [] + for dds in self.ddss: + v = yield from [(yield getattr(dds, k)) + for k in "cmd ftw pow asf".split()] + dat.append(v) + data.append((i, dat)) + else: + yield + + +def main(): + p = DDSParams(channels=4, width=8 + 32 + 16 + 16, clk=1) + tb = TB(p) + dds = DDS(tb, p) + tb.submodules += dds + + def run(tb): + dut = dds + for i, ch in enumerate(dut.profile): + yield ch.eq((((0 + << 16 | i | 0x20) + << 16 | i | 0x30) + << 32 | i | 0x40)) + # assert (yield dut.done) + yield dut.start.eq(1) + yield + yield dut.start.eq(0) + yield + yield + assert not (yield dut.done) + while not (yield dut.done): + yield + yield + + data = [] + run_simulation(tb, [tb.log(data), run(tb)], vcd_name="dds.vcd") + + assert data[-1][1] == [[0xe, 0x40 | i, 0x30 | i, 0x20 | i] for i in + range(4)] + + +class DDSTest(unittest.TestCase): + def test_run(self): + main() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() diff --git a/artiq/gateware/test/suservo/test_iir.py b/artiq/gateware/test/suservo/test_iir.py new file mode 100644 index 000000000..c3d66de52 --- /dev/null +++ b/artiq/gateware/test/suservo/test_iir.py @@ -0,0 +1,56 @@ +import logging +import unittest + +from migen import * +from artiq.gateware.suservo import iir + + +def main(): + w_kasli = iir.IIRWidths(state=25, coeff=18, adc=16, + asf=14, word=16, accu=48, shift=11, + channel=3, profile=5) + w = iir.IIRWidths(state=17, coeff=16, adc=16, + asf=14, word=16, accu=48, shift=11, + channel=2, profile=1) + + def run(dut): + for i, ch in enumerate(dut.adc): + yield ch.eq(i) + for i, ch in enumerate(dut.ctrl): + yield ch.en_iir.eq(1) + yield ch.en_out.eq(1) + yield ch.profile.eq(i) + for i in range(1 << w.channel): + yield from dut.set_state(i, i << 8, coeff="x1") + yield from dut.set_state(i, i << 8, coeff="x0") + for j in range(1 << w.profile): + yield from dut.set_state(i, + (j << 1) | (i << 8), profile=j, coeff="y1") + for k, l in enumerate("pow offset ftw0 ftw1".split()): + yield from dut.set_coeff(i, profile=j, coeff=l, + value=(i << 12) | (j << 8) | (k << 4)) + yield + for i in range(1 << w.channel): + for j in range(1 << w.profile): + for k, l in enumerate("cfg a1 b0 b1".split()): + yield from dut.set_coeff(i, profile=j, coeff=l, + value=(i << 12) | (j << 8) | (k << 4)) + yield from dut.set_coeff(i, profile=j, coeff="cfg", + value=(i << 0) | (j << 8)) # sel, dly + yield + for i in range(10): + yield from dut.check_iter() + yield + + dut = iir.IIR(w) + run_simulation(dut, [run(dut)], vcd_name="iir.vcd") + + +class IIRTest(unittest.TestCase): + def test_run(self): + main() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + main() diff --git a/artiq/gateware/test/suservo/test_servo.py b/artiq/gateware/test/suservo/test_servo.py new file mode 100644 index 000000000..c8b43349e --- /dev/null +++ b/artiq/gateware/test/suservo/test_servo.py @@ -0,0 +1,109 @@ +import logging +import unittest + +from migen import * +from migen.genlib import io + +from artiq.gateware.test.suservo import test_adc, test_dds +from artiq.gateware.suservo import servo + + +class ServoSim(servo.Servo): + def __init__(self): + adc_p = servo.ADCParams(width=16, channels=8, lanes=4, + t_cnvh=4, t_conv=57, t_rtt=4) + iir_p = servo.IIRWidths(state=25, coeff=18, adc=16, + asf=14, word=16, accu=48, shift=11, + channel=3, profile=5) + dds_p = servo.DDSParams(width=8 + 32 + 16 + 16, + channels=adc_p.channels, clk=1) + + self.submodules.adc_tb = test_adc.TB(adc_p) + self.submodules.dds_tb = test_dds.TB(dds_p) + + servo.Servo.__init__(self, self.adc_tb, self.dds_tb, + adc_p, iir_p, dds_p) + + def test(self): + assert (yield self.done) + + adc = 1 + x0 = 0x0141 + yield self.adc_tb.data[adc].eq(x0) + channel = 3 + yield self.iir.adc[channel].eq(adc) + yield self.iir.ctrl[channel].en_iir.eq(1) + yield self.iir.ctrl[channel].en_out.eq(1) + profile = 5 + yield self.iir.ctrl[channel].profile.eq(profile) + x1 = 0x0743 + yield from self.iir.set_state(adc, x1, coeff="x1") + y1 = 0x1145 + yield from self.iir.set_state(channel, y1, + profile=profile, coeff="y1") + coeff = dict(pow=0x1333, offset=0x1531, ftw0=0x1727, ftw1=0x1929, + a1=0x0135, b0=0x0337, b1=0x0539, cfg=adc | (0 << 3)) + for ks in "pow offset ftw0 ftw1", "a1 b0 b1 cfg": + for k in ks.split(): + yield from self.iir.set_coeff(channel, value=coeff[k], + profile=profile, coeff=k) + yield + + yield self.start.eq(1) + yield + yield self.start.eq(0) + while not (yield self.dds_tb.io_update): + yield + yield # io_update + + w = self.iir.widths + + x0 = x0 << (w.state - w.adc - 1) + _ = yield from self.iir.get_state(adc, coeff="x1") + assert _ == x0, (hex(_), hex(x0)) + + offset = coeff["offset"] << (w.state - w.coeff - 1) + a1, b0, b1 = coeff["a1"], coeff["b0"], coeff["b1"] + out = ( + 0*(1 << w.shift - 1) + # rounding + a1*(0 - y1) + b0*(offset - x0) + b1*(offset - x1) + ) >> w.shift + y1 = min(max(0, out), (1 << w.state - 1) - 1) + + _ = yield from self.iir.get_state(channel, profile, coeff="y1") + assert _ == y1, (hex(_), hex(y1)) + + _ = yield self.dds_tb.ddss[channel].ftw + ftw = (coeff["ftw1"] << 16) | coeff["ftw0"] + assert _ == ftw, (hex(_), hex(ftw)) + + _ = yield self.dds_tb.ddss[channel].pow + assert _ == coeff["pow"], (hex(_), hex(coeff["pow"])) + + _ = yield self.dds_tb.ddss[channel].asf + asf = y1 >> (w.state - w.asf - 1) + assert _ == asf, (hex(_), hex(asf)) + + +def main(): + servo = ServoSim() + run_simulation(servo, servo.test(), vcd_name="servo.vcd", + clocks={ + "sys": (8, 0), + "adc": (8, 0), + "ret": (8, 0), + "async": (2, 0), + }, + special_overrides={ + io.DDROutput: test_adc.DDROutput, + io.DDRInput: test_adc.DDRInput + }) + + +class ServoTest(unittest.TestCase): + def test_run(self): + main() + + +if __name__ == "__main__": + main()