From bf08fe1d506c7e0c3127d50d26a5fe5f79ba52be Mon Sep 17 00:00:00 2001
From: Donald Sebastian Leung
Date: Mon, 28 Sep 2020 11:36:26 +0800
Subject: [PATCH] Add partial implementation of lane distributor
---
rtio/sed/lane_distributor.py | 114 +++++++++++++++++++++++++++++++++++
1 file changed, 114 insertions(+)
diff --git a/rtio/sed/lane_distributor.py b/rtio/sed/lane_distributor.py
index e69de29..64d217e 100644
--- a/rtio/sed/lane_distributor.py
+++ b/rtio/sed/lane_distributor.py
@@ -0,0 +1,114 @@
+from nmigen import *
+
+from rtio import cri
+from rtio.sed import layouts
+
+__all__ = ["LaneDistributor"]
+
+class LaneDistributor(Elaboratable):
+ def __init__(self, lane_count, seqn_width, layout_payload,
+ compensation, glbl_fine_ts_width,
+ enable_spread=True, quash_channels=[], interface=None):
+ if lane_count & (lane_count - 1):
+ raise NotImplementedError("lane count must be a power of 2")
+
+ if interface is None:
+ interface = cri.Interface()
+ self.cri = interface
+ self.sequence_error = Signal()
+ self.sequence_error_channel = Signal(16, reset_less=True)
+ # The minimum timestamp that an event must have to avoid triggering
+ # an underflow, at the time when the CRI write happens, and to a channel
+ # with zero latency compensation. This is synchronous to the system clock
+ # domain.
+ us_timestamp_width = 64 - glbl_fine_ts_width
+ self.minimum_coarse_timestamp = Signal(us_timestamp_width)
+ self.output = [Record(layouts.fifo_ingress(seqn_width, layout_payload))
+ for _ in range(lane_count)]
+ self.lane_count = lane_count
+ self.us_timestamp_width = us_timestamp_width
+ self.seqn_width = seqn_width
+ self.glbl_fine_ts_width = glbl_fine_ts_width
+ self.quash_channels = quash_channels
+ self.compensation = compensation
+
+ def elaborate(self, platform):
+ m = Module()
+
+ o_status_wait = Signal()
+ o_status_underflow = Signal()
+ m.d.comb += self.cri.o_status.eq(Cat(o_status_wait, o_status_underflow))
+
+ # The core keeps writing events into the current lane as long as timestamps
+ # (after compensation) are strictly increasing, otherwise it switches to
+ # the next lane.
+ # If spread is enabled, it also switches to the next lane after the current
+ # lane has been full, in order to maximize lane utilization.
+ # The current lane is called lane "A". The next lane (which may be chosen
+ # at a later stage by the core) is called lane "B".
+ # Computations for both lanes are prepared in advance to increase performance.
+
+ current_lane = Signal(range(self.lane_count))
+ # The last coarse timestamp received from the CRI, after compensation.
+ # Used to determine when to switch lanes.
+ last_coarse_timestamp = Signal(self.us_timestamp_width)
+ # The last coarse timestamp written to each lane. Used to detect
+ # sequence errors.
+ last_lane_coarse_timestamps = Array(Signal(self.us_timestamp_width)
+ for _ in range(self.lane_count))
+ # Sequence number counter. The sequence number is used to determine which
+ # event wins during a replace.
+ seqn = Signal(self.seqn_width)
+
+ # distribute data to lanes
+ for lio in self.output:
+ m.d.comb += lio.seqn.eq(seqn)
+ m.d.comb += lio.payload.channel.eq(self.cri.chan_sel[:16])
+ m.d.comb += lio.payload.timestamp.eq(self.cri.o_timestamp)
+ if hasattr(lio.payload, "address"):
+ m.d.comb += lio.payload.address.eq(self.cri.o_address)
+ if hasattr(lio.payload, "data"):
+ m.d.comb += lio.payload.data.eq(self.cri.o_data)
+
+ # when timestamp and channel arrive in cycle #1, prepare computations
+ coarse_timestamp = Signal(self.us_timestamp_width)
+ m.d.comb += coarse_timestamp.eq(self.cri.o_timestamp[self.glbl_fine_ts_width:])
+ min_minus_timestamp = Signal(Shape(self.us_timestamp_width + 1, True),
+ reset_less=True)
+ laneAmin_minus_timestamp = Signal.like(min_minus_timestamp)
+ laneBmin_minus_timestamp = Signal.like(min_minus_timestamp)
+ last_minus_timestamp = Signal.like(min_minus_timestamp)
+ current_lane_plus_one = Signal(range(self.lane_count))
+ m.d.comb += current_lane_plus_one.eq(current_lane + 1)
+ m.d.sync += min_minus_timestamp.eq(self.minimum_coarse_timestamp - coarse_timestamp)
+ m.d.sync += laneAmin_minus_timestamp.eq(last_lane_coarse_timestamps[current_lane] - coarse_timestamp)
+ m.d.sync += laneBmin_minus_timestamp.eq(last_lane_coarse_timestamps[current_lane_plus_one] - coarse_timestamp)
+ m.d.sync += last_minus_timestamp.eq(last_coarse_timestamp - coarse_timestamp)
+
+ # Quash channels are "dummy" channels to which writes are completely ignored.
+ # This is used by the RTIO log channel, which is taken into account
+ # by the analyzer but does not enter the lanes.
+ quash = Signal()
+ m.d.sync += quash.eq(0)
+ for channel in self.quash_channels:
+ with m.If(self.cri.chan_sel[:16] == channel):
+ m.d.sync += quash.eq(1)
+
+ assert all(abs(c) < 1 << 14 - 1 for c in self.compensation)
+ latency_compensation = Memory(width=14, depth=len(self.compensation), init=self.compensation)
+ latency_compensation_rdport = latency_compensation.read_port()
+ m.submodules.latency_compensation_rdport = latency_compensation_rdport
+ m.d.comb += latency_compensation_rdport.addr.eq(self.cri.chan_sel[:16])
+
+ # cycle #2, write
+ # TODO
+
+ # cycle #3, read status
+ # TODO
+
+ # current lane has been full, spread events by switching to the next.
+ # TODO
+
+ return m
+
+# LaneDistributor(1, 1, [('channel', 16), ('timestamp', 64)], [], 1).elaborate(None)