This commit is contained in:
occheung 2023-04-23 11:42:18 +08:00
commit 7f23de0ce8
17 changed files with 1483 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
build
__pycache__
test
*.vcd
*.cfg
*.txt

BIN
bscan_spi_xc7a100t.bit Normal file

Binary file not shown.

26
buffer.py Normal file
View File

@ -0,0 +1,26 @@
from migen import *
from migen.genlib.fifo import SyncFIFO
class Buffer(Module):
def __init__(self, width=8, depth=128):
self.din = Signal(width)
self.i_stb = Signal()
self.dout = Signal(width)
self.o_stb = Signal()
self.o_ack = Signal()
# Underlying data structure
self.submodules.fifo = SyncFIFO(width, depth)
self.comb += [
# TX path
self.o_stb.eq(self.fifo.readable),
self.dout.eq(self.fifo.dout),
self.fifo.re.eq(self.o_ack),
# RX path
self.fifo.we.eq(self.i_stb),
self.fifo.din.eq(self.din),
]

22
comm.py Normal file
View File

@ -0,0 +1,22 @@
import serial
def main():
comm = serial.Serial("/dev/ttyUSB3", 115200)
# comm.write(b"Hello World!")
# for _ in range(32):
while True:
byte = comm.read(2)
print(f'{byte[0]:0>8b}' + f'{byte[1]:0>8b}')
byte = comm.read(1)
print(f'{byte[0]:0>8b}')
# cached_byte = None
# while True:
# byte = comm.read(1)
# if byte != cached_byte:
# cached_byte = byte
# print(f'{byte[0]:0>8b}')
if __name__ == "__main__":
main()

55
default.nix Normal file
View File

@ -0,0 +1,55 @@
{ pkgs ? import <nixpkgs> {} }:
let
migen = pkgs.python3Packages.buildPythonPackage rec {
name = "migen";
src = pkgs.fetchFromGitHub {
owner = "m-labs";
repo = "migen";
rev = "7bc4eb1387b39159a74c1dbd1b820728e0bfbbaa";
sha256 = "039jk8y7f0vhr32svg3nd23i88c0bhws8ngxwk9bdznfxvhiy1h6";
fetchSubmodules = true;
};
propagatedBuildInputs = with pkgs.python3Packages; [ colorama ];
};
vivadoDeps = pkgs: with pkgs; [
libxcrypt
ncurses5
zlib
libuuid
xorg.libSM
xorg.libICE
xorg.libXrender
xorg.libX11
xorg.libXext
xorg.libXtst
xorg.libXi
freetype
fontconfig
];
vivadoEnv = pkgs.buildFHSUserEnv {
name = "vivado-env";
targetPkgs = vivadoDeps;
};
vivado = pkgs.buildFHSUserEnv {
name = "vivado";
targetPkgs = vivadoDeps;
profile = "set -e; source /opt/Xilinx/Vivado/2022.2/settings64.sh";
runScript = "vivado";
};
in pkgs.mkShell {
name = "UART-Testing";
buildInputs = [
migen
pkgs.python3Packages.pyserial
vivado
vivadoEnv
pkgs.python3Packages.numpy
];
}

29
eem_helpers.py Normal file
View File

@ -0,0 +1,29 @@
from migen import *
from migen.build.generic_platform import *
def _eem_signal(i):
n = "d{}".format(i)
if i == 0:
n += "_cc"
return n
def _eem_pin(eem, i, pol):
return "eem{}:{}_{}".format(eem, _eem_signal(i), pol)
def default_iostandard(eem):
return IOStandard("LVDS_25")
def diff_io(eem, iostandard=default_iostandard):
return [("dio{}".format(eem), i,
Subsignal("p", Pins(_eem_pin(eem, i, "p"))),
Subsignal("n", Pins(_eem_pin(eem, i, "n"))),
iostandard(eem))
for i in range(8)]
def generate_pads(platform, eem):
platform.add_extension(diff_io(eem))

41
io_loopback.py Normal file
View File

@ -0,0 +1,41 @@
from migen import *
class IOLoopBack(Module):
def __init__(self, pads):
self.o = Signal(4)
self.i = Signal(4)
self.t = Signal(4)
for i in range(4):
self.specials += Instance("IOBUFDS",
o_O=self.o[i],
io_IO=pads[i].p,
io_IOB=pads[i].n,
i_I=self.i[i],
# Always enable input buffer, so it is actually a loop back
i_T=self.t[i],
)
class SingleIOLoopback(Module):
def __init__(self, pad):
self.o = Signal()
self.i = Signal()
self.t = Signal()
self.specials += Instance(
# "IOBUFDS_DCIEN",
"IOBUFDS",
# p_DIFF_TERM="TRUE",
# p_IBUF_LOW_PWR="TRUE",
# p_USE_IBUFDISABLE="TRUE",
o_O=self.o,
io_IO=pad.p,
io_IOB=pad.n,
i_I=self.i,
# Always enable input buffer, so it is actually a loop back
i_T=self.t,
# i_IBUFDISABLE=~self.t,
# i_DCITERMDISABLE=~self.t,
)

125
kasli_crg.py Normal file
View File

