forked from M-Labs/artiq
rtio/dma: add full-stack test with connection to RTIO core
This commit is contained in:
parent
5f083f21a4
commit
a9c9d5779d
|
@ -1,10 +1,13 @@
|
||||||
import unittest
|
import unittest
|
||||||
import random
|
import random
|
||||||
|
import itertools
|
||||||
|
|
||||||
from migen import *
|
from migen import *
|
||||||
from misoc.interconnect import wishbone
|
from misoc.interconnect import wishbone
|
||||||
|
|
||||||
|
from artiq.gateware import rtio
|
||||||
from artiq.gateware.rtio import dma, cri
|
from artiq.gateware.rtio import dma, cri
|
||||||
|
from artiq.gateware.rtio.phy import ttl_simple
|
||||||
|
|
||||||
|
|
||||||
def encode_n(n, min_length, max_length):
|
def encode_n(n, min_length, max_length):
|
||||||
|
@ -47,6 +50,14 @@ def encode_sequence(writes, ws):
|
||||||
return pack(sequence, ws)
|
return pack(sequence, ws)
|
||||||
|
|
||||||
|
|
||||||
|
def do_dma(dut, address):
|
||||||
|
yield from dut.dma.base_address.write(address)
|
||||||
|
yield from dut.enable.write(1)
|
||||||
|
yield
|
||||||
|
while ((yield from dut.enable.read())):
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
test_writes1 = [
|
test_writes1 = [
|
||||||
(0x01, 0x23, 0x12, 0x33),
|
(0x01, 0x23, 0x12, 0x33),
|
||||||
(0x901, 0x902, 0x911, 0xeeeeeeeeeeeeeefffffffffffffffffffffffffffffff28888177772736646717738388488),
|
(0x901, 0x902, 0x911, 0xeeeeeeeeeeeeeefffffffffffffffffffffffffffffff28888177772736646717738388488),
|
||||||
|
@ -83,21 +94,44 @@ class TB(Module):
|
||||||
self.submodules.dut = dma.DMA(bus)
|
self.submodules.dut = dma.DMA(bus)
|
||||||
|
|
||||||
|
|
||||||
|
test_writes_full_stack = [
|
||||||
|
(0, 32, 0, 1),
|
||||||
|
(1, 40, 0, 1),
|
||||||
|
(0, 48, 0, 0),
|
||||||
|
(1, 50, 0, 0),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class FullStackTB(Module):
|
||||||
|
def __init__(self, ws):
|
||||||
|
self.ttl0 = Signal()
|
||||||
|
self.ttl1 = Signal()
|
||||||
|
|
||||||
|
self.submodules.phy0 = ttl_simple.Output(self.ttl0)
|
||||||
|
self.submodules.phy1 = ttl_simple.Output(self.ttl1)
|
||||||
|
|
||||||
|
rtio_channels = [
|
||||||
|
rtio.Channel.from_phy(self.phy0),
|
||||||
|
rtio.Channel.from_phy(self.phy1)
|
||||||
|
]
|
||||||
|
|
||||||
|
sequence = encode_sequence(test_writes_full_stack, ws)
|
||||||
|
|
||||||
|
bus = wishbone.Interface(ws*8)
|
||||||
|
self.submodules.memory = wishbone.SRAM(
|
||||||
|
256, init=sequence, bus=bus)
|
||||||
|
self.submodules.dut = dma.DMA(bus)
|
||||||
|
self.submodules.rtio = rtio.Core(rtio_channels)
|
||||||
|
self.comb += self.dut.cri.connect(self.rtio.cri)
|
||||||
|
|
||||||
|
|
||||||
class TestDMA(unittest.TestCase):
|
class TestDMA(unittest.TestCase):
|
||||||
def test_dma_noerror(self):
|
def test_dma_noerror(self):
|
||||||
ws = 64
|
tb = TB(64)
|
||||||
tb = TB(ws)
|
|
||||||
|
|
||||||
def do_dma(address):
|
|
||||||
yield from tb.dut.dma.base_address.write(address)
|
|
||||||
yield from tb.dut.enable.write(1)
|
|
||||||
yield
|
|
||||||
while ((yield from tb.dut.enable.read())):
|
|
||||||
yield
|
|
||||||
|
|
||||||
def do_writes():
|
def do_writes():
|
||||||
yield from do_dma(0)
|
yield from do_dma(tb.dut, 0)
|
||||||
yield from do_dma(512)
|
yield from do_dma(tb.dut, 512)
|
||||||
|
|
||||||
received = []
|
received = []
|
||||||
@passive
|
@passive
|
||||||
|
@ -124,3 +158,30 @@ class TestDMA(unittest.TestCase):
|
||||||
|
|
||||||
run_simulation(tb, [do_writes(), rtio_sim()])
|
run_simulation(tb, [do_writes(), rtio_sim()])
|
||||||
self.assertEqual(received, test_writes1 + test_writes2)
|
self.assertEqual(received, test_writes1 + test_writes2)
|
||||||
|
|
||||||
|
def test_full_stack(self):
|
||||||
|
tb = FullStackTB(64)
|
||||||
|
|
||||||
|
ttl_changes = []
|
||||||
|
@passive
|
||||||
|
def monitor():
|
||||||
|
old_ttl_states = [0, 0]
|
||||||
|
for time in itertools.count():
|
||||||
|
ttl_states = [
|
||||||
|
(yield tb.ttl0),
|
||||||
|
(yield tb.ttl1)
|
||||||
|
]
|
||||||
|
for i, (old, new) in enumerate(zip(old_ttl_states, ttl_states)):
|
||||||
|
if new != old:
|
||||||
|
ttl_changes.append((time, i))
|
||||||
|
old_ttl_states = ttl_states
|
||||||
|
yield
|
||||||
|
|
||||||
|
run_simulation(tb, {"sys": [
|
||||||
|
do_dma(tb.dut, 0), monitor(),
|
||||||
|
(None for _ in range(70)),
|
||||||
|
]}, {"sys": 8, "rsys": 8, "rtio": 8, "rio": 8, "rio_phy": 8})
|
||||||
|
|
||||||
|
correct_changes = [(timestamp + 11, channel)
|
||||||
|
for channel, timestamp, _, _ in test_writes_full_stack]
|
||||||
|
self.assertEqual(ttl_changes, correct_changes)
|
||||||
|
|
Loading…
Reference in New Issue