coredevice/ttl: port to NAC3

This commit is contained in:
Sebastien Bourdeauducq 2021-11-10 13:58:17 +08:00
parent deb8a77464
commit 2f031285a4
2 changed files with 64 additions and 46 deletions

View File

@ -6,10 +6,10 @@ replacement. For example, pulses of "zero" length (e.g. :meth:`TTLInOut.on`
immediately followed by :meth:`TTLInOut.off`, without a delay) are suppressed.
"""
import numpy
from numpy import int32, int64
from artiq.language.core import *
from artiq.language.types import *
from artiq.coredevice.core import Core
from artiq.coredevice.rtio import (rtio_output, rtio_input_timestamp,
rtio_input_data)
from artiq.coredevice.exceptions import RTIOOverflow
@ -22,6 +22,7 @@ from artiq.coredevice.exceptions import RTIOOverflow
# 3 Set input sensitivity and sample
@nac3
class TTLOut:
"""RTIO TTL output driver.
@ -29,7 +30,9 @@ class TTLOut:
:param channel: channel number
"""
kernel_invariants = {"core", "channel", "target_o"}
core: KernelInvariant[Core]
channel: KernelInvariant[int32]
target_o: KernelInvariant[int32]
def __init__(self, dmgr, channel, core_device="core"):
self.core = dmgr.get(core_device)
@ -41,7 +44,7 @@ class TTLOut:
pass
@kernel
def set_o(self, o):
def set_o(self, o: bool):
rtio_output(self.target_o, 1 if o else 0)
@kernel
@ -61,7 +64,7 @@ class TTLOut:
self.set_o(False)
@kernel
def pulse_mu(self, duration):
def pulse_mu(self, duration: int64):
"""Pulse the output high for the specified duration
(in machine units).
@ -71,16 +74,17 @@ class TTLOut:
self.off()
@kernel
def pulse(self, duration):
def pulse(self, duration: float):
"""Pulse the output high for the specified duration
(in seconds).
The time cursor is advanced by the specified duration."""
self.on()
delay(duration)
self.core.delay(duration)
self.off()
@nac3
class TTLInOut:
"""RTIO TTL input/output driver.
@ -107,8 +111,13 @@ class TTLInOut:
:param channel: channel number
"""
kernel_invariants = {"core", "channel", "gate_latency_mu",
"target_o", "target_oe", "target_sens", "target_sample"}
core: KernelInvariant[Core]
channel: KernelInvariant[int32]
gate_latency_mu: KernelInvariant[int32]
target_o: KernelInvariant[int32]
target_oe: KernelInvariant[int32]
target_sens: KernelInvariant[int32]
target_sample: KernelInvariant[int32]
def __init__(self, dmgr, channel, gate_latency_mu=None,
core_device="core"):
@ -129,7 +138,7 @@ class TTLInOut:
self.target_sample = (channel << 8) + 3
@kernel
def set_oe(self, oe):
def set_oe(self, oe: bool):
rtio_output(self.target_oe, 1 if oe else 0)
@kernel
@ -159,7 +168,7 @@ class TTLInOut:
self.set_oe(False)
@kernel
def set_o(self, o):
def set_o(self, o: bool):
rtio_output(self.target_o, 1 if o else 0)
@kernel
@ -183,7 +192,7 @@ class TTLInOut:
self.set_o(False)
@kernel
def pulse_mu(self, duration):
def pulse_mu(self, duration: int64):
"""Pulse the output high for the specified duration
(in machine units).
@ -193,22 +202,22 @@ class TTLInOut:
self.off()
@kernel
def pulse(self, duration):
def pulse(self, duration: float):
"""Pulse the output high for the specified duration
(in seconds).
The time cursor is advanced by the specified duration."""
self.on()
delay(duration)
self.core.delay(duration)
self.off()
# Input API: gating
@kernel
def _set_sensitivity(self, value):
def _set_sensitivity(self, value: int32):
rtio_output(self.target_sens, value)
@kernel
def gate_rising_mu(self, duration):
def gate_rising_mu(self, duration: int64) -> int64:
"""Register rising edge events for the specified duration
(in machine units).
@ -223,7 +232,7 @@ class TTLInOut:
return now_mu()
@kernel
def gate_falling_mu(self, duration):
def gate_falling_mu(self, duration: int64) -> int64:
"""Register falling edge events for the specified duration
(in machine units).
@ -238,7 +247,7 @@ class TTLInOut:
return now_mu()
@kernel
def gate_both_mu(self, duration):
def gate_both_mu(self, duration: int64) -> int64:
"""Register both rising and falling edge events for the specified
duration (in machine units).
@ -253,7 +262,7 @@ class TTLInOut:
return now_mu()
@kernel
def gate_rising(self, duration):
def gate_rising(self, duration: float) -> int64:
"""Register rising edge events for the specified duration
(in seconds).
@ -263,12 +272,12 @@ class TTLInOut:
convenience when used with :meth:`count`/:meth:`timestamp_mu`.
"""
self._set_sensitivity(1)
delay(duration)
self.core.delay(duration)
self._set_sensitivity(0)
return now_mu()
@kernel
def gate_falling(self, duration):
def gate_falling(self, duration: float) -> int64:
"""Register falling edge events for the specified duration
(in seconds).
@ -279,12 +288,12 @@ class TTLInOut:
"""
self._set_sensitivity(2)
delay(duration)
self.core.delay(duration)
self._set_sensitivity(0)
return now_mu()
@kernel
def gate_both(self, duration):
def gate_both(self, duration: float) -> int64:
"""Register both rising and falling edge events for the specified
duration (in seconds).
@ -294,12 +303,12 @@ class TTLInOut:
convenience when used with :meth:`count`/:meth:`timestamp_mu`.
"""
self._set_sensitivity(3)
delay(duration)
self.core.delay(duration)
self._set_sensitivity(0)
return now_mu()
@kernel
def count(self, up_to_timestamp_mu):
# NAC3TODO @kernel
def count(self, up_to_timestamp_mu: int64) -> int32:
"""Consume RTIO input events until the hardware timestamp counter has
reached the specified timestamp and return the number of observed
events.
@ -347,12 +356,12 @@ class TTLInOut:
ttl_input.count(ttl_input.gate_rising(100 * us))
"""
count = 0
while rtio_input_timestamp(up_to_timestamp_mu + self.gate_latency_mu, self.channel) >= 0:
while rtio_input_timestamp(up_to_timestamp_mu + int64(self.gate_latency_mu), self.channel) >= int64(0):
count += 1
return count
@kernel
def timestamp_mu(self, up_to_timestamp_mu):
def timestamp_mu(self, up_to_timestamp_mu: int64) -> int64:
"""Return the timestamp of the next RTIO input event, or -1 if the
hardware timestamp counter reaches the given value before an event is
received.
@ -370,7 +379,7 @@ class TTLInOut:
:return: The timestamp (in machine units) of the first event received;
-1 on timeout.
"""
return rtio_input_timestamp(up_to_timestamp_mu + self.gate_latency_mu, self.channel)
return rtio_input_timestamp(up_to_timestamp_mu + int64(self.gate_latency_mu), self.channel)
# Input API: sampling
@kernel
@ -382,7 +391,7 @@ class TTLInOut:
rtio_output(self.target_sample, 0)
@kernel
def sample_get(self):
def sample_get(self) -> int32:
"""Returns the value of a sample previously obtained with
:meth:`sample_input`.
@ -394,7 +403,7 @@ class TTLInOut:
return rtio_input_data(self.channel)
@kernel
def sample_get_nonrt(self):
def sample_get_nonrt(self) -> int32:
"""Convenience function that obtains the value of a sample
at the position of the time cursor, breaks realtime, and
returns the sample value."""
@ -405,7 +414,7 @@ class TTLInOut:
# Input API: watching
@kernel
def watch_stay_on(self):
def watch_stay_on(self) -> bool:
"""Checks that the input is at a high level at the position
of the time cursor and keep checking until :meth:`watch_done`
is called.
@ -420,13 +429,13 @@ class TTLInOut:
return rtio_input_data(self.channel) == 1
@kernel
def watch_stay_off(self):
def watch_stay_off(self) -> bool:
"""Like :meth:`watch_stay_on`, but for low levels."""
rtio_output(self.target_sample, 1) # gate rising
return rtio_input_data(self.channel) == 0
@kernel
def watch_done(self):
# NAC3TODO @kernel
def watch_done(self) -> bool:
"""Stop watching the input at the position of the time cursor.
Returns ``True`` if the input has not changed state while it
@ -438,13 +447,14 @@ class TTLInOut:
rtio_output(self.target_sens, 0)
success = True
try:
while rtio_input_timestamp(now_mu() + self.gate_latency_mu, self.channel) != -1:
while rtio_input_timestamp(now_mu() + int64(self.gate_latency_mu), self.channel) != -1:
success = False
except RTIOOverflow:
success = False
return success
@nac3
class TTLClockGen:
"""RTIO TTL clock generator driver.
@ -456,31 +466,35 @@ class TTLClockGen:
:param channel: channel number
:param acc_width: accumulator width in bits
"""
kernel_invariants = {"core", "channel", "target", "acc_width"}
core: KernelInvariant[Core]
channel: KernelInvariant[int32]
target: KernelInvariant[int32]
acc_width: KernelInvariant[int32]
def __init__(self, dmgr, channel, acc_width=24, core_device="core"):
self.core = dmgr.get(core_device)
self.channel = channel
self.target = channel << 8
self.acc_width = numpy.int64(acc_width)
self.acc_width = acc_width
@portable
def frequency_to_ftw(self, frequency):
def frequency_to_ftw(self, frequency: float) -> int32:
"""Returns the frequency tuning word corresponding to the given
frequency.
"""
return round(2**self.acc_width*frequency*self.core.coarse_ref_period)
# NAC3TODO return round64(2**self.acc_width*frequency*self.core.coarse_ref_period)
return 0
@portable
def ftw_to_frequency(self, ftw):
def ftw_to_frequency(self, ftw: int32) -> float:
"""Returns the frequency corresponding to the given frequency tuning
word.
"""
return ftw/self.core.coarse_ref_period/2**self.acc_width
# NAC3TODO return float(ftw)/self.core.coarse_ref_period/2**int64(self.acc_width)
return 0.0
@kernel
def set_mu(self, frequency):
def set_mu(self, frequency: int32):
"""Set the frequency of the clock, in machine units, at the current
position of the time cursor.
@ -500,7 +514,7 @@ class TTLClockGen:
rtio_output(self.target, frequency)
@kernel
def set(self, frequency):
def set(self, frequency: float):
"""Like :meth:`set_mu`, but using Hz."""
self.set_mu(self.frequency_to_ftw(frequency))

View File

@ -9,7 +9,7 @@ from types import SimpleNamespace
__all__ = [
"KernelInvariant",
"KernelInvariant", "round64",
"extern", "kernel", "portable", "nac3", "rpc",
"parallel", "sequential",
"set_watchdog_factory", "watchdog", "TerminationRequested"
@ -21,6 +21,10 @@ class KernelInvariant(Generic[T]):
pass
def round64(x):
return round(x)
_allow_module_registration = True
_registered_modules = set()