@ -0,0 +1,125 @@
from migen import *
from migen.build.platforms.sinara import kasli
from migen.genlib.resetsync import AsyncResetSynchronizer
class KasliCRG(Module):
def __init__(self, platform):
self.platform = platform
# Generated clock domains
self.clock_domains.cd_sys = ClockDomain()
self.clock_domains.cd_sys5x = ClockDomain()
self.clock_domains.cd_rx_sys = ClockDomain()
self.clock_domains.cd_rx_sys5x = ClockDomain()
self.clock_domains.cd_clk200 = ClockDomain()
# Configure system clock using GTP ports
self.sys_clk_freq = 125e6
clk125 = self.platform.request("clk125_gtp")
clk125_buf = Signal()
clk125_div2 = Signal()
self.specials += Instance("IBUFDS_GTE2",
i_CEB=0,
i_I=clk125.p, i_IB=clk125.n,
o_O=clk125_buf,
o_ODIV2=clk125_div2)
# MMCM to generate different frequencies
mmcm_fb = Signal()
mmcm_locked = Signal()
mmcm_sys = Signal()
mmcm_sys5x = Signal()
mmcm_rx_sys = Signal()
mmcm_rx_sys5x = Signal()
# PLL to IDELAYCTRL clock
pll_locked = Signal()
pll_fb = Signal()
pll_clk200 = Signal()
# Actual MMCM/PLL instances
self.specials += [
Instance("MMCME2_BASE",
p_CLKIN1_PERIOD=16.0,
i_CLKIN1=clk125_div2,
i_CLKFBIN=mmcm_fb,
o_CLKFBOUT=mmcm_fb,
o_LOCKED=mmcm_locked,
# VCO @ 1.25GHz with MULT=20
p_CLKFBOUT_MULT_F=20.0, p_DIVCLK_DIVIDE=1,
# ~125MHz
p_CLKOUT0_DIVIDE_F=10.0, p_CLKOUT0_PHASE=0.0, o_CLKOUT0=mmcm_sys,
# ~625MHz
p_CLKOUT1_DIVIDE=2, p_CLKOUT1_PHASE=0.0, o_CLKOUT1=mmcm_sys5x,
# ~125MHz separated from sysclk, for RX
p_CLKOUT2_DIVIDE=10, p_CLKOUT2_PHASE=0.0, o_CLKOUT2=mmcm_rx_sys,
# ~625MHz separated from sysclk, for RX
p_CLKOUT3_DIVIDE=2, p_CLKOUT3_PHASE=0.0, o_CLKOUT3=mmcm_rx_sys5x,
# Leftovers...
# p_CLKOUT2_DIVIDE=2, p_CLKOUT2_PHASE=90.0, o_CLKOUT2=mmcm_sys4x_dqs,
),
Instance("PLLE2_BASE",
p_CLKIN1_PERIOD=16.0,
i_CLKIN1=clk125_div2,
i_CLKFBIN=pll_fb,
o_CLKFBOUT=pll_fb,
o_LOCKED=pll_locked,
# VCO @ 1GHz
p_CLKFBOUT_MULT=16, p_DIVCLK_DIVIDE=1,
# 200MHz for IDELAYCTRL
p_CLKOUT0_DIVIDE=5, p_CLKOUT0_PHASE=0.0, o_CLKOUT0=pll_clk200,
),
]
self.specials += [
Instance("BUFG", i_I=mmcm_sys, o_O=self.cd_sys.clk),
Instance("BUFG", i_I=mmcm_sys5x, o_O=self.cd_sys5x.clk),
Instance("BUFG", i_I=mmcm_rx_sys, o_O=self.cd_rx_sys.clk),
Instance("BUFG", i_I=mmcm_rx_sys5x, o_O=self.cd_rx_sys5x.clk),
Instance("BUFG", i_I=pll_clk200, o_O=self.cd_clk200.clk),
AsyncResetSynchronizer(self.cd_clk200, ~pll_locked),
]
reset_counter = Signal(4, reset=15)
ic_reset = Signal(reset=1)
self.sync.clk200 += \
If(reset_counter != 0,
reset_counter.eq(reset_counter - 1)
).Else(
ic_reset.eq(0)
)
self.specials += Instance("IDELAYCTRL", i_REFCLK=ClockSignal("clk200"), i_RST=ic_reset)
# Add clock costraints for all clock signals
platform.add_period_constraint(self.cd_sys.clk, 8.)
platform.add_period_constraint(self.cd_sys5x.clk, 1.6)
platform.add_period_constraint(self.cd_rx_sys.clk, 8.)
platform.add_period_constraint(self.cd_rx_sys5x.clk, 1.6)
platform.add_period_constraint(self.cd_clk200.clk, 5.)
platform.add_platform_command(
"set_false_path -quiet "
"-through [get_pins -filter {{REF_PIN_NAME == OQ || REF_PIN_NAME == TQ}} "
"-of [get_cells -filter {{REF_NAME == OSERDESE2}}]] "
"-to [get_pins -filter {{REF_PIN_NAME == D}} "
"-of [get_cells -filter {{REF_NAME == ISERDESE2}}]]"
)
platform.add_platform_command(
"set_false_path -quiet "
"-through [get_pins -filter {{REF_PIN_NAME == OQ || REF_PIN_NAME == TQ}} "
"-of [get_cells -filter {{REF_NAME == OSERDESE2}}]] "
"-to [get_pins -filter {{REF_PIN_NAME == DDLY}} "
"-of [get_cells -filter {{REF_NAME == ISERDESE2}}]]"
)

44
loopback.py Normal file
View File

@ -0,0 +1,44 @@
from migen import *
from migen.build.platforms.sinara import kasli
from migen.genlib.fifo import SyncFIFO
from uart import UART
from kasli_crg import KasliCRG
class UARTLoopBack(Module):
def __init__(self, sys_clk_freq):
self.uart_rx = Signal()
self.uart_tx = Signal()
self.submodules.uart = UART(round((115200/sys_clk_freq)*2**32))
self.comb += [
self.uart.phy_rx.eq(self.uart_rx),
self.uart_tx.eq(self.uart.phy_tx),
]
# Attach buffer between UART RX --> TX
# This constitutes the loopback channel
self.submodules.buffer = SyncFIFO(8, 64)
self.comb += [
self.buffer.din.eq(self.uart.rx_data),
self.buffer.we.eq(self.uart.rx_stb),
self.uart.tx_data.eq(self.buffer.dout),
self.uart.tx_stb.eq(self.buffer.readable),
self.buffer.re.eq(self.uart.tx_ack),
]
if __name__ == "__main__":
platform = kasli.Platform(hw_rev="v2.0")
crg = KasliCRG(platform)
top = UARTLoopBack(crg.sys_clk_freq)
# Wire up UART core to the pads
uart_pads = platform.request("serial")
top.comb += [
top.uart_rx.eq(uart_pads.rx),
uart_pads.tx.eq(top.uart_tx),
]
top.submodules += crg
platform.build(top)

122
serdes.py Normal file
View File

