forked from M-Labs/artiq
drtio: reorganize tests
This commit is contained in:
parent
4f963e1e11
commit
6057cb797c
94
artiq/gateware/test/drtio/test_cdc.py
Normal file
94
artiq/gateware/test/drtio/test_cdc.py
Normal file
@ -0,0 +1,94 @@
|
|||||||
|
import unittest
|
||||||
|
import random
|
||||||
|
|
||||||
|
from migen import *
|
||||||
|
|
||||||
|
from artiq.gateware.drtio.rt_packet_master import (_CrossDomainRequest,
|
||||||
|
_CrossDomainNotification)
|
||||||
|
|
||||||
|
class TestCDC(unittest.TestCase):
|
||||||
|
def test_cross_domain_request(self):
|
||||||
|
prng = random.Random(1)
|
||||||
|
for sys_freq in 3, 6, 11:
|
||||||
|
for srv_freq in 3, 6, 11:
|
||||||
|
req_stb = Signal()
|
||||||
|
req_ack = Signal()
|
||||||
|
req_data = Signal(8)
|
||||||
|
srv_stb = Signal()
|
||||||
|
srv_ack = Signal()
|
||||||
|
srv_data = Signal(8)
|
||||||
|
test_seq = [93, 92, 19, 39, 91, 30, 12, 91, 38, 42]
|
||||||
|
received_seq = []
|
||||||
|
|
||||||
|
def requester():
|
||||||
|
for data in test_seq:
|
||||||
|
yield req_data.eq(data)
|
||||||
|
yield req_stb.eq(1)
|
||||||
|
yield
|
||||||
|
while not (yield req_ack):
|
||||||
|
yield
|
||||||
|
yield req_stb.eq(0)
|
||||||
|
for j in range(prng.randrange(0, 10)):
|
||||||
|
yield
|
||||||
|
|
||||||
|
def server():
|
||||||
|
for i in range(len(test_seq)):
|
||||||
|
while not (yield srv_stb):
|
||||||
|
yield
|
||||||
|
received_seq.append((yield srv_data))
|
||||||
|
for j in range(prng.randrange(0, 10)):
|
||||||
|
yield
|
||||||
|
yield srv_ack.eq(1)
|
||||||
|
yield
|
||||||
|
yield srv_ack.eq(0)
|
||||||
|
yield
|
||||||
|
|
||||||
|
dut = _CrossDomainRequest("srv",
|
||||||
|
req_stb, req_ack, req_data,
|
||||||
|
srv_stb, srv_ack, srv_data)
|
||||||
|
run_simulation(dut,
|
||||||
|
{"sys": requester(), "srv": server()},
|
||||||
|
{"sys": sys_freq, "srv": srv_freq})
|
||||||
|
self.assertEqual(test_seq, received_seq)
|
||||||
|
|
||||||
|
def test_cross_domain_notification(self):
|
||||||
|
prng = random.Random(1)
|
||||||
|
|
||||||
|
emi_stb = Signal()
|
||||||
|
emi_data = Signal(8)
|
||||||
|
rec_stb = Signal()
|
||||||
|
rec_ack = Signal()
|
||||||
|
rec_data = Signal(8)
|
||||||
|
|
||||||
|
test_seq = [23, 12, 8, 3, 28]
|
||||||
|
received_seq = []
|
||||||
|
|
||||||
|
def emitter():
|
||||||
|
for data in test_seq:
|
||||||
|
yield emi_stb.eq(1)
|
||||||
|
yield emi_data.eq(data)
|
||||||
|
yield
|
||||||
|
yield emi_stb.eq(0)
|
||||||
|
yield
|
||||||
|
for j in range(prng.randrange(0, 3)):
|
||||||
|
yield
|
||||||
|
|
||||||
|
def receiver():
|
||||||
|
for i in range(len(test_seq)):
|
||||||
|
while not (yield rec_stb):
|
||||||
|
yield
|
||||||
|
received_seq.append((yield rec_data))
|
||||||
|
yield rec_ack.eq(1)
|
||||||
|
yield
|
||||||
|
yield rec_ack.eq(0)
|
||||||
|
yield
|
||||||
|
for j in range(prng.randrange(0, 3)):
|
||||||
|
yield
|
||||||
|
|
||||||
|
dut = _CrossDomainNotification("emi",
|
||||||
|
emi_stb, emi_data,
|
||||||
|
rec_stb, rec_ack, rec_data)
|
||||||
|
run_simulation(dut,
|
||||||
|
{"emi": emitter(), "sys": receiver()},
|
||||||
|
{"emi": 13, "sys": 3})
|
||||||
|
self.assertEqual(test_seq, received_seq)
|
@ -1,13 +1,10 @@
|
|||||||
import unittest
|
import unittest
|
||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
import random
|
|
||||||
|
|
||||||
from migen import *
|
from migen import *
|
||||||
|
|
||||||
from artiq.gateware.drtio.rt_serializer import *
|
from artiq.gateware.drtio.rt_serializer import *
|
||||||
from artiq.gateware.drtio.rt_packet_satellite import RTPacketSatellite
|
from artiq.gateware.drtio.rt_packet_satellite import RTPacketSatellite
|
||||||
from artiq.gateware.drtio.rt_packet_master import (_CrossDomainRequest,
|
|
||||||
_CrossDomainNotification)
|
|
||||||
|
|
||||||
|
|
||||||
class PacketInterface:
|
class PacketInterface:
|
||||||
@ -123,91 +120,3 @@ class TestSatellite(unittest.TestCase):
|
|||||||
yield
|
yield
|
||||||
run_simulation(dut, [send(), receive()])
|
run_simulation(dut, [send(), receive()])
|
||||||
self.assertEqual(tx_times, rx_times)
|
self.assertEqual(tx_times, rx_times)
|
||||||
|
|
||||||
|
|
||||||
class TestCDC(unittest.TestCase):
|
|
||||||
def test_cross_domain_request(self):
|
|
||||||
prng = random.Random(1)
|
|
||||||
for sys_freq in 3, 6, 11:
|
|
||||||
for srv_freq in 3, 6, 11:
|
|
||||||
req_stb = Signal()
|
|
||||||
req_ack = Signal()
|
|
||||||
req_data = Signal(8)
|
|
||||||
srv_stb = Signal()
|
|
||||||
srv_ack = Signal()
|
|
||||||
srv_data = Signal(8)
|
|
||||||
test_seq = [93, 92, 19, 39, 91, 30, 12, 91, 38, 42]
|
|
||||||
received_seq = []
|
|
||||||
|
|
||||||
def requester():
|
|
||||||
for data in test_seq:
|
|
||||||
yield req_data.eq(data)
|
|
||||||
yield req_stb.eq(1)
|
|
||||||
yield
|
|
||||||
while not (yield req_ack):
|
|
||||||
yield
|
|
||||||
yield req_stb.eq(0)
|
|
||||||
for j in range(prng.randrange(0, 10)):
|
|
||||||
yield
|
|
||||||
|
|
||||||
def server():
|
|
||||||
for i in range(len(test_seq)):
|
|
||||||
while not (yield srv_stb):
|
|
||||||
yield
|
|
||||||
received_seq.append((yield srv_data))
|
|
||||||
for j in range(prng.randrange(0, 10)):
|
|
||||||
yield
|
|
||||||
yield srv_ack.eq(1)
|
|
||||||
yield
|
|
||||||
yield srv_ack.eq(0)
|
|
||||||
yield
|
|
||||||
|
|
||||||
dut = _CrossDomainRequest("srv",
|
|
||||||
req_stb, req_ack, req_data,
|
|
||||||
srv_stb, srv_ack, srv_data)
|
|
||||||
run_simulation(dut,
|
|
||||||
{"sys": requester(), "srv": server()},
|
|
||||||
{"sys": sys_freq, "srv": srv_freq})
|
|
||||||
self.assertEqual(test_seq, received_seq)
|
|
||||||
|
|
||||||
def test_cross_domain_notification(self):
|
|
||||||
prng = random.Random(1)
|
|
||||||
|
|
||||||
emi_stb = Signal()
|
|
||||||
emi_data = Signal(8)
|
|
||||||
rec_stb = Signal()
|
|
||||||
rec_ack = Signal()
|
|
||||||
rec_data = Signal(8)
|
|
||||||
|
|
||||||
test_seq = [23, 12, 8, 3, 28]
|
|
||||||
received_seq = []
|
|
||||||
|
|
||||||
def emitter():
|
|
||||||
for data in test_seq:
|
|
||||||
yield emi_stb.eq(1)
|
|
||||||
yield emi_data.eq(data)
|
|
||||||
yield
|
|
||||||
yield emi_stb.eq(0)
|
|
||||||
yield
|
|
||||||
for j in range(prng.randrange(0, 3)):
|
|
||||||
yield
|
|
||||||
|
|
||||||
def receiver():
|
|
||||||
for i in range(len(test_seq)):
|
|
||||||
while not (yield rec_stb):
|
|
||||||
yield
|
|
||||||
received_seq.append((yield rec_data))
|
|
||||||
yield rec_ack.eq(1)
|
|
||||||
yield
|
|
||||||
yield rec_ack.eq(0)
|
|
||||||
yield
|
|
||||||
for j in range(prng.randrange(0, 3)):
|
|
||||||
yield
|
|
||||||
|
|
||||||
dut = _CrossDomainNotification("emi",
|
|
||||||
emi_stb, emi_data,
|
|
||||||
rec_stb, rec_ack, rec_data)
|
|
||||||
run_simulation(dut,
|
|
||||||
{"emi": emitter(), "sys": receiver()},
|
|
||||||
{"emi": 13, "sys": 3})
|
|
||||||
self.assertEqual(test_seq, received_seq)
|
|
Loading…
Reference in New Issue
Block a user