diff --git a/artiq/frontend/artiq_sinara_tester.py b/artiq/frontend/artiq_sinara_tester.py index a4f999029..81e7370f9 100755 --- a/artiq/frontend/artiq_sinara_tester.py +++ b/artiq/frontend/artiq_sinara_tester.py @@ -6,8 +6,21 @@ import os import select import sys +from numpy import int32 + from artiq.experiment import * +from artiq.coredevice.core import Core +from artiq.coredevice.ttl import TTLOut, TTLInOut +from artiq.coredevice.urukul import CPLD as UrukulCPLD from artiq.coredevice.ad9910 import AD9910, SyncDataEeprom +from artiq.coredevice.mirny import Mirny, Almazny +from artiq.coredevice.adf5356 import ADF5356 +from artiq.coredevice.sampler import Sampler +from artiq.coredevice.zotino import Zotino +from artiq.coredevice.fastino import Fastino +from artiq.coredevice.phaser import Phaser +from artiq.coredevice.grabber import Grabber +from artiq.coredevice.suservo import SUServo, Channel as SUServoChannel from artiq.master.databases import DeviceDB from artiq.master.worker_db import DeviceManager @@ -27,7 +40,8 @@ def chunker(seq, size): yield res -def is_enter_pressed() -> TBool: +@rpc +def is_enter_pressed() -> bool: if os.name == "nt": if msvcrt.kbhit() and msvcrt.getch() == b"\r": return True @@ -41,7 +55,10 @@ def is_enter_pressed() -> TBool: return False +@nac3 class SinaraTester(EnvExperiment): + core: KernelInvariant[Core] + def build(self): self.setattr_device("core") @@ -149,7 +166,7 @@ class SinaraTester(EnvExperiment): self.suschannels = sorted(self.suschannels.items(), key=lambda x: x[1].channel) @kernel - def test_led(self, led): + def test_led(self, led: TTLOut): while not is_enter_pressed(): self.core.break_realtime() # do not fill the FIFOs too much to avoid long response times @@ -157,8 +174,8 @@ class SinaraTester(EnvExperiment): while self.core.get_rtio_counter_mu() < t: pass for i in range(3): - led.pulse(100*ms) - delay(100*ms) + led.pulse(100.*ms) + self.core.delay(100.*ms) def test_leds(self): print("*** Testing LEDs.") @@ -169,7 +186,7 @@ class SinaraTester(EnvExperiment): self.test_led(led_dev) @kernel - def test_ttl_out_chunk(self, ttl_chunk): + def test_ttl_out_chunk(self, ttl_chunk: list[TTLOut]): while not is_enter_pressed(): self.core.break_realtime() for _ in range(50000): @@ -177,9 +194,9 @@ class SinaraTester(EnvExperiment): for ttl in ttl_chunk: i += 1 for _ in range(i): - ttl.pulse(1*us) - delay(1*us) - delay(10*us) + ttl.pulse(1.*us) + self.core.delay(1.*us) + self.core.delay(10.*us) def test_ttl_outs(self): print("*** Testing TTL outputs.") @@ -193,16 +210,16 @@ class SinaraTester(EnvExperiment): self.test_ttl_out_chunk([dev for name, dev in ttl_chunk]) @kernel - def test_ttl_in(self, ttl_out, ttl_in): + def test_ttl_in(self, ttl_out: TTLOut, ttl_in: TTLInOut) -> bool: n = 42 self.core.break_realtime() with parallel: - ttl_in.gate_rising(1*ms) + ttl_in.gate_rising(1.*ms) with sequential: - delay(50*us) + self.core.delay(50.*us) for _ in range(n): - ttl_out.pulse(2*us) - delay(2*us) + ttl_out.pulse(2.*us) + self.core.delay(2.*us) return ttl_in.count(now_mu()) == n def test_ttl_ins(self): @@ -227,23 +244,25 @@ class SinaraTester(EnvExperiment): print("FAILED") @kernel - def init_urukul(self, cpld): + def init_urukul(self, cpld: UrukulCPLD): self.core.break_realtime() cpld.init() @kernel - def test_urukul_att(self, cpld): + def test_urukul_att(self, cpld: UrukulCPLD): self.core.break_realtime() for i in range(32): test_word = 1 << i cpld.set_all_att_mu(test_word) readback_word = cpld.get_att_mu() if readback_word != test_word: - print(readback_word, test_word) - raise ValueError + # NAC3TODO print(readback_word, test_word) + # NAC3TODO raise ValueError + pass + # NAC3TODO: AD9912 support @kernel - def calibrate_urukul(self, channel): + def calibrate_urukul(self, channel: AD9910) -> tuple[int32, int32]: self.core.break_realtime() channel.init() self.core.break_realtime() @@ -253,7 +272,7 @@ class SinaraTester(EnvExperiment): return sync_delay_seed, io_update_delay @kernel - def setup_urukul(self, channel, frequency): + def setup_urukul(self, channel: AD9910, frequency: float): self.core.break_realtime() channel.init() channel.set(frequency*MHz) @@ -261,12 +280,12 @@ class SinaraTester(EnvExperiment): channel.set_att(6.) @kernel - def cfg_sw_off_urukul(self, channel): + def cfg_sw_off_urukul(self, channel: AD9910): self.core.break_realtime() channel.cfg_sw(False) @kernel - def rf_switch_wave(self, channels): + def rf_switch_wave(self, channels: list[TTLOut]): while not is_enter_pressed(): self.core.break_realtime() # do not fill the FIFOs too much to avoid long response times @@ -274,8 +293,8 @@ class SinaraTester(EnvExperiment): while self.core.get_rtio_counter_mu() < t: pass for channel in channels: - channel.pulse(100*ms) - delay(100*ms) + channel.pulse(100.*ms) + self.core.delay(100.*ms) # We assume that RTIO channels for switches are grouped by card. def test_urukuls(self): @@ -322,12 +341,12 @@ class SinaraTester(EnvExperiment): self.rf_switch_wave([swi.sw for swi in sw]) @kernel - def init_mirny(self, cpld): + def init_mirny(self, cpld: Mirny): self.core.break_realtime() cpld.init() @kernel - def setup_mirny(self, channel, frequency): + def setup_mirny(self, channel: ADF5356, frequency: float): self.core.break_realtime() channel.init() @@ -336,15 +355,15 @@ class SinaraTester(EnvExperiment): self.core.break_realtime() channel.set_frequency(frequency*MHz) - delay(5*ms) + self.core.delay(5.*ms) @kernel - def sw_off_mirny(self, channel): + def sw_off_mirny(self, channel: ADF5356): self.core.break_realtime() channel.sw.off() @kernel - def mirny_rf_switch_wave(self, channels): + def mirny_rf_switch_wave(self, channels: list[TTLOut]): while not is_enter_pressed(): self.core.break_realtime() # do not fill the FIFOs too much to avoid long response times @@ -352,26 +371,26 @@ class SinaraTester(EnvExperiment): while self.core.get_rtio_counter_mu() < t: pass for channel in channels: - channel.pulse(100*ms) - delay(100*ms) + channel.pulse(100.*ms) + self.core.delay(100.*ms) @kernel - def init_almazny(self, almazny): + def init_almazny(self, almazny: Almazny): self.core.break_realtime() almazny.init() almazny.output_toggle(True) @kernel - def almazny_set_attenuators_mu(self, almazny, ch, atts): + def almazny_set_attenuators_mu(self, almazny: Almazny, ch: int32, atts: int32): self.core.break_realtime() almazny.set_att_mu(ch, atts) @kernel - def almazny_set_attenuators(self, almazny, ch, atts): + def almazny_set_attenuators(self, almazny: Almazny, ch: int32, atts: float): self.core.break_realtime() almazny.set_att(ch, atts) @kernel - def almazny_toggle_output(self, almazny, rf_on): + def almazny_toggle_output(self, almazny: Almazny, rf_on: bool): self.core.break_realtime() almazny.output_toggle(rf_on) @@ -445,17 +464,21 @@ class SinaraTester(EnvExperiment): self.sw_off_mirny(swi) self.mirny_rf_switch_wave([swi.sw for swi in sw]) + @rpc + def update_sampler_voltages(self, voltages: list[float]): + self.sampler_voltages = voltages + @kernel - def get_sampler_voltages(self, sampler, cb): + def get_sampler_voltages(self, sampler: Sampler): self.core.break_realtime() sampler.init() - delay(5*ms) + self.core.delay(5.*ms) for i in range(8): sampler.set_gain_mu(i, 0) - delay(100*us) - smp = [0.0]*8 + self.core.delay(100.*us) + smp = [0. for _ in range(8)] sampler.sample(smp) - cb(smp) + self.update_sampler_voltages(smp) def test_samplers(self): print("*** Testing Sampler ADCs.") @@ -466,14 +489,10 @@ class SinaraTester(EnvExperiment): print("Apply 1.5V to channel {}. Press ENTER when done.".format(channel)) input() - voltages = [] - def setv(x): - nonlocal voltages - voltages = x - self.get_sampler_voltages(card_dev, setv) + self.get_sampler_voltages(card_dev) passed = True - for n, voltage in enumerate(voltages): + for n, voltage in enumerate(self.sampler_voltages): if n == channel: if abs(voltage - 1.5) > 0.2: passed = False @@ -484,22 +503,22 @@ class SinaraTester(EnvExperiment): print("PASSED") else: print("FAILED") - print(" ".join(["{:.1f}".format(x) for x in voltages])) + print(" ".join(["{:.1f}".format(x) for x in self.sampler_voltages])) @kernel - def set_zotino_voltages(self, zotino, voltages): + def set_zotino_voltages(self, zotino: Zotino, voltages: list[float]): self.core.break_realtime() zotino.init() - delay(200*us) + self.core.delay(200.*us) i = 0 for voltage in voltages: zotino.write_dac(i, voltage) - delay(100*us) + self.core.delay(100.*us) i += 1 zotino.load() @kernel - def zotinos_led_wave(self, zotinos): + def zotinos_led_wave(self, zotinos: list[Zotino]): while not is_enter_pressed(): self.core.break_realtime() # do not fill the FIFOs too much to avoid long response times @@ -509,9 +528,9 @@ class SinaraTester(EnvExperiment): for zotino in zotinos: for i in range(8): zotino.set_leds(1 << i) - delay(100*ms) + self.core.delay(100.*ms) zotino.set_leds(0) - delay(100*ms) + self.core.delay(100.*ms) def test_zotinos(self): print("*** Testing Zotino DACs and USER LEDs.") @@ -527,18 +546,18 @@ class SinaraTester(EnvExperiment): ) @kernel - def set_fastino_voltages(self, fastino, voltages): + def set_fastino_voltages(self, fastino: Fastino, voltages: list[float]): self.core.break_realtime() fastino.init() - delay(200*us) + self.core.delay(200.*us) i = 0 for voltage in voltages: fastino.set_dac(i, voltage) - delay(100*us) + self.core.delay(100.*us) i += 1 @kernel - def fastinos_led_wave(self, fastinos): + def fastinos_led_wave(self, fastinos: list[Fastino]): while not is_enter_pressed(): self.core.break_realtime() # do not fill the FIFOs too much to avoid long response times @@ -548,9 +567,9 @@ class SinaraTester(EnvExperiment): for fastino in fastinos: for i in range(8): fastino.set_leds(1 << i) - delay(100*ms) + self.core.delay(100.*ms) fastino.set_leds(0) - delay(100*ms) + self.core.delay(100.*ms) def test_fastinos(self): print("*** Testing Fastino DACs and USER LEDs.") @@ -566,27 +585,27 @@ class SinaraTester(EnvExperiment): ) @kernel - def set_phaser_frequencies(self, phaser, duc, osc): + def set_phaser_frequencies(self, phaser: Phaser, duc: float, osc: list[float]): self.core.break_realtime() phaser.init() - delay(1*ms) + self.core.delay(1.*ms) phaser.channel[0].set_duc_frequency(duc) phaser.channel[0].set_duc_cfg() - phaser.channel[0].set_att(6*dB) + phaser.channel[0].set_att(6.*dB) phaser.channel[1].set_duc_frequency(-duc) phaser.channel[1].set_duc_cfg() - phaser.channel[1].set_att(6*dB) + phaser.channel[1].set_att(6.*dB) phaser.duc_stb() - delay(1*ms) + self.core.delay(1.*ms) for i in range(len(osc)): phaser.channel[0].oscillator[i].set_frequency(osc[i]) phaser.channel[0].oscillator[i].set_amplitude_phase(.2) phaser.channel[1].oscillator[i].set_frequency(-osc[i]) phaser.channel[1].oscillator[i].set_amplitude_phase(.2) - delay(1*ms) + self.core.delay(1.*ms) @kernel - def phaser_led_wave(self, phasers): + def phaser_led_wave(self, phasers: list[Phaser]): while not is_enter_pressed(): self.core.break_realtime() # do not fill the FIFOs too much to avoid long response times @@ -596,9 +615,9 @@ class SinaraTester(EnvExperiment): for phaser in phasers: for i in range(6): phaser.set_leds(1 << i) - delay(100*ms) + self.core.delay(100.*ms) phaser.set_leds(0) - delay(100*ms) + self.core.delay(100.*ms) def test_phasers(self): print("*** Testing Phaser DACs and 6 USER LEDs.") @@ -617,9 +636,9 @@ class SinaraTester(EnvExperiment): ) @kernel - def grabber_capture(self, card_dev, rois): + def grabber_capture(self, card_dev: Grabber, rois: list[list[int32]]): self.core.break_realtime() - delay(100*us) + self.core.delay(100.*us) mask = 0 for i in range(len(rois)): i = rois[i][0] @@ -630,11 +649,11 @@ class SinaraTester(EnvExperiment): mask |= 1 << i card_dev.setup_roi(i, x0, y0, x1, y1) card_dev.gate_roi(mask) - n = [0]*len(rois) + n = [0 for _ in range(len(rois))] card_dev.input_mu(n) self.core.break_realtime() card_dev.gate_roi(0) - print("ROI sums:", n) + # NAC3TODO print("ROI sums:", n) def test_grabbers(self): print("*** Testing Grabber Frame Grabbers.") @@ -651,25 +670,25 @@ class SinaraTester(EnvExperiment): self.grabber_capture(card_dev, rois) @kernel - def setup_suservo(self, channel): + def setup_suservo(self, channel: SUServo): self.core.break_realtime() channel.init() - delay(1*us) + self.core.delay(1.*us) # ADC PGIA gain 0 for i in range(8): channel.set_pgia_mu(i, 0) - delay(10*us) + self.core.delay(10.*us) # DDS attenuator 10dB for i in range(4): for cpld in channel.cplds: cpld.set_att(i, 10.) - delay(1*us) + self.core.delay(1.*us) # Servo is done and disabled - assert channel.get_status() & 0xff == 2 - delay(10*us) + # NAC3TODO assert channel.get_status() & 0xff == 2 + self.core.delay(10.*us) @kernel - def setup_suservo_loop(self, channel, loop_nr): + def setup_suservo_loop(self, channel: SUServoChannel, loop_nr: int32): self.core.break_realtime() channel.set_y( profile=loop_nr, @@ -684,24 +703,24 @@ class SinaraTester(EnvExperiment): delay=0. # no IIR update delay after enabling ) # setpoint 0.5 (5 V with above PGIA gain setting) - delay(100*us) + self.core.delay(100.*us) channel.set_dds( profile=loop_nr, offset=-.3, # 3 V with above PGIA settings - frequency=10*MHz, + frequency=10.*MHz, phase=0.) # enable RF, IIR updates and set profile - delay(10*us) - channel.set(en_out=1, en_iir=1, profile=loop_nr) + self.core.delay(10.*us) + channel.set(en_out=True, en_iir=True, profile=loop_nr) @kernel - def setup_start_suservo(self, channel): + def setup_start_suservo(self, channel: SUServo): self.core.break_realtime() - channel.set_config(enable=1) - delay(10*us) + channel.set_config(enable=True) + self.core.delay(10.*us) # check servo enabled - assert channel.get_status() & 0x01 == 1 - delay(10*us) + # NAC3TODO assert channel.get_status() & 0x01 == 1 + self.core.delay(10.*us) def test_suservos(self): print("*** Testing SUServos.")