@ -0,0 +1,122 @@
from migen import *
class SerTX(Module):
def __init__(self):
self.txdata = Signal(20)
self.ser_out = Signal(4)
self.t_out = Signal(4)
# TODO: Create T pins
# Transmitter PHY: 4-wire
for i in range(4):
# Serialize 5 bits into each channel
# TX SERDES
self.specials += Instance("OSERDESE2",
p_DATA_RATE_OQ="SDR", p_DATA_RATE_TQ="BUF",
p_DATA_WIDTH=5, p_TRISTATE_WIDTH=1,
p_INIT_OQ=0b00000,
o_OQ=self.ser_out[i], o_TQ=self.t_out[i],
i_RST=ResetSignal(),
i_CLK=ClockSignal("sys5x"),
i_CLKDIV=ClockSignal(),
i_D1=self.txdata[i*5 + 0],
i_D2=self.txdata[i*5 + 1],
i_D3=self.txdata[i*5 + 2],
i_D4=self.txdata[i*5 + 3],
i_D5=self.txdata[i*5 + 4],
i_TCE=1, i_OCE=1,
# TODO: Hardcode t_in? Output disable is always unnecessary?
i_T1=0)
class DesRX(Module):
def __init__(self):
self.rxdata = Signal(20)
self.ser_in = Signal(4)
for i in range(4):
# Deserialize 5 bits from each channel
# RX SERDES
self.specials += [
Instance("ISERDESE2",
p_DATA_RATE="SDR",
p_DATA_WIDTH=5,
p_INTERFACE_TYPE="NETWORKING", p_NUM_CE=1,
o_Q1=self.rxdata[i*5 + 4],
o_Q2=self.rxdata[i*5 + 3],
o_Q3=self.rxdata[i*5 + 2],
o_Q4=self.rxdata[i*5 + 1],
o_Q5=self.rxdata[i*5 + 0],
i_D=self.ser_in[i],
i_CLK=ClockSignal("rx_sys5x"),
i_CLKB=~ClockSignal("rx_sys5x"),
i_CE1=1,
i_RST=ResetSignal("rx_sys"),
i_CLKDIV=ClockSignal("rx_sys")),
# # Tunable delay
# Instance("IDELAYE2",
# p_DELAY_SRC="IDATAIN", p_SIGNAL_PATTERN="DATA",
# p_CINVCTRL_SEL="FALSE", p_HIGH_PERFORMANCE_MODE="TRUE", p_REFCLK_FREQUENCY=200.0,
# p_PIPE_SEL="FALSE", p_IDELAY_TYPE="VARIABLE", p_IDELAY_VALUE=0,
# i_C=ClockSignal(),
# i_LD=self._dly_sel.storage[i//8] & self._rdly_dq_rst.re,
# i_CE=self._dly_sel.storage[i//8] & self._rdly_dq_inc.re,
# i_LDPIPEEN=0, i_INC=1,
# i_IDATAIN=dq_i_nodelay, o_DATAOUT=dq_i_delayed
# )
]
# class DoubleDesRX(Module):
# def __init__(self):
# self.rxdata = Signal(20)
# self.rx_first_edge = Signal()
# rx_raw = Array(Signal(20) for _ in range(2))
# self.comb += [
# rxdata.eq(rx_raw[self.rx_first_edge])
# ]
# # Receiver PHY: 4-wire
# for i in range(4):
# # Deserialize 5 bits from each channel
# # With 2x oversampling
# # RX SERDES
# self.specials += Instance("ISERDESE2", p_DATA_RATE="DDR",
# p_DATA_WIDTH=10,
# p_INTERFACE_TYPE="NETWORKING", p_NUM_CE=1,
# p_SERDES_MODE="MASTER",
# o_Q1=rx_raw[1][i*5 + 4], o_Q2=rx_raw[0][i*5 + 4],
# o_Q3=rx_raw[1][i*5 + 3], o_Q4=rx_raw[0][i*5 + 3],
# o_Q5=rx_raw[1][i*5 + 2], o_Q6=rx_raw[0][i*5 + 2],
# o_Q7=rx_raw[1][i*5 + 1], o_Q8=rx_raw[0][i*5 + 1],
# i_D=self.ser_in[i],
# # We are using 5x for SDR
# i_CLK=ClockSignal("sys5x"),
# i_CLKB=~ClockSignal("sys5x"),
# i_CE1=1,
# i_RST=ResetSignal(),
# i_CLKDIV=ClockSignal(),
# o_SHIFTOUT1=serdes_link1,
# o_SHIFTOUT2=serdes_link2)
# self.specials += Instance("ISERDESE2", p_DATA_RATE="DDR",
# p_DATA_WIDTH=10,
# p_INTERFACE_TYPE="NETWORKING", p_NUM_CE=1,
# p_SERDES_MODE="SLAVE",
# o_Q1=rx_raw[1][i*5], o_Q2=rx_raw[0][i*5],
# i_D=self.ser_in[i],
# # We are using 5x for SDR
# i_CLK=ClockSignal("sys5x"),
# i_CLKB=~ClockSignal("sys5x"),
# i_CE1=1,
# i_RST=ResetSignal(),
# i_CLKDIV=ClockSignal(),
# i_SHIFTIN1=serdes_link1,
# i_SHIFTIN2=serdes_link2)

102
serdes_loopback.py Normal file
View File

