diff --git a/artiq/gateware/test/drtio/test_switching.py b/artiq/gateware/test/drtio/test_switching.py new file mode 100644 index 000000000..66a9c9e69 --- /dev/null +++ b/artiq/gateware/test/drtio/test_switching.py @@ -0,0 +1,165 @@ +import unittest +from types import SimpleNamespace +import random + +from migen import * + +from artiq.gateware.drtio import * +from artiq.gateware.drtio import rt_serializer, rt_packet_repeater +from artiq.gateware import rtio +from artiq.gateware.rtio import cri +from artiq.coredevice.exceptions import * +from artiq.gateware.test.drtio.packet_interface import PacketInterface + + +class DummyTransceiverPair: + def __init__(self, nwords): + a2b_k = [Signal() for _ in range(nwords)] + a2b_d = [Signal(8) for _ in range(nwords)] + b2a_k = [Signal() for _ in range(nwords)] + b2a_d = [Signal(8) for _ in range(nwords)] + + self.alice = SimpleNamespace( + encoder=SimpleNamespace(k=a2b_k, d=a2b_d), + decoders=[SimpleNamespace(k=k, d=d) for k, d in zip(b2a_k, b2a_d)], + rx_ready=1 + ) + self.bob = SimpleNamespace( + encoder=SimpleNamespace(k=b2a_k, d=b2a_d), + decoders=[SimpleNamespace(k=k, d=d) for k, d in zip(a2b_k, a2b_d)], + rx_ready=1 + ) + + +class DummyRXSynchronizer: + def resync(self, signal): + return signal + + +class DUT(Module): + def __init__(self, nwords): + self.transceivers = DummyTransceiverPair(nwords) + + self.submodules.tsc_master = rtio.TSC("async") + self.submodules.master = DRTIOMaster(self.tsc_master, + self.transceivers.alice) + self.master.rt_controller.csrs.link_up.storage.reset = 1 + + rx_synchronizer = DummyRXSynchronizer() + self.submodules.tsc_satellite = rtio.TSC("sync") + self.submodules.satellite = DRTIOSatellite( + self.tsc_satellite, self.transceivers.bob, rx_synchronizer) + self.satellite.reset.storage.reset = 0 + self.satellite.reset.storage_full.reset = 0 + self.satellite.reset_phy.storage.reset = 0 + self.satellite.reset_phy.storage_full.reset = 0 + + self.pt = PacketInterface("s2m", nwords*8) + self.pr = PacketInterface("m2s", nwords*8) + rep_if = SimpleNamespace( + rx_rt_frame=self.pt.frame, rx_rt_data=self.pt.data, + tx_rt_frame=self.pr.frame, tx_rt_data=self.pr.data) + self.submodules.repeater = rt_packet_repeater.RTPacketRepeater( + self.tsc_satellite, rep_if) + self.comb += self.satellite.cri.connect(self.repeater.cri) + + +class Testbench: + def __init__(self): + self.dut = DUT(2) + self.now = 0 + + def init(self): + yield from self.dut.master.rt_controller.csrs.underflow_margin.write(100) + while not (yield from self.dut.master.link_layer.rx_up.read()): + yield + yield from self.get_buffer_space() + + def get_buffer_space(self): + csrs = self.dut.master.rt_controller.csrs + yield from csrs.o_get_buffer_space.write(1) + yield + while (yield from csrs.o_wait.read()): + yield + r = (yield from csrs.o_dbg_buffer_space.read()) + return r + + def delay(self, dt): + self.now += dt + + def write(self, channel, data): + mcri = self.dut.master.cri + yield mcri.chan_sel.eq(channel) + yield mcri.timestamp.eq(self.now) + yield mcri.o_data.eq(data) + yield + yield mcri.cmd.eq(cri.commands["write"]) + yield + yield mcri.cmd.eq(cri.commands["nop"]) + yield + status = yield mcri.o_status + while status & 0x1: + yield + status = yield mcri.o_status + if status & 0x2: + return "underflow" + if status & 0x4: + return "destination unreachable" + + +class TestSwitching(unittest.TestCase): + clocks = {"sys": 8, "rtio": 5, "rtio_rx": 5, + "rio": 5, "rio_phy": 5, + "sys_with_rst": 8, "rtio_with_rst": 5} + + def test_switching(self): + tb = Testbench() + + def test(): + yield from tb.init() + tb.delay(200) + yield from tb.write(1, 20) + for _ in range(40): + yield + + current_request = None + + def get_request(): + nonlocal current_request + while current_request is None: + yield + r = current_request + current_request = None + return r + + def expect_buffer_space_request(destination, space): + packet_type, field_dict, trailer = yield from get_request() + self.assertEqual(packet_type, "buffer_space_request") + self.assertEqual(trailer, []) + self.assertEqual(field_dict["destination"], destination) + yield from tb.dut.pt.send("buffer_space_reply", space=space) + + def expect_write(timestamp, channel, data): + packet_type, field_dict, trailer = yield from get_request() + self.assertEqual(packet_type, "write") + self.assertEqual(trailer, []) + self.assertEqual(field_dict["timestamp"], timestamp) + self.assertEqual(field_dict["chan_sel"], channel) + self.assertEqual(field_dict["short_data"], data) + + @passive + def send_replies(): + yield from expect_buffer_space_request(0, 1) + yield from expect_write(200, 1, 20) + yield from expect_buffer_space_request(0, 1) + + unexpected = yield from get_request() + self.fail("unexpected packet: {}".format(unexpected)) + + def receive(packet_type, field_dict, trailer): + nonlocal current_request + self.assertEqual(current_request, None) + current_request = (packet_type, field_dict, trailer) + + run_simulation(tb.dut, + {"sys": test(), "rtio": tb.dut.pr.receive(receive), "rtio_rx": send_replies()}, self.clocks)