From a9c9d5779d08e15a8bc3b47f59d53c3ffb5dc697 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 8 Oct 2017 22:38:02 +0800 Subject: [PATCH] rtio/dma: add full-stack test with connection to RTIO core --- artiq/gateware/test/rtio/test_dma.py | 83 ++++++++++++++++++++++++---- 1 file changed, 72 insertions(+), 11 deletions(-) diff --git a/artiq/gateware/test/rtio/test_dma.py b/artiq/gateware/test/rtio/test_dma.py index 759fe60b0..8f84b37d9 100644 --- a/artiq/gateware/test/rtio/test_dma.py +++ b/artiq/gateware/test/rtio/test_dma.py @@ -1,10 +1,13 @@ import unittest import random +import itertools from migen import * from misoc.interconnect import wishbone +from artiq.gateware import rtio from artiq.gateware.rtio import dma, cri +from artiq.gateware.rtio.phy import ttl_simple def encode_n(n, min_length, max_length): @@ -47,6 +50,14 @@ def encode_sequence(writes, ws): return pack(sequence, ws) +def do_dma(dut, address): + yield from dut.dma.base_address.write(address) + yield from dut.enable.write(1) + yield + while ((yield from dut.enable.read())): + yield + + test_writes1 = [ (0x01, 0x23, 0x12, 0x33), (0x901, 0x902, 0x911, 0xeeeeeeeeeeeeeefffffffffffffffffffffffffffffff28888177772736646717738388488), @@ -83,21 +94,44 @@ class TB(Module): self.submodules.dut = dma.DMA(bus) +test_writes_full_stack = [ + (0, 32, 0, 1), + (1, 40, 0, 1), + (0, 48, 0, 0), + (1, 50, 0, 0), +] + + +class FullStackTB(Module): + def __init__(self, ws): + self.ttl0 = Signal() + self.ttl1 = Signal() + + self.submodules.phy0 = ttl_simple.Output(self.ttl0) + self.submodules.phy1 = ttl_simple.Output(self.ttl1) + + rtio_channels = [ + rtio.Channel.from_phy(self.phy0), + rtio.Channel.from_phy(self.phy1) + ] + + sequence = encode_sequence(test_writes_full_stack, ws) + + bus = wishbone.Interface(ws*8) + self.submodules.memory = wishbone.SRAM( + 256, init=sequence, bus=bus) + self.submodules.dut = dma.DMA(bus) + self.submodules.rtio = rtio.Core(rtio_channels) + self.comb += self.dut.cri.connect(self.rtio.cri) + + class TestDMA(unittest.TestCase): def test_dma_noerror(self): - ws = 64 - tb = TB(ws) - - def do_dma(address): - yield from tb.dut.dma.base_address.write(address) - yield from tb.dut.enable.write(1) - yield - while ((yield from tb.dut.enable.read())): - yield + tb = TB(64) def do_writes(): - yield from do_dma(0) - yield from do_dma(512) + yield from do_dma(tb.dut, 0) + yield from do_dma(tb.dut, 512) received = [] @passive @@ -124,3 +158,30 @@ class TestDMA(unittest.TestCase): run_simulation(tb, [do_writes(), rtio_sim()]) self.assertEqual(received, test_writes1 + test_writes2) + + def test_full_stack(self): + tb = FullStackTB(64) + + ttl_changes = [] + @passive + def monitor(): + old_ttl_states = [0, 0] + for time in itertools.count(): + ttl_states = [ + (yield tb.ttl0), + (yield tb.ttl1) + ] + for i, (old, new) in enumerate(zip(old_ttl_states, ttl_states)): + if new != old: + ttl_changes.append((time, i)) + old_ttl_states = ttl_states + yield + + run_simulation(tb, {"sys": [ + do_dma(tb.dut, 0), monitor(), + (None for _ in range(70)), + ]}, {"sys": 8, "rsys": 8, "rtio": 8, "rio": 8, "rio_phy": 8}) + + correct_changes = [(timestamp + 11, channel) + for channel, timestamp, _, _ in test_writes_full_stack] + self.assertEqual(ttl_changes, correct_changes)