@ -0,0 +1,102 @@
from migen import *
from serdes import *
from migen.genlib.fifo import SyncFIFO
from migen.build.platforms.sinara import kasli
from migen.genlib.misc import WaitTimer
from kasli_crg import KasliCRG
from eem_helpers import generate_pads
from uart import UART
from io_loopback import IOLoopBack
SEPARATOR = Constant(0b0101)
class SerDesLoopBack(Module):
def __init__(self, io_pads, sys_clk_freq):
self.uart_rx = Signal()
self.uart_tx = Signal()
self.submodules.uart = UART(round((115200/sys_clk_freq)*2**32))
self.comb += [
self.uart.phy_rx.eq(self.uart_rx),
self.uart_tx.eq(self.uart.phy_tx),
]
self.submodules.tx = SerTX()
self.submodules.rx = DesRX()
# The actual channel
self.submodules.channel = IOLoopBack(io_pads)
# # Additional timer: Only permit UART transmission when timer is not up
# self.submodules.wait_timer = WaitTimer(10)
# Memoize the previous rxdata
self.rxdata_r = Signal(8)
# Attach FIFO to UART TX, send rate is too slow w.r.t sysclk
self.submodules.tx_fifo = SyncFIFO(8, 64)
self.comb += [
# RX path: From UART to channel
# self.rx_buffer.din.eq(self.uart.rx_data),
# self.rx_buffer.we.eq(self.uart.rx_stb),
self.tx.txdata[:8].eq(Mux(self.uart.rx_stb, self.uart.rx_data, 0)),
self.tx.txdata[8:12].eq(Mux(SEPARATOR, self.uart.rx_data, 0)),
self.tx.txdata[12:].eq(Mux(self.uart.rx_stb, self.uart.rx_data, 0)),
# Loopback channel
self.channel.i.eq(self.tx.ser_out),
self.rx.ser_in.eq(self.channel.o),
self.channel.t.eq(self.tx.t_out),
# self.rx.ser_in.eq(self.tx.ser_out),
# self.rx.ser_in[3].eq(self.tx.ser_out[3]),
# TX path
# self.uart.tx_data.eq(self.tx_buffer.dout),
# self.uart.tx_stb.eq(self.tx_buffer.readable),
# self.tx_buffer.re.eq(self.uart.tx_ack),
self.uart.tx_data.eq(self.tx_fifo.dout),
self.uart.tx_stb.eq(self.tx_fifo.readable),
self.tx_fifo.re.eq(self.uart.tx_ack),
]
# Timer control
self.sync += [
# Send data to FIFO if not repeated
If(self.rxdata_r != self.rx.rxdata[:8],
self.rxdata_r.eq(self.rx.rxdata),
self.tx_fifo.din.eq(self.rx.rxdata),
self.tx_fifo.we.eq(1)
).Else(
self.tx_fifo.we.eq(0)
)
]
# self.sync.sys5x += [
# self.rx.ser_in.eq(self.tx.ser_out),
# ]
if __name__ == "__main__":
platform = kasli.Platform(hw_rev="v2.0")
# Generate pads for the I/O blocks
eem = 0
generate_pads(platform, eem)
pads = [
platform.request("dio{}".format(eem), i) for i in range(4)
]
crg = KasliCRG(platform)
top = SerDesLoopBack(pads, crg.sys_clk_freq)
# Wire up UART core to the pads
uart_pads = platform.request("serial")
top.comb += [
top.uart_rx.eq(uart_pads.rx),
uart_pads.tx.eq(top.uart_tx),
]
top.submodules += crg
platform.build(top)

164
single_serdes_loopback.py Normal file
View File

@ -0,0 +1,164 @@
from migen import *
from sync_serdes import *
from migen.genlib.fifo import SyncFIFO
from migen.build.platforms.sinara import kasli
from migen.genlib.misc import WaitTimer
from kasli_crg import KasliCRG
from eem_helpers import generate_pads
from uart import UART
from io_loopback import SingleIOLoopback
SEPARATOR = Constant(0b0101)
class SingleSerDesLoopBack(Module):
def __init__(self, io_pad, sys_clk_freq):
self.uart_rx = Signal()
self.uart_tx = Signal()
self.submodules.uart = UART(round((115200/sys_clk_freq)*2**32))
self.comb += [
self.uart.phy_rx.eq(self.uart_rx),
self.uart_tx.eq(self.uart.phy_tx),
]
self.submodules.tx = SingleLineTX()
self.submodules.rx = SingleLineRX()
self.submodules.phase_reader = PhaseReader()
# self.submodules.delay_optimizer = DelayOptimizer()
# The actual channel
self.submodules.channel = SingleIOLoopback(io_pad)
# Attach FIFO to UART TX, send rate is too slow w.r.t sysclk
self.submodules.tx_fifo = SyncFIFO(8, 64)
self.comb += [
# Repetitively send 0b00100
self.tx.txdata.eq(0b00100),
# Loopback channel
self.channel.i.eq(self.tx.ser_out),
self.rx.ser_in_no_dly.eq(self.channel.o),
self.channel.t.eq(self.tx.t_out),
# TX path
self.uart.tx_data.eq(self.tx_fifo.dout),
self.uart.tx_stb.eq(self.tx_fifo.readable),
self.tx_fifo.re.eq(self.uart.tx_ack),
]
# Route deserializer to phase_reader & the delay tap optimizer
self.comb += [
# Start the reader initially
self.phase_reader.start.eq(1),
# Delay tap optimizer will start after the reader is done
# self.delay_optimizer.start.eq(0),
# RXDATA for both reader and optimzer
self.phase_reader.loopback_rxdata.eq(self.rx.rxdata),
# TODO: Reconnet
# self.delay_optimizer.loopback_rxdata.eq(self.rx.rxdata),
# Delay tap value
self.phase_reader.delay_tap.eq(self.rx.cnt_out),
# TODO: Reconnet
# self.delay_optimizer.delay_tap.eq(self.rx.cnt_out),
# Increment control enable, such that phase_reader can
# increment tap value after delay measurement
# Re-assign the incremnet control to the optimizer after the optimizer has started
# If(self.delay_optimizer.start,
# self.rx.ce.eq(self.delay_optimizer.inc_en),
# ).Else(
# self.rx.ce.eq(self.phase_reader.inc_en),
# )
self.rx.ce.eq(self.phase_reader.inc_en),
]
# Show measured result on UART
delay_tap = Signal(6)
fsm = FSM(reset_state="WAIT_DONE")
self.submodules += fsm
fsm.act("WAIT_DONE",
If(self.phase_reader.done,
NextState("WRITE_UPPER"),
),
)
fsm.act("WRITE_UPPER",
# Exist state if all results are sent
If(delay_tap == 32,
NextState("TERMINATE"),
).Elif(self.tx_fifo.writable,
self.tx_fifo.we.eq(1),
self.tx_fifo.din.eq(self.phase_reader.data_result[delay_tap][8:]),
NextState("WRITE_LOWER"),
),
)
fsm.act("WRITE_LOWER",
self.tx_fifo.we.eq(1),
self.tx_fifo.din.eq(self.phase_reader.data_result[delay_tap][:8]),
NextValue(delay_tap, delay_tap + 1),
NextState("WRITE_UPPER"),
)
# fsm.act("FIND_OPT_DELAY",
# self.delay_optimizer.start.eq(1),
# self.rx.ce.eq(self.delay_optimizer.inc_en),
# If(self.delay_optimizer.done,
# NextState("WRITE_OPT"),
# ).Else(
# NextState("FIND_OPT_DELAY")
# )
# )
# fsm.act("WRITE_OPT",
# self.tx_fifo.we.eq(1),
# self.tx_fifo.din.eq(self.delay_optimizer.opt_delay_tap),
# NextState("TERMINATE")
# )
fsm.act("TERMINATE",
NextState("TERMINATE"),
)
# # Output control
# self.sync += [
# # Send data to FIFO if not repeated
# If(self.rxdata_r[:5] != self.rx.rxdata,
# self.rxdata_r.eq(self.rx.rxdata),
# self.tx_fifo.din.eq(self.rx.rxdata),
# self.tx_fifo.we.eq(1)
# ).Else(
# self.tx_fifo.we.eq(0)
# )
# ]
if __name__ == "__main__":
platform = kasli.Platform(hw_rev="v2.0")
# Generate pads for the I/O blocks
eem = 3
generate_pads(platform, eem)
# pads = [
# platform.request("dio{}".format(eem), i) for i in range(4)
# ]
pad = platform.request("dio{}".format(eem), 0)
crg = KasliCRG(platform)
top = SingleSerDesLoopBack(pad, crg.sys_clk_freq)
# Wire up UART core to the pads
uart_pads = platform.request("serial")
top.comb += [
top.uart_rx.eq(uart_pads.rx),
uart_pads.tx.eq(top.uart_tx),
]
top.submodules += crg
platform.build(top)

