artiq-zynq/sim_generator.py
morgan f6b13f271d sim: prototyping frame decoding pipeine
sim: add double buffer
sim: add eop marker for crc checker in double buffer
sim: add KCode, pak type & CRC generator
sim: add Stream crossbar
sim: add stream pipeline with parser & buffer
sim: add frame generator & image viewer
sim: add arbiter
sim: add broadcaster, double buffer & eop tester
2025-01-22 13:42:07 +08:00

123 lines
3.7 KiB
Python

from migen import *
from misoc.interconnect.csr import *
from misoc.interconnect import stream
from src.gateware.cxp_frame_pipeline import CXPCRC32
from src.gateware.cxp_pipeline import *
class CXPCRC32Inserter(Module):
def __init__(self, insert_eop=True):
self.sink = stream.Endpoint(word_layout)
self.source = stream.Endpoint(word_layout)
# # #
self.submodules.crc = crc = CXPCRC32(word_width)
self.submodules.fsm = fsm = FSM(reset_state="IDLE")
# WARNING: this will eat data if the source don't care about ack
# NOTE: need one cycle to turn itself on
fsm.act(
"IDLE",
crc.reset.eq(1),
self.sink.ack.eq(1),
If(
self.sink.stb,
self.sink.ack.eq(0),
NextState("COPY"),
),
)
fsm.act(
"COPY",
crc.ce.eq(self.sink.stb & self.source.ack),
crc.data.eq(self.sink.data),
self.sink.connect(self.source),
self.source.eop.eq(0),
If(
self.sink.stb & self.sink.eop & self.source.ack,
NextState("INSERT"),
),
)
fsm.act(
"INSERT",
self.source.stb.eq(1),
self.source.eop.eq(1) if insert_eop else self.source.eop.eq(0),
self.source.data.eq(crc.value),
If(self.source.ack, NextState("IDLE")),
)
class StreamPacket_Wrapper(Module):
def __init__(self):
self.sink = stream.Endpoint(word_layout)
self.source = stream.Endpoint(word_layout)
# # #
self.submodules.fsm = fsm = FSM(reset_state="IDLE")
fsm.act(
"IDLE",
self.sink.ack.eq(1),
If(
self.sink.stb,
self.sink.ack.eq(0),
NextState("INSERT_HEADER_0"),
),
)
fsm.act(
"INSERT_HEADER_0",
self.sink.ack.eq(0),
self.source.stb.eq(1),
self.source.data.eq(Replicate(KCode["pak_start"], 4)),
self.source.k.eq(Replicate(1, 4)),
If(self.source.ack, NextState("INSERT_HEADER_1")),
)
fsm.act(
"INSERT_HEADER_1",
self.sink.ack.eq(0),
self.source.stb.eq(1),
self.source.data.eq(Replicate(C(0x01, char_width), 4)),
self.source.k.eq(Replicate(0, 4)),
If(self.source.ack, NextState("COPY")),
)
fsm.act(
"COPY",
self.sink.connect(self.source),
self.source.eop.eq(0),
If(
self.sink.stb & self.sink.eop & self.source.ack,
NextState("INSERT_FOOTER"),
),
)
fsm.act(
"INSERT_FOOTER",
self.sink.ack.eq(0),
self.source.stb.eq(1),
self.source.data.eq(Replicate(KCode["pak_end"], 4)),
self.source.k.eq(Replicate(1, 4)),
# Simulate RX don't have eop tagged
# self.source.eop.eq(1),
If(self.source.ack, NextState("IDLE")),
)
# With KCode & 0x01*4
class StreamData_Generator(Module):
def __init__(self):
# should be big enough for all test
self.submodules.buffer = buffer = stream.SyncFIFO(word_layout, 32)
self.submodules.crc_inserter = crc_inserter = CXPCRC32Inserter()
self.submodules.wrapper = wrapper = StreamPacket_Wrapper()
# # #
pipeline = [buffer, crc_inserter, wrapper]
for s, d in zip(pipeline, pipeline[1:]):
self.comb += s.source.connect(d.sink)
self.sink = pipeline[0].sink
self.source = pipeline[-1].source