from nmigen import * from rtio import cri from rtio.sed import layouts __all__ = ["LaneDistributor"] class LaneDistributor(Elaboratable): def __init__(self, lane_count, seqn_width, layout_payload, compensation, glbl_fine_ts_width, enable_spread=True, quash_channels=[], interface=None): if lane_count & (lane_count - 1): raise NotImplementedError("lane count must be a power of 2") if interface is None: interface = cri.Interface() self.cri = interface self.sequence_error = Signal() self.sequence_error_channel = Signal(16, reset_less=True) # The minimum timestamp that an event must have to avoid triggering # an underflow, at the time when the CRI write happens, and to a channel # with zero latency compensation. This is synchronous to the system clock # domain. us_timestamp_width = 64 - glbl_fine_ts_width self.minimum_coarse_timestamp = Signal(us_timestamp_width) self.output = [Record(layouts.fifo_ingress(seqn_width, layout_payload)) for _ in range(lane_count)] self.lane_count = lane_count self.us_timestamp_width = us_timestamp_width self.seqn_width = seqn_width self.glbl_fine_ts_width = glbl_fine_ts_width self.quash_channels = quash_channels self.compensation = compensation self.enable_spread = enable_spread def elaborate(self, platform): m = Module() o_status_wait = Signal() o_status_underflow = Signal() m.d.comb += self.cri.o_status.eq(Cat(o_status_wait, o_status_underflow)) # The core keeps writing events into the current lane as long as timestamps # (after compensation) are strictly increasing, otherwise it switches to # the next lane. # If spread is enabled, it also switches to the next lane after the current # lane has been full, in order to maximize lane utilization. # The current lane is called lane "A". The next lane (which may be chosen # at a later stage by the core) is called lane "B". # Computations for both lanes are prepared in advance to increase performance. current_lane = Signal(range(self.lane_count)) # The last coarse timestamp received from the CRI, after compensation. # Used to determine when to switch lanes. last_coarse_timestamp = Signal(self.us_timestamp_width) # The last coarse timestamp written to each lane. Used to detect # sequence errors. last_lane_coarse_timestamps = Array(Signal(self.us_timestamp_width) for _ in range(self.lane_count)) # Sequence number counter. The sequence number is used to determine which # event wins during a replace. seqn = Signal(self.seqn_width) # distribute data to lanes for lio in self.output: m.d.comb += lio.seqn.eq(seqn) m.d.comb += lio.payload.channel.eq(self.cri.chan_sel[:16]) m.d.comb += lio.payload.timestamp.eq(self.cri.o_timestamp) if hasattr(lio.payload, "address"): m.d.comb += lio.payload.address.eq(self.cri.o_address) if hasattr(lio.payload, "data"): m.d.comb += lio.payload.data.eq(self.cri.o_data) # when timestamp and channel arrive in cycle #1, prepare computations coarse_timestamp = Signal(self.us_timestamp_width) m.d.comb += coarse_timestamp.eq(self.cri.o_timestamp[self.glbl_fine_ts_width:]) min_minus_timestamp = Signal(Shape(self.us_timestamp_width + 1, True), reset_less=True) laneAmin_minus_timestamp = Signal.like(min_minus_timestamp) laneBmin_minus_timestamp = Signal.like(min_minus_timestamp) last_minus_timestamp = Signal.like(min_minus_timestamp) current_lane_plus_one = Signal(range(self.lane_count)) m.d.comb += current_lane_plus_one.eq(current_lane + 1) m.d.sync += min_minus_timestamp.eq(self.minimum_coarse_timestamp - coarse_timestamp) m.d.sync += laneAmin_minus_timestamp.eq(last_lane_coarse_timestamps[current_lane] - coarse_timestamp) m.d.sync += laneBmin_minus_timestamp.eq(last_lane_coarse_timestamps[current_lane_plus_one] - coarse_timestamp) m.d.sync += last_minus_timestamp.eq(last_coarse_timestamp - coarse_timestamp) # Quash channels are "dummy" channels to which writes are completely ignored. # This is used by the RTIO log channel, which is taken into account # by the analyzer but does not enter the lanes. quash = Signal() m.d.sync += quash.eq(0) for channel in self.quash_channels: with m.If(self.cri.chan_sel[:16] == channel): m.d.sync += quash.eq(1) assert all(abs(c) < 1 << 14 - 1 for c in self.compensation) latency_compensation = Memory(width=14, depth=len(self.compensation), init=self.compensation) latency_compensation_rdport = latency_compensation.read_port() m.submodules.latency_compensation_rdport = latency_compensation_rdport m.d.comb += latency_compensation_rdport.addr.eq(self.cri.chan_sel[:16]) # cycle #2, write compensation = Signal(Shape(14, True)) m.d.comb += compensation.eq(latency_compensation_rdport.data) timestamp_above_min = Signal() timestamp_above_last = Signal() timestamp_above_laneA_min = Signal() timestamp_above_laneB_min = Signal() timestamp_above_lane_min = Signal() force_laneB = Signal() use_laneB = Signal() use_lanen = Signal(range(self.lane_count)) do_write = Signal() do_underflow = Signal() do_sequence_error = Signal() m.d.comb += timestamp_above_min.eq(min_minus_timestamp - compensation < 0) m.d.comb += timestamp_above_laneA_min.eq(laneAmin_minus_timestamp - compensation < 0) m.d.comb += timestamp_above_laneB_min.eq(laneBmin_minus_timestamp - compensation < 0) m.d.comb += timestamp_above_last.eq(last_minus_timestamp - compensation < 0) with m.If(force_laneB | ~timestamp_above_last): m.d.comb += use_lanen.eq(current_lane_plus_one) m.d.comb += use_laneB.eq(1) with m.Else(): m.d.comb += use_lanen.eq(current_lane) m.d.comb += use_laneB.eq(0) m.d.comb += timestamp_above_lane_min.eq(Mux(use_laneB, timestamp_above_laneB_min, timestamp_above_laneA_min)) with m.If(~quash & (self.cri.cmd == cri.commands["write"])): with m.If(timestamp_above_min): with m.If(timestamp_above_lane_min): m.d.comb += do_write.eq(1) with m.Else(): m.d.comb += do_sequence_error.eq(1) with m.Else(): m.d.comb += do_underflow.eq(1) m.d.comb += Array(lio.we for lio in self.output)[use_lanen].eq(do_write) compensated_timestamp = Signal(64) m.d.comb += compensated_timestamp.eq(self.cri.o_timestamp + (compensation << self.glbl_fine_ts_width)) with m.If(do_write): m.d.sync += current_lane.eq(use_lanen) m.d.sync += last_coarse_timestamp.eq(compensated_timestamp[self.glbl_fine_ts_width:]) m.d.sync += last_lane_coarse_timestamps[use_lanen].eq(compensated_timestamp[self.glbl_fine_ts_width:]) m.d.sync += seqn.eq(seqn + 1) for lio in self.output: m.d.comb += lio.payload.timestamp.eq(compensated_timestamp) # cycle #3, read status current_lane_writable = Signal() m.d.comb += current_lane_writable.eq(Array(lio.writable for lio in self.output)[current_lane]) m.d.comb += o_status_wait.eq(~current_lane_writable) with m.If(self.cri.cmd == cri.commands["write"]): m.d.sync += o_status_underflow.eq(0) with m.If(do_underflow): m.d.sync += o_status_underflow.eq(1) m.d.sync += self.sequence_error.eq(do_sequence_error) m.d.sync += self.sequence_error_channel.eq(self.cri.chan_sel[:16]) # current lane has been full, spread events by switching to the next. if self.enable_spread: current_lane_writable_r = Signal(reset=1) m.d.sync += current_lane_writable_r.eq(current_lane_writable) with m.If(~current_lane_writable_r & current_lane_writable): m.d.sync += force_laneB.eq(1) with m.If(do_write): m.d.sync += force_laneB.eq(0) return m