387
sync_serdes.py Normal file
View File

@ -0,0 +1,387 @@
from migen import *
from migen.genlib.misc import WaitTimer
from util import PriorityEncoderMSB
class SingleLineTX(Module):
def __init__(self):
self.txdata = Signal(5)
self.ser_out = Signal()
self.t_out = Signal()
# TX SERDES
self.specials += Instance("OSERDESE2",
p_DATA_RATE_OQ="SDR", p_DATA_RATE_TQ="BUF",
p_DATA_WIDTH=5, p_TRISTATE_WIDTH=1,
p_INIT_OQ=0b00000,
o_OQ=self.ser_out, o_TQ=self.t_out,
i_RST=ResetSignal(),
i_CLK=ClockSignal("sys5x"),
i_CLKDIV=ClockSignal(),
i_D1=self.txdata[0],
i_D2=self.txdata[1],
i_D3=self.txdata[2],
i_D4=self.txdata[3],
i_D5=self.txdata[4],
i_TCE=1, i_OCE=1,
# TODO: Hardcode t_in? Output disable is always unnecessary?
i_T1=0)
class SingleLineRX(Module):
def __init__(self):
self.rxdata = Signal(10)
self.ser_in_no_dly = Signal()
self.ce = Signal()
self.cnt_out = Signal(5)
self.opt_delay = Signal(5)
ser_in = Signal()
shifts = Signal(2)
self.specials += [
# Master deserializer
Instance("ISERDESE2",
p_DATA_RATE="DDR",
p_DATA_WIDTH=10,
p_INTERFACE_TYPE="NETWORKING",
p_NUM_CE=1,
p_SERDES_MODE="MASTER",
p_IOBDELAY="IFD",
o_Q1=self.rxdata[9],
o_Q2=self.rxdata[8],
o_Q3=self.rxdata[7],
o_Q4=self.rxdata[6],
o_Q5=self.rxdata[5],
o_Q6=self.rxdata[4],
o_Q7=self.rxdata[3],
o_Q8=self.rxdata[2],
o_SHIFTOUT1=shifts[0],
o_SHIFTOUT2=shifts[1],
i_DDLY=ser_in,
i_BITSLIP=0,
i_CLK=ClockSignal("rx_sys5x"),
i_CLKB=~ClockSignal("rx_sys5x"),
i_CE1=1,
i_RST=ResetSignal("rx_sys"),
i_CLKDIV=ClockSignal("rx_sys")),
# Slave deserializer
Instance("ISERDESE2",
p_DATA_RATE="DDR",
p_DATA_WIDTH=10,
p_INTERFACE_TYPE="NETWORKING",
p_NUM_CE=1,
p_SERDES_MODE="SLAVE",
p_IOBDELAY="IFD",
o_Q3=self.rxdata[1],
o_Q4=self.rxdata[0],
# i_DDLY=ser_in,
i_BITSLIP=0,
i_CLK=ClockSignal("rx_sys5x"),
i_CLKB=~ClockSignal("rx_sys5x"),
i_CE1=1,
i_RST=ResetSignal("rx_sys"),
i_CLKDIV=ClockSignal("rx_sys"),
i_SHIFTIN1=shifts[0],
i_SHIFTIN2=shifts[1]),
# Tunable delay
Instance("IDELAYE2",
p_DELAY_SRC="IDATAIN",
p_SIGNAL_PATTERN="DATA",
p_CINVCTRL_SEL="FALSE",
p_HIGH_PERFORMANCE_MODE="TRUE",
# REFCLK refers to the clock source of IDELAYCTRL
p_REFCLK_FREQUENCY=200.0,
p_PIPE_SEL="FALSE",
p_IDELAY_TYPE="VARIABLE",
p_IDELAY_VALUE=0,
i_C=ClockSignal("rx_sys"),
# i_LD=self._dly_sel.storage[i//8] & self._rdly_dq_rst.re,
# i_CE=self._dly_sel.storage[i//8] & self._rdly_dq_inc.re,
i_LD=0,
i_CE=self.ce, # TODO: Port output
i_LDPIPEEN=0,
i_INC=1, # Always increment
# Allow aligner to check delay tap value
o_CNTVALUEOUT=self.cnt_out,
i_IDATAIN=self.ser_in_no_dly, o_DATAOUT=ser_in
),
# IDELAYCTRL is with the clocking
]
class BitSlipReader(Module):
def __init__(self):
# IN
self.loopback_rxdata = Signal(10)
self.start = Signal()
# Wait for stabilization after bitslip
self.submodules.stab_timer = WaitTimer(511)
# OUT
self.done = Signal()
self.bitslip = Signal()
self.data_result = Array(Signal(10) for _ in range(5))
self.slip_count = Signal(3)
fsm = FSM(reset_state="WAIT_START")
self.submodules += fsm
fsm.act("WAIT_START",
If(self.start,
NextState("WAIT_TIMER"),
).Else(
NextState("WAIT_START"),
)
)
fsm.act("WAIT_TIMER",
self.stab_timer.wait.eq(1),
If(self.stab_timer.done,
NextState("SAMPLE"),
)
)
fsm.act("SAMPLE",
# Wait is reset now
# Explicit assignment is unnecessary, as combinatorial statement
# falls back to he default value when not driven
# Keep result alive until reset
NextValue(self.data_result[self.slip_count], self.loopback_rxdata),
NextValue(self.slip_count, self.slip_count + 1),
NextState("HIGH_BITSLIP_FIRST"),
)
# Pulsing BITSLIP alternate between 1 right shift and 3 left shifts
# We are trying to figure out which 2-bits are the slave copying from
# Hence, we only want shifts by 2. Pulsing twice does exactly that.
fsm.act("HIGH_BITSLIP_FIRST",
self.bitslip.eq(1),
NextState("LOW_BITSLIP"),
)
fsm.act("LOW_BITSLIP",
# bitslip signal is auto-reset
NextState("HIGH_BITSLIP_SECOND"),
)
fsm.act("HIGH_BITSLIP_SECOND",
self.bitslip.eq(1),
If(self.slip_count == 5,
NextState("TERMINATE"),
).Else(
NextState("WAIT_TIMER"),
)
)
fsm.act("TERMINATE",
self.done.eq(1),
NextState("TERMINATE"),
)
class PhaseReader(Module):
def __init__(self):
# Drive IDELAYE2 CE pin to increment delay
# The signal should only last for 1 cycle
self.inc_en = Signal()
self.loopback_rxdata = Signal(10)
self.delay_tap = Signal(5)
# Pull up to start the phase reader
self.start = Signal()
self.data_result = Array(Signal(10) for _ in range(32))
self.done = Signal()
# Wait for stabilization after increment
self.submodules.stab_timer = WaitTimer(511)
fsm = FSM(reset_state="WAIT_START")
self.submodules += fsm
fsm.act("WAIT_START",
If(self.start,
NextState("WAIT_TIMER"),
).Else(
NextState("WAIT_START"),
)
)
fsm.act("WAIT_TIMER",
self.stab_timer.wait.eq(1),
If(self.stab_timer.done,
NextState("SAMPLE"),
)
)
fsm.act("SAMPLE",
# Wait is reset now
# Explicit assignment is unnecessary, as combinatorial statement
# falls back to he default value when not driven
# Keep result alive until reset
NextValue(self.data_result[self.delay_tap], self.loopback_rxdata),
NextState("HIGH_CE"),
)
fsm.act("HIGH_CE",
self.inc_en.eq(1),
NextState("LOW_CE"),
)
fsm.act("LOW_CE",
# TAP OUT is available 1 cycle after the pulse
# Explicit signal reset is unnecessary, as signal assigned by
# combinatorial logic in FSM is after leaving the setting block
NextState("READ_TAP"),
)
fsm.act("READ_TAP",
If(self.delay_tap != 0,
NextState("WAIT_TIMER"),
).Else(
NextState("PROBE_FIN"),
)
)
fsm.act("PROBE_FIN",
self.done.eq(1),
NextState("PROBE_FIN"),
)
class DelayOptimizer(Module):
def __init__(self):
# IN
# Signals from the channel
self.loopback_rxdata = Signal(10)
self.delay_tap = Signal(5)
# IN
# Signal to start the calculation
self.start = Signal()
# OUT
# Signal for controlling the channel delay tap
self.inc_en = Signal()
# OUT
# The optimal delay
self.opt_delay_tap = Signal(5)
# OUT
# Optimal delay is calculated
self.done = Signal()
# Priority encoder for finding the pulse location
self.submodules.pulse_encoder = PriorityEncoderMSB(10)
# Wait for stabilization after increment
self.submodules.stab_timer = WaitTimer(511)
# Intermediate signals
self.expected_pulse = Signal(max=9)
self.min_delay = Signal(5)
self.max_offset = Signal(5)
# Translate rxdata into array to allow indexing
self.rxdata_array = Array(Signal() for _ in range(10))
self.comb += [ self.rxdata_array[i].eq(self.loopback_rxdata[i]) for i in range(10) ]
fsm = FSM(reset_state="WAIT_START")
self.submodules += fsm
fsm.act("WAIT_START",
If(self.start,
NextState("WAIT_ZERO"),
).Else(
NextState("WAIT_START"),
)
)
fsm.act("WAIT_ZERO",
self.stab_timer.wait.eq(1),
If(self.stab_timer.done,
NextState("SAMPLE_ZERO"),
)
)
fsm.act("SAMPLE_ZERO",
# Oversampling should guarantee the detection
# However, priority encoder itself does not wraparound
# So, we need to avoid passing wrapped around pulse signal into
# the priority encoder.
If(self.loopback_rxdata[0] & self.loopback_rxdata[-1],
NextValue(self.expected_pulse, 1),
).Else(
self.pulse_encoder.i.eq(self.loopback_rxdata),
If(self.pulse_encoder.o == 9,
NextValue(self.expected_pulse, 0),
).Else(
NextValue(self.expected_pulse, self.pulse_encoder.o + 1),
)
),
# Goto the next delay tap and wait for the pulse.
NextState("INC_PULSE_DELAY_IN"),
)
fsm.act("WAIT_PULSE_IN",
self.stab_timer.wait.eq(1),
If(self.stab_timer.done,
NextState("SAMPLE_PULSE_IN"),
)
)
fsm.act("SAMPLE_PULSE_IN",
If(self.rxdata_array[self.expected_pulse],
NextValue(self.min_delay, self.delay_tap),
NextState("INC_PULSE_DELAY_OUT"),
).Else(
NextState("INC_PULSE_DELAY_IN"),
)
)
fsm.act("INC_PULSE_DELAY_IN",
# This signal is automatically deasserted after this state
self.inc_en.eq(1),
NextState("WAIT_PULSE_IN"),
)
fsm.act("WAIT_PULSE_OUT",
self.stab_timer.wait.eq(1),
If(self.stab_timer.done,
NextState("SAMPLE_PULSE_OUT"),
)
)
fsm.act("SAMPLE_PULSE_OUT",
If(~self.rxdata_array[self.expected_pulse],
NextValue(self.opt_delay_tap, self.min_delay + (self.max_offset >> 1)),
NextState("TERMINATE"),
).Else(
NextValue(self.max_offset, self.max_offset + 1),
NextState("INC_PULSE_DELAY_OUT"),
)
)
fsm.act("INC_PULSE_DELAY_OUT",
# This signal is automatically deasserted after this state
self.inc_en.eq(1),
NextState("WAIT_PULSE_OUT"),
)
fsm.act("TERMINATE",
self.done.eq(1),
NextState("TERMINATE"),
)

