rtio/sed: add top-level core unit test

pull/889/head
Sebastien Bourdeauducq 2017-09-16 14:05:08 +08:00
parent a155a481b1
commit 25c644c663
1 changed files with 87 additions and 0 deletions

View File

@ -0,0 +1,87 @@
import unittest
import itertools
from migen import *
from artiq.gateware import rtio
from artiq.gateware.rtio import cri
from artiq.gateware.rtio.sed.core import *
from artiq.gateware.rtio.phy import ttl_simple
class DUT(Module):
def __init__(self):
self.ttl0 = Signal()
self.ttl1 = Signal()
self.submodules.phy0 = ttl_simple.Output(self.ttl0)
self.submodules.phy1 = ttl_simple.Output(self.ttl1)
rtio_channels = [
rtio.Channel.from_phy(self.phy0),
rtio.Channel.from_phy(self.phy1)
]
self.submodules.sed = SED(rtio_channels, "sync")
self.sync += [
self.sed.coarse_timestamp.eq(self.sed.coarse_timestamp + 1),
self.sed.minimum_coarse_timestamp.eq(self.sed.coarse_timestamp + 16)
]
def simulate(input_events):
dut = DUT()
ttl_changes = []
access_results = []
def gen():
yield dut.sed.cri.chan_sel.eq(0)
for timestamp, data in input_events:
yield dut.sed.cri.timestamp.eq(timestamp)
yield dut.sed.cri.o_data.eq(data)
yield
yield dut.sed.cri.cmd.eq(cri.commands["write"])
yield
yield dut.sed.cri.cmd.eq(cri.commands["nop"])
access_time = 0
yield
while (yield dut.sed.cri.o_status) & 0x01:
yield
access_time += 1
status = (yield dut.sed.cri.o_status)
access_status = "ok"
if status & 0x02:
access_status = "underflow"
if status & 0x04:
access_status = "sequence_error"
access_results.append((access_status, access_time))
@passive
def monitor():
old_ttl_state = 0
for time in itertools.count():
ttl_state = yield dut.ttl0
if ttl_state != old_ttl_state:
ttl_changes.append(time)
old_ttl_state = ttl_state
yield
run_simulation(dut, {"sys": [
gen(), monitor(),
(None for _ in range(45))
]}, {"sys": 5, "rio": 5, "rio_phy": 5})
return ttl_changes, access_results
class TestSED(unittest.TestCase):
def test_sed(self):
input_events = [(18, 1), (20, 0), (25, 1), (30, 0)]
latency = 11
ttl_changes, access_results = simulate(input_events)
self.assertEqual(ttl_changes, [e[0] + latency for e in input_events])
self.assertEqual(access_results, [("ok", 0)]*len(input_events))