artiq-zynq/sim_broadcaster.py
morgan 910bad029c sim GW: prototyping frame decoding pipeine
sim: add double buffer
sim: add eop marker for crc checker in double buffer
sim: add KCode, pak type & CRC generator
sim: add Stream crossbar
sim: add stream pipeline with parser & buffer
sim: add frame generator & image viewer
sim: add arbiter
sim: add broadcaster, double buffer & eop tester

sim: add roi pipeline

sim: update roi
2025-01-09 15:56:10 +08:00

92 lines
3.2 KiB
Python

from migen import *
from misoc.interconnect.csr import *
from misoc.interconnect import stream
from sim_generator import CXPCRC32Inserter
from src.gateware.cxp_frame_pipeline import *
from src.gateware.cxp_pipeline import *
class double_buffer_pipeline(Module):
def __init__(self):
fifo = stream.SyncFIFO(word_layout, 32)
dchar_decoder = Duplicated_Char_Decoder()
broadcaster = Stream_Broadcaster(1)
pipeline = [fifo, dchar_decoder, broadcaster]
self.submodules += pipeline
for s, d in zip(pipeline, pipeline[1:]):
self.comb += s.source.connect(d.sink)
self.sink = pipeline[0].sink
self.submodules.buffer = buffer = Buffer(word_layout_dchar)
self.comb += broadcaster.sources[0].connect(buffer.sink)
self.source = buffer.source
# for sim, no backpressure
self.comb += self.source.ack.eq(1)
dut = double_buffer_pipeline()
def packet_sim(packets=[]):
print("=================TEST========================")
sink = dut.sink
cyc = len(packets)
pak = packets
for c in range(cyc):
yield sink.data.eq(pak[c]["data"])
yield sink.k.eq(pak[c]["k"])
yield sink.stb.eq(1)
if "eop" in pak[c]:
yield sink.eop.eq(1)
else:
yield sink.eop.eq(0)
yield
# extra clk cycles
for _ in range(cyc, cyc + 20):
yield sink.data.eq(0)
yield sink.k.eq(0)
yield sink.stb.eq(0)
yield sink.eop.eq(0)
yield
assert True
def testbench():
paks = [
{"data": Replicate(C(0, char_width), 4), "k": Replicate(0, 4)},
{"data": Replicate(C(0, char_width), 4), "k": Replicate(0, 4)},
{"data": Replicate(C(4, 2 * char_width)[8:], 4), "k": Replicate(0, 4)},
{"data": Replicate(C(4, 2 * char_width)[:8], 4), "k": Replicate(0, 4)}, # CRC doesn't count
{"data": C(0x7C7C7C7C, word_dw), "k": Replicate(1, 4)},
{"data": C(0x01010101, word_dw), "k": Replicate(0, 4)},
{"data": C(0x00000000, word_dw), "k": Replicate(0, 4)},
{"data": C(0x00000000, word_dw), "k": Replicate(0, 4)},
{"data": C(0xF6ACEF6A, word_dw), "k": Replicate(0, 4)},
{"data": Replicate(C(0, char_width), 4), "k": Replicate(0, 4)},
{"data": Replicate(C(1, char_width), 4), "k": Replicate(0, 4)},
{"data": Replicate(C(8, 2 * char_width)[8:], 4), "k": Replicate(0, 4)},
{"data": Replicate(C(8, 2 * char_width)[:8], 4), "k": Replicate(0, 4)}, # CRC doesn't count
{"data": C(0x19191919, word_dw), "k": Replicate(0, 4)},
{"data": C(0x00000000, word_dw), "k": Replicate(0, 4)},
{"data": C(0x09090909, word_dw), "k": Replicate(0, 4)},
{"data": C(0x90909090, word_dw), "k": Replicate(0, 4)},
{"data": C(0x00000000, word_dw), "k": Replicate(0, 4)},
{"data": C(0x00000000, word_dw), "k": Replicate(0, 4)},
{"data": C(0x00000000, word_dw), "k": Replicate(0, 4)},
{"data": C(0x00000000, word_dw), "k": Replicate(0, 4)},
{"data": C(0x985EFDB2, word_dw), "k": Replicate(0, 4)},
]
yield from packet_sim(paks)
run_simulation(dut, testbench(), vcd_name="sim-cxp.vcd")