From 4a83108637bbd16656efbebd1d84d935d3a78e4b Mon Sep 17 00:00:00 2001 From: morgan Date: Wed, 22 Jan 2025 16:17:39 +0800 Subject: [PATCH] cxp router: init router: --- src/gateware/cxp_router.py | 297 +++++++++++++++++++++++++++++++++++++ 1 file changed, 297 insertions(+) create mode 100644 src/gateware/cxp_router.py diff --git a/src/gateware/cxp_router.py b/src/gateware/cxp_router.py new file mode 100644 index 0000000..62e685d --- /dev/null +++ b/src/gateware/cxp_router.py @@ -0,0 +1,297 @@ +from migen import * +from misoc.interconnect.csr import * +from misoc.interconnect.stream import Endpoint + +from cxp_pipeline import * +# from src.gateware.cxp_pipeline import * # for sim only + +from types import SimpleNamespace +from math import lcm +from operator import or_, add + +word_layout_dchar_4x = [ + ("data", 4*word_width), + ("k", 4*word_width//8), + ("dchar", 4*char_width), + ("dchar_k", 4*char_width//8), + ("valid", 4), +] + +class Stream_Router(Module): + """ + Match the id and route stream packet to the correct downstream and strip the packet header + """ + def __init__(self, routing_ids=[0]): + n_id = len(routing_ids) + assert n_id > 0 + + self.sources = [stream.Endpoint(word_layout_dchar) for _ in range(n_id)] + self.sink = stream.Endpoint(word_layout_dchar) + + # # # + + + stream_id = Signal(char_width) + pak_tag = Signal(char_width) + stream_pak_size = Signal(char_width * 2) + + self.submodules.fsm = fsm = FSM(reset_state="WAIT_HEADER") + + fsm.act( + "WAIT_HEADER", + self.sink.ack.eq(1), + If( + self.sink.stb, + NextValue(stream_id, self.sink.dchar), + NextState("GET_PAK_TAG"), + ), + ) + + fsm.act( + "GET_PAK_TAG", + self.sink.ack.eq(1), + If( + self.sink.stb, + NextValue(pak_tag, self.sink.dchar), + NextState("GET_PAK_SIZE_0"), + ), + ) + + fsm.act( + "GET_PAK_SIZE_0", + self.sink.ack.eq(1), + If( + self.sink.stb, + NextValue(stream_pak_size[8:], self.sink.dchar), + NextState("GET_PAK_SIZE_1"), + ), + ) + + routing_case = {"default": NextState("DISCARD")} + for id in (routing_ids): + routing_case[id] = [NextState(f"COPY_TO_BUFFER_{id}")] + + fsm.act( + "GET_PAK_SIZE_1", + self.sink.ack.eq(1), + If( + self.sink.stb, + NextValue(stream_pak_size[:8], self.sink.dchar), + Case(stream_id, routing_case), + ), + ) + + for key in routing_case: + if key == "default": + fsm.act( + "DISCARD", + self.sink.ack.eq(1), + If(self.sink.stb, + NextValue(stream_pak_size, stream_pak_size - 1), + If(stream_pak_size == 0, + NextValue(stream_id, stream_id.reset), + NextValue(pak_tag, pak_tag.reset), + NextValue(stream_pak_size, stream_pak_size.reset), + NextState("WAIT_HEADER"), + ) + ), + ) + else: + fsm.act( + f"COPY_TO_BUFFER_{key}", + self.sink.connect(self.sources[key]), + + # assume downstream is not blocked + If(self.sink.stb, + NextValue(stream_pak_size, stream_pak_size - 1), + If(stream_pak_size == 0, + NextValue(stream_id, stream_id.reset), + NextValue(pak_tag, pak_tag.reset), + NextValue(stream_pak_size, stream_pak_size.reset), + NextState("WAIT_HEADER"), + ) + ), + + ) + +class Stream_Packet_Gearbox(Module): + """ + + 1:4 gearbox + + """ + def __init__(self): + self.sink = Endpoint(word_layout_dchar) + self.source = Endpoint(word_layout_dchar_4x) + + # # # + + # TODO: take into account of stbs + sink_bits = len(self.sink.payload.raw_bits()) + source_bits = len(self.source.payload.raw_bits()) - 4 # 4 extra "valid" bits + print(sink_bits, source_bits) + assert source_bits/sink_bits == 4 + + self.submodules.fsm = fsm = FSM(reset_state="0") + + + ring_buf_size = lcm(sink_bits, source_bits) + # ensure the shift register is at least twice the size of sink/source dw + if (ring_buf_size//sink_bits) < 2: + ring_buf_size = ring_buf_size * 2 + if (ring_buf_size//source_bits) < 2: + ring_buf_size = ring_buf_size * 2 + + ring_buffer_layout = [] + for name, width in word_layout_dchar: + ring_buffer_layout.append( + (name, width*ring_buf_size) + ) + ring_buffer = Record(ring_buffer_layout) + + + # Control interface + + reset_reg = Signal() + we = Signal() + re = Signal() + level = Signal(max=ring_buf_size) + w_cnt = Signal(max=ring_buf_size//sink_bits) + r_cnt = Signal(max=ring_buf_size//source_bits) + + self.sync += [ + If(reset_reg, + level.eq(level.reset), + ).Else( + If(we & ~re, level.eq(level + sink_bits)), + If(~we & re, level.eq(level - source_bits)), + If(we & re, level.eq(level + sink_bits - source_bits)), + ), + + If(reset_reg, + w_cnt.eq(w_cnt.reset), + r_cnt.eq(r_cnt.reset), + ).Else( + If(we, + If(w_cnt == ((ring_buf_size//sink_bits) - 1), + w_cnt.eq(w_cnt.reset), + ).Else( + w_cnt.eq(w_cnt + 1), + ) + ), + If(re, + If(r_cnt == ((ring_buf_size//source_bits) - 1), + r_cnt.eq(r_cnt.reset), + ).Else( + r_cnt.eq(r_cnt + 1), + ) + ), + ) + ] + + # IO + sink_cases = {} + for i in range(ring_buf_size//sink_bits): + sink_cases[i] = [] + for name, width in word_layout_dchar: + src = getattr(self.sink, name) + dst = getattr(ring_buffer, name)[width*i: width*(i+1)] + sink_cases[i].append(dst.eq(src)) + self.sync += If(self.sink.stb, Case(w_cnt, sink_cases)) + + source_cases = {} + for i in range(ring_buf_size//source_dw): + source_cases[i] = [] + for name, width in word_layout_dchar_4x: + src = getattr(ring_buffer, name)[width*i: width*(i+1)] + dst = getattr(self.source, name) + source_cases[i].append(dst.eq(src)) + + + +class CXPCRC32_Checker(Module): + """ + Verify crc in stream data packet and stream crc less output + """ + def __init__(self): + self.error = Signal() + + # TODO: change to fifo style like in LiteEthMACCRCChecker to improve timinig? + self.sink = stream.Endpoint(word_layout_dchar) + self.submodules.buf = buf = Buffer(word_layout_dchar) + self.source = buf.source + + # # # + + self.submodules.crc = crc = CXPCRC32(word_width) + self.comb += crc.data.eq(self.sink.data), + + self.submodules.fsm = fsm = FSM(reset_state="INIT") + fsm.act("INIT", + crc.reset.eq(1), + NextState("CHECKING"), + ) + + fsm.act("RESET", + crc.reset.eq(1), + self.error.eq(crc.error), + NextState("CHECKING"), + ) + + fsm.act("CHECKING", + If(self.sink.stb & self.sink.eop, + # discard the crc + self.sink.ack.eq(1), + buf.source.eop.eq(1), + NextState("RESET"), + ).Else( + self.sink.connect(buf.sink), + ), + crc.ce.eq(self.sink.stb), + ) + + + + +# +# 4x word pipeline +# + +class Stream_Merger(Module): + """ + Merge n channels stream packet into one sequentially + """ + def __init__(self, layout, n_channels): + assert n_channels > 1 # don't need a arbiter if there is only one channel + + self.active_channels = Signal(n_channels) + + self.sinks = [stream.Endpoint(layout) for _ in range(n_channels)] + self.source = stream.Endpoint(layout) + # # # + + self.submodules.fsm = fsm = FSM(reset_state="0") + + # Section 9.5.5 (CXP-001-2021) + # When Multiple connections are active, stream packets are transmitted in + # ascending order of Connection ID + # Support ch0->1->2->4 topology only + for n, sink in enumerate(self.sinks): + if n < n_channels - 1: + fsm.act(str(n), + sink.connect(self.source), + If(sink.stb & sink.eop & self.source.ack, + If(self.active_channels[n+1], + NextState(str(n+1)), + ). Else( + NextState(str(0)), + ), + ) + ) + else: + fsm.act(str(n), + sink.connect(self.source), + If(sink.stb & sink.eop & self.source.ack, + NextState(str(0)) + ), + )