diff --git a/artiq/coredevice/ttl.py b/artiq/coredevice/ttl.py index 2bc40ed58..0aec996da 100644 --- a/artiq/coredevice/ttl.py +++ b/artiq/coredevice/ttl.py @@ -6,10 +6,10 @@ replacement. For example, pulses of "zero" length (e.g. :meth:`TTLInOut.on` immediately followed by :meth:`TTLInOut.off`, without a delay) are suppressed. """ -import numpy +from numpy import int32, int64 from artiq.language.core import * -from artiq.language.types import * +from artiq.coredevice.core import Core from artiq.coredevice.rtio import (rtio_output, rtio_input_timestamp, rtio_input_data) from artiq.coredevice.exceptions import RTIOOverflow @@ -22,6 +22,7 @@ from artiq.coredevice.exceptions import RTIOOverflow # 3 Set input sensitivity and sample +@nac3 class TTLOut: """RTIO TTL output driver. @@ -29,7 +30,9 @@ class TTLOut: :param channel: channel number """ - kernel_invariants = {"core", "channel", "target_o"} + core: KernelInvariant[Core] + channel: KernelInvariant[int32] + target_o: KernelInvariant[int32] def __init__(self, dmgr, channel, core_device="core"): self.core = dmgr.get(core_device) @@ -41,7 +44,7 @@ class TTLOut: pass @kernel - def set_o(self, o): + def set_o(self, o: bool): rtio_output(self.target_o, 1 if o else 0) @kernel @@ -61,7 +64,7 @@ class TTLOut: self.set_o(False) @kernel - def pulse_mu(self, duration): + def pulse_mu(self, duration: int64): """Pulse the output high for the specified duration (in machine units). @@ -71,16 +74,17 @@ class TTLOut: self.off() @kernel - def pulse(self, duration): + def pulse(self, duration: float): """Pulse the output high for the specified duration (in seconds). The time cursor is advanced by the specified duration.""" self.on() - delay(duration) + self.core.delay(duration) self.off() +@nac3 class TTLInOut: """RTIO TTL input/output driver. @@ -107,8 +111,13 @@ class TTLInOut: :param channel: channel number """ - kernel_invariants = {"core", "channel", "gate_latency_mu", - "target_o", "target_oe", "target_sens", "target_sample"} + core: KernelInvariant[Core] + channel: KernelInvariant[int32] + gate_latency_mu: KernelInvariant[int32] + target_o: KernelInvariant[int32] + target_oe: KernelInvariant[int32] + target_sens: KernelInvariant[int32] + target_sample: KernelInvariant[int32] def __init__(self, dmgr, channel, gate_latency_mu=None, core_device="core"): @@ -129,7 +138,7 @@ class TTLInOut: self.target_sample = (channel << 8) + 3 @kernel - def set_oe(self, oe): + def set_oe(self, oe: bool): rtio_output(self.target_oe, 1 if oe else 0) @kernel @@ -159,7 +168,7 @@ class TTLInOut: self.set_oe(False) @kernel - def set_o(self, o): + def set_o(self, o: bool): rtio_output(self.target_o, 1 if o else 0) @kernel @@ -183,7 +192,7 @@ class TTLInOut: self.set_o(False) @kernel - def pulse_mu(self, duration): + def pulse_mu(self, duration: int64): """Pulse the output high for the specified duration (in machine units). @@ -193,22 +202,22 @@ class TTLInOut: self.off() @kernel - def pulse(self, duration): + def pulse(self, duration: float): """Pulse the output high for the specified duration (in seconds). The time cursor is advanced by the specified duration.""" self.on() - delay(duration) + self.core.delay(duration) self.off() # Input API: gating @kernel - def _set_sensitivity(self, value): + def _set_sensitivity(self, value: int32): rtio_output(self.target_sens, value) @kernel - def gate_rising_mu(self, duration): + def gate_rising_mu(self, duration: int64) -> int64: """Register rising edge events for the specified duration (in machine units). @@ -223,7 +232,7 @@ class TTLInOut: return now_mu() @kernel - def gate_falling_mu(self, duration): + def gate_falling_mu(self, duration: int64) -> int64: """Register falling edge events for the specified duration (in machine units). @@ -238,7 +247,7 @@ class TTLInOut: return now_mu() @kernel - def gate_both_mu(self, duration): + def gate_both_mu(self, duration: int64) -> int64: """Register both rising and falling edge events for the specified duration (in machine units). @@ -253,7 +262,7 @@ class TTLInOut: return now_mu() @kernel - def gate_rising(self, duration): + def gate_rising(self, duration: float) -> int64: """Register rising edge events for the specified duration (in seconds). @@ -263,12 +272,12 @@ class TTLInOut: convenience when used with :meth:`count`/:meth:`timestamp_mu`. """ self._set_sensitivity(1) - delay(duration) + self.core.delay(duration) self._set_sensitivity(0) return now_mu() @kernel - def gate_falling(self, duration): + def gate_falling(self, duration: float) -> int64: """Register falling edge events for the specified duration (in seconds). @@ -279,12 +288,12 @@ class TTLInOut: """ self._set_sensitivity(2) - delay(duration) + self.core.delay(duration) self._set_sensitivity(0) return now_mu() @kernel - def gate_both(self, duration): + def gate_both(self, duration: float) -> int64: """Register both rising and falling edge events for the specified duration (in seconds). @@ -294,12 +303,12 @@ class TTLInOut: convenience when used with :meth:`count`/:meth:`timestamp_mu`. """ self._set_sensitivity(3) - delay(duration) + self.core.delay(duration) self._set_sensitivity(0) return now_mu() - @kernel - def count(self, up_to_timestamp_mu): + # NAC3TODO @kernel + def count(self, up_to_timestamp_mu: int64) -> int32: """Consume RTIO input events until the hardware timestamp counter has reached the specified timestamp and return the number of observed events. @@ -347,12 +356,12 @@ class TTLInOut: ttl_input.count(ttl_input.gate_rising(100 * us)) """ count = 0 - while rtio_input_timestamp(up_to_timestamp_mu + self.gate_latency_mu, self.channel) >= 0: + while rtio_input_timestamp(up_to_timestamp_mu + int64(self.gate_latency_mu), self.channel) >= int64(0): count += 1 return count @kernel - def timestamp_mu(self, up_to_timestamp_mu): + def timestamp_mu(self, up_to_timestamp_mu: int64) -> int64: """Return the timestamp of the next RTIO input event, or -1 if the hardware timestamp counter reaches the given value before an event is received. @@ -370,7 +379,7 @@ class TTLInOut: :return: The timestamp (in machine units) of the first event received; -1 on timeout. """ - return rtio_input_timestamp(up_to_timestamp_mu + self.gate_latency_mu, self.channel) + return rtio_input_timestamp(up_to_timestamp_mu + int64(self.gate_latency_mu), self.channel) # Input API: sampling @kernel @@ -382,7 +391,7 @@ class TTLInOut: rtio_output(self.target_sample, 0) @kernel - def sample_get(self): + def sample_get(self) -> int32: """Returns the value of a sample previously obtained with :meth:`sample_input`. @@ -394,7 +403,7 @@ class TTLInOut: return rtio_input_data(self.channel) @kernel - def sample_get_nonrt(self): + def sample_get_nonrt(self) -> int32: """Convenience function that obtains the value of a sample at the position of the time cursor, breaks realtime, and returns the sample value.""" @@ -405,7 +414,7 @@ class TTLInOut: # Input API: watching @kernel - def watch_stay_on(self): + def watch_stay_on(self) -> bool: """Checks that the input is at a high level at the position of the time cursor and keep checking until :meth:`watch_done` is called. @@ -420,13 +429,13 @@ class TTLInOut: return rtio_input_data(self.channel) == 1 @kernel - def watch_stay_off(self): + def watch_stay_off(self) -> bool: """Like :meth:`watch_stay_on`, but for low levels.""" rtio_output(self.target_sample, 1) # gate rising return rtio_input_data(self.channel) == 0 - @kernel - def watch_done(self): + # NAC3TODO @kernel + def watch_done(self) -> bool: """Stop watching the input at the position of the time cursor. Returns ``True`` if the input has not changed state while it @@ -438,13 +447,14 @@ class TTLInOut: rtio_output(self.target_sens, 0) success = True try: - while rtio_input_timestamp(now_mu() + self.gate_latency_mu, self.channel) != -1: + while rtio_input_timestamp(now_mu() + int64(self.gate_latency_mu), self.channel) != -1: success = False except RTIOOverflow: success = False return success +@nac3 class TTLClockGen: """RTIO TTL clock generator driver. @@ -456,31 +466,35 @@ class TTLClockGen: :param channel: channel number :param acc_width: accumulator width in bits """ - kernel_invariants = {"core", "channel", "target", "acc_width"} + core: KernelInvariant[Core] + channel: KernelInvariant[int32] + target: KernelInvariant[int32] + acc_width: KernelInvariant[int32] def __init__(self, dmgr, channel, acc_width=24, core_device="core"): self.core = dmgr.get(core_device) self.channel = channel self.target = channel << 8 - - self.acc_width = numpy.int64(acc_width) + self.acc_width = acc_width @portable - def frequency_to_ftw(self, frequency): + def frequency_to_ftw(self, frequency: float) -> int32: """Returns the frequency tuning word corresponding to the given frequency. """ - return round(2**self.acc_width*frequency*self.core.coarse_ref_period) + # NAC3TODO return round64(2**self.acc_width*frequency*self.core.coarse_ref_period) + return 0 @portable - def ftw_to_frequency(self, ftw): + def ftw_to_frequency(self, ftw: int32) -> float: """Returns the frequency corresponding to the given frequency tuning word. """ - return ftw/self.core.coarse_ref_period/2**self.acc_width + # NAC3TODO return float(ftw)/self.core.coarse_ref_period/2**int64(self.acc_width) + return 0.0 @kernel - def set_mu(self, frequency): + def set_mu(self, frequency: int32): """Set the frequency of the clock, in machine units, at the current position of the time cursor. @@ -500,7 +514,7 @@ class TTLClockGen: rtio_output(self.target, frequency) @kernel - def set(self, frequency): + def set(self, frequency: float): """Like :meth:`set_mu`, but using Hz.""" self.set_mu(self.frequency_to_ftw(frequency)) diff --git a/artiq/language/core.py b/artiq/language/core.py index 4dfb22e9a..36d8b459e 100644 --- a/artiq/language/core.py +++ b/artiq/language/core.py @@ -9,7 +9,7 @@ from types import SimpleNamespace __all__ = [ - "KernelInvariant", + "KernelInvariant", "round64", "extern", "kernel", "portable", "nac3", "rpc", "parallel", "sequential", "set_watchdog_factory", "watchdog", "TerminationRequested" @@ -21,6 +21,10 @@ class KernelInvariant(Generic[T]): pass +def round64(x): + return round(x) + + _allow_module_registration = True _registered_modules = set()