diff --git a/artiq/gateware/test/rtio/test_dma.py b/artiq/gateware/test/rtio/test_dma.py index 5e4115279..759fe60b0 100644 --- a/artiq/gateware/test/rtio/test_dma.py +++ b/artiq/gateware/test/rtio/test_dma.py @@ -23,7 +23,7 @@ def encode_record(channel, timestamp, address, data): r += encode_n(channel, 3, 3) r += encode_n(timestamp, 8, 8) r += encode_n(address, 2, 2) - r += encode_n(data, 4, 64) + r += encode_n(data, 1, 64) return encode_n(len(r)+1, 1, 1) + r @@ -37,24 +37,45 @@ def pack(x, size): n |= x[j] except IndexError: pass - # print("{:0128x}".format(n)) r.append(n) return r -test_writes = [ +def encode_sequence(writes, ws): + sequence = [b for write in writes for b in encode_record(*write)] + sequence.append(0) + return pack(sequence, ws) + + +test_writes1 = [ (0x01, 0x23, 0x12, 0x33), (0x901, 0x902, 0x911, 0xeeeeeeeeeeeeeefffffffffffffffffffffffffffffff28888177772736646717738388488), (0x81, 0x288, 0x88, 0x8888) ] +test_writes2 = [ + (0x10, 0x10000, 0x20, 0x77), + (0x11, 0x10001, 0x22, 0x7777), + (0x12, 0x10002, 0x30, 0x777777), + (0x13, 0x10003, 0x40, 0x77777788), + (0x14, 0x10004, 0x50, 0x7777778899), +] + + +prng = random.Random(0) + + class TB(Module): def __init__(self, ws): - sequence = [b for write in test_writes for b in encode_record(*write)] - sequence.append(0) - # print(sequence) - sequence = pack(sequence, ws) + sequence1 = encode_sequence(test_writes1, ws) + sequence2 = encode_sequence(test_writes2, ws) + offset = 512//ws + assert len(sequence1) < offset + sequence = ( + sequence1 + + [prng.randrange(2**(ws*8)) for _ in range(offset-len(sequence1))] + + sequence2) bus = wishbone.Interface(ws*8) self.submodules.memory = wishbone.SRAM( @@ -64,16 +85,19 @@ class TB(Module): class TestDMA(unittest.TestCase): def test_dma_noerror(self): - prng = random.Random(0) ws = 64 tb = TB(ws) - def do_dma(): - for i in range(2): - yield from tb.dut.enable.write(1) + def do_dma(address): + yield from tb.dut.dma.base_address.write(address) + yield from tb.dut.enable.write(1) + yield + while ((yield from tb.dut.enable.read())): yield - while ((yield from tb.dut.enable.read())): - yield + + def do_writes(): + yield from do_dma(0) + yield from do_dma(512) received = [] @passive @@ -98,5 +122,5 @@ class TestDMA(unittest.TestCase): self.fail("unexpected RTIO command") yield - run_simulation(tb, [do_dma(), rtio_sim()]) - self.assertEqual(received, test_writes*2) + run_simulation(tb, [do_writes(), rtio_sim()]) + self.assertEqual(received, test_writes1 + test_writes2)