From 5db9bc9bd473729ef508ab9ba6542873ac1f7499 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sat, 26 Feb 2022 08:55:08 +0800 Subject: [PATCH] edge_counter: port to nac3 --- artiq/coredevice/edge_counter.py | 57 ++++++++++++--------- artiq/examples/nac3devices/nac3devices.json | 7 +++ artiq/examples/nac3devices/nac3devices.py | 3 ++ 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/artiq/coredevice/edge_counter.py b/artiq/coredevice/edge_counter.py index e7782064e..52357fd53 100644 --- a/artiq/coredevice/edge_counter.py +++ b/artiq/coredevice/edge_counter.py @@ -51,11 +51,13 @@ See :mod:`artiq.gateware.rtio.phy.edge_counter` and :meth:`artiq.gateware.eem.DIO.add_std` for the gateware components. """ +from numpy import int32, int64 + +from artiq.coredevice.core import Core from artiq.language.core import * -from artiq.language.types import * from artiq.coredevice.rtio import (rtio_output, rtio_input_data, rtio_input_timestamped_data) -from numpy import int32, int64 + CONFIG_COUNT_RISING = 0b0001 CONFIG_COUNT_FALLING = 0b0010 @@ -63,12 +65,14 @@ CONFIG_SEND_COUNT_EVENT = 0b0100 CONFIG_RESET_TO_ZERO = 0b1000 +@nac3 class CounterOverflow(Exception): """Raised when an edge counter value is read which indicates that the counter might have overflowed.""" pass +@nac3 class EdgeCounter: """RTIO TTL edge counter driver driver. @@ -84,7 +88,9 @@ class EdgeCounter: the gateware needs to be rebuilt. """ - kernel_invariants = {"core", "channel", "counter_max"} + core: KernelInvariant[Core] + channel: KernelInvariant[int32] + counter_max: KernelInvariant[int32] def __init__(self, dmgr, channel, gateware_width=31, core_device="core"): self.core = dmgr.get(core_device) @@ -92,7 +98,7 @@ class EdgeCounter: self.counter_max = (1 << (gateware_width - 1)) - 1 @kernel - def gate_rising(self, duration): + def gate_rising(self, duration: float) -> int64: """Count rising edges for the given duration and request the total at the end. @@ -106,7 +112,7 @@ class EdgeCounter: return self.gate_rising_mu(self.core.seconds_to_mu(duration)) @kernel - def gate_falling(self, duration): + def gate_falling(self, duration: float) -> int64: """Count falling edges for the given duration and request the total at the end. @@ -120,7 +126,7 @@ class EdgeCounter: return self.gate_falling_mu(self.core.seconds_to_mu(duration)) @kernel - def gate_both(self, duration): + def gate_both(self, duration: float) -> int64: """Count both rising and falling edges for the given duration, and request the total at the end. @@ -133,42 +139,43 @@ class EdgeCounter: """ return self.gate_both_mu(self.core.seconds_to_mu(duration)) + # NAC3TODO: restore keyword arguments in calls (https://git.m-labs.hk/M-Labs/nac3/issues/199) @kernel - def gate_rising_mu(self, duration_mu): + def gate_rising_mu(self, duration_mu: int64) -> int64: """See :meth:`gate_rising`.""" return self._gate_mu( - duration_mu, count_rising=True, count_falling=False) + duration_mu, True, False) @kernel - def gate_falling_mu(self, duration_mu): + def gate_falling_mu(self, duration_mu: int64) -> int64: """See :meth:`gate_falling`.""" return self._gate_mu( - duration_mu, count_rising=False, count_falling=True) + duration_mu, False, True) @kernel - def gate_both_mu(self, duration_mu): + def gate_both_mu(self, duration_mu: int64) -> int64: """See :meth:`gate_both_mu`.""" return self._gate_mu( - duration_mu, count_rising=True, count_falling=True) + duration_mu, True, True) @kernel - def _gate_mu(self, duration_mu, count_rising, count_falling): + def _gate_mu(self, duration_mu: int64, count_rising: bool, count_falling: bool) -> int64: self.set_config( - count_rising=count_rising, - count_falling=count_falling, - send_count_event=False, - reset_to_zero=True) + count_rising, + count_falling, + False, + True) delay_mu(duration_mu) self.set_config( - count_rising=False, - count_falling=False, - send_count_event=True, - reset_to_zero=False) + False, + False, + True, + False) return now_mu() @kernel - def set_config(self, count_rising: TBool, count_falling: TBool, - send_count_event: TBool, reset_to_zero: TBool): + def set_config(self, count_rising: bool, count_falling: bool, + send_count_event: bool, reset_to_zero: bool): """Emit an RTIO event at the current timeline position to set the gateware configuration. @@ -193,7 +200,7 @@ class EdgeCounter: rtio_output(self.channel << 8, config) @kernel - def fetch_count(self) -> TInt32: + def fetch_count(self) -> int32: """Wait for and return count total from previously requested input event. @@ -212,7 +219,7 @@ class EdgeCounter: @kernel def fetch_timestamped_count( - self, timeout_mu=int64(-1)) -> TTuple([TInt64, TInt32]): + self, timeout_mu: int64 = int64(-1)) -> tuple[int64, int32]: """Wait for and return the timestamp and count total of a previously requested input event. diff --git a/artiq/examples/nac3devices/nac3devices.json b/artiq/examples/nac3devices/nac3devices.json index e57ec9fd7..a376ee3af 100644 --- a/artiq/examples/nac3devices/nac3devices.json +++ b/artiq/examples/nac3devices/nac3devices.json @@ -25,6 +25,13 @@ { "type": "sampler", "ports": [4] + }, + { + "type": "dio", + "ports": [5], + "bank_direction_low": "input", + "bank_direction_high": "output", + "edge_counter": true } ] } diff --git a/artiq/examples/nac3devices/nac3devices.py b/artiq/examples/nac3devices/nac3devices.py index 7a2943e18..474aa0cd7 100644 --- a/artiq/examples/nac3devices/nac3devices.py +++ b/artiq/examples/nac3devices/nac3devices.py @@ -6,6 +6,7 @@ from artiq.coredevice.adf5356 import ADF5356 from artiq.coredevice.urukul import CPLD as UrukulCPLD from artiq.coredevice.ad9912 import AD9912 from artiq.coredevice.sampler import Sampler +from artiq.coredevice.edge_counter import EdgeCounter @nac3 @@ -17,6 +18,7 @@ class NAC3Devices(EnvExperiment): urukul0_cpld: KernelInvariant[UrukulCPLD] urukul0_ch0: KernelInvariant[AD9912] sampler0: KernelInvariant[Sampler] + ttl0_counter: KernelInvariant[EdgeCounter] def build(self): self.setattr_device("core") @@ -26,6 +28,7 @@ class NAC3Devices(EnvExperiment): self.setattr_device("urukul0_cpld") self.setattr_device("urukul0_ch0") self.setattr_device("sampler0") + self.setattr_device("ttl0_counter") @kernel def run(self):