forked from M-Labs/artiq
1
0
Fork 0

moninj: merge _DDSModel and _DDSHandler

moninj_router_design
Simon Renblad 2024-04-16 13:04:52 +08:00
parent 585d8a4d5b
commit 3b8f6edd52
1 changed files with 50 additions and 55 deletions

View File

@ -152,37 +152,6 @@ class _TTLHandler:
return self.channel
class _DDSModel:
def __init__(self, dds_type, ref_clk, cpld=None, pll=1, clk_div=0):
self.cpld = cpld
self.cur_frequency = 0
self.cur_reg = 0
self.dds_type = dds_type
self.is_urukul = dds_type in ["AD9910", "AD9912"]
if dds_type == "AD9914":
self.ftw_per_hz = 2**32 / ref_clk
else:
if dds_type == "AD9910":
max_freq = 1 << 32
clk_mult = [4, 1, 2, 4]
elif dds_type == "AD9912": # AD9912
max_freq = 1 << 48
clk_mult = [1, 1, 2, 4]
else:
raise NotImplementedError
sysclk = ref_clk / clk_mult[clk_div] * pll
self.ftw_per_hz = 1 / sysclk * max_freq
def monitor_update(self, probe, value):
if self.dds_type == "AD9912":
value = value << 16
self.cur_frequency = self._ftw_to_freq(value)
def _ftw_to_freq(self, ftw):
return ftw / self.ftw_per_hz
class _DDSWidget(QtWidgets.QFrame):
apply_changes = QtCore.pyqtSignal()
off_clicked = QtCore.pyqtSignal()
@ -310,27 +279,53 @@ class _DDSWidget(QtWidgets.QFrame):
class _DDSHandler:
def __init__(self, dm, title, bus_channel=0, channel=0, dds_model=None):
def __init__(self, dm, title, bus_channel, channel, dds_type,
ref_clk, cpld=None, pll=1, clk_div=0):
self.dm = dm
self.bus_channel = bus_channel
self.channel = channel
self.cur_frequency = 0
self.dds_name = title
self.dds_model = dds_model
self.widget = _DDSWidget(title, dds_model.is_urukul)
self.cpld = cpld
self.cur_frequency = 0
self.cur_reg = 0
self.dds_type = dds_type
self.is_urukul = dds_type in ["AD9910", "AD9912"]
if dds_type == "AD9914":
self.ftw_per_hz = 2**32 / ref_clk
else:
if dds_type == "AD9910":
max_freq = 1 << 32
clk_mult = [4, 1, 2, 4]
elif dds_type == "AD9912": # AD9912
max_freq = 1 << 48
clk_mult = [1, 1, 2, 4]
else:
raise NotImplementedError
sysclk = ref_clk / clk_mult[clk_div] * pll
self.ftw_per_hz = 1 / sysclk * max_freq
self.widget = _DDSWidget(title, self.is_urukul)
self.widget.apply_changes.connect(self.apply_changes)
self.widget.off_clicked.connect(self.off_clicked)
self.widget.set_clicked.connect(self.set_clicked)
self.refresh_display()
def monitor_update(self, probe, value):
if self.dds_type == "AD9912":
value = value << 16
self.cur_frequency = self._ftw_to_freq(value)
def _ftw_to_freq(self, ftw):
return ftw / self.ftw_per_hz
def refresh_display(self):
self.cur_frequency = self.dds_model.cur_frequency
self.widget.set_value(self.cur_frequency / 1e6)
def apply_changes(self):
self.widget.set_page(0)
frequency = self.widget.get_value() * 1e6
self.dm.dds_set_frequency(self.dds_name, self.dds_model, frequency)
self.dm.dds_set_frequency(self.dds_name, frequency)
def set_clicked(self):
self.widget.set_page(1)
@ -338,7 +333,7 @@ class _DDSHandler:
self.widget.start_edit()
def off_clicked(self):
self.dm.dds_channel_toggle(self.dds_name, self.dds_model, sw=False)
self.dm.dds_channel_toggle(self.dds_name, sw=False)
def sort_key(self):
return (self.bus_channel, self.channel)
@ -413,9 +408,8 @@ def setup_from_ddb(ddb):
bus_channel = v["arguments"]["bus_channel"]
channel = v["arguments"]["channel"]
dds_sysclk = v["arguments"]["sysclk"]
model = _DDSModel(v["class"], dds_sysclk)
handler = _HandlerDesc(k, comment, _DDSHandler,
(k, bus_channel, channel, model))
(k, bus_channel, channel, v["class"], dds_sysclk))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad9910" and v["class"] == "AD9910") or \
(v["module"] == "artiq.coredevice.ad9912" and v["class"] == "AD9912"):
@ -428,9 +422,9 @@ def setup_from_ddb(ddb):
pll = v["arguments"]["pll_n"]
refclk = ddb[dds_cpld]["arguments"]["refclk"]
clk_div = v["arguments"].get("clk_div", 0)
model = _DDSModel(v["class"], refclk, dds_cpld, pll, clk_div)
handler = _HandlerDesc(k, comment, _DDSHandler,
(k, bus_channel, channel, model))
(k, bus_channel, channel, v["class"], refclk,
dds_cpld, pll, clk_div))
description.add(handler)
elif (v["module"] == "artiq.coredevice.ad53xx" and v["class"] == "AD53xx") or \
(v["module"] == "artiq.coredevice.zotino" and v["class"] == "Zotino"):
@ -572,14 +566,15 @@ class _DeviceManager:
scheduling["flush"])
logger.info("Submitted '%s', RID is %d", title, rid)
def _dds_faux_injection(self, dds_channel, dds_model, action, title, log_msg):
def _dds_faux_injection(self, dds_channel, action, title, log_msg):
handler = self.handlers_by_uid[dds_channel]
# create kernel and fill it in and send-by-content
# initialize CPLD (if applicable)
if dds_model.is_urukul:
if handler.is_urukul:
# urukuls need CPLD init and switch to on
cpld_dev = """self.setattr_device("core_cache")
self.setattr_device("{}")""".format(dds_model.cpld)
self.setattr_device("{}")""".format(handler.cpld)
# `sta`/`rf_sw`` variables are guaranteed for urukuls
# so {action} can use it
@ -593,20 +588,20 @@ class _DeviceManager:
delay(15*ms)
self.{cpld}.init()
self.core_cache.put("_{cpld}_init", [1])
""".format(cpld=dds_model.cpld)
""".format(cpld=handler.cpld)
else:
cpld_dev = ""
cpld_init = ""
# AD9912/9910: init channel (if uninitialized)
if dds_model.dds_type == "AD9912":
if handler.dds_type == "AD9912":
# 0xFF before init, 0x99 after
channel_init = """
if self.{dds_channel}.read({cfgreg}, length=1) == 0xFF:
delay(10*ms)
self.{dds_channel}.init()
""".format(dds_channel=dds_channel, cfgreg=AD9912_SER_CONF)
elif dds_model.dds_type == "AD9910":
elif handler.dds_type == "AD9910":
# -1 before init, 2 after
channel_init = """
if self.{dds_channel}.read32({cfgreg}) == -1:
@ -644,22 +639,23 @@ class _DeviceManager:
title,
log_msg))
def dds_set_frequency(self, dds_channel, dds_model, freq):
def dds_set_frequency(self, dds_channel, freq):
handler = self.handlers_by_uid[dds_channel]
action = "self.{ch}.set({freq})".format(
freq=freq, ch=dds_channel)
if dds_model.is_urukul:
if handler.is_urukul:
action += """
ch_no = self.{ch}.chip_select - 4
self.{cpld}.cfg_switches(rf_sw | 1 << ch_no)
""".format(ch=dds_channel, cpld=dds_model.cpld)
""".format(ch=dds_channel, cpld=handler.cpld)
self._dds_faux_injection(
dds_channel,
dds_model,
action,
"SetDDS",
"Set DDS {} {}MHz".format(dds_channel, freq / 1e6))
def dds_channel_toggle(self, dds_channel, dds_model, sw=True):
def dds_channel_toggle(self, dds_channel, sw=True):
handler = self.handlers_by_uid[dds_channel]
# urukul only
if sw:
switch = "| 1 << ch_no"
@ -670,12 +666,11 @@ class _DeviceManager:
self.{cpld}.cfg_switches(rf_sw {switch})
""".format(
dds_channel=dds_channel,
cpld=dds_model.cpld,
cpld=handler.cpld,
switch=switch
)
self._dds_faux_injection(
dds_channel,
dds_model,
action,
"ToggleDDS",
"Toggle DDS {} {}".format(dds_channel, "on" if sw else "off"))
@ -707,7 +702,7 @@ class _DeviceManager:
handler.refresh_display()
elif (channel, probe) in self.dds_handlers:
handler = self.dds_handlers[(channel, probe)]
handler.dds_model.monitor_update(probe, value)
handler.monitor_update(probe, value)
handler.refresh_display()
elif (channel, probe) in self.dac_handlers:
handler = self.dac_handlers[(channel, probe)]