coredevice/urukul: port to NAC3

This commit is contained in:
Sebastien Bourdeauducq 2021-11-10 19:01:44 +08:00
parent 31955d0c7a
commit 93f24c9f94

View File

@ -1,15 +1,17 @@
from numpy import int32, int64
from artiq.language.core import kernel, delay, portable, at_mu, now_mu
from artiq.language.core import nac3, KernelInvariant, kernel, portable
from artiq.language.units import us, ms
from artiq.language.types import TInt32, TFloat, TBool
from artiq.coredevice import spi2 as spi
from artiq.coredevice.core import Core
from artiq.coredevice.spi2 import *
from artiq.coredevice.ttl import TTLOut
SPI_CONFIG = (0 * spi.SPI_OFFLINE | 0 * spi.SPI_END |
0 * spi.SPI_INPUT | 1 * spi.SPI_CS_POLARITY |
0 * spi.SPI_CLK_POLARITY | 0 * spi.SPI_CLK_PHASE |
0 * spi.SPI_LSB_FIRST | 0 * spi.SPI_HALF_DUPLEX)
SPI_CONFIG = (0 * SPI_OFFLINE | 0 * SPI_END |
0 * SPI_INPUT | 1 * SPI_CS_POLARITY |
0 * SPI_CLK_POLARITY | 0 * SPI_CLK_PHASE |
0 * SPI_LSB_FIRST | 0 * SPI_HALF_DUPLEX)
# SPI clock write and read dividers
SPIT_CFG_WR = 2
@ -54,8 +56,8 @@ CS_DDS_CH3 = 7
@portable
def urukul_cfg(rf_sw, led, profile, io_update, mask_nu,
clk_sel, sync_sel, rst, io_rst, clk_div):
def urukul_cfg(rf_sw: int32, led: int32, profile: int32, io_update: int32, mask_nu: int32,
clk_sel: int32, sync_sel: int32, rst: int32, io_rst: int32, clk_div: int32) -> int32:
"""Build Urukul CPLD configuration register"""
return ((rf_sw << CFG_RF_SW) |
(led << CFG_LED) |
@ -71,56 +73,45 @@ def urukul_cfg(rf_sw, led, profile, io_update, mask_nu,
@portable
def urukul_sta_rf_sw(sta):
def urukul_sta_rf_sw(sta: int32) -> int32:
"""Return the RF switch status from Urukul status register value."""
return (sta >> STA_RF_SW) & 0xf
@portable
def urukul_sta_smp_err(sta):
def urukul_sta_smp_err(sta: int32) -> int32:
"""Return the SMP_ERR status from Urukul status register value."""
return (sta >> STA_SMP_ERR) & 0xf
@portable
def urukul_sta_pll_lock(sta):
def urukul_sta_pll_lock(sta: int32) -> int32:
"""Return the PLL_LOCK status from Urukul status register value."""
return (sta >> STA_PLL_LOCK) & 0xf
@portable
def urukul_sta_ifc_mode(sta):
def urukul_sta_ifc_mode(sta: int32) -> int32:
"""Return the IFC_MODE status from Urukul status register value."""
return (sta >> STA_IFC_MODE) & 0xf
@portable
def urukul_sta_proto_rev(sta):
def urukul_sta_proto_rev(sta: int32) -> int32:
"""Return the PROTO_REV value from Urukul status register value."""
return (sta >> STA_PROTO_REV) & 0x7f
class _RegIOUpdate:
def __init__(self, cpld):
self.cpld = cpld
@kernel
def pulse(self, t: TFloat):
cfg = self.cpld.cfg_reg
self.cpld.cfg_write(cfg | (1 << CFG_IO_UPDATE))
delay(t)
self.cpld.cfg_write(cfg)
@nac3
class _DummySync:
def __init__(self, cpld):
self.cpld = cpld
@kernel
def set_mu(self, ftw: TInt32):
def set_mu(self, ftw: int32):
pass
@nac3
class CPLD:
"""Urukul CPLD SPI router and configuration interface.
@ -159,7 +150,16 @@ class CPLD:
front panel SMA with no clock connected), then the ``init()`` method of
the DDS channels can fail with the error message ``PLL lock timeout``.
"""
kernel_invariants = {"refclk", "bus", "core", "io_update", "clk_div"}
core: KernelInvariant[Core]
refclk: KernelInvariant[float]
bus: KernelInvariant[SPIMaster]
io_update: KernelInvariant[TTLOut]
clk_div: KernelInvariant[int32]
sync: KernelInvariant[_DummySync]
cfg_reg: int32
att_reg: int32
sync_div: int32
def __init__(self, dmgr, spi_device, io_update_device=None,
dds_reset_device=None, sync_device=None,
@ -176,13 +176,17 @@ class CPLD:
if io_update_device is not None:
self.io_update = dmgr.get(io_update_device)
else:
self.io_update = _RegIOUpdate(self)
self.io_update = _RegIOUpdate(self.core, self)
# NAC3TODO
raise NotImplementedError
if dds_reset_device is not None:
self.dds_reset = dmgr.get(dds_reset_device)
if sync_device is not None:
self.sync = dmgr.get(sync_device)
if sync_div is None:
sync_div = 2
# NAC3TODO
raise NotImplementedError
else:
self.sync = _DummySync(self)
assert sync_div is None
@ -196,7 +200,7 @@ class CPLD:
self.sync_div = sync_div
@kernel
def cfg_write(self, cfg: TInt32):
def cfg_write(self, cfg: int32):
"""Write to the configuration register.
See :func:`urukul_cfg` for possible flags.
@ -204,13 +208,13 @@ class CPLD:
:param cfg: 24 bit data to be written. Will be stored at
:attr:`cfg_reg`.
"""
self.bus.set_config_mu(SPI_CONFIG | spi.SPI_END, 24,
self.bus.set_config_mu(SPI_CONFIG | SPI_END, 24,
SPIT_CFG_WR, CS_CFG)
self.bus.write(cfg << 8)
self.cfg_reg = cfg
@kernel
def sta_read(self) -> TInt32:
def sta_read(self) -> int32:
"""Read the status register.
Use any of the following functions to extract values:
@ -223,13 +227,13 @@ class CPLD:
:return: The status register value.
"""
self.bus.set_config_mu(SPI_CONFIG | spi.SPI_END | spi.SPI_INPUT, 24,
self.bus.set_config_mu(SPI_CONFIG | SPI_END | SPI_INPUT, 24,
SPIT_CFG_RD, CS_CFG)
self.bus.write(self.cfg_reg << 8)
return self.bus.read()
@kernel
def init(self, blind: TBool = False):
def init(self, blind: bool = False):
"""Initialize and detect Urukul.
Resets the DDS I/O interface and verifies correct CPLD gateware
@ -246,13 +250,14 @@ class CPLD:
else:
proto_rev = urukul_sta_proto_rev(self.sta_read())
if proto_rev != STA_PROTO_REV_MATCH:
raise ValueError("Urukul proto_rev mismatch")
delay(100 * us) # reset, slack
# NAC3TODO raise ValueError("Urukul proto_rev mismatch")
pass
self.core.delay(100. * us) # reset, slack
self.cfg_write(cfg)
if self.sync_div:
at_mu(now_mu() & ~0xf) # align to RTIO/2
if self.sync_div != 0:
at_mu(now_mu() & ~int64(0xf)) # align to RTIO/2
self.set_sync_div(self.sync_div) # 125 MHz/2 = 1 GHz/16
delay(1 * ms) # DDS wake up
self.core.delay(1. * ms) # DDS wake up
@kernel
def io_rst(self):
@ -260,8 +265,8 @@ class CPLD:
self.cfg_write(self.cfg_reg | (1 << CFG_IO_RST))
self.cfg_write(self.cfg_reg & ~(1 << CFG_IO_RST))
@kernel
def cfg_sw(self, channel: TInt32, on: TBool):
# NAC3TODO @kernel
def cfg_sw(self, channel: int32, on: bool):
"""Configure the RF switches through the configuration register.
These values are logically OR-ed with the LVDS lines on EEM1.
@ -277,15 +282,15 @@ class CPLD:
self.cfg_write(c)
@kernel
def cfg_switches(self, state: TInt32):
def cfg_switches(self, state: int32):
"""Configure all four RF switches through the configuration register.
:param state: RF switch state as a 4 bit integer.
"""
self.cfg_write((self.cfg_reg & ~0xf) | state)
@portable(flags={"fast-math"})
def mu_to_att(self, att_mu: TInt32) -> TFloat:
@portable
def mu_to_att(self, att_mu: int32) -> float:
"""Convert a digital attenuation setting to dB.
:param att_mu: Digital attenuation setting.
@ -293,20 +298,21 @@ class CPLD:
"""
return (255 - (att_mu & 0xff)) / 8
@portable(flags={"fast-math"})
def att_to_mu(self, att: TFloat) -> TInt32:
@portable
def att_to_mu(self, att: float) -> int32:
"""Convert an attenuation setting in dB to machine units.
:param att: Attenuation setting in dB.
:return: Digital attenuation setting.
"""
code = int32(255) - int32(round(att * 8))
code = 255 - round(att * 8.)
if code < 0 or code > 255:
raise ValueError("Invalid urukul.CPLD attenuation!")
# NAC3TODO raise ValueError("Invalid urukul.CPLD attenuation!")
pass
return code
@kernel
def set_att_mu(self, channel: TInt32, att: TInt32):
# NAC3TODO @kernel
def set_att_mu(self, channel: int32, att: int32):
"""Set digital step attenuator in machine units.
This method will also write the attenuator settings of the three
@ -322,20 +328,20 @@ class CPLD:
self.set_all_att_mu(a)
@kernel
def set_all_att_mu(self, att_reg: TInt32):
def set_all_att_mu(self, att_reg: int32):
"""Set all four digital step attenuators (in machine units).
.. seealso:: :meth:`set_att_mu`
:param att_reg: Attenuator setting string (32 bit)
"""
self.bus.set_config_mu(SPI_CONFIG | spi.SPI_END, 32,
self.bus.set_config_mu(SPI_CONFIG | SPI_END, 32,
SPIT_ATT_WR, CS_ATT)
self.bus.write(att_reg)
self.att_reg = att_reg
@kernel
def set_att(self, channel: TInt32, att: TFloat):
# NAC3TODO @kernel
def set_att(self, channel: int32, att: float):
"""Set digital step attenuator in SI units.
This method will write the attenuator settings of all four channels.
@ -350,7 +356,7 @@ class CPLD:
self.set_att_mu(channel, self.att_to_mu(att))
@kernel
def get_att_mu(self) -> TInt32:
def get_att_mu(self) -> int32:
"""Return the digital step attenuator settings in machine units.
The result is stored and will be used in future calls of
@ -360,18 +366,18 @@ class CPLD:
:return: 32 bit attenuator settings
"""
self.bus.set_config_mu(SPI_CONFIG | spi.SPI_INPUT, 32,
self.bus.set_config_mu(SPI_CONFIG | SPI_INPUT, 32,
SPIT_ATT_RD, CS_ATT)
self.bus.write(0) # shift in zeros, shift out current value
self.bus.set_config_mu(SPI_CONFIG | spi.SPI_END, 32,
self.bus.set_config_mu(SPI_CONFIG | SPI_END, 32,
SPIT_ATT_WR, CS_ATT)
delay(10 * us)
self.core.delay(10. * us)
self.att_reg = self.bus.read()
self.bus.write(self.att_reg) # shift in current value again and latch
return self.att_reg
@kernel
def get_channel_att_mu(self, channel: TInt32) -> TInt32:
def get_channel_att_mu(self, channel: int32) -> int32:
"""Get digital step attenuator value for a channel in machine units.
The result is stored and will be used in future calls of
@ -386,7 +392,7 @@ class CPLD:
return int32((self.get_att_mu() >> (channel * 8)) & 0xff)
@kernel
def get_channel_att(self, channel: TInt32) -> TFloat:
def get_channel_att(self, channel: int32) -> float:
"""Get digital step attenuator value for a channel in SI units.
.. seealso:: :meth:`get_channel_att_mu`
@ -399,7 +405,7 @@ class CPLD:
return self.mu_to_att(self.get_channel_att_mu(channel))
@kernel
def set_sync_div(self, div: TInt32):
def set_sync_div(self, div: int32):
"""Set the SYNC_IN AD9910 pulse generator frequency
and align it to the current RTIO timestamp.
@ -412,11 +418,11 @@ class CPLD:
"""
ftw_max = 1 << 4
ftw = ftw_max // div
assert ftw * div == ftw_max
# NAC3TODO assert ftw * div == ftw_max
self.sync.set_mu(ftw)
@kernel
def set_profile(self, profile: TInt32):
# NAC3TODO @kernel
def set_profile(self, profile: int32):
"""Set the PROFILE pins.
The PROFILE pins are common to all four DDS channels.
@ -426,3 +432,18 @@ class CPLD:
cfg = self.cfg_reg & ~(7 << CFG_PROFILE)
cfg |= (profile & 7) << CFG_PROFILE
self.cfg_write(cfg)
class _RegIOUpdate:
core: KernelInvariant[Core]
cpld: KernelInvariant[CPLD]
def __init__(self, core, cpld):
self.core = core
self.cpld = cpld
@kernel
def pulse(self, t: float):
cfg = self.cpld.cfg_reg
self.cpld.cfg_write(cfg | (1 << CFG_IO_UPDATE))
delay(t)
self.cpld.cfg_write(cfg)