From a2b78941345ed8993ec5891b7a962ad2a1c75915 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Mon, 11 Sep 2017 23:05:10 +0800 Subject: [PATCH] rtio/sed: add output driver simulation unittest --- .../test/rtio/test_sed_output_driver.py | 125 ++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 artiq/gateware/test/rtio/test_sed_output_driver.py diff --git a/artiq/gateware/test/rtio/test_sed_output_driver.py b/artiq/gateware/test/rtio/test_sed_output_driver.py new file mode 100644 index 000000000..ef61b98ea --- /dev/null +++ b/artiq/gateware/test/rtio/test_sed_output_driver.py @@ -0,0 +1,125 @@ +import unittest + +from migen import * + +from artiq.gateware import rtio +from artiq.gateware.rtio.sed import output_network, output_driver +from artiq.gateware.rtio.phy import ttl_simple +from artiq.gateware.rtio import rtlink + + +LANE_COUNT = 8 + + +class BusyPHY(Module): + def __init__(self): + self.rtlink = rtlink.Interface(rtlink.OInterface(1)) + self.comb += self.rtlink.o.busy.eq(1) + + +class DUT(Module): + def __init__(self): + self.ttl0 = Signal() + self.ttl1 = Signal() + self.ttl2 = Signal() + + self.submodules.phy0 = ttl_simple.Output(self.ttl0) + self.submodules.phy1 = ttl_simple.Output(self.ttl1) + self.submodules.phy2 = ttl_simple.Output(self.ttl2) + self.phy2.rtlink.o.enable_replace = False + self.submodules.phy3 = BusyPHY() + + rtio_channels = [ + rtio.Channel.from_phy(self.phy0, ofifo_depth=4), + rtio.Channel.from_phy(self.phy1, ofifo_depth=4), + rtio.Channel.from_phy(self.phy2, ofifo_depth=4), + rtio.Channel.from_phy(self.phy3, ofifo_depth=4), + ] + + self.submodules.output_driver = output_driver.OutputDriver( + rtio_channels, LANE_COUNT, 4*LANE_COUNT) + + +def simulate(input_events): + dut = DUT() + + def gen(): + for n, input_event in enumerate(input_events): + yield dut.output_driver.input[n].valid.eq(1) + yield dut.output_driver.input[n].seqn.eq(n) + for k, v in input_event.items(): + yield getattr(dut.output_driver.input[n].payload, k).eq(v) + yield + for n in range(len(input_events)): + yield dut.output_driver.input[n].valid.eq(0) + for i in range(output_network.latency(LANE_COUNT) + 2): + yield + for i in range(3): + yield + + output = "" + + @passive + def monitor(): + nonlocal output + + ttls = [dut.ttl0, dut.ttl1, dut.ttl2] + prev_ttl_values = [0, 0, 0] + while True: + ttl_values = [] + for ttl in ttls: + ttl_values.append((yield ttl)) + for n, (old, new) in enumerate(zip(prev_ttl_values, ttl_values)): + if old != new: + output += "TTL{} {}->{}\n".format(n, old, new) + prev_ttl_values = ttl_values + + if (yield dut.output_driver.collision): + output += "collision ch{}\n".format((yield dut.output_driver.collision_channel)) + if (yield dut.output_driver.busy): + output += "busy ch{}\n".format((yield dut.output_driver.busy_channel)) + + yield + + run_simulation(dut, {"sys": [gen(), monitor()]}, + {"sys": 5, "rio": 5, "rio_phy": 5}, vcd_name="foo.vcd") + return output + + +class TestOutputNetwork(unittest.TestCase): + def test_one_ttl(self): + self.assertEqual( + simulate([{"data": 1}]), + "TTL0 0->1\n") + + def test_simultaneous_ttl(self): + self.assertEqual( + simulate([{"channel": 0, "data": 1}, + {"channel": 1, "data": 1}, + {"channel": 2, "data": 1}]), + "TTL0 0->1\n" + "TTL1 0->1\n" + "TTL2 0->1\n") + + def test_replace(self): + self.assertEqual( + simulate([{"data": 0}, + {"data": 1}, + {"data": 0}]), + "") + self.assertEqual( + simulate([{"data": 1}, + {"data": 0}, + {"data": 1}]), + "TTL0 0->1\n") + + def test_collision(self): + self.assertEqual( + simulate([{"channel": 2}, + {"channel": 2}]), + "collision ch2\n") + + def test_busy(self): + self.assertEqual( + simulate([{"channel": 3}]), + "busy ch3\n")