198
test_aligner.py Normal file
View File

@ -0,0 +1,198 @@
from migen import *
from sync_serdes import PhaseReader, DelayOptimizer, BitSlipReader
import random
def reader_testbench(dut, rxdata_list):
yield dut.delay_tap.eq(0)
yield dut.start.eq(1)
assert (yield dut.stab_timer.wait) == 0
for i in range(32):
yield dut.loopback_rxdata.eq(rxdata_list[i])
yield
yield
assert (yield dut.stab_timer.wait) == 1
# Keep yielding until the DUT gives CE signal
while (yield dut.inc_en) == 0:
yield
# Check that inc_en is deassrted after 1 clock cycle
yield
assert (yield dut.inc_en) == 0
# Load a new tap value
yield dut.delay_tap.eq(i + 1)
yield
# Nothing to check in the READ_TAP state
yield
assert(yield dut.done) == 1
for i in range(32):
signal = yield dut.data_result[i]
expected = rxdata_list[i]
assert signal == expected
for i in range(200):
assert (yield dut.inc_en) == 0
yield
# Untouched delay: Record should be invariant
for i in range(32):
signal = yield dut.data_result[i]
expected = rxdata_list[i]
assert signal == expected
def optimal_delay_testbench(dut, pulse_list, cycles, pulse_index, min_delay, max_offset, opt_delay_tap):
# Start the module
yield dut.delay_tap.eq(0)
yield dut.start.eq(1)
assert (yield dut.stab_timer.wait) == 0
for i in range(cycles):
# Pass in a new rxdata for sampling
# The stab_timer should start waiting after
yield dut.loopback_rxdata.eq(pulse_list[i])
yield
yield
assert (yield dut.stab_timer.wait) == 1
# Eventually, the wait will end
# Either it triggers an increment or a finished signal
# And we will get the expected pulse location
# inc_en is pulsed after this is found
if i == (cycles - 1):
while (yield dut.done) == 0:
yield
break
else:
while (yield dut.inc_en) == 0:
yield
# Then we increment the rxdata index
yield dut.delay_tap.eq(i + 1)
yield
# Fast-forward to the result
# while (yield dut.done) == 0:
# yield
assert (yield dut.done) == 1
assert (yield dut.expected_pulse) == pulse_index
assert (yield dut.min_delay) == min_delay
assert (yield dut.max_offset) == max_offset
assert (yield dut.opt_delay_tap) == opt_delay_tap
for _ in range(100):
yield
# Invariant test: Everything is frozen after done
assert (yield dut.done) == 1
assert (yield dut.expected_pulse) == pulse_index
assert (yield dut.min_delay) == min_delay
assert (yield dut.max_offset) == max_offset
assert (yield dut.opt_delay_tap) == opt_delay_tap
def bitslip_reader_tb(dut, rxdata_list):
# Start the module
yield dut.start.eq(1)
assert (yield dut.stab_timer.wait) == 0
for i in range(5):
yield dut.loopback_rxdata.eq(rxdata_list[i])
yield
yield
assert (yield dut.stab_timer.wait) == 1
# Keep yielding until the DUT gives BITSLIP signal
while (yield dut.bitslip) == 0:
yield
# There will be 2 BITSLIP pulses
# Both BITSLIP pulses should last for 1 cycle
assert (yield dut.bitslip) == 1
yield
assert (yield dut.bitslip) == 0
yield
assert (yield dut.bitslip) == 1
yield
assert (yield dut.bitslip) == 0
assert (yield dut.done) == 1
# The result in the module should contain all rxdata
for i, rxdata in enumerate(rxdata_list):
assert (yield dut.data_result[i]) == rxdata
yield
yield
# # Random testing for delay reader
# for _ in range(32):
# rxdata_list = [ random.getrandbits(10) for _ in range(32) ]
# dut = PhaseReader()
# run_simulation(dut, reader_testbench(dut, rxdata_list), vcd_name="phase_reader.vcd")
# # Random testing for optimal delay calculation
# # Generate a delay list
# start = random.randint(0, 9)
# start_length = random.randint(1, 10)
# offset = random.randint(4, 5)
# current_index = start
# remaining_length = start_length
# single_pulse_list = []
# expected_index = (current_index + 1) % 10
# expected_length = 10
# for tap in range(32 + offset):
# single_pulse_list.append(1 << current_index)
# remaining_length -= 1
# if remaining_length == 0:
# current_index = (current_index + 1) % 10
# remaining_length = 10
# pulse_list = list(single_pulse_list)
# for i in range(offset, 32):
# pulse_list[i] |= single_pulse_list[i - offset]
# found_start_edge = False
# max_offset = 0
# # Calculate min_delay
# for i, pulse in enumerate(pulse_list):
# if (pulse & (1 << expected_index)) != 0:
# if not found_start_edge:
# min_delay = i
# found_start_edge = True
# else:
# max_offset += 1
# if (pulse & (1 << expected_index)) == 0 and found_start_edge:
# cycles = i + 1
# break
# print(min_delay)
# print(max_offset)
# print(cycles)
# opt_delay = int(min_delay + (max_offset / 2))
# print(opt_delay)
# # Simulate
# dut = DelayOptimizer()
# run_simulation(dut, optimal_delay_testbench(
# dut, pulse_list, cycles, expected_index,
# min_delay, max_offset, opt_delay),
# vcd_name="delay_opt.vcd"
# )
# Random test for bitslip reader
for _ in range(32):
rxdata_list = [ random.getrandbits(10) for _ in range(5) ]
dut = BitSlipReader()
run_simulation(dut, bitslip_reader_tb(dut, rxdata_list), vcd_name="bitslip_reader.vcd")

46
test_buffer.py Normal file
View File

@ -0,0 +1,46 @@
from migen import *
from buffer import Buffer
BUFFER_DEPTH=8
full_test_bytes = [0xDE, 0xAD, 0xBE, 0xEF, 0xBA, 0xD0, 0xCA, 0xFE, 0x15, 0x77]
def testbench(dut, length=10):
# dut = Buffer(8, BUFFER_DEPTH)
# full_test_bytes = [0xDE, 0xAD, 0xBE, 0xEF, 0xBA, 0xD0, 0xCA, 0xFE, 0x15, 0x77]
test_data = full_test_bytes[:min(BUFFER_DEPTH, length)]
# Initial condition, stb low
yield dut.i_stb.eq(0)
yield
# Append bytes one-by-one
for number in test_data:
yield dut.din.eq(number)
yield dut.i_stb.eq(1)
yield
# Deassert input strobe to buffers
yield dut.i_stb.eq(0)
yield
# Receive bytes on-by-one
for number in test_data:
assert (yield dut.o_stb) == 1
assert (yield dut.dout) == number
yield dut.o_ack.eq(1)
yield
yield dut.o_ack.eq(0)
yield
yield
yield
yield
assert (yield dut.o_stb) == 0
for buffer_len in range(len(full_test_bytes)):
dut = Buffer(8, BUFFER_DEPTH)
run_simulation(dut, testbench(dut, length=buffer_len), vcd_name="buffer.vcd")

