diff --git a/artiq/gateware/test/drtio/test_full_stack.py b/artiq/gateware/test/drtio/test_full_stack.py index f952337c7..c8ef2be12 100644 --- a/artiq/gateware/test/drtio/test_full_stack.py +++ b/artiq/gateware/test/drtio/test_full_stack.py @@ -36,10 +36,12 @@ class DummyRXSynchronizer: return signal -class LargeDataReceiver(Module): - def __init__(self, width): - self.rtlink = rtlink.Interface(rtlink.OInterface(width)) - self.received_data = Signal(width) +class SimpleIOPHY(Module): + def __init__(self, o_width, i_width): + self.rtlink = rtlink.Interface( + rtlink.OInterface(o_width), + rtlink.IInterface(i_width, timestamped=True)) + self.received_data = Signal(o_width) self.sync.rio_phy += If(self.rtlink.o.stb, self.received_data.eq(self.rtlink.o.data)) @@ -56,7 +58,7 @@ class DUT(Module): rx_synchronizer = DummyRXSynchronizer() self.submodules.phy0 = ttl_simple.Output(self.ttl0) self.submodules.phy1 = ttl_simple.Output(self.ttl1) - self.submodules.phy2 = LargeDataReceiver(512) + self.submodules.phy2 = SimpleIOPHY(512, 32) # test wide output data rtio_channels = [ rtio.Channel.from_phy(self.phy0, ofifo_depth=4), rtio.Channel.from_phy(self.phy1, ofifo_depth=4), @@ -71,7 +73,7 @@ class TestFullStack(unittest.TestCase): "rio": 5, "rio_phy": 5, "sys_with_rst": 8, "rtio_with_rst": 5} - def test_controller(self): + def test_outputs(self): dut = DUT(2) kcsrs = dut.master_ki csrs = dut.master.rt_controller.csrs @@ -229,6 +231,48 @@ class TestFullStack(unittest.TestCase): {"sys": test(), "rtio": check_ttls()}, self.clocks) self.assertEqual(ttl_changes, correct_ttl_changes) + def test_inputs(self): + dut = DUT(2) + kcsrs = dut.master_ki + + def get_input(timeout): + yield from kcsrs.chan_sel.write(2) + yield from kcsrs.timestamp.write(10) + yield from kcsrs.i_request.write(1) + yield + status = yield from kcsrs.i_status.read() + while status & 0x4: + yield + status = yield from kcsrs.i_status.read() + if status & 0x1: + return "timeout" + if status & 0x2: + return "overflow" + return ((yield from kcsrs.i_data.read()), + (yield from kcsrs.i_timestamp.read())) + + def test(): + # wait for link layer ready + for i in range(5): + yield + + i1 = yield from get_input(10) + i2 = yield from get_input(20) + self.assertEqual(i1, (0x600d1dea, 6)) + self.assertEqual(i2, "timeout") + + def generate_input(): + for i in range(5): + yield + yield dut.phy2.rtlink.i.data.eq(0x600d1dea) + yield dut.phy2.rtlink.i.stb.eq(1) + yield + yield dut.phy2.rtlink.i.data.eq(0) + yield dut.phy2.rtlink.i.stb.eq(0) + + run_simulation(dut, + {"sys": test(), "rtio": generate_input()}, self.clocks, vcd_name="foo.vcd") + def test_echo(self): dut = DUT(2) csrs = dut.master.rt_controller.csrs