init source coding
This commit is contained in:
parent
e8a178a111
commit
b215dd1b57
15
default.nix
15
default.nix
|
@ -15,6 +15,20 @@ let
|
|||
propagatedBuildInputs = with pkgs.python3Packages; [ colorama ];
|
||||
};
|
||||
|
||||
misoc = pkgs.python3Packages.buildPythonPackage rec {
|
||||
name = "misoc";
|
||||
|
||||
src = pkgs.fetchFromGitHub {
|
||||
owner = "m-labs";
|
||||
repo = "misoc";
|
||||
rev = "0cf0ebb7d4f56cc6d44a3dea3e386efab9d82419";
|
||||
sha256 = "sha256-TI0agjSSMJtH4mgAMpSO128zxcwSo/AjY1B6AW7zBQQ=";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
propagatedBuildInputs = with pkgs.python3Packages; [ jinja2 numpy pyserial asyncserial ] ++ [ migen ];
|
||||
};
|
||||
|
||||
vivadoDeps = pkgs: with pkgs; [
|
||||
libxcrypt
|
||||
ncurses5
|
||||
|
@ -47,6 +61,7 @@ in pkgs.mkShell {
|
|||
name = "UART-Testing";
|
||||
buildInputs = [
|
||||
migen
|
||||
misoc
|
||||
pkgs.python3Packages.pyserial
|
||||
vivado
|
||||
vivadoEnv
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
from migen import *
|
||||
from misoc.cores.code_8b10b import SingleEncoder, Decoder
|
||||
|
||||
|
||||
class MultiEncoder(Module):
|
||||
# For 4-channel EEM
|
||||
# Thus, no need for words parameter
|
||||
# It is always a 2-wds -> 4-ch conversion
|
||||
# Implement using crossbar, so we don't need to do clock domain conversion
|
||||
# while maintaining proper disparity
|
||||
def __init__(self, lsb_first=False):
|
||||
WORDS = 2
|
||||
# Keep the link layer interface identical to standard encoders
|
||||
self.d = [Signal(8) for _ in range(WORDS)]
|
||||
self.k = [Signal() for _ in range(WORDS)]
|
||||
|
||||
# Output interface is simplified because we have custom physical layer
|
||||
self.output = [Signal(10) for _ in range(WORDS)]
|
||||
|
||||
# Module start signal
|
||||
self.start = Signal()
|
||||
|
||||
# lsb_first should not be set set
|
||||
# But technically if the same setting is reflected on the decoder
|
||||
# It shouldn't be an issue
|
||||
if lsb_first:
|
||||
raise ValueError("lsb_first must not be set")
|
||||
|
||||
# Phase of the encoder
|
||||
# Alternate crossbar between encoder and SERDES every cycle
|
||||
phase = Signal()
|
||||
|
||||
# Intermediate registers for output and disparity
|
||||
# More significant bits are buffered due to channel geometry
|
||||
# Disparity bit is delayed. The same encoder is shared by 2 SERDES
|
||||
output_bufs = [Signal(5) for _ in range(WORDS)]
|
||||
disp_bufs = [Signal() for _ in range(WORDS)]
|
||||
|
||||
encoders = [SingleEncoder(lsb_first) for _ in range(WORDS)]
|
||||
self.submodules += encoders
|
||||
|
||||
for d, k, output, output_buf, disp_buf, encoder in \
|
||||
zip(self.d, self.k, self.output, output_bufs, disp_bufs, encoders):
|
||||
self.comb += [
|
||||
encoder.d.eq(d),
|
||||
encoder.k.eq(k),
|
||||
encoder.disp_in.eq(disp_buf),
|
||||
# Implementing switching crossbar
|
||||
If(phase,
|
||||
output.eq(Cat(encoder.output[0:5], output_buf))
|
||||
).Else(
|
||||
output.eq(Cat(output_buf, encoder.output[0:5]))
|
||||
),
|
||||
]
|
||||
# Handle intermediate registers
|
||||
self.sync += [
|
||||
disp_buf.eq(encoder.disp_out),
|
||||
output_buf.eq(encoder.output[5:10]),
|
||||
]
|
||||
|
||||
aligned = Signal()
|
||||
|
||||
self.sync += [
|
||||
# Phase switching
|
||||
phase.eq(~phase),
|
||||
If(~aligned & self.start,
|
||||
aligned.eq(1),
|
||||
# Later statements take precedent
|
||||
phase.eq(0),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
# Unlike the usual 8b10b decoder, it needs to know which SERDES to decode
|
||||
class CrossbarDecoder(Module):
|
||||
def __init__(self, lsb_first=False):
|
||||
self.raw_input = Signal(10)
|
||||
self.d = Signal(8)
|
||||
self.k = Signal()
|
||||
|
||||
# Notifier signal when group alignmnet is completed
|
||||
self.start = Signal()
|
||||
aligned = Signal()
|
||||
phase = Signal()
|
||||
|
||||
# Intermediate register for input
|
||||
buffer = Signal(5)
|
||||
|
||||
self.submodules.decoder = Decoder(lsb_first)
|
||||
|
||||
# lsb_first should not be set set
|
||||
# But technically if the same setting is reflected on the decoder
|
||||
# It shouldn't be an issue
|
||||
if lsb_first:
|
||||
raise ValueError("lsb_first must not be set")
|
||||
|
||||
# Update phase & synchronous elements
|
||||
self.sync += [
|
||||
phase.eq(~phase),
|
||||
If(~aligned & self.start,
|
||||
aligned.eq(1),
|
||||
phase.eq(0),
|
||||
),
|
||||
If(phase,
|
||||
buffer.eq(self.raw_input[:5]),
|
||||
).Else(
|
||||
buffer.eq(self.raw_input[5:])
|
||||
),
|
||||
]
|
||||
|
||||
# Send appropriate input to decoder
|
||||
self.comb += [
|
||||
If(phase,
|
||||
self.decoder.input.eq(Cat(buffer, self.raw_input[5:])),
|
||||
).Else(
|
||||
self.decoder.input.eq(Cat(buffer, self.raw_input[:5])),
|
||||
),
|
||||
]
|
||||
|
||||
self.comb += [
|
||||
self.d.eq(self.decoder.d),
|
||||
self.k.eq(self.decoder.k),
|
||||
]
|
|
@ -0,0 +1,103 @@
|
|||
from migen import *
|
||||
from multi_coders import MultiEncoder, CrossbarDecoder
|
||||
|
||||
|
||||
class IdentityCoders(Module):
|
||||
def __init__(self):
|
||||
self.submodules.encoder = MultiEncoder(lsb_first=False)
|
||||
decoders = [ CrossbarDecoder(lsb_first=False) for _ in range(2) ]
|
||||
self.submodules += decoders
|
||||
|
||||
# Interface fo input/output
|
||||
self.d_in = [ Signal(8) for _ in range(2) ]
|
||||
self.k_in = [ Signal() for _ in range(2) ]
|
||||
self.d_out = [ Signal(8) for _ in range(2) ]
|
||||
self.k_out = [ Signal() for _ in range(2) ]
|
||||
|
||||
# Signal to start both encoders & decoders
|
||||
self.encoder_start = Signal()
|
||||
self.decoder_start = Signal()
|
||||
self.comb += self.encoder.start.eq(self.encoder_start)
|
||||
for decoder in decoders:
|
||||
self.comb += decoder.start.eq(self.decoder_start)
|
||||
|
||||
# Interconnect encoders and decoders
|
||||
for encoder_output, decoder in zip(self.encoder.output, decoders):
|
||||
self.sync += decoder.raw_input.eq(encoder_output)
|
||||
|
||||
for d_in, k_in, encoder_d, encoder_k in \
|
||||
zip(self.d_in, self.k_in, self.encoder.d, self.encoder.k):
|
||||
self.comb += [
|
||||
# Connect symbols to encoder
|
||||
encoder_d.eq(d_in),
|
||||
encoder_k.eq(k_in),
|
||||
]
|
||||
|
||||
# Connect symbols from decoder
|
||||
for d_out, k_out, decoder in zip(self.d_out, self.k_out, decoders):
|
||||
self.comb += [
|
||||
d_out.eq(decoder.d),
|
||||
k_out.eq(decoder.k),
|
||||
]
|
||||
|
||||
|
||||
import random
|
||||
|
||||
|
||||
def testbench(dut, transmission_delay=1):
|
||||
data_size = 256
|
||||
list_of_data = [
|
||||
(random.randint(0, 0xFF), random.getrandbits(1)) \
|
||||
for _ in range(data_size)
|
||||
]
|
||||
# Control characters
|
||||
controls = [
|
||||
0x1C, 0x3C, 0x5C, 0x7C, 0x9C, 0xBC, 0xDC, 0xFC,
|
||||
0xF7, 0xFB, 0xFD, 0xFE
|
||||
]
|
||||
|
||||
# Correct control symbols
|
||||
for idx, (data, control) in enumerate(list_of_data):
|
||||
if control:
|
||||
list_of_data[idx] = (controls[random.randint(0, len(controls) - 1)], 1)
|
||||
# Decoder, Encoder, and the channel all introduces delay
|
||||
delay_list = [ None ] * 5
|
||||
|
||||
send_list = list_of_data + delay_list
|
||||
recv_list = delay_list + list_of_data
|
||||
|
||||
yield dut.encoder_start.eq(1)
|
||||
# Skip exactly 1 cycle. The channel has 1 cycle delay.
|
||||
yield
|
||||
yield dut.decoder_start.eq(1)
|
||||
|
||||
for _ in range(transmission_delay):
|
||||
yield
|
||||
|
||||
for data_in, data_out in zip(send_list, recv_list):
|
||||
if data_in is not None:
|
||||
d_in, k_in = data_in
|
||||
yield dut.d_in[0].eq(d_in)
|
||||
yield dut.d_in[1].eq(d_in)
|
||||
yield dut.k_in[0].eq(k_in)
|
||||
yield dut.k_in[1].eq(k_in)
|
||||
else:
|
||||
yield dut.d_in[0].eq(0)
|
||||
yield dut.d_in[1].eq(0)
|
||||
yield dut.k_in[0].eq(0)
|
||||
yield dut.k_in[1].eq(0)
|
||||
|
||||
if data_out is not None:
|
||||
d_out, k_out = data_out
|
||||
assert (yield dut.d_out[0]) == d_out
|
||||
assert (yield dut.d_out[1]) == d_out
|
||||
assert (yield dut.k_out[0]) == k_out
|
||||
assert (yield dut.k_out[1]) == k_out
|
||||
yield
|
||||
|
||||
for _ in range(10):
|
||||
yield
|
||||
|
||||
for delay in range(16):
|
||||
dut = IdentityCoders()
|
||||
run_simulation(dut, testbench(dut, delay), vcd_name="coders.vcd")
|
Loading…
Reference in New Issue