moninj: dds inj: extract shared code

detect urukul already init in more than one way
detect ad9912 channel already init
This commit is contained in:
mwojcik 2022-08-19 17:23:17 +08:00 committed by Sebastien Bourdeauducq
parent 3038639802
commit 5581ae15ca
1 changed files with 80 additions and 53 deletions

View File

@ -9,7 +9,7 @@ from sipyco.sync_struct import Subscriber
from artiq.coredevice.comm_moninj import * from artiq.coredevice.comm_moninj import *
from artiq.coredevice.ad9910 import _AD9910_REG_PROFILE0, _AD9910_REG_PROFILE7, _AD9910_REG_FTW from artiq.coredevice.ad9910 import _AD9910_REG_PROFILE0, _AD9910_REG_PROFILE7, _AD9910_REG_FTW
from artiq.coredevice.ad9912_reg import AD9912_POW1 from artiq.coredevice.ad9912_reg import AD9912_POW1, AD9912_SER_CONF
from artiq.gui.tools import LayoutWidget from artiq.gui.tools import LayoutWidget
from artiq.gui.flowlayout import FlowLayout from artiq.gui.flowlayout import FlowLayout
@ -549,33 +549,56 @@ class _DeviceManager:
scheduling["flush"]) scheduling["flush"])
logger.info("Submitted '%s', RID is %d", title, rid) logger.info("Submitted '%s', RID is %d", title, rid)
def dds_set_frequency(self, dds_channel, dds_model, freq): def _dds_faux_injection(self, dds_channel, dds_model, action, title, log_msg):
# create kernel and fill it in and send-by-content # create kernel and fill it in and send-by-content
# initialize CPLD (if applicable)
if dds_model.is_urukul: if dds_model.is_urukul:
# TODO: reimplement cache (simple "was init")
#
# urukuls need CPLD init and switch to on # urukuls need CPLD init and switch to on
# keep previous config if it was set already
cpld_dev = """self.setattr_device("core_cache") cpld_dev = """self.setattr_device("core_cache")
self.setattr_device("{}")""".format(dds_model.cpld) self.setattr_device("{}")""".format(dds_model.cpld)
cpld_init = """cfg = self.core_cache.get("_{cpld}_cfg")
if len(cfg) > 0: # `sta`/`rf_sw`` variables are guaranteed for urukuls
self.{cpld}.cfg_reg = cfg[0] # so {action} can use it
else: # if there's no RF enabled, CPLD may have not been initialized
#
# but if there is, it has
cpld_init = """delay(15*ms)
was_init = self.core_cache.get("_{cpld}_init")
sta = self.{cpld}.sta_read()
rf_sw = urukul_sta_rf_sw(sta)
if rf_sw == 0 and len(was_init) == 0:
delay(15*ms) delay(15*ms)
self.{cpld}.init() self.{cpld}.init()
self.core_cache.put("_{cpld}_cfg", [self.{cpld}.cfg_reg]) self.core_cache.put("_{cpld}_init", [1])
cfg = self.core_cache.get("_{cpld}_cfg")
""".format(cpld=dds_model.cpld) """.format(cpld=dds_model.cpld)
cfg_sw = """self.{}.cfg_sw(True)
cfg[0] = self.{}.cfg_reg
""".format(dds_channel, dds_model.cpld)
else: else:
cpld_dev = "" cpld_dev = ""
cpld_init = "" cpld_init = ""
cfg_sw = ""
# AD9912/9910: init channel (if uninitialized)
if dds_model.dds_type == "AD9912":
# 0xFF before init, 0x99 after
channel_init = """
delay(10*ms)
if self.{dds_channel}.read({cfgreg}, length=1) == 0xFF:
delay(10*ms)
self.{dds_channel}.init()
""".format(dds_channel=dds_channel, cfgreg=AD9912_SER_CONF)
elif dds_model.dds_type == "AD9910":
# TODO: verify AD9910 behavior (when we have hardware)
channel_init = "self.{dds_channel}.init()".format(dds_channel)
else:
channel_init = "self.{dds_channel}.init()".format(dds_channel)
dds_exp = textwrap.dedent(""" dds_exp = textwrap.dedent("""
from artiq.experiment import * from artiq.experiment import *
from artiq.coredevice.urukul import *
class SetDDS(EnvExperiment): class {title}(EnvExperiment):
def build(self): def build(self):
self.setattr_device("core") self.setattr_device("core")
self.setattr_device("{dds_channel}") self.setattr_device("{dds_channel}")
@ -585,53 +608,57 @@ class _DeviceManager:
def run(self): def run(self):
self.core.break_realtime() self.core.break_realtime()
{cpld_init} {cpld_init}
delay(5*ms) {channel_init}
self.{dds_channel}.init() delay(15*ms)
self.{dds_channel}.set({freq}) {action}
{cfg_sw} """.format(title=title, action=action,
""".format(dds_channel=dds_channel, freq=freq, dds_channel=dds_channel,
cpld_dev=cpld_dev, cpld_init=cpld_init, cpld_dev=cpld_dev, cpld_init=cpld_init,
cfg_sw=cfg_sw)) channel_init=channel_init))
asyncio.ensure_future( asyncio.ensure_future(
self._submit_by_content( self._submit_by_content(
dds_exp, dds_exp,
"SetDDS", title,
"Set DDS {} {}MHz".format(dds_channel, freq/1e6))) log_msg))
def dds_set_frequency(self, dds_channel, dds_model, freq):
if dds_model.is_urukul:
ch_no = "ch_no = self.{ch}.chip_select - 4"
else:
ch_no = "ch_no = self.{ch}.channel"
action = """
{ch_no}
self.{ch}.set({freq})
self.{cpld}.cfg_switches(rf_sw | 1 << ch_no)
""".format(freq=freq, cpld=dds_model.cpld,
ch=dds_channel, ch_no=ch_no.format(ch=dds_channel))
self._dds_faux_injection(
dds_channel,
dds_model,
action,
"SetDDS",
"Set DDS {} {}MHz".format(dds_channel, freq/1e6))
def dds_channel_toggle(self, dds_channel, dds_model, sw=True): def dds_channel_toggle(self, dds_channel, dds_model, sw=True):
# urukul only # urukul only
toggle_exp = textwrap.dedent(""" if sw:
from artiq.experiment import * switch = "| 1 << ch_no"
else:
class ToggleDDS(EnvExperiment): switch = "& ~(1 << ch_no)"
def build(self): action = """
self.setattr_device("core") ch_no = self.{dds_channel}.chip_select - 4
self.setattr_device("{ch}") self.{cpld}.cfg_switches(rf_sw {switch})
self.setattr_device("core_cache") """.format(
self.setattr_device("{cpld}") dds_channel=dds_channel,
cpld=dds_model.cpld,
@kernel switch=switch
def run(self):
self.core.break_realtime()
cfg = self.core_cache.get("_{cpld}_cfg")
if len(cfg) > 0:
self.{cpld}.cfg_reg = cfg[0]
else:
delay(15*ms)
self.{cpld}.init()
self.core_cache.put("_{cpld}_cfg", [self.{cpld}.cfg_reg])
cfg = self.core_cache.get("_{cpld}_cfg")
delay(5*ms)
self.{ch}.init()
self.{ch}.cfg_sw({sw})
cfg[0] = self.{cpld}.cfg_reg
""".format(ch=dds_channel, cpld=dds_model.cpld, sw=sw))
asyncio.ensure_future(
self._submit_by_content(
toggle_exp,
"ToggleDDS",
"Toggle DDS {} {}".format(dds_channel, "on" if sw else "off"))
) )
self._dds_faux_injection(
dds_channel,
dds_model,
action,
"ToggleDDS",
"Toggle DDS {} {}".format(dds_channel, "on" if sw else "off"))
def setup_ttl_monitoring(self, enable, channel): def setup_ttl_monitoring(self, enable, channel):
if self.mi_connection is not None: if self.mi_connection is not None: