diff --git a/default.nix b/default.nix index dd93dcd..0a0f120 100644 --- a/default.nix +++ b/default.nix @@ -15,6 +15,20 @@ let propagatedBuildInputs = with pkgs.python3Packages; [ colorama ]; }; + misoc = pkgs.python3Packages.buildPythonPackage rec { + name = "misoc"; + + src = pkgs.fetchFromGitHub { + owner = "m-labs"; + repo = "misoc"; + rev = "0cf0ebb7d4f56cc6d44a3dea3e386efab9d82419"; + sha256 = "sha256-TI0agjSSMJtH4mgAMpSO128zxcwSo/AjY1B6AW7zBQQ="; + fetchSubmodules = true; + }; + + propagatedBuildInputs = with pkgs.python3Packages; [ jinja2 numpy pyserial asyncserial ] ++ [ migen ]; + }; + vivadoDeps = pkgs: with pkgs; [ libxcrypt ncurses5 @@ -47,6 +61,7 @@ in pkgs.mkShell { name = "UART-Testing"; buildInputs = [ migen + misoc pkgs.python3Packages.pyserial vivado vivadoEnv diff --git a/multi_coders.py b/multi_coders.py new file mode 100644 index 0000000..2561ac7 --- /dev/null +++ b/multi_coders.py @@ -0,0 +1,123 @@ +from migen import * +from misoc.cores.code_8b10b import SingleEncoder, Decoder + + +class MultiEncoder(Module): + # For 4-channel EEM + # Thus, no need for words parameter + # It is always a 2-wds -> 4-ch conversion + # Implement using crossbar, so we don't need to do clock domain conversion + # while maintaining proper disparity + def __init__(self, lsb_first=False): + WORDS = 2 + # Keep the link layer interface identical to standard encoders + self.d = [Signal(8) for _ in range(WORDS)] + self.k = [Signal() for _ in range(WORDS)] + + # Output interface is simplified because we have custom physical layer + self.output = [Signal(10) for _ in range(WORDS)] + + # Module start signal + self.start = Signal() + + # lsb_first should not be set set + # But technically if the same setting is reflected on the decoder + # It shouldn't be an issue + if lsb_first: + raise ValueError("lsb_first must not be set") + + # Phase of the encoder + # Alternate crossbar between encoder and SERDES every cycle + phase = Signal() + + # Intermediate registers for output and disparity + # More significant bits are buffered due to channel geometry + # Disparity bit is delayed. The same encoder is shared by 2 SERDES + output_bufs = [Signal(5) for _ in range(WORDS)] + disp_bufs = [Signal() for _ in range(WORDS)] + + encoders = [SingleEncoder(lsb_first) for _ in range(WORDS)] + self.submodules += encoders + + for d, k, output, output_buf, disp_buf, encoder in \ + zip(self.d, self.k, self.output, output_bufs, disp_bufs, encoders): + self.comb += [ + encoder.d.eq(d), + encoder.k.eq(k), + encoder.disp_in.eq(disp_buf), + # Implementing switching crossbar + If(phase, + output.eq(Cat(encoder.output[0:5], output_buf)) + ).Else( + output.eq(Cat(output_buf, encoder.output[0:5])) + ), + ] + # Handle intermediate registers + self.sync += [ + disp_buf.eq(encoder.disp_out), + output_buf.eq(encoder.output[5:10]), + ] + + aligned = Signal() + + self.sync += [ + # Phase switching + phase.eq(~phase), + If(~aligned & self.start, + aligned.eq(1), + # Later statements take precedent + phase.eq(0), + ), + ] + + +# Unlike the usual 8b10b decoder, it needs to know which SERDES to decode +class CrossbarDecoder(Module): + def __init__(self, lsb_first=False): + self.raw_input = Signal(10) + self.d = Signal(8) + self.k = Signal() + + # Notifier signal when group alignmnet is completed + self.start = Signal() + aligned = Signal() + phase = Signal() + + # Intermediate register for input + buffer = Signal(5) + + self.submodules.decoder = Decoder(lsb_first) + + # lsb_first should not be set set + # But technically if the same setting is reflected on the decoder + # It shouldn't be an issue + if lsb_first: + raise ValueError("lsb_first must not be set") + + # Update phase & synchronous elements + self.sync += [ + phase.eq(~phase), + If(~aligned & self.start, + aligned.eq(1), + phase.eq(0), + ), + If(phase, + buffer.eq(self.raw_input[:5]), + ).Else( + buffer.eq(self.raw_input[5:]) + ), + ] + + # Send appropriate input to decoder + self.comb += [ + If(phase, + self.decoder.input.eq(Cat(buffer, self.raw_input[5:])), + ).Else( + self.decoder.input.eq(Cat(buffer, self.raw_input[:5])), + ), + ] + + self.comb += [ + self.d.eq(self.decoder.d), + self.k.eq(self.decoder.k), + ] diff --git a/test_coder.py b/test_coder.py new file mode 100644 index 0000000..d2b2e2a --- /dev/null +++ b/test_coder.py @@ -0,0 +1,103 @@ +from migen import * +from multi_coders import MultiEncoder, CrossbarDecoder + + +class IdentityCoders(Module): + def __init__(self): + self.submodules.encoder = MultiEncoder(lsb_first=False) + decoders = [ CrossbarDecoder(lsb_first=False) for _ in range(2) ] + self.submodules += decoders + + # Interface fo input/output + self.d_in = [ Signal(8) for _ in range(2) ] + self.k_in = [ Signal() for _ in range(2) ] + self.d_out = [ Signal(8) for _ in range(2) ] + self.k_out = [ Signal() for _ in range(2) ] + + # Signal to start both encoders & decoders + self.encoder_start = Signal() + self.decoder_start = Signal() + self.comb += self.encoder.start.eq(self.encoder_start) + for decoder in decoders: + self.comb += decoder.start.eq(self.decoder_start) + + # Interconnect encoders and decoders + for encoder_output, decoder in zip(self.encoder.output, decoders): + self.sync += decoder.raw_input.eq(encoder_output) + + for d_in, k_in, encoder_d, encoder_k in \ + zip(self.d_in, self.k_in, self.encoder.d, self.encoder.k): + self.comb += [ + # Connect symbols to encoder + encoder_d.eq(d_in), + encoder_k.eq(k_in), + ] + + # Connect symbols from decoder + for d_out, k_out, decoder in zip(self.d_out, self.k_out, decoders): + self.comb += [ + d_out.eq(decoder.d), + k_out.eq(decoder.k), + ] + + +import random + + +def testbench(dut, transmission_delay=1): + data_size = 256 + list_of_data = [ + (random.randint(0, 0xFF), random.getrandbits(1)) \ + for _ in range(data_size) + ] + # Control characters + controls = [ + 0x1C, 0x3C, 0x5C, 0x7C, 0x9C, 0xBC, 0xDC, 0xFC, + 0xF7, 0xFB, 0xFD, 0xFE + ] + + # Correct control symbols + for idx, (data, control) in enumerate(list_of_data): + if control: + list_of_data[idx] = (controls[random.randint(0, len(controls) - 1)], 1) + # Decoder, Encoder, and the channel all introduces delay + delay_list = [ None ] * 5 + + send_list = list_of_data + delay_list + recv_list = delay_list + list_of_data + + yield dut.encoder_start.eq(1) + # Skip exactly 1 cycle. The channel has 1 cycle delay. + yield + yield dut.decoder_start.eq(1) + + for _ in range(transmission_delay): + yield + + for data_in, data_out in zip(send_list, recv_list): + if data_in is not None: + d_in, k_in = data_in + yield dut.d_in[0].eq(d_in) + yield dut.d_in[1].eq(d_in) + yield dut.k_in[0].eq(k_in) + yield dut.k_in[1].eq(k_in) + else: + yield dut.d_in[0].eq(0) + yield dut.d_in[1].eq(0) + yield dut.k_in[0].eq(0) + yield dut.k_in[1].eq(0) + + if data_out is not None: + d_out, k_out = data_out + assert (yield dut.d_out[0]) == d_out + assert (yield dut.d_out[1]) == d_out + assert (yield dut.k_out[0]) == k_out + assert (yield dut.k_out[1]) == k_out + yield + + for _ in range(10): + yield + +for delay in range(16): + dut = IdentityCoders() + run_simulation(dut, testbench(dut, delay), vcd_name="coders.vcd")