104 lines
3.3 KiB
Python
104 lines
3.3 KiB
Python
from migen import *
|
|
from multi_coders import MultiEncoder, CrossbarDecoder
|
|
|
|
|
|
class IdentityCoders(Module):
|
|
def __init__(self):
|
|
self.submodules.encoder = MultiEncoder(lsb_first=False)
|
|
decoders = [ CrossbarDecoder(lsb_first=False) for _ in range(2) ]
|
|
self.submodules += decoders
|
|
|
|
# Interface fo input/output
|
|
self.d_in = [ Signal(8) for _ in range(2) ]
|
|
self.k_in = [ Signal() for _ in range(2) ]
|
|
self.d_out = [ Signal(8) for _ in range(2) ]
|
|
self.k_out = [ Signal() for _ in range(2) ]
|
|
|
|
# Signal to start both encoders & decoders
|
|
self.encoder_start = Signal()
|
|
self.decoder_start = Signal()
|
|
self.comb += self.encoder.start.eq(self.encoder_start)
|
|
for decoder in decoders:
|
|
self.comb += decoder.start.eq(self.decoder_start)
|
|
|
|
# Interconnect encoders and decoders
|
|
for encoder_output, decoder in zip(self.encoder.output, decoders):
|
|
self.sync += decoder.raw_input.eq(encoder_output)
|
|
|
|
for d_in, k_in, encoder_d, encoder_k in \
|
|
zip(self.d_in, self.k_in, self.encoder.d, self.encoder.k):
|
|
self.comb += [
|
|
# Connect symbols to encoder
|
|
encoder_d.eq(d_in),
|
|
encoder_k.eq(k_in),
|
|
]
|
|
|
|
# Connect symbols from decoder
|
|
for d_out, k_out, decoder in zip(self.d_out, self.k_out, decoders):
|
|
self.comb += [
|
|
d_out.eq(decoder.d),
|
|
k_out.eq(decoder.k),
|
|
]
|
|
|
|
|
|
import random
|
|
|
|
|
|
def testbench(dut, transmission_delay=1):
|
|
data_size = 256
|
|
list_of_data = [
|
|
(random.randint(0, 0xFF), random.getrandbits(1)) \
|
|
for _ in range(data_size)
|
|
]
|
|
# Control characters
|
|
controls = [
|
|
0x1C, 0x3C, 0x5C, 0x7C, 0x9C, 0xBC, 0xDC, 0xFC,
|
|
0xF7, 0xFB, 0xFD, 0xFE
|
|
]
|
|
|
|
# Correct control symbols
|
|
for idx, (data, control) in enumerate(list_of_data):
|
|
if control:
|
|
list_of_data[idx] = (controls[random.randint(0, len(controls) - 1)], 1)
|
|
# Decoder, Encoder, and the channel all introduces delay
|
|
delay_list = [ None ] * 5
|
|
|
|
send_list = list_of_data + delay_list
|
|
recv_list = delay_list + list_of_data
|
|
|
|
yield dut.encoder_start.eq(1)
|
|
# Skip exactly 1 cycle. The channel has 1 cycle delay.
|
|
yield
|
|
yield dut.decoder_start.eq(1)
|
|
|
|
for _ in range(transmission_delay):
|
|
yield
|
|
|
|
for data_in, data_out in zip(send_list, recv_list):
|
|
if data_in is not None:
|
|
d_in, k_in = data_in
|
|
yield dut.d_in[0].eq(d_in)
|
|
yield dut.d_in[1].eq(d_in)
|
|
yield dut.k_in[0].eq(k_in)
|
|
yield dut.k_in[1].eq(k_in)
|
|
else:
|
|
yield dut.d_in[0].eq(0)
|
|
yield dut.d_in[1].eq(0)
|
|
yield dut.k_in[0].eq(0)
|
|
yield dut.k_in[1].eq(0)
|
|
|
|
if data_out is not None:
|
|
d_out, k_out = data_out
|
|
assert (yield dut.d_out[0]) == d_out
|
|
assert (yield dut.d_out[1]) == d_out
|
|
assert (yield dut.k_out[0]) == k_out
|
|
assert (yield dut.k_out[1]) == k_out
|
|
yield
|
|
|
|
for _ in range(10):
|
|
yield
|
|
|
|
for delay in range(16):
|
|
dut = IdentityCoders()
|
|
run_simulation(dut, testbench(dut, delay), vcd_name="coders.vcd")
|