edge_counter: port to nac3

This commit is contained in:
Sebastien Bourdeauducq 2022-02-26 08:55:08 +08:00
parent 41c597a707
commit 5db9bc9bd4
3 changed files with 42 additions and 25 deletions

View File

@ -51,11 +51,13 @@ See :mod:`artiq.gateware.rtio.phy.edge_counter` and
:meth:`artiq.gateware.eem.DIO.add_std` for the gateware components.
"""
from numpy import int32, int64
from artiq.coredevice.core import Core
from artiq.language.core import *
from artiq.language.types import *
from artiq.coredevice.rtio import (rtio_output, rtio_input_data,
rtio_input_timestamped_data)
from numpy import int32, int64
CONFIG_COUNT_RISING = 0b0001
CONFIG_COUNT_FALLING = 0b0010
@ -63,12 +65,14 @@ CONFIG_SEND_COUNT_EVENT = 0b0100
CONFIG_RESET_TO_ZERO = 0b1000
@nac3
class CounterOverflow(Exception):
"""Raised when an edge counter value is read which indicates that the
counter might have overflowed."""
pass
@nac3
class EdgeCounter:
"""RTIO TTL edge counter driver driver.
@ -84,7 +88,9 @@ class EdgeCounter:
the gateware needs to be rebuilt.
"""
kernel_invariants = {"core", "channel", "counter_max"}
core: KernelInvariant[Core]
channel: KernelInvariant[int32]
counter_max: KernelInvariant[int32]
def __init__(self, dmgr, channel, gateware_width=31, core_device="core"):
self.core = dmgr.get(core_device)
@ -92,7 +98,7 @@ class EdgeCounter:
self.counter_max = (1 << (gateware_width - 1)) - 1
@kernel
def gate_rising(self, duration):
def gate_rising(self, duration: float) -> int64:
"""Count rising edges for the given duration and request the total at
the end.
@ -106,7 +112,7 @@ class EdgeCounter:
return self.gate_rising_mu(self.core.seconds_to_mu(duration))
@kernel
def gate_falling(self, duration):
def gate_falling(self, duration: float) -> int64:
"""Count falling edges for the given duration and request the total at
the end.
@ -120,7 +126,7 @@ class EdgeCounter:
return self.gate_falling_mu(self.core.seconds_to_mu(duration))
@kernel
def gate_both(self, duration):
def gate_both(self, duration: float) -> int64:
"""Count both rising and falling edges for the given duration, and
request the total at the end.
@ -133,42 +139,43 @@ class EdgeCounter:
"""
return self.gate_both_mu(self.core.seconds_to_mu(duration))
# NAC3TODO: restore keyword arguments in calls (https://git.m-labs.hk/M-Labs/nac3/issues/199)
@kernel
def gate_rising_mu(self, duration_mu):
def gate_rising_mu(self, duration_mu: int64) -> int64:
"""See :meth:`gate_rising`."""
return self._gate_mu(
duration_mu, count_rising=True, count_falling=False)
duration_mu, True, False)
@kernel
def gate_falling_mu(self, duration_mu):
def gate_falling_mu(self, duration_mu: int64) -> int64:
"""See :meth:`gate_falling`."""
return self._gate_mu(
duration_mu, count_rising=False, count_falling=True)
duration_mu, False, True)
@kernel
def gate_both_mu(self, duration_mu):
def gate_both_mu(self, duration_mu: int64) -> int64:
"""See :meth:`gate_both_mu`."""
return self._gate_mu(
duration_mu, count_rising=True, count_falling=True)
duration_mu, True, True)
@kernel
def _gate_mu(self, duration_mu, count_rising, count_falling):
def _gate_mu(self, duration_mu: int64, count_rising: bool, count_falling: bool) -> int64:
self.set_config(
count_rising=count_rising,
count_falling=count_falling,
send_count_event=False,
reset_to_zero=True)
count_rising,
count_falling,
False,
True)
delay_mu(duration_mu)
self.set_config(
count_rising=False,
count_falling=False,
send_count_event=True,
reset_to_zero=False)
False,
False,
True,
False)
return now_mu()
@kernel
def set_config(self, count_rising: TBool, count_falling: TBool,
send_count_event: TBool, reset_to_zero: TBool):
def set_config(self, count_rising: bool, count_falling: bool,
send_count_event: bool, reset_to_zero: bool):
"""Emit an RTIO event at the current timeline position to set the
gateware configuration.
@ -193,7 +200,7 @@ class EdgeCounter:
rtio_output(self.channel << 8, config)
@kernel
def fetch_count(self) -> TInt32:
def fetch_count(self) -> int32:
"""Wait for and return count total from previously requested input
event.
@ -212,7 +219,7 @@ class EdgeCounter:
@kernel
def fetch_timestamped_count(
self, timeout_mu=int64(-1)) -> TTuple([TInt64, TInt32]):
self, timeout_mu: int64 = int64(-1)) -> tuple[int64, int32]:
"""Wait for and return the timestamp and count total of a previously
requested input event.

View File

@ -25,6 +25,13 @@
{
"type": "sampler",
"ports": [4]
},
{
"type": "dio",
"ports": [5],
"bank_direction_low": "input",
"bank_direction_high": "output",
"edge_counter": true
}
]
}

View File

@ -6,6 +6,7 @@ from artiq.coredevice.adf5356 import ADF5356
from artiq.coredevice.urukul import CPLD as UrukulCPLD
from artiq.coredevice.ad9912 import AD9912
from artiq.coredevice.sampler import Sampler
from artiq.coredevice.edge_counter import EdgeCounter
@nac3
@ -17,6 +18,7 @@ class NAC3Devices(EnvExperiment):
urukul0_cpld: KernelInvariant[UrukulCPLD]
urukul0_ch0: KernelInvariant[AD9912]
sampler0: KernelInvariant[Sampler]
ttl0_counter: KernelInvariant[EdgeCounter]
def build(self):
self.setattr_device("core")
@ -26,6 +28,7 @@ class NAC3Devices(EnvExperiment):
self.setattr_device("urukul0_cpld")
self.setattr_device("urukul0_ch0")
self.setattr_device("sampler0")
self.setattr_device("ttl0_counter")
@kernel
def run(self):