forked from M-Labs/artiq
1
0
Fork 0

rtio/sed: test latency compensation

This commit is contained in:
Sebastien Bourdeauducq 2017-09-26 16:11:21 +08:00
parent 9905b8723b
commit e6f0ce3aba
1 changed files with 32 additions and 3 deletions

View File

@ -9,9 +9,11 @@ from artiq.gateware.rtio.sed import lane_distributor
LANE_COUNT = 8 LANE_COUNT = 8
def simulate(input_events, wait=True): def simulate(input_events, compensation=None, wait=True):
dut = lane_distributor.LaneDistributor(LANE_COUNT, 8, [("channel", 8), ("timestamp", 32)], layout = [("channel", 8), ("timestamp", 32)]
[0]*256, 3) if compensation is None:
compensation = [0]*256
dut = lane_distributor.LaneDistributor(LANE_COUNT, 8, layout, compensation, 3)
output = [] output = []
access_results = [] access_results = []
@ -122,3 +124,30 @@ class TestLaneDistributor(unittest.TestCase):
output, access_results = simulate(input_events) output, access_results = simulate(input_events)
self.assertEqual([o[0] for o in output], [x % LANE_COUNT for x in range(9)]) self.assertEqual([o[0] for o in output], [x % LANE_COUNT for x in range(9)])
self.assertEqual([ar[0] for ar in access_results], ["ok"]*9) self.assertEqual([ar[0] for ar in access_results], ["ok"]*9)
def test_regular_lc(self):
N = 16
output, access_results = simulate([(n, 8) for n in range(N)],
compensation=range(N), wait=False)
self.assertEqual(output, [(0, n, n, (n+1)*8) for n in range(N)])
self.assertEqual(access_results, [("ok", 0)]*N)
def test_lane_switch_lc(self):
N = 32
compensation = [n//2 for n in range(N)]
output, access_results = simulate([(n, 8) for n in range(N)],
compensation=compensation, wait=False)
self.assertEqual(output, [((n-n//2) % LANE_COUNT, n, n, 8*(1+n//2)) for n in range(N)])
self.assertEqual([ar[0] for ar in access_results], ["ok"]*N)
def test_underflow_lc(self):
N = 16
compensation = [0]*N
input_events = [(n, (n+1)*8) for n in range(N)]
compensation[N-2] = -input_events[N-2][1]//8
output, access_results = simulate(input_events, compensation=compensation)
self.assertEqual(len(output), len(input_events)-1) # event with underflow must get discarded
self.assertEqual([ar[0] for ar in access_results[:N-2]], ["ok"]*(N-2))
self.assertEqual(access_results[N-2][0], "underflow")
self.assertEqual(output[N-2], (0, N-2, N-1, N*8))
self.assertEqual(access_results[N-1][0], "ok")