2
0
mirror of https://github.com/m-labs/artiq.git synced 2024-12-19 08:26:30 +08:00

drtio: self-checking echo test

This commit is contained in:
Sebastien Bourdeauducq 2016-10-07 17:31:51 +08:00
parent 0574e882d2
commit 43caffc168

View File

@ -48,7 +48,27 @@ class PacketInterface:
if frame:
frame_words.append((yield self.data))
if previous_frame and not frame:
callback(frame_words)
packet_type = self.plm.type_names[frame_words[0] & 0xff]
packet_nwords = layout_len(self.plm.layouts[packet_type]) \
//len(self.data)
packet, trailer = frame_words[:packet_nwords], \
frame_words[packet_nwords:]
n = 0
packet_int = 0
for w in packet:
packet_int |= (w << n)
n += len(self.data)
field_dict = dict()
idx = 0
for field_name, field_size in self.plm.layouts[packet_type]:
v = (packet_int >> idx) & (2**field_size - 1)
field_dict[field_name] = v
idx += field_size
callback(packet_type, field_dict, trailer)
frame_words = []
previous_frame = frame
yield
@ -56,12 +76,18 @@ class PacketInterface:
class TestSatellite(unittest.TestCase):
def test_echo(self):
nwords = 4
for nwords in range(1, 8):
dut = RTPacketSatellite(nwords)
pt = PacketInterface("m2s", dut.rx_rt_frame, dut.rx_rt_data)
pr = PacketInterface("s2m", dut.tx_rt_frame, dut.tx_rt_data)
completed = False
def send():
yield from pt.send("echo_request")
for i in range(40):
while not completed:
yield
run_simulation(dut, [send(), pr.receive(print)])
def receive(packet_type, field_dict, trailer):
nonlocal completed
self.assertEqual(packet_type, "echo_reply")
self.assertEqual(trailer, [])
completed = True
run_simulation(dut, [send(), pr.receive(receive)])