artiq_sinara_tester: port to NAC3

This commit is contained in:
Sebastien Bourdeauducq 2022-03-03 17:07:27 +08:00
parent ba106de24f
commit 9d82f968f9
1 changed files with 106 additions and 87 deletions

View File

@ -6,8 +6,21 @@ import os
import select
import sys
from numpy import int32
from artiq.experiment import *
from artiq.coredevice.core import Core
from artiq.coredevice.ttl import TTLOut, TTLInOut
from artiq.coredevice.urukul import CPLD as UrukulCPLD
from artiq.coredevice.ad9910 import AD9910, SyncDataEeprom
from artiq.coredevice.mirny import Mirny, Almazny
from artiq.coredevice.adf5356 import ADF5356
from artiq.coredevice.sampler import Sampler
from artiq.coredevice.zotino import Zotino
from artiq.coredevice.fastino import Fastino
from artiq.coredevice.phaser import Phaser
from artiq.coredevice.grabber import Grabber
from artiq.coredevice.suservo import SUServo, Channel as SUServoChannel
from artiq.master.databases import DeviceDB
from artiq.master.worker_db import DeviceManager
@ -27,7 +40,8 @@ def chunker(seq, size):
yield res
def is_enter_pressed() -> TBool:
@rpc
def is_enter_pressed() -> bool:
if os.name == "nt":
if msvcrt.kbhit() and msvcrt.getch() == b"\r":
return True
@ -41,7 +55,10 @@ def is_enter_pressed() -> TBool:
return False
@nac3
class SinaraTester(EnvExperiment):
core: KernelInvariant[Core]
def build(self):
self.setattr_device("core")
@ -149,7 +166,7 @@ class SinaraTester(EnvExperiment):
self.suschannels = sorted(self.suschannels.items(), key=lambda x: x[1].channel)
@kernel
def test_led(self, led):
def test_led(self, led: TTLOut):
while not is_enter_pressed():
self.core.break_realtime()
# do not fill the FIFOs too much to avoid long response times
@ -157,8 +174,8 @@ class SinaraTester(EnvExperiment):
while self.core.get_rtio_counter_mu() < t:
pass
for i in range(3):
led.pulse(100*ms)
delay(100*ms)
led.pulse(100.*ms)
self.core.delay(100.*ms)
def test_leds(self):
print("*** Testing LEDs.")
@ -169,7 +186,7 @@ class SinaraTester(EnvExperiment):
self.test_led(led_dev)
@kernel
def test_ttl_out_chunk(self, ttl_chunk):
def test_ttl_out_chunk(self, ttl_chunk: list[TTLOut]):
while not is_enter_pressed():
self.core.break_realtime()
for _ in range(50000):
@ -177,9 +194,9 @@ class SinaraTester(EnvExperiment):
for ttl in ttl_chunk:
i += 1
for _ in range(i):
ttl.pulse(1*us)
delay(1*us)
delay(10*us)
ttl.pulse(1.*us)
self.core.delay(1.*us)
self.core.delay(10.*us)
def test_ttl_outs(self):
print("*** Testing TTL outputs.")
@ -193,16 +210,16 @@ class SinaraTester(EnvExperiment):
self.test_ttl_out_chunk([dev for name, dev in ttl_chunk])
@kernel
def test_ttl_in(self, ttl_out, ttl_in):
def test_ttl_in(self, ttl_out: TTLOut, ttl_in: TTLInOut) -> bool:
n = 42
self.core.break_realtime()
with parallel:
ttl_in.gate_rising(1*ms)
ttl_in.gate_rising(1.*ms)
with sequential:
delay(50*us)
self.core.delay(50.*us)
for _ in range(n):
ttl_out.pulse(2*us)
delay(2*us)
ttl_out.pulse(2.*us)
self.core.delay(2.*us)
return ttl_in.count(now_mu()) == n
def test_ttl_ins(self):
@ -227,23 +244,25 @@ class SinaraTester(EnvExperiment):
print("FAILED")
@kernel
def init_urukul(self, cpld):
def init_urukul(self, cpld: UrukulCPLD):
self.core.break_realtime()
cpld.init()
@kernel
def test_urukul_att(self, cpld):
def test_urukul_att(self, cpld: UrukulCPLD):
self.core.break_realtime()
for i in range(32):
test_word = 1 << i
cpld.set_all_att_mu(test_word)
readback_word = cpld.get_att_mu()
if readback_word != test_word:
print(readback_word, test_word)
raise ValueError
# NAC3TODO print(readback_word, test_word)
# NAC3TODO raise ValueError
pass
# NAC3TODO: AD9912 support
@kernel
def calibrate_urukul(self, channel):
def calibrate_urukul(self, channel: AD9910) -> tuple[int32, int32]:
self.core.break_realtime()
channel.init()
self.core.break_realtime()
@ -253,7 +272,7 @@ class SinaraTester(EnvExperiment):
return sync_delay_seed, io_update_delay
@kernel
def setup_urukul(self, channel, frequency):
def setup_urukul(self, channel: AD9910, frequency: float):
self.core.break_realtime()
channel.init()
channel.set(frequency*MHz)
@ -261,12 +280,12 @@ class SinaraTester(EnvExperiment):
channel.set_att(6.)
@kernel
def cfg_sw_off_urukul(self, channel):
def cfg_sw_off_urukul(self, channel: AD9910):
self.core.break_realtime()
channel.cfg_sw(False)
@kernel
def rf_switch_wave(self, channels):
def rf_switch_wave(self, channels: list[TTLOut]):
while not is_enter_pressed():
self.core.break_realtime()
# do not fill the FIFOs too much to avoid long response times
@ -274,8 +293,8 @@ class SinaraTester(EnvExperiment):
while self.core.get_rtio_counter_mu() < t:
pass
for channel in channels:
channel.pulse(100*ms)
delay(100*ms)
channel.pulse(100.*ms)
self.core.delay(100.*ms)
# We assume that RTIO channels for switches are grouped by card.
def test_urukuls(self):
@ -322,12 +341,12 @@ class SinaraTester(EnvExperiment):
self.rf_switch_wave([swi.sw for swi in sw])
@kernel
def init_mirny(self, cpld):
def init_mirny(self, cpld: Mirny):
self.core.break_realtime()
cpld.init()
@kernel
def setup_mirny(self, channel, frequency):
def setup_mirny(self, channel: ADF5356, frequency: float):
self.core.break_realtime()
channel.init()
@ -336,15 +355,15 @@ class SinaraTester(EnvExperiment):
self.core.break_realtime()
channel.set_frequency(frequency*MHz)
delay(5*ms)
self.core.delay(5.*ms)
@kernel
def sw_off_mirny(self, channel):
def sw_off_mirny(self, channel: ADF5356):
self.core.break_realtime()
channel.sw.off()
@kernel
def mirny_rf_switch_wave(self, channels):
def mirny_rf_switch_wave(self, channels: list[TTLOut]):
while not is_enter_pressed():
self.core.break_realtime()
# do not fill the FIFOs too much to avoid long response times
@ -352,26 +371,26 @@ class SinaraTester(EnvExperiment):
while self.core.get_rtio_counter_mu() < t:
pass
for channel in channels:
channel.pulse(100*ms)
delay(100*ms)
channel.pulse(100.*ms)
self.core.delay(100.*ms)
@kernel
def init_almazny(self, almazny):
def init_almazny(self, almazny: Almazny):
self.core.break_realtime()
almazny.init()
almazny.output_toggle(True)
@kernel
def almazny_set_attenuators_mu(self, almazny, ch, atts):
def almazny_set_attenuators_mu(self, almazny: Almazny, ch: int32, atts: int32):
self.core.break_realtime()
almazny.set_att_mu(ch, atts)
@kernel
def almazny_set_attenuators(self, almazny, ch, atts):
def almazny_set_attenuators(self, almazny: Almazny, ch: int32, atts: float):
self.core.break_realtime()
almazny.set_att(ch, atts)
@kernel
def almazny_toggle_output(self, almazny, rf_on):
def almazny_toggle_output(self, almazny: Almazny, rf_on: bool):
self.core.break_realtime()
almazny.output_toggle(rf_on)
@ -445,17 +464,21 @@ class SinaraTester(EnvExperiment):
self.sw_off_mirny(swi)
self.mirny_rf_switch_wave([swi.sw for swi in sw])
@rpc
def update_sampler_voltages(self, voltages: list[float]):
self.sampler_voltages = voltages
@kernel
def get_sampler_voltages(self, sampler, cb):
def get_sampler_voltages(self, sampler: Sampler):
self.core.break_realtime()
sampler.init()
delay(5*ms)
self.core.delay(5.*ms)
for i in range(8):
sampler.set_gain_mu(i, 0)
delay(100*us)
smp = [0.0]*8
self.core.delay(100.*us)
smp = [0. for _ in range(8)]
sampler.sample(smp)
cb(smp)
self.update_sampler_voltages(smp)
def test_samplers(self):
print("*** Testing Sampler ADCs.")
@ -466,14 +489,10 @@ class SinaraTester(EnvExperiment):
print("Apply 1.5V to channel {}. Press ENTER when done.".format(channel))
input()
voltages = []
def setv(x):
nonlocal voltages
voltages = x
self.get_sampler_voltages(card_dev, setv)
self.get_sampler_voltages(card_dev)
passed = True
for n, voltage in enumerate(voltages):
for n, voltage in enumerate(self.sampler_voltages):
if n == channel:
if abs(voltage - 1.5) > 0.2:
passed = False
@ -484,22 +503,22 @@ class SinaraTester(EnvExperiment):
print("PASSED")
else:
print("FAILED")
print(" ".join(["{:.1f}".format(x) for x in voltages]))
print(" ".join(["{:.1f}".format(x) for x in self.sampler_voltages]))
@kernel
def set_zotino_voltages(self, zotino, voltages):
def set_zotino_voltages(self, zotino: Zotino, voltages: list[float]):
self.core.break_realtime()
zotino.init()
delay(200*us)
self.core.delay(200.*us)
i = 0
for voltage in voltages:
zotino.write_dac(i, voltage)
delay(100*us)
self.core.delay(100.*us)
i += 1
zotino.load()
@kernel
def zotinos_led_wave(self, zotinos):
def zotinos_led_wave(self, zotinos: list[Zotino]):
while not is_enter_pressed():
self.core.break_realtime()
# do not fill the FIFOs too much to avoid long response times
@ -509,9 +528,9 @@ class SinaraTester(EnvExperiment):
for zotino in zotinos:
for i in range(8):
zotino.set_leds(1 << i)
delay(100*ms)
self.core.delay(100.*ms)
zotino.set_leds(0)
delay(100*ms)
self.core.delay(100.*ms)
def test_zotinos(self):
print("*** Testing Zotino DACs and USER LEDs.")
@ -527,18 +546,18 @@ class SinaraTester(EnvExperiment):
)
@kernel
def set_fastino_voltages(self, fastino, voltages):
def set_fastino_voltages(self, fastino: Fastino, voltages: list[float]):
self.core.break_realtime()
fastino.init()
delay(200*us)
self.core.delay(200.*us)
i = 0
for voltage in voltages:
fastino.set_dac(i, voltage)
delay(100*us)
self.core.delay(100.*us)
i += 1
@kernel
def fastinos_led_wave(self, fastinos):
def fastinos_led_wave(self, fastinos: list[Fastino]):
while not is_enter_pressed():
self.core.break_realtime()
# do not fill the FIFOs too much to avoid long response times
@ -548,9 +567,9 @@ class SinaraTester(EnvExperiment):
for fastino in fastinos:
for i in range(8):
fastino.set_leds(1 << i)
delay(100*ms)
self.core.delay(100.*ms)
fastino.set_leds(0)
delay(100*ms)
self.core.delay(100.*ms)
def test_fastinos(self):
print("*** Testing Fastino DACs and USER LEDs.")
@ -566,27 +585,27 @@ class SinaraTester(EnvExperiment):
)
@kernel
def set_phaser_frequencies(self, phaser, duc, osc):
def set_phaser_frequencies(self, phaser: Phaser, duc: float, osc: list[float]):
self.core.break_realtime()
phaser.init()
delay(1*ms)
self.core.delay(1.*ms)
phaser.channel[0].set_duc_frequency(duc)
phaser.channel[0].set_duc_cfg()
phaser.channel[0].set_att(6*dB)
phaser.channel[0].set_att(6.*dB)
phaser.channel[1].set_duc_frequency(-duc)
phaser.channel[1].set_duc_cfg()
phaser.channel[1].set_att(6*dB)
phaser.channel[1].set_att(6.*dB)
phaser.duc_stb()
delay(1*ms)
self.core.delay(1.*ms)
for i in range(len(osc)):
phaser.channel[0].oscillator[i].set_frequency(osc[i])
phaser.channel[0].oscillator[i].set_amplitude_phase(.2)
phaser.channel[1].oscillator[i].set_frequency(-osc[i])
phaser.channel[1].oscillator[i].set_amplitude_phase(.2)
delay(1*ms)
self.core.delay(1.*ms)
@kernel
def phaser_led_wave(self, phasers):
def phaser_led_wave(self, phasers: list[Phaser]):
while not is_enter_pressed():
self.core.break_realtime()
# do not fill the FIFOs too much to avoid long response times
@ -596,9 +615,9 @@ class SinaraTester(EnvExperiment):
for phaser in phasers:
for i in range(6):
phaser.set_leds(1 << i)
delay(100*ms)
self.core.delay(100.*ms)
phaser.set_leds(0)
delay(100*ms)
self.core.delay(100.*ms)
def test_phasers(self):
print("*** Testing Phaser DACs and 6 USER LEDs.")
@ -617,9 +636,9 @@ class SinaraTester(EnvExperiment):
)
@kernel
def grabber_capture(self, card_dev, rois):
def grabber_capture(self, card_dev: Grabber, rois: list[list[int32]]):
self.core.break_realtime()
delay(100*us)
self.core.delay(100.*us)
mask = 0
for i in range(len(rois)):
i = rois[i][0]
@ -630,11 +649,11 @@ class SinaraTester(EnvExperiment):
mask |= 1 << i
card_dev.setup_roi(i, x0, y0, x1, y1)
card_dev.gate_roi(mask)
n = [0]*len(rois)
n = [0 for _ in range(len(rois))]
card_dev.input_mu(n)
self.core.break_realtime()
card_dev.gate_roi(0)
print("ROI sums:", n)
# NAC3TODO print("ROI sums:", n)
def test_grabbers(self):
print("*** Testing Grabber Frame Grabbers.")
@ -651,25 +670,25 @@ class SinaraTester(EnvExperiment):
self.grabber_capture(card_dev, rois)
@kernel
def setup_suservo(self, channel):
def setup_suservo(self, channel: SUServo):
self.core.break_realtime()
channel.init()
delay(1*us)
self.core.delay(1.*us)
# ADC PGIA gain 0
for i in range(8):
channel.set_pgia_mu(i, 0)
delay(10*us)
self.core.delay(10.*us)
# DDS attenuator 10dB
for i in range(4):
for cpld in channel.cplds:
cpld.set_att(i, 10.)
delay(1*us)
self.core.delay(1.*us)
# Servo is done and disabled
assert channel.get_status() & 0xff == 2
delay(10*us)
# NAC3TODO assert channel.get_status() & 0xff == 2
self.core.delay(10.*us)
@kernel
def setup_suservo_loop(self, channel, loop_nr):
def setup_suservo_loop(self, channel: SUServoChannel, loop_nr: int32):
self.core.break_realtime()
channel.set_y(
profile=loop_nr,
@ -684,24 +703,24 @@ class SinaraTester(EnvExperiment):
delay=0. # no IIR update delay after enabling
)
# setpoint 0.5 (5 V with above PGIA gain setting)
delay(100*us)
self.core.delay(100.*us)
channel.set_dds(
profile=loop_nr,
offset=-.3, # 3 V with above PGIA settings
frequency=10*MHz,
frequency=10.*MHz,
phase=0.)
# enable RF, IIR updates and set profile
delay(10*us)
channel.set(en_out=1, en_iir=1, profile=loop_nr)
self.core.delay(10.*us)
channel.set(en_out=True, en_iir=True, profile=loop_nr)
@kernel
def setup_start_suservo(self, channel):
def setup_start_suservo(self, channel: SUServo):
self.core.break_realtime()
channel.set_config(enable=1)
delay(10*us)
channel.set_config(enable=True)
self.core.delay(10.*us)
# check servo enabled
assert channel.get_status() & 0x01 == 1
delay(10*us)
# NAC3TODO assert channel.get_status() & 0x01 == 1
self.core.delay(10.*us)
def test_suservos(self):
print("*** Testing SUServos.")