mirror of
https://github.com/m-labs/artiq.git
synced 2025-01-12 03:53:34 +08:00
Add gateware input event counter
This commit is contained in:
parent
3c0e3e5910
commit
a565f77538
236
artiq/coredevice/edge_counter.py
Executable file
236
artiq/coredevice/edge_counter.py
Executable file
@ -0,0 +1,236 @@
|
||||
"""Driver for RTIO-enabled TTL edge counter.
|
||||
|
||||
Like for the TTL input PHYs, sensitivity can be configured over RTIO
|
||||
(``gate_rising()``, etc.). In contrast to the former, however, the count is
|
||||
accumulated in gateware, and only a single input event is generated at the end
|
||||
of each gate period::
|
||||
|
||||
with parallel:
|
||||
doppler_cool()
|
||||
self.pmt_counter.gate_rising(1 * ms)
|
||||
|
||||
with parallel:
|
||||
readout()
|
||||
self.pmt_counter.gate_rising(100 * us)
|
||||
|
||||
print("Doppler cooling counts:", self.pmt_counter.fetch_count())
|
||||
print("Readout counts:", self.pmt_counter.fetch_count())
|
||||
|
||||
For applications where the timestamps of the individual input events are not
|
||||
required, this has two advantages over ``TTLInOut.count()`` beyond raw
|
||||
throughput. First, it is easy to count events during multiple separate periods
|
||||
without blocking to read back counts in between, as illustrated in the above
|
||||
example. Secondly, as each count total only takes up a single input event, it
|
||||
is much easier to acquire counts on several channels in parallel without
|
||||
risking input FIFO overflows::
|
||||
|
||||
# Using the TTLInOut driver, pmt_1 input events are only processed
|
||||
# after pmt_0 is done counting. To avoid RTIOOverflows, a round-robin
|
||||
# scheme would have to be implemented manually.
|
||||
|
||||
with parallel:
|
||||
self.pmt_0.gate_rising(10 * ms)
|
||||
self.pmt_1.gate_rising(10 * ms)
|
||||
|
||||
counts_0 = self.pmt_0.count(now_mu()) # blocks
|
||||
counts_1 = self.pmt_1.count(now_mu())
|
||||
|
||||
#
|
||||
|
||||
# Using gateware counters, only a single input event each is
|
||||
# generated, greatly reducing the load on the input FIFOs:
|
||||
|
||||
with parallel:
|
||||
self.pmt_0_counter.gate_rising(10 * ms)
|
||||
self.pmt_1_counter.gate_rising(10 * ms)
|
||||
|
||||
counts_0 = self.pmt_0_counter.fetch_count() # blocks
|
||||
counts_1 = self.pmt_1_counter.fetch_count()
|
||||
|
||||
See :mod:`artiq.gateware.rtio.phy.edge_counter` and
|
||||
:meth:`artiq.gateware.eem.DIO.add_std` for the gateware components.
|
||||
"""
|
||||
|
||||
from artiq.language.core import *
|
||||
from artiq.language.types import *
|
||||
from artiq.coredevice.rtio import (rtio_output, rtio_input_data,
|
||||
rtio_input_timestamped_data)
|
||||
from numpy import int32, int64
|
||||
|
||||
CONFIG_COUNT_RISING = 0b0001
|
||||
CONFIG_COUNT_FALLING = 0b0010
|
||||
CONFIG_SEND_COUNT_EVENT = 0b0100
|
||||
CONFIG_RESET_TO_ZERO = 0b1000
|
||||
|
||||
|
||||
class CounterOverflow(Exception):
|
||||
"""Raised when an edge counter value is read which indicates that the
|
||||
counter might have overflowed."""
|
||||
pass
|
||||
|
||||
|
||||
class EdgeCounter:
|
||||
"""RTIO TTL edge counter driver driver.
|
||||
|
||||
Like for regular TTL inputs, timeline periods where the counter is
|
||||
sensitive to a chosen set of input transitions can be specified. Unlike the
|
||||
former, however, the specified edges do not create individual input events;
|
||||
rather, the total count can be requested as a single input event from the
|
||||
core (typically at the end of the gate window).
|
||||
|
||||
:param channel: The RTIO channel of the gateware phy.
|
||||
:param gateware_width: The width of the gateware counter register, in
|
||||
bits. This is only used for overflow handling; to change the size,
|
||||
the gateware needs to be rebuilt.
|
||||
"""
|
||||
|
||||
kernel_invariants = {"core", "channel", "counter_max"}
|
||||
|
||||
def __init__(self, dmgr, channel, gateware_width=31, core_device="core"):
|
||||
self.core = dmgr.get(core_device)
|
||||
self.channel = channel
|
||||
self.counter_max = (1 << (gateware_width - 1)) - 1
|
||||
|
||||
@kernel
|
||||
def gate_rising(self, duration):
|
||||
"""Count rising edges for the given duration and request the total at
|
||||
the end.
|
||||
|
||||
The counter is reset at the beginning of the gate period. Use
|
||||
:meth:`set_config` directly for more detailed control.
|
||||
|
||||
:param duration: The duration for which the gate is to stay open.
|
||||
|
||||
:return: The timestamp at the end of the gate period, in machine units.
|
||||
"""
|
||||
return self.gate_rising_mu(self.core.seconds_to_mu(duration))
|
||||
|
||||
@kernel
|
||||
def gate_falling(self, duration):
|
||||
"""Count falling edges for the given duration and request the total at
|
||||
the end.
|
||||
|
||||
The counter is reset at the beginning of the gate period. Use
|
||||
:meth:`set_config` directly for more detailed control.
|
||||
|
||||
:param duration: The duration for which the gate is to stay open.
|
||||
|
||||
:return: The timestamp at the end of the gate period, in machine units.
|
||||
"""
|
||||
return self.gate_falling_mu(self.core.seconds_to_mu(duration))
|
||||
|
||||
@kernel
|
||||
def gate_both(self, duration):
|
||||
"""Count both rising and falling edges for the given duration, and
|
||||
request the total at the end.
|
||||
|
||||
The counter is reset at the beginning of the gate period. Use
|
||||
:meth:`set_config` directly for more detailed control.
|
||||
|
||||
:param duration: The duration for which the gate is to stay open.
|
||||
|
||||
:return: The timestamp at the end of the gate period, in machine units.
|
||||
"""
|
||||
return self.gate_both_mu(self.core.seconds_to_mu(duration))
|
||||
|
||||
@kernel
|
||||
def gate_rising_mu(self, duration_mu):
|
||||
"""See :meth:`gate_rising`."""
|
||||
return self._gate_mu(
|
||||
duration_mu, count_rising=True, count_falling=False)
|
||||
|
||||
@kernel
|
||||
def gate_falling_mu(self, duration_mu):
|
||||
"""See :meth:`gate_falling`."""
|
||||
return self._gate_mu(
|
||||
duration_mu, count_rising=False, count_falling=True)
|
||||
|
||||
@kernel
|
||||
def gate_both_mu(self, duration_mu):
|
||||
"""See :meth:`gate_both_mu`."""
|
||||
return self._gate_mu(
|
||||
duration_mu, count_rising=True, count_falling=True)
|
||||
|
||||
@kernel
|
||||
def _gate_mu(self, duration_mu, count_rising, count_falling):
|
||||
self.set_config(
|
||||
count_rising=count_rising,
|
||||
count_falling=count_falling,
|
||||
send_count_event=False,
|
||||
reset_to_zero=True)
|
||||
delay_mu(duration_mu)
|
||||
self.set_config(
|
||||
count_rising=False,
|
||||
count_falling=False,
|
||||
send_count_event=True,
|
||||
reset_to_zero=False)
|
||||
return now_mu()
|
||||
|
||||
@kernel
|
||||
def set_config(self, count_rising: TBool, count_falling: TBool,
|
||||
send_count_event: TBool, reset_to_zero: TBool):
|
||||
"""Emit an RTIO event at the current timeline position to set the
|
||||
gateware configuration.
|
||||
|
||||
For most use cases, the `gate_*` wrappers will be more convenient.
|
||||
|
||||
:param count_rising: Whether to count rising signal edges.
|
||||
:param count_falling: Whether to count falling signal edges.
|
||||
:param send_count_event: If `True`, an input event with the current
|
||||
counter value is generated on the next clock cycle (once).
|
||||
:param reset_to_zero: If `True`, the counter value is reset to zero on
|
||||
the next clock cycle (once).
|
||||
"""
|
||||
config = int32(0)
|
||||
if count_rising:
|
||||
config |= CONFIG_COUNT_RISING
|
||||
if count_falling:
|
||||
config |= CONFIG_COUNT_FALLING
|
||||
if send_count_event:
|
||||
config |= CONFIG_SEND_COUNT_EVENT
|
||||
if reset_to_zero:
|
||||
config |= CONFIG_RESET_TO_ZERO
|
||||
rtio_output(self.channel << 8, config)
|
||||
|
||||
@kernel
|
||||
def fetch_count(self) -> TInt32:
|
||||
"""Wait for and return count total from previously requested input
|
||||
event.
|
||||
|
||||
It is valid to trigger multiple gate periods without immediately
|
||||
reading back the count total; the results will be returned in order on
|
||||
subsequent fetch calls.
|
||||
|
||||
This function blocks until a result becomes available.
|
||||
"""
|
||||
count = rtio_input_data(self.channel)
|
||||
if count == self.counter_max:
|
||||
raise CounterOverflow(
|
||||
"Input edge counter overflow on RTIO channel {0}",
|
||||
int64(self.channel))
|
||||
return count
|
||||
|
||||
@kernel
|
||||
def fetch_timestamped_count(
|
||||
self, timeout_mu=int64(-1)) -> TTuple([TInt64, TInt32]):
|
||||
"""Wait for and return the timestamp and count total of a previously
|
||||
requested input event.
|
||||
|
||||
It is valid to trigger multiple gate periods without immediately
|
||||
reading back the count total; the results will be returned in order on
|
||||
subsequent fetch calls.
|
||||
|
||||
This function blocks until a result becomes available or the given
|
||||
timeout elapses.
|
||||
|
||||
:return: A tuple of timestamp (-1 if timeout elapsed) and counter
|
||||
value. (The timestamp is that of the requested input event –
|
||||
typically the gate closing time – and not that of any input edges.)
|
||||
"""
|
||||
timestamp, count = rtio_input_timestamped_data(timeout_mu,
|
||||
self.channel)
|
||||
if count == self.counter_max:
|
||||
raise CounterOverflow(
|
||||
"Input edge counter overflow on RTIO channel {0}",
|
||||
int64(self.channel))
|
||||
return timestamp, count
|
@ -38,20 +38,32 @@ class DIO(_EEM):
|
||||
for i in range(8)]
|
||||
|
||||
@classmethod
|
||||
def add_std(cls, target, eem, ttl03_cls, ttl47_cls, iostandard="LVDS_25"):
|
||||
def add_std(cls, target, eem, ttl03_cls, ttl47_cls, iostandard="LVDS_25",
|
||||
edge_counter_cls=None):
|
||||
cls.add_extension(target, eem, iostandard=iostandard)
|
||||
|
||||
phys = []
|
||||
for i in range(4):
|
||||
pads = target.platform.request("dio{}".format(eem), i)
|
||||
phy = ttl03_cls(pads.p, pads.n)
|
||||
phys.append(phy)
|
||||
target.submodules += phy
|
||||
target.rtio_channels.append(rtio.Channel.from_phy(phy))
|
||||
for i in range(4):
|
||||
pads = target.platform.request("dio{}".format(eem), 4+i)
|
||||
phy = ttl47_cls(pads.p, pads.n)
|
||||
phys.append(phy)
|
||||
target.submodules += phy
|
||||
target.rtio_channels.append(rtio.Channel.from_phy(phy))
|
||||
|
||||
if edge_counter_cls is not None:
|
||||
for phy in phys:
|
||||
state = getattr(phy, "input_state", None)
|
||||
if state is not None:
|
||||
counter = edge_counter_cls(state)
|
||||
target.submodules += counter
|
||||
target.rtio_channels.append(rtio.Channel.from_phy(counter))
|
||||
|
||||
|
||||
class Urukul(_EEM):
|
||||
@staticmethod
|
||||
|
79
artiq/gateware/rtio/phy/edge_counter.py
Normal file
79
artiq/gateware/rtio/phy/edge_counter.py
Normal file
@ -0,0 +1,79 @@
|
||||
from migen import *
|
||||
from artiq.gateware.rtio import rtlink
|
||||
|
||||
|
||||
class SimpleEdgeCounter(Module):
|
||||
"""Counts rising/falling edges of an input signal.
|
||||
|
||||
Control (sensitivity/zeroing) is done via a single RTIO output channel,
|
||||
which is is also used to request an input event to be emitted with the
|
||||
current counter value.
|
||||
|
||||
:param input_state: The (scalar) input signal to detect edges of. This
|
||||
should already be in the rio_phy clock domain.
|
||||
:param counter_width: The width of the counter register, in bits. Defaults
|
||||
to 31 to match integers being signed in ARTIQ Python.
|
||||
"""
|
||||
|
||||
def __init__(self, input_state, counter_width=31):
|
||||
assert counter_width >= 2
|
||||
|
||||
# RTIO interface:
|
||||
# - output 0: 4 bits, <count_rising><count_falling><send_event><zero_counter>
|
||||
# - input 0: 32 bits, accumulated edge count
|
||||
self.rtlink = rtlink.Interface(
|
||||
rtlink.OInterface(4, enable_replace=False),
|
||||
rtlink.IInterface(counter_width))
|
||||
|
||||
# # #
|
||||
|
||||
current_count = Signal(counter_width)
|
||||
|
||||
count_rising = Signal()
|
||||
count_falling = Signal()
|
||||
send_event_stb = Signal()
|
||||
zero_counter_stb = Signal()
|
||||
|
||||
# Read configuration from RTIO output events.
|
||||
self.sync.rio += [
|
||||
If(self.rtlink.o.stb,
|
||||
count_rising.eq(self.rtlink.o.data[0]),
|
||||
count_falling.eq(self.rtlink.o.data[1]),
|
||||
send_event_stb.eq(self.rtlink.o.data[2]),
|
||||
zero_counter_stb.eq(self.rtlink.o.data[3])
|
||||
).Else(
|
||||
send_event_stb.eq(0),
|
||||
zero_counter_stb.eq(0)
|
||||
)
|
||||
]
|
||||
|
||||
# Generate RTIO input event with current count if requested.
|
||||
event_data = Signal.like(current_count)
|
||||
self.comb += [
|
||||
self.rtlink.i.stb.eq(send_event_stb),
|
||||
self.rtlink.i.data.eq(event_data)
|
||||
]
|
||||
|
||||
# Keep previous input state for edge detection.
|
||||
input_state_d = Signal()
|
||||
self.sync.rio_phy += input_state_d.eq(input_state)
|
||||
|
||||
# Count input edges, saturating at the maximum.
|
||||
new_count = Signal.like(current_count)
|
||||
self.comb += new_count.eq(
|
||||
current_count + Mux(current_count == 2**counter_width - 1,
|
||||
0,
|
||||
(count_rising & (input_state & ~input_state_d)) |
|
||||
(count_falling & (~input_state & input_state_d))
|
||||
)
|
||||
)
|
||||
|
||||
self.sync.rio += [
|
||||
event_data.eq(new_count),
|
||||
current_count.eq(Mux(zero_counter_stb, 0, new_count))
|
||||
]
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
input = Signal(name="input")
|
||||
print(fhdl.verilog.convert(SimpleEdgeCounter(input)))
|
@ -67,6 +67,10 @@ class InOut(Module):
|
||||
override_oe = Signal()
|
||||
self.overrides = [override_en, override_o, override_oe]
|
||||
|
||||
#: LSB of the input state (for edge detection; arbitrary choice, support for
|
||||
#: short pulses will need a more involved solution).
|
||||
self.input_state = Signal()
|
||||
|
||||
# # #
|
||||
|
||||
# Output
|
||||
@ -100,6 +104,7 @@ class InOut(Module):
|
||||
]
|
||||
|
||||
i = serdes.i[-1]
|
||||
self.comb += self.input_state.eq(i)
|
||||
i_d = Signal()
|
||||
self.sync.rio_phy += [
|
||||
i_d.eq(i),
|
||||
|
@ -41,6 +41,9 @@ class Input(Module):
|
||||
self.overrides = []
|
||||
self.probes = []
|
||||
|
||||
#: Registered copy of the input state, in the rio_phy clock domain.
|
||||
self.input_state = Signal()
|
||||
|
||||
# # #
|
||||
|
||||
sensitivity = Signal(2)
|
||||
@ -69,7 +72,8 @@ class Input(Module):
|
||||
(sensitivity[0] & ( i & ~i_d)) |
|
||||
(sensitivity[1] & (~i & i_d))
|
||||
),
|
||||
self.rtlink.i.data.eq(i)
|
||||
self.rtlink.i.data.eq(i),
|
||||
self.input_state.eq(i)
|
||||
]
|
||||
|
||||
self.probes += [i]
|
||||
@ -86,6 +90,9 @@ class InOut(Module):
|
||||
self.overrides = [override_en, override_o, override_oe]
|
||||
self.probes = []
|
||||
|
||||
# Registered copy of the input state, in the rio_phy clock domain.
|
||||
self.input_state = Signal()
|
||||
|
||||
# # #
|
||||
|
||||
ts = TSTriple()
|
||||
@ -126,7 +133,8 @@ class InOut(Module):
|
||||
(sensitivity[0] & ( i & ~i_d)) |
|
||||
(sensitivity[1] & (~i & i_d))
|
||||
),
|
||||
self.rtlink.i.data.eq(i)
|
||||
self.rtlink.i.data.eq(i),
|
||||
self.input_state.eq(i)
|
||||
]
|
||||
|
||||
self.probes += [i, ts.oe]
|
||||
|
134
artiq/gateware/test/rtio/test_edge_counter.py
Normal file
134
artiq/gateware/test/rtio/test_edge_counter.py
Normal file
@ -0,0 +1,134 @@
|
||||
import unittest
|
||||
|
||||
from migen import *
|
||||
from artiq.gateware.rtio.phy.edge_counter import *
|
||||
|
||||
CONFIG_COUNT_RISING = 0b0001
|
||||
CONFIG_COUNT_FALLING = 0b0010
|
||||
CONFIG_SEND_COUNT_EVENT = 0b0100
|
||||
CONFIG_RESET_TO_ZERO = 0b1000
|
||||
|
||||
|
||||
class TimeoutError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class Testbench:
|
||||
def __init__(self, counter_width=32):
|
||||
self.input = Signal()
|
||||
self.dut = SimpleEdgeCounter(self.input, counter_width=counter_width)
|
||||
|
||||
self.fragment = self.dut.get_fragment()
|
||||
cd = ClockDomain("rio")
|
||||
self.fragment.clock_domains.append(cd)
|
||||
self.rio_rst = cd.rst
|
||||
|
||||
def write_config(self, config):
|
||||
bus = self.dut.rtlink.o
|
||||
yield bus.data.eq(config)
|
||||
yield bus.stb.eq(1)
|
||||
yield
|
||||
yield bus.stb.eq(0)
|
||||
yield
|
||||
|
||||
def read_event(self, timeout):
|
||||
bus = self.dut.rtlink.i
|
||||
for _ in range(timeout):
|
||||
if (yield bus.stb):
|
||||
break
|
||||
yield
|
||||
else:
|
||||
raise TimeoutError
|
||||
return (yield bus.data)
|
||||
|
||||
def fetch_count(self, zero=False):
|
||||
c = CONFIG_SEND_COUNT_EVENT
|
||||
if zero:
|
||||
c |= CONFIG_RESET_TO_ZERO
|
||||
yield from self.write_config(c)
|
||||
return (yield from self.read_event(1))
|
||||
|
||||
def toggle_input(self):
|
||||
yield self.input.eq(1)
|
||||
yield
|
||||
yield self.input.eq(0)
|
||||
yield
|
||||
|
||||
def reset_rio(self):
|
||||
yield self.rio_rst.eq(1)
|
||||
yield
|
||||
yield self.rio_rst.eq(0)
|
||||
yield
|
||||
|
||||
def run(self, gen):
|
||||
run_simulation(self.fragment, gen,
|
||||
clocks={n: 5 for n in ["sys", "rio", "rio_phy"]})
|
||||
|
||||
|
||||
class TestEdgeCounter(unittest.TestCase):
|
||||
def test_init(self):
|
||||
tb = Testbench()
|
||||
|
||||
def gen():
|
||||
# No counts initially...
|
||||
self.assertEqual((yield from tb.fetch_count()), 0)
|
||||
|
||||
# ...nor any sensitivity.
|
||||
yield from tb.toggle_input()
|
||||
self.assertEqual((yield from tb.fetch_count()), 0)
|
||||
|
||||
tb.run(gen())
|
||||
|
||||
def test_sensitivity(self):
|
||||
tb = Testbench()
|
||||
|
||||
def gen(sensitivity_config, expected_rising, expected_falling):
|
||||
yield from tb.write_config(sensitivity_config)
|
||||
yield tb.input.eq(1)
|
||||
yield
|
||||
self.assertEqual((yield from tb.fetch_count(zero=True)),
|
||||
expected_rising)
|
||||
|
||||
yield from tb.write_config(sensitivity_config)
|
||||
yield tb.input.eq(0)
|
||||
yield
|
||||
self.assertEqual((yield from tb.fetch_count()), expected_falling)
|
||||
|
||||
yield
|
||||
with self.assertRaises(TimeoutError):
|
||||
# Make sure there are no more suprious events.
|
||||
yield from tb.read_event(10)
|
||||
|
||||
tb.run(gen(CONFIG_COUNT_RISING, 1, 0))
|
||||
tb.run(gen(CONFIG_COUNT_FALLING, 0, 1))
|
||||
tb.run(gen(CONFIG_COUNT_RISING | CONFIG_COUNT_FALLING, 1, 1))
|
||||
|
||||
def test_reset(self):
|
||||
tb = Testbench()
|
||||
|
||||
def gen():
|
||||
# Generate one count.
|
||||
yield from tb.write_config(CONFIG_COUNT_RISING)
|
||||
yield from tb.toggle_input()
|
||||
self.assertEqual((yield from tb.fetch_count()), 1)
|
||||
|
||||
# Make sure it is gone after an RTIO reset, and the counter isn't
|
||||
# sensitive anymore.
|
||||
yield from tb.write_config(CONFIG_COUNT_RISING)
|
||||
yield from tb.reset_rio()
|
||||
yield from tb.toggle_input()
|
||||
self.assertEqual((yield from tb.fetch_count()), 0)
|
||||
|
||||
tb.run(gen())
|
||||
|
||||
def test_saturation(self):
|
||||
for width in range(3, 5):
|
||||
tb = Testbench(counter_width=width)
|
||||
|
||||
def gen():
|
||||
yield from tb.write_config(CONFIG_COUNT_RISING)
|
||||
for _ in range(2**width + 1):
|
||||
yield from tb.toggle_input()
|
||||
self.assertEqual((yield from tb.fetch_count()), 2**width - 1)
|
||||
|
||||
tb.run(gen())
|
97
artiq/test/coredevice/test_edge_counter.py
Normal file
97
artiq/test/coredevice/test_edge_counter.py
Normal file
@ -0,0 +1,97 @@
|
||||
from artiq.experiment import *
|
||||
from artiq.test.hardware_testbench import ExperimentCase
|
||||
|
||||
|
||||
class EdgeCounterExp(EnvExperiment):
|
||||
def build(self):
|
||||
self.setattr_device("core")
|
||||
self.setattr_device("loop_in_counter")
|
||||
self.setattr_device("loop_out")
|
||||
|
||||
@kernel
|
||||
def count_pulse_edges(self, gate_fn):
|
||||
self.core.break_realtime()
|
||||
with parallel:
|
||||
with sequential:
|
||||
delay(5 * us)
|
||||
self.loop_out.pulse(10 * us)
|
||||
with sequential:
|
||||
gate_fn(10 * us)
|
||||
delay(1 * us)
|
||||
gate_fn(10 * us)
|
||||
return (self.loop_in_counter.fetch_count(),
|
||||
self.loop_in_counter.fetch_count())
|
||||
|
||||
@kernel
|
||||
def timeout_timestamp(self):
|
||||
self.core.break_realtime()
|
||||
timestamp_mu, _ = self.loop_in_counter.fetch_timestamped_count(
|
||||
now_mu())
|
||||
return timestamp_mu
|
||||
|
||||
@kernel
|
||||
def gate_relative_timestamp(self):
|
||||
self.core.break_realtime()
|
||||
gate_end_mu = self.loop_in_counter.gate_rising(1 * us)
|
||||
timestamp_mu, _ = self.loop_in_counter.fetch_timestamped_count()
|
||||
return timestamp_mu - gate_end_mu
|
||||
|
||||
@kernel
|
||||
def many_pulses_split(self, num_pulses):
|
||||
self.core.break_realtime()
|
||||
|
||||
self.loop_in_counter.set_config(
|
||||
count_rising=True,
|
||||
count_falling=True,
|
||||
send_count_event=False,
|
||||
reset_to_zero=True)
|
||||
|
||||
for _ in range(num_pulses):
|
||||
self.loop_out.pulse(5 * us)
|
||||
delay(5 * us)
|
||||
|
||||
self.loop_in_counter.set_config(
|
||||
count_rising=True,
|
||||
count_falling=True,
|
||||
send_count_event=True,
|
||||
reset_to_zero=False)
|
||||
|
||||
for _ in range(num_pulses):
|
||||
self.loop_out.pulse(5 * us)
|
||||
delay(5 * us)
|
||||
|
||||
self.loop_in_counter.set_config(
|
||||
count_rising=False,
|
||||
count_falling=False,
|
||||
send_count_event=True,
|
||||
reset_to_zero=False)
|
||||
|
||||
return (self.loop_in_counter.fetch_count(),
|
||||
self.loop_in_counter.fetch_count())
|
||||
|
||||
|
||||
class EdgeCounterTest(ExperimentCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.exp = self.create(EdgeCounterExp)
|
||||
|
||||
def test_sensitivity(self):
|
||||
c = self.exp.loop_in_counter
|
||||
self.assertEqual(self.exp.count_pulse_edges(c.gate_rising), (1, 0))
|
||||
self.assertEqual(self.exp.count_pulse_edges(c.gate_falling), (0, 1))
|
||||
self.assertEqual(self.exp.count_pulse_edges(c.gate_both), (1, 1))
|
||||
|
||||
def test_timeout_timestamp(self):
|
||||
self.assertEqual(self.exp.timeout_timestamp(), -1)
|
||||
|
||||
def test_gate_timestamp(self):
|
||||
# The input event should be received at some point after it was
|
||||
# requested, with some extra latency as it makes its way through the
|
||||
# DRTIO machinery. (We only impose a somewhat arbitrary upper limit
|
||||
# on the latency here.)
|
||||
delta_mu = self.exp.gate_relative_timestamp()
|
||||
self.assertGreaterEqual(delta_mu, 0)
|
||||
self.assertLess(delta_mu, 100)
|
||||
|
||||
def test_many_pulses_split(self):
|
||||
self.assertEqual(self.exp.many_pulses_split(500), (1000, 2000))
|
@ -41,6 +41,12 @@ Digital I/O drivers
|
||||
.. automodule:: artiq.coredevice.ttl
|
||||
:members:
|
||||
|
||||
:mod:`artiq.coredevice.edge_counter` module
|
||||
++++++++++++++++++++++++++++++++++++++++++++
|
||||
|
||||
.. automodule:: artiq.coredevice.edge_counter
|
||||
:members:
|
||||
|
||||
:mod:`artiq.coredevice.shiftreg` module
|
||||
+++++++++++++++++++++++++++++++++++++++
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user