from migen import * from multi_coders import MultiEncoder, CrossbarDecoder class IdentityCoders(Module): def __init__(self): self.submodules.encoder = MultiEncoder(lsb_first=False) decoders = [ CrossbarDecoder(lsb_first=False) for _ in range(2) ] self.submodules += decoders # Interface fo input/output self.d_in = [ Signal(8) for _ in range(2) ] self.k_in = [ Signal() for _ in range(2) ] self.d_out = [ Signal(8) for _ in range(2) ] self.k_out = [ Signal() for _ in range(2) ] # Signal to start both encoders & decoders self.encoder_start = Signal() self.decoder_start = Signal() self.comb += self.encoder.start.eq(self.encoder_start) for decoder in decoders: self.comb += decoder.start.eq(self.decoder_start) # Interconnect encoders and decoders for encoder_output, decoder in zip(self.encoder.output, decoders): self.sync += decoder.raw_input.eq(encoder_output) for d_in, k_in, encoder_d, encoder_k in \ zip(self.d_in, self.k_in, self.encoder.d, self.encoder.k): self.comb += [ # Connect symbols to encoder encoder_d.eq(d_in), encoder_k.eq(k_in), ] # Connect symbols from decoder for d_out, k_out, decoder in zip(self.d_out, self.k_out, decoders): self.comb += [ d_out.eq(decoder.d), k_out.eq(decoder.k), ] import random def testbench(dut, transmission_delay=1): data_size = 256 list_of_data = [ (random.randint(0, 0xFF), random.getrandbits(1)) \ for _ in range(data_size) ] # Control characters controls = [ 0x1C, 0x3C, 0x5C, 0x7C, 0x9C, 0xBC, 0xDC, 0xFC, 0xF7, 0xFB, 0xFD, 0xFE ] # Correct control symbols for idx, (data, control) in enumerate(list_of_data): if control: list_of_data[idx] = (controls[random.randint(0, len(controls) - 1)], 1) # Decoder, Encoder, and the channel all introduces delay delay_list = [ None ] * 5 send_list = list_of_data + delay_list recv_list = delay_list + list_of_data yield dut.encoder_start.eq(1) # Skip exactly 1 cycle. The channel has 1 cycle delay. yield yield dut.decoder_start.eq(1) for _ in range(transmission_delay): yield for data_in, data_out in zip(send_list, recv_list): if data_in is not None: d_in, k_in = data_in yield dut.d_in[0].eq(d_in) yield dut.d_in[1].eq(d_in) yield dut.k_in[0].eq(k_in) yield dut.k_in[1].eq(k_in) else: yield dut.d_in[0].eq(0) yield dut.d_in[1].eq(0) yield dut.k_in[0].eq(0) yield dut.k_in[1].eq(0) if data_out is not None: d_out, k_out = data_out assert (yield dut.d_out[0]) == d_out assert (yield dut.d_out[1]) == d_out assert (yield dut.k_out[0]) == k_out assert (yield dut.k_out[1]) == k_out yield for _ in range(10): yield for delay in range(16): dut = IdentityCoders() run_simulation(dut, testbench(dut, delay), vcd_name="coders.vcd")