diff --git a/artiq/gateware/test/rtio/test_sed_lane_distributor.py b/artiq/gateware/test/rtio/test_sed_lane_distributor.py index b369d711b..488d914a9 100644 --- a/artiq/gateware/test/rtio/test_sed_lane_distributor.py +++ b/artiq/gateware/test/rtio/test_sed_lane_distributor.py @@ -9,9 +9,11 @@ from artiq.gateware.rtio.sed import lane_distributor LANE_COUNT = 8 -def simulate(input_events, wait=True): - dut = lane_distributor.LaneDistributor(LANE_COUNT, 8, [("channel", 8), ("timestamp", 32)], - [0]*256, 3) +def simulate(input_events, compensation=None, wait=True): + layout = [("channel", 8), ("timestamp", 32)] + if compensation is None: + compensation = [0]*256 + dut = lane_distributor.LaneDistributor(LANE_COUNT, 8, layout, compensation, 3) output = [] access_results = [] @@ -122,3 +124,30 @@ class TestLaneDistributor(unittest.TestCase): output, access_results = simulate(input_events) self.assertEqual([o[0] for o in output], [x % LANE_COUNT for x in range(9)]) self.assertEqual([ar[0] for ar in access_results], ["ok"]*9) + + def test_regular_lc(self): + N = 16 + output, access_results = simulate([(n, 8) for n in range(N)], + compensation=range(N), wait=False) + self.assertEqual(output, [(0, n, n, (n+1)*8) for n in range(N)]) + self.assertEqual(access_results, [("ok", 0)]*N) + + def test_lane_switch_lc(self): + N = 32 + compensation = [n//2 for n in range(N)] + output, access_results = simulate([(n, 8) for n in range(N)], + compensation=compensation, wait=False) + self.assertEqual(output, [((n-n//2) % LANE_COUNT, n, n, 8*(1+n//2)) for n in range(N)]) + self.assertEqual([ar[0] for ar in access_results], ["ok"]*N) + + def test_underflow_lc(self): + N = 16 + compensation = [0]*N + input_events = [(n, (n+1)*8) for n in range(N)] + compensation[N-2] = -input_events[N-2][1]//8 + output, access_results = simulate(input_events, compensation=compensation) + self.assertEqual(len(output), len(input_events)-1) # event with underflow must get discarded + self.assertEqual([ar[0] for ar in access_results[:N-2]], ["ok"]*(N-2)) + self.assertEqual(access_results[N-2][0], "underflow") + self.assertEqual(output[N-2], (0, N-2, N-1, N*8)) + self.assertEqual(access_results[N-1][0], "ok")