forked from M-Labs/artiq
drtio: input unittest
This commit is contained in:
parent
56fd9b3b4b
commit
13ae1d1a38
@ -36,10 +36,12 @@ class DummyRXSynchronizer:
|
||||
return signal
|
||||
|
||||
|
||||
class LargeDataReceiver(Module):
|
||||
def __init__(self, width):
|
||||
self.rtlink = rtlink.Interface(rtlink.OInterface(width))
|
||||
self.received_data = Signal(width)
|
||||
class SimpleIOPHY(Module):
|
||||
def __init__(self, o_width, i_width):
|
||||
self.rtlink = rtlink.Interface(
|
||||
rtlink.OInterface(o_width),
|
||||
rtlink.IInterface(i_width, timestamped=True))
|
||||
self.received_data = Signal(o_width)
|
||||
self.sync.rio_phy += If(self.rtlink.o.stb,
|
||||
self.received_data.eq(self.rtlink.o.data))
|
||||
|
||||
@ -56,7 +58,7 @@ class DUT(Module):
|
||||
rx_synchronizer = DummyRXSynchronizer()
|
||||
self.submodules.phy0 = ttl_simple.Output(self.ttl0)
|
||||
self.submodules.phy1 = ttl_simple.Output(self.ttl1)
|
||||
self.submodules.phy2 = LargeDataReceiver(512)
|
||||
self.submodules.phy2 = SimpleIOPHY(512, 32) # test wide output data
|
||||
rtio_channels = [
|
||||
rtio.Channel.from_phy(self.phy0, ofifo_depth=4),
|
||||
rtio.Channel.from_phy(self.phy1, ofifo_depth=4),
|
||||
@ -71,7 +73,7 @@ class TestFullStack(unittest.TestCase):
|
||||
"rio": 5, "rio_phy": 5,
|
||||
"sys_with_rst": 8, "rtio_with_rst": 5}
|
||||
|
||||
def test_controller(self):
|
||||
def test_outputs(self):
|
||||
dut = DUT(2)
|
||||
kcsrs = dut.master_ki
|
||||
csrs = dut.master.rt_controller.csrs
|
||||
@ -229,6 +231,48 @@ class TestFullStack(unittest.TestCase):
|
||||
{"sys": test(), "rtio": check_ttls()}, self.clocks)
|
||||
self.assertEqual(ttl_changes, correct_ttl_changes)
|
||||
|
||||
def test_inputs(self):
|
||||
dut = DUT(2)
|
||||
kcsrs = dut.master_ki
|
||||
|
||||
def get_input(timeout):
|
||||
yield from kcsrs.chan_sel.write(2)
|
||||
yield from kcsrs.timestamp.write(10)
|
||||
yield from kcsrs.i_request.write(1)
|
||||
yield
|
||||
status = yield from kcsrs.i_status.read()
|
||||
while status & 0x4:
|
||||
yield
|
||||
status = yield from kcsrs.i_status.read()
|
||||
if status & 0x1:
|
||||
return "timeout"
|
||||
if status & 0x2:
|
||||
return "overflow"
|
||||
return ((yield from kcsrs.i_data.read()),
|
||||
(yield from kcsrs.i_timestamp.read()))
|
||||
|
||||
def test():
|
||||
# wait for link layer ready
|
||||
for i in range(5):
|
||||
yield
|
||||
|
||||
i1 = yield from get_input(10)
|
||||
i2 = yield from get_input(20)
|
||||
self.assertEqual(i1, (0x600d1dea, 6))
|
||||
self.assertEqual(i2, "timeout")
|
||||
|
||||
def generate_input():
|
||||
for i in range(5):
|
||||
yield
|
||||
yield dut.phy2.rtlink.i.data.eq(0x600d1dea)
|
||||
yield dut.phy2.rtlink.i.stb.eq(1)
|
||||
yield
|
||||
yield dut.phy2.rtlink.i.data.eq(0)
|
||||
yield dut.phy2.rtlink.i.stb.eq(0)
|
||||
|
||||
run_simulation(dut,
|
||||
{"sys": test(), "rtio": generate_input()}, self.clocks, vcd_name="foo.vcd")
|
||||
|
||||
def test_echo(self):
|
||||
dut = DUT(2)
|
||||
csrs = dut.master.rt_controller.csrs
|
||||
|
Loading…
Reference in New Issue
Block a user