diff --git a/artiq/coredevice/ad9910.py b/artiq/coredevice/ad9910.py index c1e828cf7..f2beedba0 100644 --- a/artiq/coredevice/ad9910.py +++ b/artiq/coredevice/ad9910.py @@ -3,15 +3,15 @@ from numpy import int32, int64 from artiq.language.core import ( kernel, delay, portable, delay_mu, now_mu, at_mu) from artiq.language.units import us, ms -from artiq.language.types import * +from artiq.language.types import TBool, TInt32, TInt64, TFloat, TList, TTuple from artiq.coredevice import spi2 as spi from artiq.coredevice import urukul + # Work around ARTIQ-Python import machinery urukul_sta_pll_lock = urukul.urukul_sta_pll_lock urukul_sta_smp_err = urukul.urukul_sta_smp_err - __all__ = [ "AD9910", "PHASE_MODE_CONTINUOUS", "PHASE_MODE_ABSOLUTE", "PHASE_MODE_TRACKING", @@ -20,7 +20,6 @@ __all__ = [ "RAM_MODE_CONT_BIDIR_RAMP", "RAM_MODE_CONT_RAMPUP", ] - _PHASE_MODE_DEFAULT = -1 PHASE_MODE_CONTINUOUS = 0 PHASE_MODE_ABSOLUTE = 1 @@ -124,15 +123,15 @@ class AD9910: To stabilize the SYNC_IN delay tuning, run :meth:`tune_sync_delay` once and set this to the delay tap number returned (default: -1 to signal no synchronization and no tuning during :meth:`init`). - Can be a string of the form "eeprom_device:byte_offset" to read the value - from a I2C EEPROM; in which case, `io_update_delay` must be set to the - same string value. + Can be a string of the form "eeprom_device:byte_offset" to read the + value from a I2C EEPROM; in which case, `io_update_delay` must be set + to the same string value. :param io_update_delay: IO_UPDATE pulse alignment delay. To align IO_UPDATE to SYNC_CLK, run :meth:`tune_io_update_delay` and set this to the delay tap number returned. - Can be a string of the form "eeprom_device:byte_offset" to read the value - from a I2C EEPROM; in which case, `sync_delay_seed` must be set to the - same string value. + Can be a string of the form "eeprom_device:byte_offset" to read the + value from a I2C EEPROM; in which case, `sync_delay_seed` must be set + to the same string value. """ kernel_invariants = {"chip_select", "cpld", "core", "bus", "ftw_per_hz", "sysclk_per_mu"} @@ -148,38 +147,41 @@ class AD9910: if sw_device: self.sw = dmgr.get(sw_device) self.kernel_invariants.add("sw") - clk = self.cpld.refclk/[4, 1, 2, 4][self.cpld.clk_div] + clk = self.cpld.refclk / [4, 1, 2, 4][self.cpld.clk_div] self.pll_en = pll_en self.pll_n = pll_n self.pll_vco = pll_vco self.pll_cp = pll_cp if pll_en: - sysclk = clk*pll_n + sysclk = clk * pll_n assert clk <= 60e6 assert 12 <= pll_n <= 127 assert 0 <= pll_vco <= 5 vco_min, vco_max = [(370, 510), (420, 590), (500, 700), (600, 880), (700, 950), (820, 1150)][pll_vco] - assert vco_min <= sysclk/1e6 <= vco_max + assert vco_min <= sysclk / 1e6 <= vco_max assert 0 <= pll_cp <= 7 else: sysclk = clk assert sysclk <= 1e9 - self.ftw_per_hz = (1 << 32)/sysclk - self.sysclk_per_mu = int(round(sysclk*self.core.ref_period)) + self.ftw_per_hz = (1 << 32) / sysclk + self.sysclk_per_mu = int(round(sysclk * self.core.ref_period)) self.sysclk = sysclk - if isinstance(sync_delay_seed, str) or isinstance(io_update_delay, str): + if isinstance(sync_delay_seed, str) or isinstance(io_update_delay, + str): if sync_delay_seed != io_update_delay: - raise ValueError("When using EEPROM, sync_delay_seed must be equal to io_update_delay") + raise ValueError("When using EEPROM, sync_delay_seed must be " + "equal to io_update_delay") self.sync_data = SyncDataEeprom(dmgr, self.core, sync_delay_seed) else: - self.sync_data = SyncDataUser(self.core, sync_delay_seed, io_update_delay) + self.sync_data = SyncDataUser(self.core, sync_delay_seed, + io_update_delay) self.phase_mode = PHASE_MODE_CONTINUOUS @kernel - def set_phase_mode(self, phase_mode): + def set_phase_mode(self, phase_mode: TInt32): r"""Set the default phase mode. for future calls to :meth:`set` and @@ -224,7 +226,7 @@ class AD9910: self.phase_mode = phase_mode @kernel - def write16(self, addr, data): + def write16(self, addr: TInt32, data: TInt32): """Write to 16 bit register. :param addr: Register address @@ -235,7 +237,7 @@ class AD9910: self.bus.write((addr << 24) | (data << 8)) @kernel - def write32(self, addr, data): + def write32(self, addr: TInt32, data: TInt32): """Write to 32 bit register. :param addr: Register address @@ -249,7 +251,7 @@ class AD9910: self.bus.write(data) @kernel - def read16(self, addr): + def read16(self, addr: TInt32) -> TInt32: """Read from 16 bit register. :param addr: Register address @@ -264,7 +266,7 @@ class AD9910: return self.bus.read() @kernel - def read32(self, addr): + def read32(self, addr: TInt32) -> TInt32: """Read from 32 bit register. :param addr: Register address @@ -279,7 +281,7 @@ class AD9910: return self.bus.read() @kernel - def read64(self, addr): + def read64(self, addr: TInt32) -> TInt64: """Read from 64 bit register. :param addr: Register address @@ -302,7 +304,7 @@ class AD9910: return (int64(hi) << 32) | lo @kernel - def write64(self, addr, data_high, data_low): + def write64(self, addr: TInt32, data_high: TInt32, data_low: TInt32): """Write to 64 bit register. :param addr: Register address @@ -320,14 +322,14 @@ class AD9910: self.bus.write(data_low) @kernel - def write_ram(self, data): + def write_ram(self, data: TList(int32)): """Write data to RAM. The profile to write to and the step, start, and end address need to be configured before and separately using :meth:`set_profile_ram` and the parent CPLD `set_profile`. - :param data List(int32): Data to be written to RAM. + :param data: Data to be written to RAM. """ self.bus.set_config_mu(urukul.SPI_CONFIG, 8, urukul.SPIT_DDS_WR, self.chip_select) @@ -341,14 +343,14 @@ class AD9910: self.bus.write(data[len(data) - 1]) @kernel - def read_ram(self, data): + def read_ram(self, data: TList(int32)): """Read data from RAM. The profile to read from and the step, start, and end address need to be configured before and separately using :meth:`set_profile_ram` and the parent CPLD `set_profile`. - :param data List(int32): List to be filled with data read from RAM. + :param data: List to be filled with data read from RAM. """ self.bus.set_config_mu(urukul.SPI_CONFIG, 8, urukul.SPIT_DDS_WR, self.chip_select) @@ -370,10 +372,12 @@ class AD9910: data[(n - preload) + i] = self.bus.read() @kernel - def set_cfr1(self, power_down=0b0000, phase_autoclear=0, - drg_load_lrr=0, drg_autoclear=0, - internal_profile=0, ram_destination=0, ram_enable=0, - manual_osk_external=0, osk_enable=0, select_auto_osk=0): + def set_cfr1(self, power_down: TInt32 = 0b0000, + phase_autoclear: TInt32 = 0, + drg_load_lrr: TInt32 = 0, drg_autoclear: TInt32 = 0, + internal_profile: TInt32 = 0, ram_destination: TInt32 = 0, + ram_enable: TInt32 = 0, manual_osk_external: TInt32 = 0, + osk_enable: TInt32 = 0, select_auto_osk: TInt32 = 0): """Set CFR1. See the AD9910 datasheet for parameter meanings. This method does not pulse IO_UPDATE. @@ -405,7 +409,7 @@ class AD9910: 2) # SDIO input only, MSB first @kernel - def init(self, blind=False): + def init(self, blind: TBool = False): """Initialize and configure the DDS. Sets up SPI mode, confirms chip presence, powers down unused blocks, @@ -418,70 +422,69 @@ class AD9910: if self.sync_data.sync_delay_seed >= 0 and not self.cpld.sync_div: raise ValueError("parent cpld does not drive SYNC") if self.sync_data.sync_delay_seed >= 0: - if self.sysclk_per_mu != self.sysclk*self.core.ref_period: + if self.sysclk_per_mu != self.sysclk * self.core.ref_period: raise ValueError("incorrect clock ratio for synchronization") - delay(50*ms) # slack + delay(50 * ms) # slack # Set SPI mode self.set_cfr1() - self.cpld.io_update.pulse(1*us) - delay(1*ms) + self.cpld.io_update.pulse(1 * us) + delay(1 * ms) if not blind: # Use the AUX DAC setting to identify and confirm presence aux_dac = self.read32(_AD9910_REG_AUX_DAC) if aux_dac & 0xff != 0x7f: raise ValueError("Urukul AD9910 AUX_DAC mismatch") - delay(50*us) # slack + delay(50 * us) # slack # Configure PLL settings and bring up PLL # enable amplitude scale from profiles # read effective FTW # sync timing validation disable (enabled later) self.write32(_AD9910_REG_CFR2, 0x01010020) - self.cpld.io_update.pulse(1*us) + self.cpld.io_update.pulse(1 * us) cfr3 = (0x0807c000 | (self.pll_vco << 24) | (self.pll_cp << 19) | (self.pll_en << 8) | (self.pll_n << 1)) self.write32(_AD9910_REG_CFR3, cfr3 | 0x400) # PFD reset - self.cpld.io_update.pulse(1*us) + self.cpld.io_update.pulse(1 * us) if self.pll_en: self.write32(_AD9910_REG_CFR3, cfr3) - self.cpld.io_update.pulse(1*us) + self.cpld.io_update.pulse(1 * us) if blind: - delay(100*ms) + delay(100 * ms) else: # Wait for PLL lock, up to 100 ms for i in range(100): sta = self.cpld.sta_read() lock = urukul_sta_pll_lock(sta) - delay(1*ms) + delay(1 * ms) if lock & (1 << self.chip_select - 4): break if i >= 100 - 1: raise ValueError("PLL lock timeout") - delay(10*us) # slack + delay(10 * us) # slack if self.sync_data.sync_delay_seed >= 0: self.tune_sync_delay(self.sync_data.sync_delay_seed) - delay(1*ms) + delay(1 * ms) @kernel - def power_down(self, bits=0b1111): + def power_down(self, bits: TInt32 = 0b1111): """Power down DDS. :param bits: Power down bits, see datasheet """ self.set_cfr1(power_down=bits) - self.cpld.io_update.pulse(1*us) + self.cpld.io_update.pulse(1 * us) - # KLUDGE: ref_time_mu default argument is explicitly marked int64() to - # avoid silent truncation of explicitly passed timestamps. (Compiler bug?) @kernel - def set_mu(self, ftw, pow_=0, asf=0x3fff, phase_mode=_PHASE_MODE_DEFAULT, - ref_time_mu=int64(-1), profile=0): + def set_mu(self, ftw: TInt32, pow_: TInt32 = 0, asf: TInt32 = 0x3fff, + phase_mode: TInt32 = _PHASE_MODE_DEFAULT, + ref_time_mu: TInt64 = int64(-1), profile: TInt32 = 0): """Set profile 0 data in machine units. This uses machine units (FTW, POW, ASF). The frequency tuning word width is 32, the phase offset word width is 16, and the amplitude - scale factor width is 12. + scale factor width is 14. After the SPI transfer, the shared IO update pin is pulsed to activate the data. @@ -518,7 +521,7 @@ class AD9910: # Also no need to use IO_UPDATE time as this # is equivalent to an output pipeline latency. dt = int32(now_mu()) - int32(ref_time_mu) - pow_ += dt*ftw*self.sysclk_per_mu >> 16 + pow_ += dt * ftw * self.sysclk_per_mu >> 16 self.write64(_AD9910_REG_PROFILE0 + profile, (asf << 16) | (pow_ & 0xffff), ftw) delay_mu(int64(self.sync_data.io_update_delay)) @@ -530,8 +533,9 @@ class AD9910: return pow_ @kernel - def set_profile_ram(self, start, end, step=1, profile=0, nodwell_high=0, - zero_crossing=0, mode=1): + def set_profile_ram(self, start: TInt32, end: TInt32, step: TInt32 = 1, + profile: TInt32 = 0, nodwell_high: TInt32 = 0, + zero_crossing: TInt32 = 0, mode: TInt32 = 1): """Set the RAM profile settings. :param start: Profile start address in RAM. @@ -555,57 +559,60 @@ class AD9910: self.write64(_AD9910_REG_PROFILE0 + profile, hi, lo) @kernel - def set_ftw(self, ftw): - """Set the value stored to the AD9910's frequency tuning word (FTW) register. + def set_ftw(self, ftw: TInt32): + """Set the value stored to the AD9910's frequency tuning word (FTW) + register. :param ftw: Frequency tuning word to be stored, range: 0 to 0xffffffff. """ self.write32(_AD9910_REG_FTW, ftw) @kernel - def set_asf(self, asf): - """Set the value stored to the AD9910's amplitude scale factor (ASF) register. + def set_asf(self, asf: TInt32): + """Set the value stored to the AD9910's amplitude scale factor (ASF) + register. :param asf: Amplitude scale factor to be stored, range: 0 to 0x3fff. """ self.write32(_AD9910_REG_ASF, asf << 2) @kernel - def set_pow(self, pow_): - """Set the value stored to the AD9910's phase offset word (POW) register. + def set_pow(self, pow_: TInt32): + """Set the value stored to the AD9910's phase offset word (POW) + register. :param pow_: Phase offset word to be stored, range: 0 to 0xffff. """ self.write16(_AD9910_REG_POW, pow_) @portable(flags={"fast-math"}) - def frequency_to_ftw(self, frequency) -> TInt32: + def frequency_to_ftw(self, frequency: TFloat) -> TInt32: """Return the 32-bit frequency tuning word corresponding to the given frequency. """ - return int32(round(self.ftw_per_hz*frequency)) + return int32(round(self.ftw_per_hz * frequency)) @portable(flags={"fast-math"}) - def ftw_to_frequency(self, ftw): + def ftw_to_frequency(self, ftw: TInt32) -> TFloat: """Return the frequency corresponding to the given frequency tuning word. """ return ftw / self.ftw_per_hz @portable(flags={"fast-math"}) - def turns_to_pow(self, turns) -> TInt32: + def turns_to_pow(self, turns: TFloat) -> TInt32: """Return the 16-bit phase offset word corresponding to the given phase in turns.""" - return int32(round(turns*0x10000)) & int32(0xffff) + return int32(round(turns * 0x10000)) & int32(0xffff) @portable(flags={"fast-math"}) - def pow_to_turns(self, pow_): + def pow_to_turns(self, pow_: TInt32) -> TFloat: """Return the phase in turns corresponding to a given phase offset word.""" - return pow_/0x10000 + return pow_ / 0x10000 @portable(flags={"fast-math"}) - def amplitude_to_asf(self, amplitude) -> TInt32: + def amplitude_to_asf(self, amplitude: TFloat) -> TInt32: """Return 14-bit amplitude scale factor corresponding to given fractional amplitude.""" code = int32(round(amplitude * 0x3fff)) @@ -614,13 +621,13 @@ class AD9910: return code @portable(flags={"fast-math"}) - def asf_to_amplitude(self, asf): + def asf_to_amplitude(self, asf: TInt32) -> TFloat: """Return amplitude as a fraction of full scale corresponding to given amplitude scale factor.""" return asf / float(0x3fff) @portable(flags={"fast-math"}) - def frequency_to_ram(self, frequency, ram): + def frequency_to_ram(self, frequency: TList(TFloat), ram: TList(TInt32)): """Convert frequency values to RAM profile data. To be used with :const:`RAM_DEST_FTW`. @@ -633,7 +640,7 @@ class AD9910: ram[i] = self.frequency_to_ftw(frequency[i]) @portable(flags={"fast-math"}) - def turns_to_ram(self, turns, ram): + def turns_to_ram(self, turns: TList(TFloat), ram: TList(TInt32)): """Convert phase values to RAM profile data. To be used with :const:`RAM_DEST_POW`. @@ -646,7 +653,7 @@ class AD9910: ram[i] = self.turns_to_pow(turns[i]) << 16 @portable(flags={"fast-math"}) - def amplitude_to_ram(self, amplitude, ram): + def amplitude_to_ram(self, amplitude: TList(TFloat), ram: TList(TInt32)): """Convert amplitude values to RAM profile data. To be used with :const:`RAM_DEST_ASF`. @@ -659,7 +666,8 @@ class AD9910: ram[i] = self.amplitude_to_asf(amplitude[i]) << 18 @portable(flags={"fast-math"}) - def turns_amplitude_to_ram(self, turns, amplitude, ram): + def turns_amplitude_to_ram(self, turns: TList(TFloat), + amplitude: TList(TFloat), ram: TList(TInt32)): """Convert phase and amplitude values to RAM profile data. To be used with :const:`RAM_DEST_POWASF`. @@ -674,32 +682,36 @@ class AD9910: self.amplitude_to_asf(amplitude[i]) << 2) @kernel - def set_frequency(self, frequency): - """Set the value stored to the AD9910's frequency tuning word (FTW) register. + def set_frequency(self, frequency: TFloat): + """Set the value stored to the AD9910's frequency tuning word (FTW) + register. :param frequency: frequency to be stored, in Hz. """ - return self.set_ftw(self.frequency_to_ftw(frequency)) + self.set_ftw(self.frequency_to_ftw(frequency)) @kernel - def set_amplitude(self, amplitude): - """Set the value stored to the AD9910's amplitude scale factor (ASF) register. + def set_amplitude(self, amplitude: TFloat): + """Set the value stored to the AD9910's amplitude scale factor (ASF) + register. :param amplitude: amplitude to be stored, in units of full scale. """ - return self.set_asf(self.amplitude_to_asf(amplitude)) + self.set_asf(self.amplitude_to_asf(amplitude)) @kernel - def set_phase(self, turns): - """Set the value stored to the AD9910's phase offset word (POW) register. + def set_phase(self, turns: TFloat): + """Set the value stored to the AD9910's phase offset word (POW) + register. :param turns: phase offset to be stored, in turns. """ - return self.set_pow(self.turns_to_pow(turns)) + self.set_pow(self.turns_to_pow(turns)) @kernel - def set(self, frequency, phase=0.0, amplitude=1.0, - phase_mode=_PHASE_MODE_DEFAULT, ref_time_mu=int64(-1), profile=0): + def set(self, frequency: TFloat, phase: TFloat = 0.0, + amplitude: TFloat = 1.0, phase_mode: TInt32 = _PHASE_MODE_DEFAULT, + ref_time_mu: TInt64 = int64(-1), profile: TInt32 = 0): """Set profile 0 data in SI units. .. seealso:: :meth:`set_mu` @@ -718,7 +730,7 @@ class AD9910: profile)) @kernel - def set_att_mu(self, att): + def set_att_mu(self, att: TInt32): """Set digital step attenuator in machine units. This method will write the attenuator settings of all four channels. @@ -730,7 +742,7 @@ class AD9910: self.cpld.set_att_mu(self.chip_select - 4, att) @kernel - def set_att(self, att): + def set_att(self, att: TFloat): """Set digital step attenuator in SI units. This method will write the attenuator settings of all four channels. @@ -742,7 +754,7 @@ class AD9910: self.cpld.set_att(self.chip_select - 4, att) @kernel - def cfg_sw(self, state): + def cfg_sw(self, state: TInt32): """Set CPLD CFG RF switch state. The RF switch is controlled by the logical or of the CPLD configuration shift register RF switch bit and the SW TTL line (if used). @@ -752,7 +764,7 @@ class AD9910: self.cpld.cfg_sw(self.chip_select - 4, state) @kernel - def set_sync(self, in_delay, window): + def set_sync(self, in_delay: TInt32, window: TInt32): """Set the relevant parameters in the multi device synchronization register. See the AD9910 datasheet for details. The SYNC clock generator preset value is set to zero, and the SYNC_OUT generator is @@ -782,12 +794,13 @@ class AD9910: Also modifies CFR2. """ self.write32(_AD9910_REG_CFR2, 0x01010020) # clear SMP_ERR - self.cpld.io_update.pulse(1*us) + self.cpld.io_update.pulse(1 * us) self.write32(_AD9910_REG_CFR2, 0x01010000) # enable SMP_ERR - self.cpld.io_update.pulse(1*us) + self.cpld.io_update.pulse(1 * us) @kernel - def tune_sync_delay(self, search_seed=15): + def tune_sync_delay(self, + search_seed: TInt32 = 15) -> TTuple([TInt32, TInt32]): """Find a stable SYNC_IN delay. This method first locates a valid SYNC_IN delay at zero validation @@ -812,7 +825,7 @@ class AD9910: margin = 1 # 1*75ps setup and hold for window in range(16): next_seed = -1 - for in_delay in range(search_span - 2*window): + for in_delay in range(search_span - 2 * window): # alternate search direction around search_seed if in_delay & 1: in_delay = -in_delay @@ -822,9 +835,9 @@ class AD9910: self.set_sync(in_delay, window) self.clear_smp_err() # integrate SMP_ERR statistics for a few hundred cycles - delay(100*us) + delay(100 * us) err = urukul_sta_smp_err(self.cpld.sta_read()) - delay(100*us) # slack + delay(100 * us) # slack if not (err >> (self.chip_select - 4)) & 1: next_seed = in_delay break @@ -836,14 +849,15 @@ class AD9910: window = max(min_window, window - 1 - margin) self.set_sync(search_seed, window) self.clear_smp_err() - delay(100*us) # slack + delay(100 * us) # slack return search_seed, window else: break raise ValueError("no valid window/delay") @kernel - def measure_io_update_alignment(self, delay_start, delay_stop): + def measure_io_update_alignment(self, delay_start: TInt64, + delay_stop: TInt64) -> TInt32: """Use the digital ramp generator to locate the alignment between IO_UPDATE and SYNC_CLK. @@ -878,14 +892,14 @@ class AD9910: at_mu(t + 0x1000 + delay_stop) self.cpld.io_update.pulse_mu(16 - delay_stop) # realign ftw = self.read32(_AD9910_REG_FTW) # read out effective FTW - delay(100*us) # slack + delay(100 * us) # slack # disable DRG self.write32(_AD9910_REG_CFR2, 0x01010000) self.cpld.io_update.pulse_mu(8) return ftw & 1 @kernel - def tune_io_update_delay(self): + def tune_io_update_delay(self) -> TInt32: """Find a stable IO_UPDATE delay alignment. Scan through increasing IO_UPDATE delays until a delay is found that @@ -922,5 +936,5 @@ class AD9910: "no clear IO_UPDATE-SYNC_CLK alignment edge found") else: # the good delay is period//2 after the edge - return (i + 1 + period//2) & (period - 1) + return (i + 1 + period // 2) & (period - 1) raise ValueError("no IO_UPDATE-SYNC_CLK alignment edge found") diff --git a/artiq/coredevice/ad9912.py b/artiq/coredevice/ad9912.py index fa88f2003..b2987c3a3 100644 --- a/artiq/coredevice/ad9912.py +++ b/artiq/coredevice/ad9912.py @@ -1,6 +1,6 @@ from numpy import int32, int64 -from artiq.language.types import TInt32, TInt64 +from artiq.language.types import TInt32, TInt64, TFloat, TTuple from artiq.language.core import kernel, delay, portable from artiq.language.units import ms, us, ns from artiq.coredevice.ad9912_reg import * @@ -39,12 +39,12 @@ class AD9912: self.sw = dmgr.get(sw_device) self.kernel_invariants.add("sw") self.pll_n = pll_n - sysclk = self.cpld.refclk/[1, 1, 2, 4][self.cpld.clk_div]*pll_n + sysclk = self.cpld.refclk / [1, 1, 2, 4][self.cpld.clk_div] * pll_n assert sysclk <= 1e9 - self.ftw_per_hz = 1/sysclk*(int64(1) << 48) + self.ftw_per_hz = 1 / sysclk * (int64(1) << 48) @kernel - def write(self, addr, data, length): + def write(self, addr: TInt32, data: TInt32, length: TInt32): """Variable length write to a register. Up to 4 bytes. @@ -55,14 +55,14 @@ class AD9912: assert length > 0 assert length <= 4 self.bus.set_config_mu(urukul.SPI_CONFIG, 16, - urukul.SPIT_DDS_WR, self.chip_select) + urukul.SPIT_DDS_WR, self.chip_select) self.bus.write((addr | ((length - 1) << 13)) << 16) - self.bus.set_config_mu(urukul.SPI_CONFIG | spi.SPI_END, length*8, - urukul.SPIT_DDS_WR, self.chip_select) - self.bus.write(data << (32 - length*8)) + self.bus.set_config_mu(urukul.SPI_CONFIG | spi.SPI_END, length * 8, + urukul.SPIT_DDS_WR, self.chip_select) + self.bus.write(data << (32 - length * 8)) @kernel - def read(self, addr, length): + def read(self, addr: TInt32, length: TInt32) -> TInt32: """Variable length read from a register. Up to 4 bytes. @@ -73,15 +73,15 @@ class AD9912: assert length > 0 assert length <= 4 self.bus.set_config_mu(urukul.SPI_CONFIG, 16, - urukul.SPIT_DDS_WR, self.chip_select) + urukul.SPIT_DDS_WR, self.chip_select) self.bus.write((addr | ((length - 1) << 13) | 0x8000) << 16) self.bus.set_config_mu(urukul.SPI_CONFIG | spi.SPI_END - | spi.SPI_INPUT, length*8, - urukul.SPIT_DDS_RD, self.chip_select) + | spi.SPI_INPUT, length * 8, + urukul.SPIT_DDS_RD, self.chip_select) self.bus.write(0) data = self.bus.read() if length < 4: - data &= (1 << (length*8)) - 1 + data &= (1 << (length * 8)) - 1 return data @kernel @@ -94,24 +94,24 @@ class AD9912: """ # SPI mode self.write(AD9912_SER_CONF, 0x99, length=1) - self.cpld.io_update.pulse(2*us) + self.cpld.io_update.pulse(2 * us) # Verify chip ID and presence prodid = self.read(AD9912_PRODIDH, length=2) if (prodid != 0x1982) and (prodid != 0x1902): raise ValueError("Urukul AD9912 product id mismatch") - delay(50*us) + delay(50 * us) # HSTL power down, CMOS power down self.write(AD9912_PWRCNTRL1, 0x80, length=1) - self.cpld.io_update.pulse(2*us) - self.write(AD9912_N_DIV, self.pll_n//2 - 2, length=1) - self.cpld.io_update.pulse(2*us) + self.cpld.io_update.pulse(2 * us) + self.write(AD9912_N_DIV, self.pll_n // 2 - 2, length=1) + self.cpld.io_update.pulse(2 * us) # I_cp = 375 µA, VCO high range self.write(AD9912_PLLCFG, 0b00000101, length=1) - self.cpld.io_update.pulse(2*us) - delay(1*ms) + self.cpld.io_update.pulse(2 * us) + delay(1 * ms) @kernel - def set_att_mu(self, att): + def set_att_mu(self, att: TInt32): """Set digital step attenuator in machine units. This method will write the attenuator settings of all four channels. @@ -123,7 +123,7 @@ class AD9912: self.cpld.set_att_mu(self.chip_select - 4, att) @kernel - def set_att(self, att): + def set_att(self, att: TFloat): """Set digital step attenuator in SI units. This method will write the attenuator settings of all four channels. @@ -135,62 +135,73 @@ class AD9912: self.cpld.set_att(self.chip_select - 4, att) @kernel - def set_mu(self, ftw, pow): + def set_mu(self, ftw: TInt64, pow_: TInt32): """Set profile 0 data in machine units. After the SPI transfer, the shared IO update pin is pulsed to activate the data. :param ftw: Frequency tuning word: 48 bit unsigned. - :param pow: Phase tuning word: 16 bit unsigned. + :param pow_: Phase tuning word: 16 bit unsigned. """ # streaming transfer of FTW and POW self.bus.set_config_mu(urukul.SPI_CONFIG, 16, - urukul.SPIT_DDS_WR, self.chip_select) + urukul.SPIT_DDS_WR, self.chip_select) self.bus.write((AD9912_POW1 << 16) | (3 << 29)) self.bus.set_config_mu(urukul.SPI_CONFIG, 32, - urukul.SPIT_DDS_WR, self.chip_select) - self.bus.write((pow << 16) | (int32(ftw >> 32) & 0xffff)) + urukul.SPIT_DDS_WR, self.chip_select) + self.bus.write((pow_ << 16) | (int32(ftw >> 32) & 0xffff)) self.bus.set_config_mu(urukul.SPI_CONFIG | spi.SPI_END, 32, - urukul.SPIT_DDS_WR, self.chip_select) + urukul.SPIT_DDS_WR, self.chip_select) self.bus.write(int32(ftw)) - self.cpld.io_update.pulse(10*ns) + self.cpld.io_update.pulse(10 * ns) @portable(flags={"fast-math"}) - def frequency_to_ftw(self, frequency) -> TInt64: + def frequency_to_ftw(self, frequency: TFloat) -> TInt64: """Returns the 48-bit frequency tuning word corresponding to the given frequency. """ - return int64(round(self.ftw_per_hz*frequency)) & ((int64(1) << 48) - 1) + return int64(round(self.ftw_per_hz * frequency)) & ( + (int64(1) << 48) - 1) @portable(flags={"fast-math"}) - def ftw_to_frequency(self, ftw): + def ftw_to_frequency(self, ftw: TInt64) -> TFloat: """Returns the frequency corresponding to the given frequency tuning word. """ - return ftw/self.ftw_per_hz + return ftw / self.ftw_per_hz @portable(flags={"fast-math"}) - def turns_to_pow(self, phase) -> TInt32: + def turns_to_pow(self, phase: TFloat) -> TInt32: """Returns the 16-bit phase offset word corresponding to the given phase. """ - return int32(round((1 << 14)*phase)) & 0xffff + return int32(round((1 << 14) * phase)) & 0xffff + + @portable(flags={"fast-math"}) + def pow_to_turns(self, pow_: TInt32) -> TFloat: + """Return the phase in turns corresponding to a given phase offset + word. + + :param pow_: Phase offset word. + :return: Phase in turns. + """ + return pow_ / (1 << 14) @kernel - def set(self, frequency, phase=0.0): + def set(self, frequency: TFloat, phase: TFloat = 0.0): """Set profile 0 data in SI units. .. seealso:: :meth:`set_mu` - :param ftw: Frequency in Hz - :param pow: Phase tuning word in turns + :param frequency: Frequency in Hz + :param phase: Phase tuning word in turns """ self.set_mu(self.frequency_to_ftw(frequency), - self.turns_to_pow(phase)) + self.turns_to_pow(phase)) @kernel - def cfg_sw(self, state): + def cfg_sw(self, state: TInt32): """Set CPLD CFG RF switch state. The RF switch is controlled by the logical or of the CPLD configuration shift register RF switch bit and the SW TTL line (if used). diff --git a/artiq/coredevice/urukul.py b/artiq/coredevice/urukul.py index cbc3edbfc..dd19728b4 100644 --- a/artiq/coredevice/urukul.py +++ b/artiq/coredevice/urukul.py @@ -1,15 +1,15 @@ +from numpy import int32, int64 + from artiq.language.core import kernel, delay, portable, at_mu, now_mu from artiq.language.units import us, ms - -from numpy import int32, int64 +from artiq.language.types import TInt32, TFloat, TBool from artiq.coredevice import spi2 as spi - -SPI_CONFIG = (0*spi.SPI_OFFLINE | 0*spi.SPI_END | - 0*spi.SPI_INPUT | 1*spi.SPI_CS_POLARITY | - 0*spi.SPI_CLK_POLARITY | 0*spi.SPI_CLK_PHASE | - 0*spi.SPI_LSB_FIRST | 0*spi.SPI_HALF_DUPLEX) +SPI_CONFIG = (0 * spi.SPI_OFFLINE | 0 * spi.SPI_END | + 0 * spi.SPI_INPUT | 1 * spi.SPI_CS_POLARITY | + 0 * spi.SPI_CLK_POLARITY | 0 * spi.SPI_CLK_PHASE | + 0 * spi.SPI_LSB_FIRST | 0 * spi.SPI_HALF_DUPLEX) # SPI clock write and read dividers SPIT_CFG_WR = 2 @@ -105,7 +105,7 @@ class _RegIOUpdate: self.cpld = cpld @kernel - def pulse(self, t): + def pulse(self, t: TFloat): cfg = self.cpld.cfg_reg self.cpld.cfg_write(cfg | (1 << CFG_IO_UPDATE)) delay(t) @@ -117,7 +117,7 @@ class _DummySync: self.cpld = cpld @kernel - def set_mu(self, ftw): + def set_mu(self, ftw: TInt32): pass @@ -196,12 +196,12 @@ class CPLD: self.sync_div = sync_div @kernel - def cfg_write(self, cfg): + def cfg_write(self, cfg: TInt32): """Write to the configuration register. See :func:`urukul_cfg` for possible flags. - :param data: 24 bit data to be written. Will be stored at + :param cfg: 24 bit data to be written. Will be stored at :attr:`cfg_reg`. """ self.bus.set_config_mu(SPI_CONFIG | spi.SPI_END, 24, @@ -210,7 +210,7 @@ class CPLD: self.cfg_reg = cfg @kernel - def sta_read(self): + def sta_read(self) -> TInt32: """Read the status register. Use any of the following functions to extract values: @@ -229,7 +229,7 @@ class CPLD: return self.bus.read() @kernel - def init(self, blind=False): + def init(self, blind: TBool = False): """Initialize and detect Urukul. Resets the DDS I/O interface and verifies correct CPLD gateware @@ -247,12 +247,12 @@ class CPLD: proto_rev = urukul_sta_proto_rev(self.sta_read()) if proto_rev != STA_PROTO_REV_MATCH: raise ValueError("Urukul proto_rev mismatch") - delay(100*us) # reset, slack + delay(100 * us) # reset, slack self.cfg_write(cfg) if self.sync_div: at_mu(now_mu() & ~0xf) # align to RTIO/2 self.set_sync_div(self.sync_div) # 125 MHz/2 = 1 GHz/16 - delay(1*ms) # DDS wake up + delay(1 * ms) # DDS wake up @kernel def io_rst(self): @@ -261,7 +261,7 @@ class CPLD: self.cfg_write(self.cfg_reg & ~(1 << CFG_IO_RST)) @kernel - def cfg_sw(self, channel, on): + def cfg_sw(self, channel: TInt32, on: TBool): """Configure the RF switches through the configuration register. These values are logically OR-ed with the LVDS lines on EEM1. @@ -277,7 +277,7 @@ class CPLD: self.cfg_write(c) @kernel - def cfg_switches(self, state): + def cfg_switches(self, state: TInt32): """Configure all four RF switches through the configuration register. :param state: RF switch state as a 4 bit integer. @@ -285,11 +285,12 @@ class CPLD: self.cfg_write((self.cfg_reg & ~0xf) | state) @kernel - def set_att_mu(self, channel, att): + def set_att_mu(self, channel: TInt32, att: TInt32): """Set digital step attenuator in machine units. - This method will also write the attenuator settings of the three other channels. Use - :meth:`get_att_mu` to retrieve the hardware state set in previous experiments. + This method will also write the attenuator settings of the three + other channels. Use :meth:`get_att_mu` to retrieve the hardware + state set in previous experiments. :param channel: Attenuator channel (0-3). :param att: 8-bit digital attenuation setting: @@ -300,7 +301,7 @@ class CPLD: self.set_all_att_mu(a) @kernel - def set_all_att_mu(self, att_reg): + def set_all_att_mu(self, att_reg: TInt32): """Set all four digital step attenuators (in machine units). .. seealso:: :meth:`set_att_mu` @@ -313,7 +314,7 @@ class CPLD: self.att_reg = att_reg @kernel - def set_att(self, channel, att): + def set_att(self, channel: TInt32, att: TFloat): """Set digital step attenuator in SI units. This method will write the attenuator settings of all four channels. @@ -325,16 +326,16 @@ class CPLD: attenuation. Minimum attenuation is 0*dB, maximum attenuation is 31.5*dB. """ - code = 255 - int32(round(att*8)) - if code < 0 or code > 255: - raise ValueError("Invalid urukul.CPLD attenuation!") - self.set_att_mu(channel, code) + self.set_att_mu(channel, self.att_to_mu(att)) @kernel - def get_att_mu(self): + def get_att_mu(self) -> TInt32: """Return the digital step attenuator settings in machine units. - The result is stored and will be used in future calls of :meth:`set_att_mu`. + The result is stored and will be used in future calls of + :meth:`set_att_mu` and :meth:`set_att`. + + .. seealso:: :meth:`get_channel_att_mu` :return: 32 bit attenuator settings """ @@ -343,13 +344,13 @@ class CPLD: self.bus.write(0) # shift in zeros, shift out current value self.bus.set_config_mu(SPI_CONFIG | spi.SPI_END, 32, SPIT_ATT_WR, CS_ATT) - delay(10*us) + delay(10 * us) self.att_reg = self.bus.read() self.bus.write(self.att_reg) # shift in current value again and latch return self.att_reg @kernel - def set_sync_div(self, div): + def set_sync_div(self, div: TInt32): """Set the SYNC_IN AD9910 pulse generator frequency and align it to the current RTIO timestamp. @@ -361,12 +362,12 @@ class CPLD: Minimum division ratio is 2. Maximum division ratio is 16. """ ftw_max = 1 << 4 - ftw = ftw_max//div - assert ftw*div == ftw_max + ftw = ftw_max // div + assert ftw * div == ftw_max self.sync.set_mu(ftw) @kernel - def set_profile(self, profile): + def set_profile(self, profile: TInt32): """Set the PROFILE pins. The PROFILE pins are common to all four DDS channels.