105
uart.py Normal file
View File

@ -0,0 +1,105 @@
from migen import *
from migen.genlib.cdc import MultiReg
class UART(Module):
def __init__(self, tuning_word):
self.phy_rx = Signal()
self.phy_tx = Signal()
self.rx_data = Signal(8)
self.rx_stb = Signal()
self.tx_data = Signal(8)
self.tx_stb = Signal()
self.tx_ack = Signal()
# # #
#
# RX
#
uart_clk_rxen = Signal()
phase_accumulator_rx = Signal(32)
rx = Signal()
self.specials += MultiReg(self.phy_rx, rx)
rx_r = Signal()
rx_reg = Signal(8)
rx_bitcount = Signal(4)
rx_busy = Signal()
rx_done = self.rx_stb
rx_data = self.rx_data
self.sync += [
rx_done.eq(0),
rx_r.eq(rx),
If(~rx_busy,
If(~rx & rx_r, # look for start bit
rx_busy.eq(1),
rx_bitcount.eq(0),
)
).Else(
If(uart_clk_rxen,
rx_bitcount.eq(rx_bitcount + 1),
If(rx_bitcount == 0,
If(rx, # verify start bit
rx_busy.eq(0)
)
).Elif(rx_bitcount == 9,
rx_busy.eq(0),
If(rx, # verify stop bit
rx_data.eq(rx_reg),
rx_done.eq(1)
)
).Else(
rx_reg.eq(Cat(rx_reg[1:], rx))
)
)
)
]
self.sync += \
If(rx_busy,
Cat(phase_accumulator_rx, uart_clk_rxen).eq(phase_accumulator_rx + tuning_word)
).Else(
Cat(phase_accumulator_rx, uart_clk_rxen).eq(2**31)
)
#
# TX
#
uart_clk_txen = Signal()
phase_accumulator_tx = Signal(32)
self.phy_tx.reset = 1
tx_reg = Signal(8)
tx_bitcount = Signal(4)
tx_busy = Signal()
self.sync += [
self.tx_ack.eq(0),
If(self.tx_stb & ~tx_busy & ~self.tx_ack,
tx_reg.eq(self.tx_data),
tx_bitcount.eq(0),
tx_busy.eq(1),
self.phy_tx.eq(0)
).Elif(uart_clk_txen & tx_busy,
tx_bitcount.eq(tx_bitcount + 1),
If(tx_bitcount == 8,
self.phy_tx.eq(1)
).Elif(tx_bitcount == 9,
self.phy_tx.eq(1),
tx_busy.eq(0),
self.tx_ack.eq(1),
).Else(
self.phy_tx.eq(tx_reg[0]),
tx_reg.eq(Cat(tx_reg[1:], 0))
)
)
]
self.sync += \
If(tx_busy,
Cat(phase_accumulator_tx, uart_clk_txen).eq(phase_accumulator_tx + tuning_word)
).Else(
Cat(phase_accumulator_tx, uart_clk_txen).eq(0)
)

11
util.py Normal file
View File

@ -0,0 +1,11 @@
from migen import *
class PriorityEncoderMSB(Module):
def __init__(self, width):
self.i = Signal(width) # one-hot, msb has priority
self.o = Signal(max=max(2, width)) # binary
self.n = Signal() # none
for j in range(width): # first has priority
self.comb += If(self.i[j], self.o.eq(j))
self.comb += self.n.eq(self.i == 0)