From 22173b8c709f899b092342a4ccacb61bb6834de3 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Wed, 26 Oct 2016 00:35:22 +0800 Subject: [PATCH] drtio: full stack unittest --- artiq/test/gateware/drtio/test_full_stack.py | 96 +++++++++++++++----- 1 file changed, 73 insertions(+), 23 deletions(-) diff --git a/artiq/test/gateware/drtio/test_full_stack.py b/artiq/test/gateware/drtio/test_full_stack.py index 76cd42619..ab6fd68e4 100644 --- a/artiq/test/gateware/drtio/test_full_stack.py +++ b/artiq/test/gateware/drtio/test_full_stack.py @@ -6,6 +6,7 @@ from migen import * from artiq.gateware.drtio import * from artiq.gateware import rtio from artiq.gateware.rtio.phy import ttl_simple +from artiq.coredevice.exceptions import * class DummyTransceiverPair: @@ -36,52 +37,101 @@ class DummyRXSynchronizer: class DUT(Module): def __init__(self, nwords): - self.ttl = Signal() + self.ttl0 = Signal() + self.ttl1 = Signal() self.transceivers = DummyTransceiverPair(nwords) self.submodules.master = DRTIOMaster(self.transceivers.alice) rx_synchronizer = DummyRXSynchronizer() - self.submodules.phy = ttl_simple.Output(self.ttl) + self.submodules.phy0 = ttl_simple.Output(self.ttl0) + self.submodules.phy1 = ttl_simple.Output(self.ttl1) + rtio_channels = [ + rtio.Channel.from_phy(self.phy0), + rtio.Channel.from_phy(self.phy1) + ] self.submodules.satellite = DRTIOSatellite( - self.transceivers.bob, rx_synchronizer, [rtio.Channel.from_phy(self.phy)]) + self.transceivers.bob, rx_synchronizer, rtio_channels) class TestFullStack(unittest.TestCase): def test_full_stack(self): dut = DUT(2) - kcsrs = dut.master.rt_controller.kcsrs + kcsrs = dut.master.rt_controller.kcsrs - def get_fifo_space(): + now = 0 + def delay(dt): + nonlocal now + now += dt + + def get_fifo_space(channel): + yield from kcsrs.chan_sel.write(channel) yield from kcsrs.o_get_fifo_space.write(1) yield while (yield from kcsrs.o_status.read()) & 1: yield return (yield from kcsrs.o_dbg_fifo_space.read()) - def test(): - print((yield from get_fifo_space())) - yield from kcsrs.o_timestamp.write(550) - yield from kcsrs.o_data.write(1) + def write(channel, data): + yield from kcsrs.chan_sel.write(channel) + yield from kcsrs.o_timestamp.write(now) + yield from kcsrs.o_data.write(data) yield from kcsrs.o_we.write(1) yield status = 1 while status: status = yield from kcsrs.o_status.read() - print("status after write:", status) + if status & 2: + yield from kcsrs.o_underflow_reset.write(1) + raise RTIOUnderflow + if status & 4: + yield from kcsrs.o_sequence_error_reset.write(1) + raise RTIOSequenceError yield - yield from kcsrs.o_timestamp.write(600) - yield from kcsrs.o_data.write(0) - yield from kcsrs.o_we.write(1) - yield - status = 1 - while status: - status = yield from kcsrs.o_status.read() - print("status after write:", status) - yield - for i in range(40): - yield - #print((yield from get_fifo_space())) - run_simulation(dut, test(), + def test(): + yield from get_fifo_space(0) + yield from get_fifo_space(1) + + with self.assertRaises(RTIOUnderflow): + yield from write(0, 0) + + delay(200*8) + yield from write(0, 1) + delay(5*8) + yield from write(0, 0) + yield from write(1, 1) + delay(6*8) + yield from write(1, 0) + + delay(-200*8) + with self.assertRaises(RTIOSequenceError): + yield from write(0, 1) + delay(200*8) + + for _ in range(50): + yield + + ttl_changes = [] + @passive + def check_ttls(): + cycle = 0 + old_ttls = [0, 0] + while True: + ttls = [(yield dut.ttl0), (yield dut.ttl1)] + for n, (old_ttl, ttl) in enumerate(zip(old_ttls, ttls)): + if ttl != old_ttl: + ttl_changes.append((cycle, n)) + old_ttls = ttls + yield + cycle += 1 + + run_simulation(dut, + {"sys": test(), "rtio": check_ttls()}, {"sys": 8, "rtio": 5, "rtio_rx": 5, "rio": 5, "rio_phy": 5}, vcd_name="foo.vcd") + self.assertEqual(ttl_changes, [ + (203, 0), + (208, 0), + (208, 1), + (214, 1) + ])