commit 7f23de0ce81670d06408ba238a94a9e93943a863 Author: occheung Date: Sun Apr 23 11:42:18 2023 +0800 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ff57468 --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +build +__pycache__ +test +*.vcd +*.cfg +*.txt \ No newline at end of file diff --git a/bscan_spi_xc7a100t.bit b/bscan_spi_xc7a100t.bit new file mode 100644 index 0000000..c456de3 Binary files /dev/null and b/bscan_spi_xc7a100t.bit differ diff --git a/buffer.py b/buffer.py new file mode 100644 index 0000000..046b569 --- /dev/null +++ b/buffer.py @@ -0,0 +1,26 @@ +from migen import * +from migen.genlib.fifo import SyncFIFO + + +class Buffer(Module): + def __init__(self, width=8, depth=128): + self.din = Signal(width) + self.i_stb = Signal() + + self.dout = Signal(width) + self.o_stb = Signal() + self.o_ack = Signal() + + # Underlying data structure + self.submodules.fifo = SyncFIFO(width, depth) + + self.comb += [ + # TX path + self.o_stb.eq(self.fifo.readable), + self.dout.eq(self.fifo.dout), + self.fifo.re.eq(self.o_ack), + + # RX path + self.fifo.we.eq(self.i_stb), + self.fifo.din.eq(self.din), + ] diff --git a/comm.py b/comm.py new file mode 100644 index 0000000..03f72e7 --- /dev/null +++ b/comm.py @@ -0,0 +1,22 @@ +import serial + + +def main(): + comm = serial.Serial("/dev/ttyUSB3", 115200) + # comm.write(b"Hello World!") + + # for _ in range(32): + while True: + byte = comm.read(2) + print(f'{byte[0]:0>8b}' + f'{byte[1]:0>8b}') + byte = comm.read(1) + print(f'{byte[0]:0>8b}') + # cached_byte = None + # while True: + # byte = comm.read(1) + # if byte != cached_byte: + # cached_byte = byte + # print(f'{byte[0]:0>8b}') + +if __name__ == "__main__": + main() diff --git a/default.nix b/default.nix new file mode 100644 index 0000000..8253755 --- /dev/null +++ b/default.nix @@ -0,0 +1,55 @@ +{ pkgs ? import {} }: + +let + migen = pkgs.python3Packages.buildPythonPackage rec { + name = "migen"; + + src = pkgs.fetchFromGitHub { + owner = "m-labs"; + repo = "migen"; + rev = "7bc4eb1387b39159a74c1dbd1b820728e0bfbbaa"; + sha256 = "039jk8y7f0vhr32svg3nd23i88c0bhws8ngxwk9bdznfxvhiy1h6"; + fetchSubmodules = true; + }; + + propagatedBuildInputs = with pkgs.python3Packages; [ colorama ]; + }; + + vivadoDeps = pkgs: with pkgs; [ + libxcrypt + ncurses5 + zlib + libuuid + xorg.libSM + xorg.libICE + xorg.libXrender + xorg.libX11 + xorg.libXext + xorg.libXtst + xorg.libXi + freetype + fontconfig + ]; + + vivadoEnv = pkgs.buildFHSUserEnv { + name = "vivado-env"; + targetPkgs = vivadoDeps; + }; + + vivado = pkgs.buildFHSUserEnv { + name = "vivado"; + targetPkgs = vivadoDeps; + profile = "set -e; source /opt/Xilinx/Vivado/2022.2/settings64.sh"; + runScript = "vivado"; + }; + +in pkgs.mkShell { + name = "UART-Testing"; + buildInputs = [ + migen + pkgs.python3Packages.pyserial + vivado + vivadoEnv + pkgs.python3Packages.numpy + ]; +} diff --git a/eem_helpers.py b/eem_helpers.py new file mode 100644 index 0000000..91b7bfb --- /dev/null +++ b/eem_helpers.py @@ -0,0 +1,29 @@ +from migen import * +from migen.build.generic_platform import * + + +def _eem_signal(i): + n = "d{}".format(i) + if i == 0: + n += "_cc" + return n + + +def _eem_pin(eem, i, pol): + return "eem{}:{}_{}".format(eem, _eem_signal(i), pol) + + +def default_iostandard(eem): + return IOStandard("LVDS_25") + + +def diff_io(eem, iostandard=default_iostandard): + return [("dio{}".format(eem), i, + Subsignal("p", Pins(_eem_pin(eem, i, "p"))), + Subsignal("n", Pins(_eem_pin(eem, i, "n"))), + iostandard(eem)) + for i in range(8)] + + +def generate_pads(platform, eem): + platform.add_extension(diff_io(eem)) diff --git a/io_loopback.py b/io_loopback.py new file mode 100644 index 0000000..bbb7df4 --- /dev/null +++ b/io_loopback.py @@ -0,0 +1,41 @@ +from migen import * + + +class IOLoopBack(Module): + def __init__(self, pads): + self.o = Signal(4) + self.i = Signal(4) + self.t = Signal(4) + + for i in range(4): + self.specials += Instance("IOBUFDS", + o_O=self.o[i], + io_IO=pads[i].p, + io_IOB=pads[i].n, + i_I=self.i[i], + # Always enable input buffer, so it is actually a loop back + i_T=self.t[i], + ) + + +class SingleIOLoopback(Module): + def __init__(self, pad): + self.o = Signal() + self.i = Signal() + self.t = Signal() + + self.specials += Instance( + # "IOBUFDS_DCIEN", + "IOBUFDS", + # p_DIFF_TERM="TRUE", + # p_IBUF_LOW_PWR="TRUE", + # p_USE_IBUFDISABLE="TRUE", + o_O=self.o, + io_IO=pad.p, + io_IOB=pad.n, + i_I=self.i, + # Always enable input buffer, so it is actually a loop back + i_T=self.t, + # i_IBUFDISABLE=~self.t, + # i_DCITERMDISABLE=~self.t, + ) diff --git a/kasli_crg.py b/kasli_crg.py new file mode 100644 index 0000000..50407bf --- /dev/null +++ b/kasli_crg.py @@ -0,0 +1,125 @@ +from migen import * +from migen.build.platforms.sinara import kasli +from migen.genlib.resetsync import AsyncResetSynchronizer + + +class KasliCRG(Module): + def __init__(self, platform): + self.platform = platform + + # Generated clock domains + self.clock_domains.cd_sys = ClockDomain() + self.clock_domains.cd_sys5x = ClockDomain() + self.clock_domains.cd_rx_sys = ClockDomain() + self.clock_domains.cd_rx_sys5x = ClockDomain() + self.clock_domains.cd_clk200 = ClockDomain() + + # Configure system clock using GTP ports + self.sys_clk_freq = 125e6 + clk125 = self.platform.request("clk125_gtp") + clk125_buf = Signal() + clk125_div2 = Signal() + + self.specials += Instance("IBUFDS_GTE2", + i_CEB=0, + i_I=clk125.p, i_IB=clk125.n, + o_O=clk125_buf, + o_ODIV2=clk125_div2) + + # MMCM to generate different frequencies + mmcm_fb = Signal() + mmcm_locked = Signal() + mmcm_sys = Signal() + mmcm_sys5x = Signal() + mmcm_rx_sys = Signal() + mmcm_rx_sys5x = Signal() + + # PLL to IDELAYCTRL clock + pll_locked = Signal() + pll_fb = Signal() + pll_clk200 = Signal() + + # Actual MMCM/PLL instances + self.specials += [ + Instance("MMCME2_BASE", + p_CLKIN1_PERIOD=16.0, + i_CLKIN1=clk125_div2, + + i_CLKFBIN=mmcm_fb, + o_CLKFBOUT=mmcm_fb, + o_LOCKED=mmcm_locked, + + # VCO @ 1.25GHz with MULT=20 + p_CLKFBOUT_MULT_F=20.0, p_DIVCLK_DIVIDE=1, + + # ~125MHz + p_CLKOUT0_DIVIDE_F=10.0, p_CLKOUT0_PHASE=0.0, o_CLKOUT0=mmcm_sys, + + # ~625MHz + p_CLKOUT1_DIVIDE=2, p_CLKOUT1_PHASE=0.0, o_CLKOUT1=mmcm_sys5x, + + # ~125MHz separated from sysclk, for RX + p_CLKOUT2_DIVIDE=10, p_CLKOUT2_PHASE=0.0, o_CLKOUT2=mmcm_rx_sys, + + # ~625MHz separated from sysclk, for RX + p_CLKOUT3_DIVIDE=2, p_CLKOUT3_PHASE=0.0, o_CLKOUT3=mmcm_rx_sys5x, + + # Leftovers... + # p_CLKOUT2_DIVIDE=2, p_CLKOUT2_PHASE=90.0, o_CLKOUT2=mmcm_sys4x_dqs, + ), + Instance("PLLE2_BASE", + p_CLKIN1_PERIOD=16.0, + i_CLKIN1=clk125_div2, + + i_CLKFBIN=pll_fb, + o_CLKFBOUT=pll_fb, + o_LOCKED=pll_locked, + + # VCO @ 1GHz + p_CLKFBOUT_MULT=16, p_DIVCLK_DIVIDE=1, + + # 200MHz for IDELAYCTRL + p_CLKOUT0_DIVIDE=5, p_CLKOUT0_PHASE=0.0, o_CLKOUT0=pll_clk200, + ), + ] + + self.specials += [ + Instance("BUFG", i_I=mmcm_sys, o_O=self.cd_sys.clk), + Instance("BUFG", i_I=mmcm_sys5x, o_O=self.cd_sys5x.clk), + Instance("BUFG", i_I=mmcm_rx_sys, o_O=self.cd_rx_sys.clk), + Instance("BUFG", i_I=mmcm_rx_sys5x, o_O=self.cd_rx_sys5x.clk), + Instance("BUFG", i_I=pll_clk200, o_O=self.cd_clk200.clk), + AsyncResetSynchronizer(self.cd_clk200, ~pll_locked), + ] + + reset_counter = Signal(4, reset=15) + ic_reset = Signal(reset=1) + self.sync.clk200 += \ + If(reset_counter != 0, + reset_counter.eq(reset_counter - 1) + ).Else( + ic_reset.eq(0) + ) + self.specials += Instance("IDELAYCTRL", i_REFCLK=ClockSignal("clk200"), i_RST=ic_reset) + + # Add clock costraints for all clock signals + platform.add_period_constraint(self.cd_sys.clk, 8.) + platform.add_period_constraint(self.cd_sys5x.clk, 1.6) + platform.add_period_constraint(self.cd_rx_sys.clk, 8.) + platform.add_period_constraint(self.cd_rx_sys5x.clk, 1.6) + platform.add_period_constraint(self.cd_clk200.clk, 5.) + + platform.add_platform_command( + "set_false_path -quiet " + "-through [get_pins -filter {{REF_PIN_NAME == OQ || REF_PIN_NAME == TQ}} " + "-of [get_cells -filter {{REF_NAME == OSERDESE2}}]] " + "-to [get_pins -filter {{REF_PIN_NAME == D}} " + "-of [get_cells -filter {{REF_NAME == ISERDESE2}}]]" + ) + platform.add_platform_command( + "set_false_path -quiet " + "-through [get_pins -filter {{REF_PIN_NAME == OQ || REF_PIN_NAME == TQ}} " + "-of [get_cells -filter {{REF_NAME == OSERDESE2}}]] " + "-to [get_pins -filter {{REF_PIN_NAME == DDLY}} " + "-of [get_cells -filter {{REF_NAME == ISERDESE2}}]]" + ) diff --git a/loopback.py b/loopback.py new file mode 100644 index 0000000..fc9a856 --- /dev/null +++ b/loopback.py @@ -0,0 +1,44 @@ +from migen import * +from migen.build.platforms.sinara import kasli +from migen.genlib.fifo import SyncFIFO +from uart import UART +from kasli_crg import KasliCRG + + +class UARTLoopBack(Module): + def __init__(self, sys_clk_freq): + self.uart_rx = Signal() + self.uart_tx = Signal() + + self.submodules.uart = UART(round((115200/sys_clk_freq)*2**32)) + self.comb += [ + self.uart.phy_rx.eq(self.uart_rx), + self.uart_tx.eq(self.uart.phy_tx), + ] + + # Attach buffer between UART RX --> TX + # This constitutes the loopback channel + self.submodules.buffer = SyncFIFO(8, 64) + self.comb += [ + self.buffer.din.eq(self.uart.rx_data), + self.buffer.we.eq(self.uart.rx_stb), + self.uart.tx_data.eq(self.buffer.dout), + self.uart.tx_stb.eq(self.buffer.readable), + self.buffer.re.eq(self.uart.tx_ack), + ] + + +if __name__ == "__main__": + platform = kasli.Platform(hw_rev="v2.0") + crg = KasliCRG(platform) + top = UARTLoopBack(crg.sys_clk_freq) + + # Wire up UART core to the pads + uart_pads = platform.request("serial") + top.comb += [ + top.uart_rx.eq(uart_pads.rx), + uart_pads.tx.eq(top.uart_tx), + ] + + top.submodules += crg + platform.build(top) diff --git a/serdes.py b/serdes.py new file mode 100644 index 0000000..f6a55e1 --- /dev/null +++ b/serdes.py @@ -0,0 +1,122 @@ +from migen import * + + +class SerTX(Module): + def __init__(self): + self.txdata = Signal(20) + self.ser_out = Signal(4) + self.t_out = Signal(4) + # TODO: Create T pins + + # Transmitter PHY: 4-wire + for i in range(4): + # Serialize 5 bits into each channel + + # TX SERDES + self.specials += Instance("OSERDESE2", + p_DATA_RATE_OQ="SDR", p_DATA_RATE_TQ="BUF", + p_DATA_WIDTH=5, p_TRISTATE_WIDTH=1, + p_INIT_OQ=0b00000, + o_OQ=self.ser_out[i], o_TQ=self.t_out[i], + i_RST=ResetSignal(), + i_CLK=ClockSignal("sys5x"), + i_CLKDIV=ClockSignal(), + i_D1=self.txdata[i*5 + 0], + i_D2=self.txdata[i*5 + 1], + i_D3=self.txdata[i*5 + 2], + i_D4=self.txdata[i*5 + 3], + i_D5=self.txdata[i*5 + 4], + i_TCE=1, i_OCE=1, + # TODO: Hardcode t_in? Output disable is always unnecessary? + i_T1=0) + + +class DesRX(Module): + def __init__(self): + self.rxdata = Signal(20) + self.ser_in = Signal(4) + + for i in range(4): + # Deserialize 5 bits from each channel + # RX SERDES + self.specials += [ + Instance("ISERDESE2", + p_DATA_RATE="SDR", + p_DATA_WIDTH=5, + p_INTERFACE_TYPE="NETWORKING", p_NUM_CE=1, + o_Q1=self.rxdata[i*5 + 4], + o_Q2=self.rxdata[i*5 + 3], + o_Q3=self.rxdata[i*5 + 2], + o_Q4=self.rxdata[i*5 + 1], + o_Q5=self.rxdata[i*5 + 0], + i_D=self.ser_in[i], + i_CLK=ClockSignal("rx_sys5x"), + i_CLKB=~ClockSignal("rx_sys5x"), + i_CE1=1, + i_RST=ResetSignal("rx_sys"), + i_CLKDIV=ClockSignal("rx_sys")), + + # # Tunable delay + # Instance("IDELAYE2", + # p_DELAY_SRC="IDATAIN", p_SIGNAL_PATTERN="DATA", + # p_CINVCTRL_SEL="FALSE", p_HIGH_PERFORMANCE_MODE="TRUE", p_REFCLK_FREQUENCY=200.0, + # p_PIPE_SEL="FALSE", p_IDELAY_TYPE="VARIABLE", p_IDELAY_VALUE=0, + + # i_C=ClockSignal(), + # i_LD=self._dly_sel.storage[i//8] & self._rdly_dq_rst.re, + # i_CE=self._dly_sel.storage[i//8] & self._rdly_dq_inc.re, + # i_LDPIPEEN=0, i_INC=1, + + # i_IDATAIN=dq_i_nodelay, o_DATAOUT=dq_i_delayed + # ) + ] + + +# class DoubleDesRX(Module): +# def __init__(self): +# self.rxdata = Signal(20) + +# self.rx_first_edge = Signal() +# rx_raw = Array(Signal(20) for _ in range(2)) +# self.comb += [ +# rxdata.eq(rx_raw[self.rx_first_edge]) +# ] + +# # Receiver PHY: 4-wire +# for i in range(4): +# # Deserialize 5 bits from each channel +# # With 2x oversampling + +# # RX SERDES +# self.specials += Instance("ISERDESE2", p_DATA_RATE="DDR", +# p_DATA_WIDTH=10, +# p_INTERFACE_TYPE="NETWORKING", p_NUM_CE=1, +# p_SERDES_MODE="MASTER", +# o_Q1=rx_raw[1][i*5 + 4], o_Q2=rx_raw[0][i*5 + 4], +# o_Q3=rx_raw[1][i*5 + 3], o_Q4=rx_raw[0][i*5 + 3], +# o_Q5=rx_raw[1][i*5 + 2], o_Q6=rx_raw[0][i*5 + 2], +# o_Q7=rx_raw[1][i*5 + 1], o_Q8=rx_raw[0][i*5 + 1], +# i_D=self.ser_in[i], +# # We are using 5x for SDR +# i_CLK=ClockSignal("sys5x"), +# i_CLKB=~ClockSignal("sys5x"), +# i_CE1=1, +# i_RST=ResetSignal(), +# i_CLKDIV=ClockSignal(), +# o_SHIFTOUT1=serdes_link1, +# o_SHIFTOUT2=serdes_link2) + +# self.specials += Instance("ISERDESE2", p_DATA_RATE="DDR", +# p_DATA_WIDTH=10, +# p_INTERFACE_TYPE="NETWORKING", p_NUM_CE=1, +# p_SERDES_MODE="SLAVE", +# o_Q1=rx_raw[1][i*5], o_Q2=rx_raw[0][i*5], +# i_D=self.ser_in[i], +# # We are using 5x for SDR +# i_CLK=ClockSignal("sys5x"), +# i_CLKB=~ClockSignal("sys5x"), +# i_CE1=1, +# i_RST=ResetSignal(), +# i_CLKDIV=ClockSignal(), +# i_SHIFTIN1=serdes_link1, +# i_SHIFTIN2=serdes_link2) diff --git a/serdes_loopback.py b/serdes_loopback.py new file mode 100644 index 0000000..de54de4 --- /dev/null +++ b/serdes_loopback.py @@ -0,0 +1,102 @@ +from migen import * +from serdes import * +from migen.genlib.fifo import SyncFIFO +from migen.build.platforms.sinara import kasli +from migen.genlib.misc import WaitTimer +from kasli_crg import KasliCRG +from eem_helpers import generate_pads +from uart import UART +from io_loopback import IOLoopBack + + +SEPARATOR = Constant(0b0101) + +class SerDesLoopBack(Module): + def __init__(self, io_pads, sys_clk_freq): + self.uart_rx = Signal() + self.uart_tx = Signal() + + self.submodules.uart = UART(round((115200/sys_clk_freq)*2**32)) + self.comb += [ + self.uart.phy_rx.eq(self.uart_rx), + self.uart_tx.eq(self.uart.phy_tx), + ] + + self.submodules.tx = SerTX() + self.submodules.rx = DesRX() + + # The actual channel + self.submodules.channel = IOLoopBack(io_pads) + + # # Additional timer: Only permit UART transmission when timer is not up + # self.submodules.wait_timer = WaitTimer(10) + + # Memoize the previous rxdata + self.rxdata_r = Signal(8) + + # Attach FIFO to UART TX, send rate is too slow w.r.t sysclk + self.submodules.tx_fifo = SyncFIFO(8, 64) + + self.comb += [ + # RX path: From UART to channel + # self.rx_buffer.din.eq(self.uart.rx_data), + # self.rx_buffer.we.eq(self.uart.rx_stb), + self.tx.txdata[:8].eq(Mux(self.uart.rx_stb, self.uart.rx_data, 0)), + self.tx.txdata[8:12].eq(Mux(SEPARATOR, self.uart.rx_data, 0)), + self.tx.txdata[12:].eq(Mux(self.uart.rx_stb, self.uart.rx_data, 0)), + + # Loopback channel + self.channel.i.eq(self.tx.ser_out), + self.rx.ser_in.eq(self.channel.o), + self.channel.t.eq(self.tx.t_out), + # self.rx.ser_in.eq(self.tx.ser_out), + # self.rx.ser_in[3].eq(self.tx.ser_out[3]), + + # TX path + # self.uart.tx_data.eq(self.tx_buffer.dout), + # self.uart.tx_stb.eq(self.tx_buffer.readable), + # self.tx_buffer.re.eq(self.uart.tx_ack), + self.uart.tx_data.eq(self.tx_fifo.dout), + self.uart.tx_stb.eq(self.tx_fifo.readable), + self.tx_fifo.re.eq(self.uart.tx_ack), + ] + + # Timer control + self.sync += [ + # Send data to FIFO if not repeated + If(self.rxdata_r != self.rx.rxdata[:8], + self.rxdata_r.eq(self.rx.rxdata), + self.tx_fifo.din.eq(self.rx.rxdata), + self.tx_fifo.we.eq(1) + ).Else( + self.tx_fifo.we.eq(0) + ) + ] + + # self.sync.sys5x += [ + # self.rx.ser_in.eq(self.tx.ser_out), + # ] + + +if __name__ == "__main__": + platform = kasli.Platform(hw_rev="v2.0") + + # Generate pads for the I/O blocks + eem = 0 + generate_pads(platform, eem) + pads = [ + platform.request("dio{}".format(eem), i) for i in range(4) + ] + + crg = KasliCRG(platform) + top = SerDesLoopBack(pads, crg.sys_clk_freq) + + # Wire up UART core to the pads + uart_pads = platform.request("serial") + top.comb += [ + top.uart_rx.eq(uart_pads.rx), + uart_pads.tx.eq(top.uart_tx), + ] + + top.submodules += crg + platform.build(top) diff --git a/single_serdes_loopback.py b/single_serdes_loopback.py new file mode 100644 index 0000000..f38bb2e --- /dev/null +++ b/single_serdes_loopback.py @@ -0,0 +1,164 @@ +from migen import * +from sync_serdes import * +from migen.genlib.fifo import SyncFIFO +from migen.build.platforms.sinara import kasli +from migen.genlib.misc import WaitTimer +from kasli_crg import KasliCRG +from eem_helpers import generate_pads +from uart import UART +from io_loopback import SingleIOLoopback + + +SEPARATOR = Constant(0b0101) + +class SingleSerDesLoopBack(Module): + def __init__(self, io_pad, sys_clk_freq): + self.uart_rx = Signal() + self.uart_tx = Signal() + + self.submodules.uart = UART(round((115200/sys_clk_freq)*2**32)) + self.comb += [ + self.uart.phy_rx.eq(self.uart_rx), + self.uart_tx.eq(self.uart.phy_tx), + ] + + self.submodules.tx = SingleLineTX() + self.submodules.rx = SingleLineRX() + self.submodules.phase_reader = PhaseReader() + # self.submodules.delay_optimizer = DelayOptimizer() + + # The actual channel + self.submodules.channel = SingleIOLoopback(io_pad) + + # Attach FIFO to UART TX, send rate is too slow w.r.t sysclk + self.submodules.tx_fifo = SyncFIFO(8, 64) + + self.comb += [ + # Repetitively send 0b00100 + self.tx.txdata.eq(0b00100), + + # Loopback channel + self.channel.i.eq(self.tx.ser_out), + self.rx.ser_in_no_dly.eq(self.channel.o), + self.channel.t.eq(self.tx.t_out), + + # TX path + self.uart.tx_data.eq(self.tx_fifo.dout), + self.uart.tx_stb.eq(self.tx_fifo.readable), + self.tx_fifo.re.eq(self.uart.tx_ack), + ] + + # Route deserializer to phase_reader & the delay tap optimizer + self.comb += [ + # Start the reader initially + self.phase_reader.start.eq(1), + # Delay tap optimizer will start after the reader is done + # self.delay_optimizer.start.eq(0), + + # RXDATA for both reader and optimzer + self.phase_reader.loopback_rxdata.eq(self.rx.rxdata), + # TODO: Reconnet + # self.delay_optimizer.loopback_rxdata.eq(self.rx.rxdata), + + # Delay tap value + self.phase_reader.delay_tap.eq(self.rx.cnt_out), + # TODO: Reconnet + # self.delay_optimizer.delay_tap.eq(self.rx.cnt_out), + + # Increment control enable, such that phase_reader can + # increment tap value after delay measurement + # Re-assign the incremnet control to the optimizer after the optimizer has started + # If(self.delay_optimizer.start, + # self.rx.ce.eq(self.delay_optimizer.inc_en), + # ).Else( + # self.rx.ce.eq(self.phase_reader.inc_en), + # ) + self.rx.ce.eq(self.phase_reader.inc_en), + ] + + # Show measured result on UART + delay_tap = Signal(6) + + fsm = FSM(reset_state="WAIT_DONE") + self.submodules += fsm + + fsm.act("WAIT_DONE", + If(self.phase_reader.done, + NextState("WRITE_UPPER"), + ), + ) + + fsm.act("WRITE_UPPER", + # Exist state if all results are sent + If(delay_tap == 32, + NextState("TERMINATE"), + ).Elif(self.tx_fifo.writable, + self.tx_fifo.we.eq(1), + self.tx_fifo.din.eq(self.phase_reader.data_result[delay_tap][8:]), + NextState("WRITE_LOWER"), + ), + ) + + fsm.act("WRITE_LOWER", + self.tx_fifo.we.eq(1), + self.tx_fifo.din.eq(self.phase_reader.data_result[delay_tap][:8]), + NextValue(delay_tap, delay_tap + 1), + NextState("WRITE_UPPER"), + ) + + # fsm.act("FIND_OPT_DELAY", + # self.delay_optimizer.start.eq(1), + # self.rx.ce.eq(self.delay_optimizer.inc_en), + # If(self.delay_optimizer.done, + # NextState("WRITE_OPT"), + # ).Else( + # NextState("FIND_OPT_DELAY") + # ) + # ) + + # fsm.act("WRITE_OPT", + # self.tx_fifo.we.eq(1), + # self.tx_fifo.din.eq(self.delay_optimizer.opt_delay_tap), + # NextState("TERMINATE") + # ) + + fsm.act("TERMINATE", + NextState("TERMINATE"), + ) + + # # Output control + # self.sync += [ + # # Send data to FIFO if not repeated + # If(self.rxdata_r[:5] != self.rx.rxdata, + # self.rxdata_r.eq(self.rx.rxdata), + # self.tx_fifo.din.eq(self.rx.rxdata), + # self.tx_fifo.we.eq(1) + # ).Else( + # self.tx_fifo.we.eq(0) + # ) + # ] + + +if __name__ == "__main__": + platform = kasli.Platform(hw_rev="v2.0") + + # Generate pads for the I/O blocks + eem = 3 + generate_pads(platform, eem) + # pads = [ + # platform.request("dio{}".format(eem), i) for i in range(4) + # ] + pad = platform.request("dio{}".format(eem), 0) + + crg = KasliCRG(platform) + top = SingleSerDesLoopBack(pad, crg.sys_clk_freq) + + # Wire up UART core to the pads + uart_pads = platform.request("serial") + top.comb += [ + top.uart_rx.eq(uart_pads.rx), + uart_pads.tx.eq(top.uart_tx), + ] + + top.submodules += crg + platform.build(top) diff --git a/sync_serdes.py b/sync_serdes.py new file mode 100644 index 0000000..7245026 --- /dev/null +++ b/sync_serdes.py @@ -0,0 +1,387 @@ +from migen import * +from migen.genlib.misc import WaitTimer +from util import PriorityEncoderMSB + + +class SingleLineTX(Module): + def __init__(self): + self.txdata = Signal(5) + self.ser_out = Signal() + self.t_out = Signal() + + # TX SERDES + self.specials += Instance("OSERDESE2", + p_DATA_RATE_OQ="SDR", p_DATA_RATE_TQ="BUF", + p_DATA_WIDTH=5, p_TRISTATE_WIDTH=1, + p_INIT_OQ=0b00000, + o_OQ=self.ser_out, o_TQ=self.t_out, + i_RST=ResetSignal(), + i_CLK=ClockSignal("sys5x"), + i_CLKDIV=ClockSignal(), + i_D1=self.txdata[0], + i_D2=self.txdata[1], + i_D3=self.txdata[2], + i_D4=self.txdata[3], + i_D5=self.txdata[4], + i_TCE=1, i_OCE=1, + # TODO: Hardcode t_in? Output disable is always unnecessary? + i_T1=0) + + +class SingleLineRX(Module): + def __init__(self): + self.rxdata = Signal(10) + self.ser_in_no_dly = Signal() + self.ce = Signal() + self.cnt_out = Signal(5) + self.opt_delay = Signal(5) + + ser_in = Signal() + shifts = Signal(2) + + self.specials += [ + # Master deserializer + Instance("ISERDESE2", + p_DATA_RATE="DDR", + p_DATA_WIDTH=10, + p_INTERFACE_TYPE="NETWORKING", + p_NUM_CE=1, + p_SERDES_MODE="MASTER", + p_IOBDELAY="IFD", + o_Q1=self.rxdata[9], + o_Q2=self.rxdata[8], + o_Q3=self.rxdata[7], + o_Q4=self.rxdata[6], + o_Q5=self.rxdata[5], + o_Q6=self.rxdata[4], + o_Q7=self.rxdata[3], + o_Q8=self.rxdata[2], + o_SHIFTOUT1=shifts[0], + o_SHIFTOUT2=shifts[1], + i_DDLY=ser_in, + i_BITSLIP=0, + i_CLK=ClockSignal("rx_sys5x"), + i_CLKB=~ClockSignal("rx_sys5x"), + i_CE1=1, + i_RST=ResetSignal("rx_sys"), + i_CLKDIV=ClockSignal("rx_sys")), + + # Slave deserializer + Instance("ISERDESE2", + p_DATA_RATE="DDR", + p_DATA_WIDTH=10, + p_INTERFACE_TYPE="NETWORKING", + p_NUM_CE=1, + p_SERDES_MODE="SLAVE", + p_IOBDELAY="IFD", + o_Q3=self.rxdata[1], + o_Q4=self.rxdata[0], + # i_DDLY=ser_in, + i_BITSLIP=0, + i_CLK=ClockSignal("rx_sys5x"), + i_CLKB=~ClockSignal("rx_sys5x"), + i_CE1=1, + i_RST=ResetSignal("rx_sys"), + i_CLKDIV=ClockSignal("rx_sys"), + i_SHIFTIN1=shifts[0], + i_SHIFTIN2=shifts[1]), + + # Tunable delay + Instance("IDELAYE2", + p_DELAY_SRC="IDATAIN", + p_SIGNAL_PATTERN="DATA", + p_CINVCTRL_SEL="FALSE", + p_HIGH_PERFORMANCE_MODE="TRUE", + # REFCLK refers to the clock source of IDELAYCTRL + p_REFCLK_FREQUENCY=200.0, + p_PIPE_SEL="FALSE", + p_IDELAY_TYPE="VARIABLE", + p_IDELAY_VALUE=0, + + i_C=ClockSignal("rx_sys"), + # i_LD=self._dly_sel.storage[i//8] & self._rdly_dq_rst.re, + # i_CE=self._dly_sel.storage[i//8] & self._rdly_dq_inc.re, + i_LD=0, + i_CE=self.ce, # TODO: Port output + i_LDPIPEEN=0, + i_INC=1, # Always increment + + # Allow aligner to check delay tap value + o_CNTVALUEOUT=self.cnt_out, + + i_IDATAIN=self.ser_in_no_dly, o_DATAOUT=ser_in + ), + + # IDELAYCTRL is with the clocking + ] + + +class BitSlipReader(Module): + def __init__(self): + # IN + self.loopback_rxdata = Signal(10) + self.start = Signal() + + # Wait for stabilization after bitslip + self.submodules.stab_timer = WaitTimer(511) + + # OUT + self.done = Signal() + self.bitslip = Signal() + self.data_result = Array(Signal(10) for _ in range(5)) + + self.slip_count = Signal(3) + + fsm = FSM(reset_state="WAIT_START") + self.submodules += fsm + + fsm.act("WAIT_START", + If(self.start, + NextState("WAIT_TIMER"), + ).Else( + NextState("WAIT_START"), + ) + ) + + fsm.act("WAIT_TIMER", + self.stab_timer.wait.eq(1), + If(self.stab_timer.done, + NextState("SAMPLE"), + ) + ) + + fsm.act("SAMPLE", + # Wait is reset now + # Explicit assignment is unnecessary, as combinatorial statement + # falls back to he default value when not driven + + # Keep result alive until reset + NextValue(self.data_result[self.slip_count], self.loopback_rxdata), + NextValue(self.slip_count, self.slip_count + 1), + NextState("HIGH_BITSLIP_FIRST"), + ) + + # Pulsing BITSLIP alternate between 1 right shift and 3 left shifts + # We are trying to figure out which 2-bits are the slave copying from + # Hence, we only want shifts by 2. Pulsing twice does exactly that. + fsm.act("HIGH_BITSLIP_FIRST", + self.bitslip.eq(1), + NextState("LOW_BITSLIP"), + ) + + fsm.act("LOW_BITSLIP", + # bitslip signal is auto-reset + NextState("HIGH_BITSLIP_SECOND"), + ) + + fsm.act("HIGH_BITSLIP_SECOND", + self.bitslip.eq(1), + If(self.slip_count == 5, + NextState("TERMINATE"), + ).Else( + NextState("WAIT_TIMER"), + ) + ) + + fsm.act("TERMINATE", + self.done.eq(1), + NextState("TERMINATE"), + ) + + + +class PhaseReader(Module): + def __init__(self): + # Drive IDELAYE2 CE pin to increment delay + # The signal should only last for 1 cycle + self.inc_en = Signal() + self.loopback_rxdata = Signal(10) + self.delay_tap = Signal(5) + + # Pull up to start the phase reader + self.start = Signal() + + self.data_result = Array(Signal(10) for _ in range(32)) + self.done = Signal() + + # Wait for stabilization after increment + self.submodules.stab_timer = WaitTimer(511) + + fsm = FSM(reset_state="WAIT_START") + self.submodules += fsm + + fsm.act("WAIT_START", + If(self.start, + NextState("WAIT_TIMER"), + ).Else( + NextState("WAIT_START"), + ) + ) + + fsm.act("WAIT_TIMER", + self.stab_timer.wait.eq(1), + If(self.stab_timer.done, + NextState("SAMPLE"), + ) + ) + + fsm.act("SAMPLE", + # Wait is reset now + # Explicit assignment is unnecessary, as combinatorial statement + # falls back to he default value when not driven + + # Keep result alive until reset + NextValue(self.data_result[self.delay_tap], self.loopback_rxdata), + NextState("HIGH_CE"), + ) + + fsm.act("HIGH_CE", + self.inc_en.eq(1), + NextState("LOW_CE"), + ) + + fsm.act("LOW_CE", + # TAP OUT is available 1 cycle after the pulse + # Explicit signal reset is unnecessary, as signal assigned by + # combinatorial logic in FSM is after leaving the setting block + NextState("READ_TAP"), + ) + + fsm.act("READ_TAP", + If(self.delay_tap != 0, + NextState("WAIT_TIMER"), + ).Else( + NextState("PROBE_FIN"), + ) + ) + + fsm.act("PROBE_FIN", + self.done.eq(1), + NextState("PROBE_FIN"), + ) + + +class DelayOptimizer(Module): + def __init__(self): + # IN + # Signals from the channel + self.loopback_rxdata = Signal(10) + self.delay_tap = Signal(5) + + # IN + # Signal to start the calculation + self.start = Signal() + + # OUT + # Signal for controlling the channel delay tap + self.inc_en = Signal() + + # OUT + # The optimal delay + self.opt_delay_tap = Signal(5) + + # OUT + # Optimal delay is calculated + self.done = Signal() + + # Priority encoder for finding the pulse location + self.submodules.pulse_encoder = PriorityEncoderMSB(10) + + # Wait for stabilization after increment + self.submodules.stab_timer = WaitTimer(511) + + # Intermediate signals + self.expected_pulse = Signal(max=9) + self.min_delay = Signal(5) + self.max_offset = Signal(5) + + # Translate rxdata into array to allow indexing + self.rxdata_array = Array(Signal() for _ in range(10)) + self.comb += [ self.rxdata_array[i].eq(self.loopback_rxdata[i]) for i in range(10) ] + + fsm = FSM(reset_state="WAIT_START") + self.submodules += fsm + + fsm.act("WAIT_START", + If(self.start, + NextState("WAIT_ZERO"), + ).Else( + NextState("WAIT_START"), + ) + ) + + fsm.act("WAIT_ZERO", + self.stab_timer.wait.eq(1), + If(self.stab_timer.done, + NextState("SAMPLE_ZERO"), + ) + ) + + fsm.act("SAMPLE_ZERO", + # Oversampling should guarantee the detection + # However, priority encoder itself does not wraparound + # So, we need to avoid passing wrapped around pulse signal into + # the priority encoder. + If(self.loopback_rxdata[0] & self.loopback_rxdata[-1], + NextValue(self.expected_pulse, 1), + ).Else( + self.pulse_encoder.i.eq(self.loopback_rxdata), + If(self.pulse_encoder.o == 9, + NextValue(self.expected_pulse, 0), + ).Else( + NextValue(self.expected_pulse, self.pulse_encoder.o + 1), + ) + ), + # Goto the next delay tap and wait for the pulse. + NextState("INC_PULSE_DELAY_IN"), + ) + + fsm.act("WAIT_PULSE_IN", + self.stab_timer.wait.eq(1), + If(self.stab_timer.done, + NextState("SAMPLE_PULSE_IN"), + ) + ) + + fsm.act("SAMPLE_PULSE_IN", + If(self.rxdata_array[self.expected_pulse], + NextValue(self.min_delay, self.delay_tap), + NextState("INC_PULSE_DELAY_OUT"), + ).Else( + NextState("INC_PULSE_DELAY_IN"), + ) + ) + + fsm.act("INC_PULSE_DELAY_IN", + # This signal is automatically deasserted after this state + self.inc_en.eq(1), + NextState("WAIT_PULSE_IN"), + ) + + fsm.act("WAIT_PULSE_OUT", + self.stab_timer.wait.eq(1), + If(self.stab_timer.done, + NextState("SAMPLE_PULSE_OUT"), + ) + ) + + fsm.act("SAMPLE_PULSE_OUT", + If(~self.rxdata_array[self.expected_pulse], + NextValue(self.opt_delay_tap, self.min_delay + (self.max_offset >> 1)), + NextState("TERMINATE"), + ).Else( + NextValue(self.max_offset, self.max_offset + 1), + NextState("INC_PULSE_DELAY_OUT"), + ) + ) + + fsm.act("INC_PULSE_DELAY_OUT", + # This signal is automatically deasserted after this state + self.inc_en.eq(1), + NextState("WAIT_PULSE_OUT"), + ) + + fsm.act("TERMINATE", + self.done.eq(1), + NextState("TERMINATE"), + ) diff --git a/test_aligner.py b/test_aligner.py new file mode 100644 index 0000000..8393756 --- /dev/null +++ b/test_aligner.py @@ -0,0 +1,198 @@ +from migen import * +from sync_serdes import PhaseReader, DelayOptimizer, BitSlipReader +import random + + +def reader_testbench(dut, rxdata_list): + yield dut.delay_tap.eq(0) + yield dut.start.eq(1) + assert (yield dut.stab_timer.wait) == 0 + + for i in range(32): + yield dut.loopback_rxdata.eq(rxdata_list[i]) + yield + yield + assert (yield dut.stab_timer.wait) == 1 + + # Keep yielding until the DUT gives CE signal + while (yield dut.inc_en) == 0: + yield + + # Check that inc_en is deassrted after 1 clock cycle + yield + assert (yield dut.inc_en) == 0 + + # Load a new tap value + yield dut.delay_tap.eq(i + 1) + yield + # Nothing to check in the READ_TAP state + yield + + assert(yield dut.done) == 1 + + for i in range(32): + signal = yield dut.data_result[i] + expected = rxdata_list[i] + assert signal == expected + + for i in range(200): + assert (yield dut.inc_en) == 0 + yield + + # Untouched delay: Record should be invariant + for i in range(32): + signal = yield dut.data_result[i] + expected = rxdata_list[i] + assert signal == expected + + +def optimal_delay_testbench(dut, pulse_list, cycles, pulse_index, min_delay, max_offset, opt_delay_tap): + # Start the module + yield dut.delay_tap.eq(0) + yield dut.start.eq(1) + assert (yield dut.stab_timer.wait) == 0 + + for i in range(cycles): + # Pass in a new rxdata for sampling + # The stab_timer should start waiting after + yield dut.loopback_rxdata.eq(pulse_list[i]) + yield + yield + assert (yield dut.stab_timer.wait) == 1 + + # Eventually, the wait will end + # Either it triggers an increment or a finished signal + # And we will get the expected pulse location + # inc_en is pulsed after this is found + if i == (cycles - 1): + while (yield dut.done) == 0: + yield + break + else: + while (yield dut.inc_en) == 0: + yield + + # Then we increment the rxdata index + yield dut.delay_tap.eq(i + 1) + yield + + # Fast-forward to the result + # while (yield dut.done) == 0: + # yield + + assert (yield dut.done) == 1 + assert (yield dut.expected_pulse) == pulse_index + assert (yield dut.min_delay) == min_delay + assert (yield dut.max_offset) == max_offset + assert (yield dut.opt_delay_tap) == opt_delay_tap + + for _ in range(100): + yield + + # Invariant test: Everything is frozen after done + assert (yield dut.done) == 1 + assert (yield dut.expected_pulse) == pulse_index + assert (yield dut.min_delay) == min_delay + assert (yield dut.max_offset) == max_offset + assert (yield dut.opt_delay_tap) == opt_delay_tap + + +def bitslip_reader_tb(dut, rxdata_list): + # Start the module + yield dut.start.eq(1) + assert (yield dut.stab_timer.wait) == 0 + + for i in range(5): + yield dut.loopback_rxdata.eq(rxdata_list[i]) + yield + yield + assert (yield dut.stab_timer.wait) == 1 + + # Keep yielding until the DUT gives BITSLIP signal + while (yield dut.bitslip) == 0: + yield + + # There will be 2 BITSLIP pulses + # Both BITSLIP pulses should last for 1 cycle + assert (yield dut.bitslip) == 1 + yield + assert (yield dut.bitslip) == 0 + yield + assert (yield dut.bitslip) == 1 + yield + assert (yield dut.bitslip) == 0 + + assert (yield dut.done) == 1 + # The result in the module should contain all rxdata + for i, rxdata in enumerate(rxdata_list): + assert (yield dut.data_result[i]) == rxdata + + yield + yield + + +# # Random testing for delay reader +# for _ in range(32): +# rxdata_list = [ random.getrandbits(10) for _ in range(32) ] +# dut = PhaseReader() +# run_simulation(dut, reader_testbench(dut, rxdata_list), vcd_name="phase_reader.vcd") + +# # Random testing for optimal delay calculation +# # Generate a delay list +# start = random.randint(0, 9) +# start_length = random.randint(1, 10) +# offset = random.randint(4, 5) + +# current_index = start +# remaining_length = start_length +# single_pulse_list = [] + +# expected_index = (current_index + 1) % 10 +# expected_length = 10 + +# for tap in range(32 + offset): +# single_pulse_list.append(1 << current_index) +# remaining_length -= 1 + +# if remaining_length == 0: +# current_index = (current_index + 1) % 10 +# remaining_length = 10 + +# pulse_list = list(single_pulse_list) +# for i in range(offset, 32): +# pulse_list[i] |= single_pulse_list[i - offset] + +# found_start_edge = False +# max_offset = 0 + +# # Calculate min_delay +# for i, pulse in enumerate(pulse_list): +# if (pulse & (1 << expected_index)) != 0: +# if not found_start_edge: +# min_delay = i +# found_start_edge = True +# else: +# max_offset += 1 +# if (pulse & (1 << expected_index)) == 0 and found_start_edge: +# cycles = i + 1 +# break + +# print(min_delay) +# print(max_offset) +# print(cycles) +# opt_delay = int(min_delay + (max_offset / 2)) +# print(opt_delay) + +# # Simulate +# dut = DelayOptimizer() +# run_simulation(dut, optimal_delay_testbench( +# dut, pulse_list, cycles, expected_index, +# min_delay, max_offset, opt_delay), +# vcd_name="delay_opt.vcd" +# ) + +# Random test for bitslip reader +for _ in range(32): + rxdata_list = [ random.getrandbits(10) for _ in range(5) ] + dut = BitSlipReader() + run_simulation(dut, bitslip_reader_tb(dut, rxdata_list), vcd_name="bitslip_reader.vcd") diff --git a/test_buffer.py b/test_buffer.py new file mode 100644 index 0000000..2144968 --- /dev/null +++ b/test_buffer.py @@ -0,0 +1,46 @@ +from migen import * +from buffer import Buffer + + +BUFFER_DEPTH=8 +full_test_bytes = [0xDE, 0xAD, 0xBE, 0xEF, 0xBA, 0xD0, 0xCA, 0xFE, 0x15, 0x77] + + +def testbench(dut, length=10): + # dut = Buffer(8, BUFFER_DEPTH) + # full_test_bytes = [0xDE, 0xAD, 0xBE, 0xEF, 0xBA, 0xD0, 0xCA, 0xFE, 0x15, 0x77] + + test_data = full_test_bytes[:min(BUFFER_DEPTH, length)] + + # Initial condition, stb low + yield dut.i_stb.eq(0) + yield + + # Append bytes one-by-one + for number in test_data: + yield dut.din.eq(number) + yield dut.i_stb.eq(1) + yield + + # Deassert input strobe to buffers + yield dut.i_stb.eq(0) + yield + + # Receive bytes on-by-one + for number in test_data: + assert (yield dut.o_stb) == 1 + assert (yield dut.dout) == number + yield dut.o_ack.eq(1) + yield + yield dut.o_ack.eq(0) + yield + yield + + yield + yield + assert (yield dut.o_stb) == 0 + + +for buffer_len in range(len(full_test_bytes)): + dut = Buffer(8, BUFFER_DEPTH) + run_simulation(dut, testbench(dut, length=buffer_len), vcd_name="buffer.vcd") diff --git a/uart.py b/uart.py new file mode 100644 index 0000000..ddde0c9 --- /dev/null +++ b/uart.py @@ -0,0 +1,105 @@ +from migen import * +from migen.genlib.cdc import MultiReg + + +class UART(Module): + def __init__(self, tuning_word): + self.phy_rx = Signal() + self.phy_tx = Signal() + + self.rx_data = Signal(8) + self.rx_stb = Signal() + + self.tx_data = Signal(8) + self.tx_stb = Signal() + self.tx_ack = Signal() + + # # # + + # + # RX + # + + uart_clk_rxen = Signal() + phase_accumulator_rx = Signal(32) + + rx = Signal() + self.specials += MultiReg(self.phy_rx, rx) + rx_r = Signal() + rx_reg = Signal(8) + rx_bitcount = Signal(4) + rx_busy = Signal() + rx_done = self.rx_stb + rx_data = self.rx_data + self.sync += [ + rx_done.eq(0), + rx_r.eq(rx), + If(~rx_busy, + If(~rx & rx_r, # look for start bit + rx_busy.eq(1), + rx_bitcount.eq(0), + ) + ).Else( + If(uart_clk_rxen, + rx_bitcount.eq(rx_bitcount + 1), + If(rx_bitcount == 0, + If(rx, # verify start bit + rx_busy.eq(0) + ) + ).Elif(rx_bitcount == 9, + rx_busy.eq(0), + If(rx, # verify stop bit + rx_data.eq(rx_reg), + rx_done.eq(1) + ) + ).Else( + rx_reg.eq(Cat(rx_reg[1:], rx)) + ) + ) + ) + ] + self.sync += \ + If(rx_busy, + Cat(phase_accumulator_rx, uart_clk_rxen).eq(phase_accumulator_rx + tuning_word) + ).Else( + Cat(phase_accumulator_rx, uart_clk_rxen).eq(2**31) + ) + + # + # TX + # + uart_clk_txen = Signal() + phase_accumulator_tx = Signal(32) + + self.phy_tx.reset = 1 + + tx_reg = Signal(8) + tx_bitcount = Signal(4) + tx_busy = Signal() + self.sync += [ + self.tx_ack.eq(0), + If(self.tx_stb & ~tx_busy & ~self.tx_ack, + tx_reg.eq(self.tx_data), + tx_bitcount.eq(0), + tx_busy.eq(1), + self.phy_tx.eq(0) + ).Elif(uart_clk_txen & tx_busy, + tx_bitcount.eq(tx_bitcount + 1), + If(tx_bitcount == 8, + self.phy_tx.eq(1) + ).Elif(tx_bitcount == 9, + self.phy_tx.eq(1), + tx_busy.eq(0), + self.tx_ack.eq(1), + ).Else( + self.phy_tx.eq(tx_reg[0]), + tx_reg.eq(Cat(tx_reg[1:], 0)) + ) + ) + ] + self.sync += \ + If(tx_busy, + Cat(phase_accumulator_tx, uart_clk_txen).eq(phase_accumulator_tx + tuning_word) + ).Else( + Cat(phase_accumulator_tx, uart_clk_txen).eq(0) + ) diff --git a/util.py b/util.py new file mode 100644 index 0000000..27283b2 --- /dev/null +++ b/util.py @@ -0,0 +1,11 @@ +from migen import * + + +class PriorityEncoderMSB(Module): + def __init__(self, width): + self.i = Signal(width) # one-hot, msb has priority + self.o = Signal(max=max(2, width)) # binary + self.n = Signal() # none + for j in range(width): # first has priority + self.comb += If(self.i[j], self.o.eq(j)) + self.comb += self.n.eq(self.i == 0)