fastino: port to NAC3

This commit is contained in:
Sebastien Bourdeauducq 2022-02-28 13:34:55 +08:00
parent 64a0c4b29a
commit a407007e0b
3 changed files with 51 additions and 30 deletions

View File

@ -3,13 +3,23 @@ streaming DAC.
"""
from numpy import int32, int64
from artiq.language.core import kernel, portable, delay, delay_mu
from artiq.language.core import nac3, kernel, portable, KernelInvariant
from artiq.coredevice.rtio import (rtio_output, rtio_output_wide,
rtio_input_data)
from artiq.language.units import ns
from artiq.language.types import TInt32, TList
from artiq.coredevice.core import Core
# NAC3TODO work around https://git.m-labs.hk/M-Labs/nac3/issues/189
@nac3
class ValueError(Exception):
pass
@nac3
class NotImplementedError(Exception):
pass
@nac3
class Fastino:
"""Fastino 32-channel, 16-bit, 2.5 MS/s per channel streaming DAC
@ -41,7 +51,11 @@ class Fastino:
:param log2_width: Width of DAC channel group (logarithm base 2).
Value must match the corresponding value in the RTIO PHY (gateware).
"""
kernel_invariants = {"core", "channel", "width", "t_frame"}
core: KernelInvariant[Core]
channel: KernelInvariant[int32]
width: KernelInvariant[int32]
t_frame: KernelInvariant[int64]
def __init__(self, dmgr, channel, core_device="core", log2_width=0):
self.channel = channel << 8
@ -69,15 +83,15 @@ class Fastino:
Note: On Fastino gateware before v0.2 this may lead to 0 voltage being emitted
transiently.
"""
self.set_cfg(reset=0, afe_power_down=0, dac_clr=0, clr_err=1)
self.set_cfg(reset=False, afe_power_down=False, dac_clr=False, clr_err=True)
delay_mu(self.t_frame)
self.set_cfg(reset=0, afe_power_down=0, dac_clr=0, clr_err=0)
self.set_cfg(reset=False, afe_power_down=False, dac_clr=False, clr_err=False)
delay_mu(self.t_frame)
self.set_continuous(0)
delay_mu(self.t_frame)
self.stage_cic(1)
delay_mu(self.t_frame)
self.apply_cic(0xffffffff)
self.apply_cic(int32(int64(0xffffffff)))
delay_mu(self.t_frame)
self.set_leds(0)
delay_mu(self.t_frame)
@ -85,7 +99,7 @@ class Fastino:
delay_mu(self.t_frame)
@kernel
def write(self, addr, data):
def write(self, addr: int32, data: int32):
"""Write data to a Fastino register.
:param addr: Address to write to.
@ -94,7 +108,7 @@ class Fastino:
rtio_output(self.channel | addr, data)
@kernel
def read(self, addr):
def read(self, addr: int32):
"""Read from Fastino register.
TODO: untested
@ -102,12 +116,12 @@ class Fastino:
:param addr: Address to read from.
:return: The data read.
"""
raise NotImplementedError
raise NotImplementedError()
# rtio_output(self.channel | addr | 0x80)
# return rtio_input_data(self.channel >> 8)
@kernel
def set_dac_mu(self, dac, data):
def set_dac_mu(self, dac: int32, data: int32):
"""Write DAC data in machine units.
:param dac: DAC channel to write to (0-31).
@ -117,7 +131,7 @@ class Fastino:
self.write(dac, data)
@kernel
def set_group_mu(self, dac: TInt32, data: TList(TInt32)):
def set_group_mu(self, dac: int32, data: list[int32]):
"""Write a group of DAC channels in machine units.
:param dac: First channel in DAC channel group (0-31). The `log2_width`
@ -127,24 +141,24 @@ class Fastino:
If the list length is less than group size, the remaining
DAC channels within the group are cleared to 0 (machine units).
"""
if dac & (self.width - 1):
if dac & (self.width - 1) != 0:
raise ValueError("Group index LSBs must be zero")
rtio_output_wide(self.channel | dac, data)
@portable
def voltage_to_mu(self, voltage):
def voltage_to_mu(self, voltage: float) -> int32:
"""Convert SI Volts to DAC machine units.
:param voltage: Voltage in SI Volts.
:return: DAC data word in machine units, 16 bit integer.
"""
data = int32(round((0x8000/10.)*voltage)) + int32(0x8000)
data = int32(round((float(0x8000)/10.)*voltage)) + int32(0x8000)
if data < 0 or data > 0xffff:
raise ValueError("DAC voltage out of bounds")
return data
@portable
def voltage_group_to_mu(self, voltage, data):
def voltage_group_to_mu(self, voltage: list[float], data: list[int32]):
"""Convert SI Volts to packed DAC channel group machine units.
:param voltage: List of SI Volt voltages.
@ -153,12 +167,12 @@ class Fastino:
"""
for i in range(len(voltage)):
v = self.voltage_to_mu(voltage[i])
if i & 1:
if i & 1 != 0:
v = data[i // 2] | (v << 16)
data[i // 2] = int32(v)
@kernel
def set_dac(self, dac, voltage):
def set_dac(self, dac: int32, voltage: float):
"""Set DAC data to given voltage.
:param dac: DAC channel (0-31).
@ -167,18 +181,18 @@ class Fastino:
self.write(dac, self.voltage_to_mu(voltage))
@kernel
def set_group(self, dac, voltage):
def set_group(self, dac: int32, voltage: list[float]):
"""Set DAC group data to given voltage.
:param dac: DAC channel (0-31).
:param voltage: Desired output voltage.
"""
data = [int32(0)] * (len(voltage) // 2)
data = [int32(0) for _ in range(len(voltage) // 2)]
self.voltage_group_to_mu(voltage, data)
self.set_group_mu(dac, data)
@kernel
def update(self, update):
def update(self, update: int32):
"""Schedule channels for update.
:param update: Bit mask of channels to update (32 bit).
@ -186,7 +200,7 @@ class Fastino:
self.write(0x20, update)
@kernel
def set_hold(self, hold):
def set_hold(self, hold: int32):
"""Set channels to manual update.
:param hold: Bit mask of channels to hold (32 bit).
@ -194,7 +208,7 @@ class Fastino:
self.write(0x21, hold)
@kernel
def set_cfg(self, reset=0, afe_power_down=0, dac_clr=0, clr_err=0):
def set_cfg(self, reset: bool = False, afe_power_down: bool = False, dac_clr: bool = False, clr_err: bool = False):
"""Set configuration bits.
:param reset: Reset SPI PLL and SPI clock domain.
@ -205,11 +219,11 @@ class Fastino:
This clears the sticky red error LED. Must be cleared to enable
error counting.
"""
self.write(0x22, (reset << 0) | (afe_power_down << 1) |
(dac_clr << 2) | (clr_err << 3))
self.write(0x22, (int32(reset) << 0) | (int32(afe_power_down) << 1) |
(int32(dac_clr) << 2) | (int32(clr_err) << 3))
@kernel
def set_leds(self, leds):
def set_leds(self, leds: int32):
"""Set the green user-defined LEDs
:param leds: LED status, 8 bit integer each bit corresponding to one
@ -218,14 +232,14 @@ class Fastino:
self.write(0x23, leds)
@kernel
def set_continuous(self, channel_mask):
def set_continuous(self, channel_mask: int32):
"""Enable continuous DAC updates on channels regardless of new data
being submitted.
"""
self.write(0x25, channel_mask)
@kernel
def stage_cic_mu(self, rate_mantissa, rate_exponent, gain_exponent):
def stage_cic_mu(self, rate_mantissa: int32, rate_exponent: int32, gain_exponent: int32):
"""Stage machine unit CIC interpolator configuration.
"""
if rate_mantissa < 0 or rate_mantissa >= 1 << 6:
@ -238,7 +252,7 @@ class Fastino:
self.write(0x26, config)
@kernel
def stage_cic(self, rate) -> TInt32:
def stage_cic(self, rate: int32) -> int32:
"""Compute and stage interpolator configuration.
This method approximates the desired interpolation rate using a 10 bit
@ -269,12 +283,12 @@ class Fastino:
while gain > 1 << gain_exponent:
gain_exponent += 1
gain_exponent += order*rate_exponent
assert gain_exponent <= order*16
# NAC3TODO assert gain_exponent <= order*16
self.stage_cic_mu(rate_mantissa - 1, rate_exponent, gain_exponent)
return rate_mantissa << rate_exponent
@kernel
def apply_cic(self, channel_mask):
def apply_cic(self, channel_mask: int32):
"""Apply the staged interpolator configuration on the specified channels.
Each Fastino channel starting with gateware v0.2 includes a fourth order

View File

@ -36,6 +36,10 @@
{
"type": "grabber",
"ports": [6]
},
{
"type": "fastino",
"ports": [7]
}
]
}

View File

@ -9,6 +9,7 @@ from artiq.coredevice.ad9912 import AD9912
from artiq.coredevice.sampler import Sampler
from artiq.coredevice.edge_counter import EdgeCounter
from artiq.coredevice.grabber import Grabber
from artiq.coredevice.fastino import Fastino
@nac3
@ -23,6 +24,7 @@ class NAC3Devices(EnvExperiment):
sampler0: KernelInvariant[Sampler]
ttl0_counter: KernelInvariant[EdgeCounter]
grabber0: KernelInvariant[Grabber]
fastino0: KernelInvariant[Fastino]
def build(self):
self.setattr_device("core")
@ -35,6 +37,7 @@ class NAC3Devices(EnvExperiment):
self.setattr_device("sampler0")
self.setattr_device("ttl0_counter")
self.setattr_device("grabber0")
self.setattr_device("fastino0")
@kernel
def run(self):