drtio: test large data

This commit is contained in:
Sebastien Bourdeauducq 2016-11-27 12:57:12 +08:00
parent 0903964488
commit b2450c7c56

View File

@ -1,10 +1,12 @@
import unittest
from types import SimpleNamespace
import random
from migen import *
from artiq.gateware.drtio import *
from artiq.gateware import rtio
from artiq.gateware.rtio import rtlink
from artiq.gateware.rtio.phy import ttl_simple
from artiq.coredevice.exceptions import *
@ -33,6 +35,14 @@ class DummyRXSynchronizer:
return signal
class LargeDataReceiver(Module):
def __init__(self, width):
self.rtlink = rtlink.Interface(rtlink.OInterface(width))
self.received_data = Signal(width)
self.sync.rio_phy += If(self.rtlink.o.stb,
self.received_data.eq(self.rtlink.o.data))
class DUT(Module):
def __init__(self, nwords):
self.ttl0 = Signal()
@ -45,9 +55,11 @@ class DUT(Module):
rx_synchronizer = DummyRXSynchronizer()
self.submodules.phy0 = ttl_simple.Output(self.ttl0)
self.submodules.phy1 = ttl_simple.Output(self.ttl1)
self.submodules.phy2 = LargeDataReceiver(512)
rtio_channels = [
rtio.Channel.from_phy(self.phy0, ofifo_depth=4),
rtio.Channel.from_phy(self.phy1, ofifo_depth=4)
rtio.Channel.from_phy(self.phy1, ofifo_depth=4),
rtio.Channel.from_phy(self.phy2, ofifo_depth=4),
]
self.submodules.satellite = DRTIOSatellite(
self.transceivers.bob, rx_synchronizer, rtio_channels)
@ -64,11 +76,13 @@ class TestFullStack(unittest.TestCase):
ttl_changes = []
correct_ttl_changes = [
# from test_pulses
(203, 0),
(208, 0),
(208, 1),
(214, 1),
# from test_fifo_space
(414, 0),
(454, 0),
(494, 0),
@ -136,6 +150,15 @@ class TestFullStack(unittest.TestCase):
yield from write(0, 1)
delay(200*8)
def test_large_data():
correct_large_data = random.Random(0).randrange(2**512-1)
self.assertNotEqual((yield dut.phy2.received_data), correct_large_data)
delay(10*8)
yield from write(2, correct_large_data)
for i in range(40):
yield
self.assertEqual((yield dut.phy2.received_data), correct_large_data)
def test_fifo_space():
delay(200*8)
max_wlen = 0
@ -161,11 +184,11 @@ class TestFullStack(unittest.TestCase):
def test_tsc_error():
err_present = yield from mgr.packet_err_present.read()
self.assertEqual(err_present, 0)
yield from csrs.tsc_correction.write(10000000)
yield from csrs.tsc_correction.write(100000000)
yield from csrs.set_time.write(1)
for i in range(5):
for i in range(15):
yield
delay(10000)
delay(10000*8)
yield from write(0, 1)
for i in range(10):
yield
@ -187,6 +210,7 @@ class TestFullStack(unittest.TestCase):
yield from test_pulses()
yield from test_sequence_error()
yield from test_fifo_space()
yield from test_large_data()
yield from test_fifo_emptied()
yield from test_tsc_error()
@ -203,7 +227,7 @@ class TestFullStack(unittest.TestCase):
yield
cycle += 1
run_simulation(dut,
run_simulation(dut,
{"sys": test(), "rtio": check_ttls()}, self.clocks)
self.assertEqual(ttl_changes, correct_ttl_changes)