from migen import * from misoc.interconnect.csr import * from misoc.interconnect import stream # from src.gateware.cxp_frame_pipeline import * from src.gateware.cxp_pipeline import * from types import SimpleNamespace class DUT(Module): def __init__(self): # PHY phy = SimpleNamespace() phy.sink = stream.Endpoint(word_layout) phy.source = stream.Endpoint(word_layout) self.sync += [ phy.source.stb.eq(0), If(~((phy.sink.data[:8] == 0xBC) & (phy.sink.k[0] == 1)), phy.source.stb.eq(1), phy.source.data.eq(phy.sink.data), phy.source.k.eq(phy.sink.k), ), ] # # # dchar_decoder = Duplicated_Char_Decoder() eop_marker = EOP_Marker() pipeline = [phy, dchar_decoder, eop_marker] self.submodules += pipeline[1:] #phy is not a submodules for s, d in zip(pipeline, pipeline[1:]): self.comb += s.source.connect(d.sink) self.sink, self.source = pipeline[0].sink, pipeline[-1].source dut = DUT() def packet_sim(packets=[]): print("=================TEST========================") sink = dut.sink source = dut.source for i, p in enumerate(packets): yield sink.data.eq(p["data"]) yield sink.k.eq(p["k"]) yield sink.stb.eq(p["stb"]) yield source.ack.eq(1) # if i % 2 == 0: # yield source.ack.eq(1) # else: # yield source.ack.eq(0) yield for _ in range(10): yield sink.data.eq(0) yield sink.k.eq(0) yield sink.stb.eq(0) yield sink.eop.eq(0) yield source.ack.eq(1) yield assert True def testbench(): paks = [] for i in range(1, 10): paks += [ {"data": C(i << 8 | 1, word_width), "k" : Replicate(0, 4), "stb":1}, {"data": C(i << 8 | 2, word_width), "k" : Replicate(0, 4), "stb":1}, {"data": C(i << 8 | 3, word_width), "k" : Replicate(0, 4), "stb":1}, {"data": C(i << 8 | 4, word_width), "k" : Replicate(0, 4), "stb":1}, {"data": C(0xB53C3CBC, word_width), "k" : C(0b0111, 4), "stb":0}, { "data": Replicate(KCode["pak_end"], 4), "k": Replicate(1, 4), "stb": 1, }, ] yield from packet_sim(paks) run_simulation(dut, testbench(), vcd_name="sim-cxp.vcd")