test/rtio: port to NAC3 (no DMA)

This commit is contained in:
Sébastien Bourdeauducq 2024-08-19 23:01:36 +08:00
parent 735d28be71
commit 854f4845fb
1 changed files with 203 additions and 81 deletions

View File

@ -2,7 +2,7 @@
# Copyright (C) 2014, 2015 Robert Jordens <jordens@gmail.com>
import os, unittest
import numpy as np
from numpy import int32, int64
from math import sqrt
@ -10,114 +10,154 @@ from artiq.experiment import *
from artiq.test.hardware_testbench import ExperimentCase
from artiq.coredevice import exceptions
from artiq.coredevice.core import Core
from artiq.coredevice.ttl import TTLOut
from artiq.coredevice.ttl import TTLOut, TTLInOut, TTLClockGen
from artiq.coredevice.ad9914 import AD9914
from artiq.coredevice.comm_mgmt import CommMgmt
from artiq.coredevice.comm_analyzer import (StoppedMessage, OutputMessage, InputMessage,
decode_dump, get_analyzer_dump)
artiq_low_latency = os.getenv("ARTIQ_LOW_LATENCY")
artiq_in_devel = os.getenv("ARTIQ_IN_DEVEL")
@nac3
class RTIOCounter(EnvExperiment):
core: KernelInvariant[Core]
def build(self):
self.setattr_device("core")
@rpc
def report(self, dt: float):
self.set_dataset("dt", dt)
@kernel
def run(self):
t0 = self.core.get_rtio_counter_mu()
t1 = self.core.get_rtio_counter_mu()
self.set_dataset("dt", self.core.mu_to_seconds(t1 - t0))
self.report(self.core.mu_to_seconds(t1 - t0))
@nac3
class InvalidCounter(Exception):
pass
@nac3
class WaitForRTIOCounter(EnvExperiment):
core: KernelInvariant[Core]
def build(self):
self.setattr_device("core")
@kernel
def run(self):
self.core.break_realtime()
target_mu = now_mu() + 10000
target_mu = now_mu() + int64(10000)
self.core.wait_until_mu(target_mu)
if self.core.get_rtio_counter_mu() < target_mu:
raise InvalidCounter
@nac3
class PulseNotReceived(Exception):
pass
@nac3
class RTT(EnvExperiment):
core: KernelInvariant[Core]
ttl_inout: KernelInvariant[TTLInOut]
def build(self):
self.setattr_device("core")
self.setattr_device("ttl_inout")
@rpc
def report(self, rtt: float):
self.set_dataset("rtt", rtt)
@kernel
def run(self):
self.core.reset()
self.ttl_inout.output()
delay(1*us)
with interleave:
self.core.delay(1.*us)
t0 = int64(0)
with parallel:
# make sure not to send two commands into the same RTIO
# channel with the same timestamp
self.ttl_inout.gate_rising(5*us)
self.ttl_inout.gate_rising(5.*us)
with sequential:
delay(1*us)
self.core.delay(1.*us)
t0 = now_mu()
self.ttl_inout.pulse(1*us)
self.ttl_inout.pulse(1.*us)
t1 = self.ttl_inout.timestamp_mu(now_mu())
if t1 < 0:
if t1 < int64(0):
raise PulseNotReceived
self.set_dataset("rtt", self.core.mu_to_seconds(t1 - t0))
self.report(self.core.mu_to_seconds(t1 - t0))
@nac3
class Loopback(EnvExperiment):
core: KernelInvariant[Core]
loop_in: KernelInvariant[TTLInOut]
loop_out: KernelInvariant[TTLOut]
def build(self):
self.setattr_device("core")
self.setattr_device("loop_in")
self.setattr_device("loop_out")
@rpc
def report(self, rtt: float):
self.set_dataset("rtt", rtt)
@kernel
def run(self):
self.core.reset()
self.loop_in.input()
self.loop_out.off()
delay(1*us)
self.core.delay(1.*us)
t0 = int64(0)
with parallel:
self.loop_in.gate_rising(2*us)
self.loop_in.gate_rising(2.*us)
with sequential:
delay(1*us)
self.core.delay(1.*us)
t0 = now_mu()
self.loop_out.pulse(1*us)
self.loop_out.pulse(1.*us)
t1 = self.loop_in.timestamp_mu(now_mu())
if t1 < 0:
if t1 < int64(0):
raise PulseNotReceived
self.set_dataset("rtt", self.core.mu_to_seconds(t1 - t0))
self.report(self.core.mu_to_seconds(t1 - t0))
@nac3
class ClockGeneratorLoopback(EnvExperiment):
core: KernelInvariant[Core]
loop_clock_in: KernelInvariant[TTLInOut]
loop_clock_out: KernelInvariant[TTLClockGen]
def build(self):
self.setattr_device("core")
self.setattr_device("loop_clock_in")
self.setattr_device("loop_clock_out")
@rpc
def report(self, count: int32):
self.set_dataset("count", count)
@kernel
def run(self):
self.core.reset()
self.loop_clock_in.input()
self.loop_clock_out.stop()
delay(200*us)
self.core.delay(200.*us)
with parallel:
self.loop_clock_in.gate_rising(10*us)
self.loop_clock_in.gate_rising(10.*us)
with sequential:
delay(200*ns)
self.loop_clock_out.set(1*MHz)
self.set_dataset("count", self.loop_clock_in.count(now_mu()))
self.core.delay(200.*ns)
self.loop_clock_out.set(1.*MHz)
self.report(self.loop_clock_in.count(now_mu()))
@nac3
@ -130,7 +170,7 @@ class PulseRate(EnvExperiment):
self.setattr_device("ttl_out")
@rpc
def set_pulse_rate(self, pulse_rate: float):
def report(self, pulse_rate: float):
self.set_dataset("pulse_rate", pulse_rate)
@kernel
@ -148,23 +188,31 @@ class PulseRate(EnvExperiment):
self.core.break_realtime()
else:
i -= 1
self.set_pulse_rate(self.core.mu_to_seconds(dt))
self.report(self.core.mu_to_seconds(dt))
@nac3
class PulseRateAD9914DDS(EnvExperiment):
core: KernelInvariant[Core]
ad9914dds0: KernelInvariant[AD9914]
ad9914dds1: KernelInvariant[AD9914]
def build(self):
self.setattr_device("core")
self.setattr_device("ad9914dds0")
self.setattr_device("ad9914dds1")
@rpc
def report(self, pulse_rate: float):
self.set_dataset("pulse_rate", pulse_rate)
@kernel
def run(self):
self.core.reset()
dt = self.core.seconds_to_mu(5*us)
freq = self.ad9914dds0.frequency_to_ftw(100*MHz)
dt = self.core.seconds_to_mu(5.*us)
freq = self.ad9914dds0.frequency_to_ftw(100.*MHz)
while True:
delay(10*ms)
self.core.delay(10.*ms)
for i in range(1250):
try:
delay_mu(-self.ad9914dds0.set_duration_mu)
@ -173,22 +221,29 @@ class PulseRateAD9914DDS(EnvExperiment):
self.ad9914dds1.set_mu(freq)
delay_mu(dt)
except RTIOUnderflow:
dt += 100
dt += int64(100)
self.core.break_realtime()
break
else:
self.set_dataset("pulse_rate", self.core.mu_to_seconds(dt//2))
self.report(self.core.mu_to_seconds(dt//int64(2)))
return
@nac3
class LoopbackCount(EnvExperiment):
core: KernelInvariant[Core]
loop_in: KernelInvariant[TTLInOut]
loop_out: KernelInvariant[TTLOut]
npulses: KernelInvariant[int32]
def build(self, npulses):
self.setattr_device("core")
self.setattr_device("loop_in")
self.setattr_device("loop_out")
self.npulses = npulses
def set_count(self, count):
@rpc
def report(self, count: int32):
self.set_dataset("count", count)
@kernel
@ -196,21 +251,27 @@ class LoopbackCount(EnvExperiment):
self.core.reset()
self.loop_in.input()
self.loop_out.output()
delay(5*us)
self.core.delay(5.*us)
with parallel:
self.loop_in.gate_rising(10*us)
self.loop_in.gate_rising(10.*us)
with sequential:
for i in range(self.npulses):
delay(25*ns)
self.loop_out.pulse(25*ns)
self.set_dataset("count", self.loop_in.count(now_mu()))
self.core.delay(25.*ns)
self.loop_out.pulse(25.*ns)
self.report(self.loop_in.count(now_mu()))
@nac3
class IncorrectPulseTiming(Exception):
pass
@nac3
class LoopbackGateTiming(EnvExperiment):
core: KernelInvariant[Core]
loop_in: KernelInvariant[TTLInOut]
loop_out: KernelInvariant[TTLOut]
def build(self):
self.setattr_device("core")
self.setattr_device("loop_in")
@ -222,19 +283,20 @@ class LoopbackGateTiming(EnvExperiment):
self.core.reset()
self.loop_in.input()
self.loop_out.output()
delay_mu(500)
delay_mu(int64(500))
self.loop_out.off()
delay_mu(5000)
delay_mu(int64(5000))
# Determine loop delay.
out_mu = int64(0)
with parallel:
self.loop_in.gate_rising_mu(10000)
self.loop_in.gate_rising_mu(int64(10000))
with sequential:
delay_mu(5000)
delay_mu(int64(5000))
out_mu = now_mu()
self.loop_out.pulse_mu(1000)
self.loop_out.pulse_mu(int64(1000))
in_mu = self.loop_in.timestamp_mu(now_mu())
if in_mu < 0:
if in_mu < int64(0):
raise PulseNotReceived("Cannot determine loop delay")
loop_delay_mu = in_mu - out_mu
@ -242,37 +304,46 @@ class LoopbackGateTiming(EnvExperiment):
# In the most common configuration, 24 mu == 24 ns == 3 coarse periods,
# which should be plenty of slack.
# FIXME: ZC706 with NIST_QC2 needs 48ns - hw problem?
delay_mu(10000)
delay_mu(int64(10000))
gate_start_mu = now_mu()
self.loop_in.gate_both_mu(48) # XXX
self.loop_in.gate_both_mu(int64(48)) # XXX
gate_end_mu = now_mu()
# gateware latency offset between gate and input
lat_offset = 11*8
lat_offset = int64(11*8)
out_mu = gate_start_mu - loop_delay_mu + lat_offset
at_mu(out_mu)
self.loop_out.pulse_mu(48) # XXX
self.loop_out.pulse_mu(int64(48)) # XXX
in_mu = self.loop_in.timestamp_mu(gate_end_mu)
print("timings: ", gate_start_mu, in_mu - lat_offset, gate_end_mu)
if in_mu < 0:
print_rpc("timings:")
print_rpc(gate_start_mu)
print_rpc(in_mu - lat_offset)
print_rpc(gate_end_mu)
if in_mu < int64(0):
raise PulseNotReceived
if not (gate_start_mu <= (in_mu - lat_offset) <= gate_end_mu):
raise IncorrectPulseTiming("Input event should occur during gate")
if not (-2 < (in_mu - out_mu - loop_delay_mu) < 2):
if not (int64(-2) < (in_mu - out_mu - loop_delay_mu) < int64(2)):
raise IncorrectPulseTiming("Loop delay should not change")
in_mu = self.loop_in.timestamp_mu(gate_end_mu)
if in_mu > 0:
if in_mu > int64(0):
raise IncorrectPulseTiming("Only one pulse should be received")
@nac3
class IncorrectLevel(Exception):
pass
@nac3
class Level(EnvExperiment):
core: KernelInvariant[Core]
loop_in: KernelInvariant[TTLInOut]
loop_out: KernelInvariant[TTLOut]
def build(self):
self.setattr_device("core")
self.setattr_device("loop_in")
@ -283,20 +354,25 @@ class Level(EnvExperiment):
self.core.reset()
self.loop_in.input()
self.loop_out.output()
delay(5*us)
self.core.delay(5.*us)
self.loop_out.off()
delay(5*us)
if self.loop_in.sample_get_nonrt():
self.core.delay(5.*us)
if self.loop_in.sample_get_nonrt() != 0:
raise IncorrectLevel
self.loop_out.on()
delay(5*us)
if not self.loop_in.sample_get_nonrt():
self.core.delay(5.*us)
if self.loop_in.sample_get_nonrt() == 0:
raise IncorrectLevel
@nac3
class Watch(EnvExperiment):
core: KernelInvariant[Core]
loop_in: KernelInvariant[TTLInOut]
loop_out: KernelInvariant[TTLOut]
def build(self):
self.setattr_device("core")
self.setattr_device("loop_in")
@ -307,27 +383,31 @@ class Watch(EnvExperiment):
self.core.reset()
self.loop_in.input()
self.loop_out.output()
delay(5*us)
self.core.delay(5.*us)
self.loop_out.off()
delay(5*us)
self.core.delay(5.*us)
if not self.loop_in.watch_stay_off():
raise IncorrectLevel
delay(10*us)
self.core.delay(10.*us)
if not self.loop_in.watch_done():
raise IncorrectLevel
delay(10*us)
self.core.delay(10.*us)
if not self.loop_in.watch_stay_off():
raise IncorrectLevel
delay(3*us)
self.core.delay(3.*us)
self.loop_out.on()
delay(10*us)
self.core.delay(10.*us)
if self.loop_in.watch_done():
raise IncorrectLevel
@nac3
class Underflow(EnvExperiment):
core: KernelInvariant[Core]
ttl_out: KernelInvariant[TTLOut]
def build(self):
self.setattr_device("core")
self.setattr_device("ttl_out")
@ -336,11 +416,15 @@ class Underflow(EnvExperiment):
def run(self):
self.core.reset()
while True:
delay(25*ns)
self.ttl_out.pulse(25*ns)
self.core.delay(25.*ns)
self.ttl_out.pulse(25.*ns)
@nac3
class SequenceError(EnvExperiment):
core: KernelInvariant[Core]
ttl_out: KernelInvariant[TTLOut]
def build(self):
self.setattr_device("core")
self.setattr_device("ttl_out")
@ -348,13 +432,17 @@ class SequenceError(EnvExperiment):
@kernel
def run(self):
self.core.reset()
delay(55*256*us)
self.core.delay(55.*256.*us)
for _ in range(256):
self.ttl_out.pulse(25*us)
delay(-75*us)
self.ttl_out.pulse(25.*us)
self.core.delay(-75.*us)
@nac3
class Collision(EnvExperiment):
core: KernelInvariant[Core]
ttl_out_serdes: KernelInvariant[TTLOut]
def build(self):
self.setattr_device("core")
self.setattr_device("ttl_out_serdes")
@ -362,15 +450,19 @@ class Collision(EnvExperiment):
@kernel
def run(self):
self.core.reset()
delay(5*ms) # make sure we won't get underflow
self.core.delay(5.*ms) # make sure we won't get underflow
for i in range(16):
self.ttl_out_serdes.pulse_mu(1)
delay_mu(1)
self.ttl_out_serdes.pulse_mu(int64(1))
delay_mu(int64(1))
while self.core.get_rtio_counter_mu() < now_mu():
pass
@nac3
class AddressCollision(EnvExperiment):
core: KernelInvariant[Core]
loop_in: KernelInvariant[TTLInOut]
def build(self):
self.setattr_device("core")
self.setattr_device("loop_in")
@ -379,60 +471,89 @@ class AddressCollision(EnvExperiment):
def run(self):
self.core.reset()
self.loop_in.input()
self.loop_in.pulse(10*us)
self.loop_in.pulse(10.*us)
while self.core.get_rtio_counter_mu() < now_mu():
pass
@nac3
class TimeKeepsRunning(EnvExperiment):
core: KernelInvariant[Core]
def build(self):
self.setattr_device("core")
@rpc
def report(self, time_at_start: int64):
self.set_dataset("time_at_start", time_at_start)
@kernel
def run(self):
self.core.reset()
self.set_dataset("time_at_start", now_mu())
self.report(now_mu())
@nac3
class Handover(EnvExperiment):
core: KernelInvariant[Core]
def build(self):
self.setattr_device("core")
@rpc
def report(self, var: str, t: int64):
self.set_dataset(var, t)
@kernel
def k(self, var):
self.set_dataset(var, now_mu())
delay_mu(1234)
def k(self, var: str):
self.report(var, now_mu())
delay_mu(int64(1234))
def run(self):
self.k("t1")
self.k("t2")
@nac3
class Rounding(EnvExperiment):
core: KernelInvariant[Core]
def build(self):
self.setattr_device("core")
@rpc
def report(self, delta: int64):
self.set_dataset("delta", delta)
@kernel
def run(self):
self.core.reset()
t1 = now_mu()
delay(8*us)
self.core.delay(8.*us)
t2 = now_mu()
self.set_dataset("delta", t2 - t1)
self.report(t2 - t1)
@nac3
class DummyException(Exception):
pass
@nac3
class HandoverException(EnvExperiment):
core: KernelInvariant[Core]
def build(self):
self.setattr_device("core")
@rpc
def report(self, var: str, t: int64):
self.set_dataset(var, t)
@kernel
def k(self, var):
self.set_dataset(var, now_mu())
delay_mu(1234)
def k(self, var: str):
self.report(var, now_mu())
delay_mu(int64(1234))
raise DummyException
def run(self):
@ -597,7 +718,7 @@ class _DMA(EnvExperiment):
self.setattr_device("core_dma")
self.setattr_device("ttl_out")
self.trace_name = trace_name
self.delta = np.int64(0)
self.delta = int64(0)
@kernel
def record(self, for_handle=True):
@ -669,6 +790,7 @@ class _DMA(EnvExperiment):
self.core_dma.playback_handle(handle)
@unittest.skip("NAC3TODO https://git.m-labs.hk/M-Labs/nac3/issues/338")
class DMATest(ExperimentCase):
def test_dma_storage(self):
exp = self.create(_DMA)