from migen import * from misoc.interconnect import stream from src.gateware.cxp_pipeline import * from sim_pipeline import * from PIL import Image import numpy as np def get_image_header( stream_id, source_tag, xsize, xoffset, ysize, yoffset, dsize, pixelF, tag_geo, flag ): stream_id = C(stream_id, char_width) source_tag = C(source_tag, 2 * char_width) xsize = C(xsize, 3 * char_width) xoffset = C(xoffset, 3 * char_width) ysize = C(ysize, 3 * char_width) yoffset = C(yoffset, 3 * char_width) dsize = C(dsize, 3 * char_width) pixelF = C(pixelF, 2 * char_width) tag_geo = C(tag_geo, 2 * char_width) flag = C(flag, char_width) assert len(stream_id) == len(flag) == char_width assert len(source_tag) == len(pixelF) == len(tag_geo) == 2 * char_width assert len(xsize) == len(xoffset) == len(ysize) == len(yoffset) == 3 * char_width return [ {"data": Replicate(KCode["stream_marker"], 4), "k": Replicate(1, 4)}, {"data": Replicate(C(0x01, char_width), 4), "k": Replicate(0, 4)}, {"data": Replicate(stream_id, 4), "k": Replicate(0, 4)}, {"data": Replicate(source_tag[8:], 4), "k": Replicate(0, 4)}, {"data": Replicate(source_tag[:8], 4), "k": Replicate(0, 4)}, {"data": Replicate(xsize[16:], 4), "k": Replicate(0, 4)}, {"data": Replicate(xsize[8:16], 4), "k": Replicate(0, 4)}, {"data": Replicate(xsize[:8], 4), "k": Replicate(0, 4)}, {"data": Replicate(xoffset[16:], 4), "k": Replicate(0, 4)}, {"data": Replicate(xoffset[8:16], 4), "k": Replicate(0, 4)}, {"data": Replicate(xoffset[:8], 4), "k": Replicate(0, 4)}, {"data": Replicate(ysize[16:], 4), "k": Replicate(0, 4)}, {"data": Replicate(ysize[8:16], 4), "k": Replicate(0, 4)}, {"data": Replicate(ysize[:8], 4), "k": Replicate(0, 4)}, {"data": Replicate(yoffset[16:], 4), "k": Replicate(0, 4)}, {"data": Replicate(yoffset[8:16], 4), "k": Replicate(0, 4)}, {"data": Replicate(yoffset[:8], 4), "k": Replicate(0, 4)}, {"data": Replicate(dsize[16:], 4), "k": Replicate(0, 4)}, {"data": Replicate(dsize[8:16], 4), "k": Replicate(0, 4)}, {"data": Replicate(dsize[:8], 4), "k": Replicate(0, 4)}, {"data": Replicate(pixelF[8:], 4), "k": Replicate(0, 4)}, {"data": Replicate(pixelF[:8], 4), "k": Replicate(0, 4)}, {"data": Replicate(tag_geo[8:], 4), "k": Replicate(0, 4)}, {"data": Replicate(tag_geo[:8], 4), "k": Replicate(0, 4)}, {"data": Replicate(flag, 4), "k": Replicate(0, 4)}, ] def get_line_marker(): return [ {"data": Replicate(KCode["stream_marker"], 4), "k": Replicate(1, 4)}, {"data": Replicate(C(0x02, char_width), 4), "k": Replicate(0, 4)}, ] def get_frame_packet(stream_id, pixel_format="mono16"): assert pixel_format in ["mono16"] arr = [[4156, 31078], [65063, 24837]] source_tag = 0 xsize, ysize = len(arr[0]), len(arr) xoffset, yoffset = 0, 0 if pixel_format == "mono16": dsize = xsize // 2 pixelF = 0x0105 tag_geo = 0 flag = 0 packet = [] # Image header packet += get_image_header( stream_id, source_tag, xsize, xoffset, ysize, yoffset, dsize, pixelF, tag_geo, flag, ) for line in arr: packet += get_line_marker() if pixel_format == "mono16": for i in range(len(line)): if (i % 2) == 0: if i == len(line) - 1: print(C(line[i])) packet += [ { "data": C(line[i], 4 * char_width), "k": Replicate(0, 4), }, ] else: print(C(line[i], 2 * char_width), C(line[i + 1])) # CXP use MSB packet += [ { "data": Cat( C(line[i], 2 * char_width), C(line[i + 1], 2 * char_width), ), "k": Replicate(0, 4), }, ] return packet