from migen import * from misoc.interconnect import stream from sim_pipeline import * from sim_generator import CXPCRC32Inserter from sim_frame_gen import get_frame_packet from src.gateware.cxp_pipeline import * import numpy as np from PIL import Image class Frame(Module): def __init__(self): # to construct correct crc and ack/stb signal self.submodules.buffer = buffer = stream.SyncFIFO(word_layout, 32) self.submodules.crc_inserter = crc_inserter = CXPCRC32Inserter() self.submodules.dchar_decoder = dchar_decoder = Duplicated_Char_Decoder() self.submodules.stream_pipe = stream_pipe = Stream_Pipeline() pipeline = [buffer, crc_inserter, dchar_decoder, stream_pipe] for s, d in zip(pipeline, pipeline[1:]): self.comb += s.source.connect(d.sink) self.sink = pipeline[0].sink self.source = pipeline[-1].source # no backpressure for sim self.sync += self.source.ack.eq(1) dut = Frame() def check_case(packet=[]): print("=================TEST========================") sink = dut.sink stream_pipe = dut.stream_pipe for i, p in enumerate(packet): yield sink.data.eq(p["data"]) yield sink.k.eq(p["k"]) yield sink.stb.eq(1) if "eop" in p: yield sink.eop.eq(1) else: yield sink.eop.eq(0) # check cycle result yield # source = dut.dchar_decoder.source # source = dut.stream_pipe.frame_extractor.sink source = dut.sink # print( # f"\nCYCLE#{i} : source char = {yield source.data:#X} k = {yield source.k:#X} stb = {yield source.stb} ack = {yield source.ack} eop = {yield source.eop}" # ) # extra clk cycles cyc = i + 1 img = [] line = -1 total_pixel = 1000 for i in range(cyc, cyc + total_pixel): yield sink.data.eq(0) yield sink.k.eq(0) yield sink.stb.eq(0) yield sink.eop.eq(0) yield # print( # f"\nCYCLE#{i} : source char = {yield source.data:#X} k = {yield source.k:#X} stb = {yield source.stb} ack = {yield source.ack} eop = {yield source.eop}" # ) frame_extractoer = dut.stream_pipe.frame_extractor new_line = yield frame_extractoer.new_line if new_line: img.append([]) line += 1 stb = yield frame_extractoer.source.stb data = yield frame_extractoer.source.data if stb: # CXP use MSB img[line].append(np.uint16(data & 0xFFFF)) img[line].append(np.uint16(data >> 16)) # metadata = dut.stream_pipe.frame_extractor.metadata # img_header_layout = [ # "stream_id", # "source_tag", # "x_size", # "x_offset", # "y_size", # "y_offset", # "l_size", # number of data words per image line # "pixel_format", # "tap_geo", # "flag", # ] # for name in img_header_layout: # print(f"{name} = {yield getattr(metadata, name):#04X} ", end="") # print() Image.fromarray(np.array(img, dtype=np.uint8)).show() assert True def testbench(): stream_id = 0x69 packet_tag = 0 frame_packet = get_frame_packet(stream_id) packet = [ {"data": Replicate(C(stream_id, char_width), 4), "k": Replicate(0, 4)}, {"data": Replicate(C(packet_tag, char_width), 4), "k": Replicate(0, 4)}, { "data": Replicate(C(len(frame_packet), 2*char_width)[8:], 4), "k": Replicate(0, 4), }, { "data": Replicate(C(len(frame_packet), 2*char_width)[:8], 4), "k": Replicate(0, 4), }, ] packet += frame_packet # NOTE: for crc inserter!!!! packet[-1]["eop"] = 0 yield from check_case(packet) run_simulation(dut, testbench(), vcd_name="sim-cxp.vcd")