artiq/artiq/coredevice/rtio.py

194 lines
5.0 KiB
Python
Raw Normal View History

2014-09-12 15:28:02 +08:00
from artiq.language.core import *
from artiq.language.db import *
2014-09-12 15:28:02 +08:00
class LLRTIOOut(AutoDB):
2014-10-17 00:12:53 +08:00
"""Low-level RTIO output driver.
Allows setting RTIO outputs at arbitrary times, without time unit
2014-12-01 17:32:36 +08:00
conversion.
2014-10-17 00:12:53 +08:00
This is meant to be used mostly in drivers; consider using
``RTIOOut`` instead.
"""
class DBKeys:
2015-03-08 18:37:53 +08:00
core = Device()
channel = Argument()
2014-10-17 00:12:53 +08:00
def build(self):
self._set_oe()
@kernel
def _set_oe(self):
2015-04-14 19:44:45 +08:00
syscall("rtio_set_oe", time_to_cycles(now()), self.channel, True)
2014-10-17 00:12:53 +08:00
@kernel
2015-04-14 19:44:45 +08:00
def set_o(self, t, value):
"""Sets the output value of the RTIO channel.
2014-10-17 00:12:53 +08:00
:param t: timestamp in RTIO cycles (64-bit integer).
:param value: value to set at the output.
"""
2015-04-14 19:44:45 +08:00
syscall("rtio_set_o", t, self.channel, value)
2014-10-17 00:12:53 +08:00
@kernel
def on(self, t):
"""Turns the RTIO channel on.
:param t: timestamp in RTIO cycles (64-bit integer).
"""
2015-04-14 19:44:45 +08:00
self.set_o(t, 1)
2014-10-17 00:12:53 +08:00
@kernel
def off(self, t):
"""Turns the RTIO channel off.
:param t: timestamp in RTIO cycles (64-bit integer).
"""
2015-04-14 19:44:45 +08:00
self.set_o(t, 0)
2014-10-17 00:12:53 +08:00
2014-12-01 17:32:36 +08:00
2015-04-14 19:44:45 +08:00
class RTIOOut(AutoDB):
"""RTIO output driver.
2014-09-30 18:10:40 +08:00
Configures the corresponding RTIO channel as output on the core device and
provides functions to set its level.
This driver supports zero-length transition suppression. For example, if
two pulses are emitted back-to-back with no delay between them, they will
be merged into a single pulse with a duration equal to the sum of the
durations of the original pulses.
:param core: core device
:param channel: channel number
"""
2015-04-14 19:44:45 +08:00
class DBKeys:
core = Device()
channel = Argument()
def build(self):
2015-04-14 19:44:45 +08:00
self.previous_timestamp = int64(0) # in RTIO cycles
self._set_oe()
@kernel
def _set_oe(self):
syscall("rtio_set_oe", time_to_cycles(now()), self.channel, True)
@kernel
def _set_o(self, value):
syscall("rtio_set_o", time_to_cycles(now()), self.channel, value)
self.previous_timestamp = time_to_cycles(now())
2014-09-12 15:28:02 +08:00
@kernel
def sync(self):
"""Busy-waits until all programmed level switches have been effected.
This function is useful to synchronize CPU-controlled devices (such as
the AD9858 DDS bus) with related RTIO controls (such as RF switches at
the output of the DDS).
"""
2014-11-30 00:13:54 +08:00
while syscall("rtio_get_counter") < self.previous_timestamp:
pass
2014-09-12 15:28:02 +08:00
@kernel
def on(self):
2015-04-14 19:44:45 +08:00
"""Sets the output to a logic high state."""
self._set_o(1)
2014-09-12 15:28:02 +08:00
@kernel
def off(self):
2015-04-14 19:44:45 +08:00
"""Sets the output to a logic low state."""
self._set_o(0)
2014-09-12 15:28:02 +08:00
@kernel
def pulse(self, duration):
"""Pulses the output high for the specified duration.
"""
2014-09-12 15:28:02 +08:00
self.on()
delay(duration)
self.off()
2014-09-13 19:37:57 +08:00
2015-04-14 19:44:45 +08:00
class RTIOIn(AutoDB):
"""RTIO input driver.
2014-09-30 18:10:40 +08:00
Configures the corresponding RTIO channel as input on the core device and
provides functions to analyze the incoming signal, with real-time gating
to prevent overflows.
:param core: core device
:param channel: channel number
"""
2015-04-14 19:44:45 +08:00
class DBKeys:
core = Device()
channel = Argument()
def build(self):
2015-04-14 19:44:45 +08:00
self.previous_timestamp = int64(0) # in RTIO cycles
self._set_oe()
@kernel
def _set_oe(self):
syscall("rtio_set_oe", time_to_cycles(now()), self.channel, False)
@kernel
def _set_sensitivity(self, value):
syscall("rtio_set_sensitivity", time_to_cycles(now()), self.channel, value)
self.previous_timestamp = time_to_cycles(now())
2014-09-13 19:37:57 +08:00
@kernel
def gate_rising(self, duration):
"""Register rising edge events for the specified duration.
"""
2015-04-14 19:44:45 +08:00
self._set_sensitivity(1)
delay(duration)
2015-04-14 19:44:45 +08:00
self._set_sensitivity(0)
@kernel
def gate_falling(self, duration):
"""Register falling edge events for the specified duration.
"""
2015-04-14 19:44:45 +08:00
self._set_sensitivity(2)
delay(duration)
2015-04-14 19:44:45 +08:00
self._set_sensitivity(0)
@kernel
def gate_both(self, duration):
"""Register both rising and falling edge events for the specified
duration.
"""
2015-04-14 19:44:45 +08:00
self._set_sensitivity(3)
delay(duration)
2015-04-14 19:44:45 +08:00
self._set_sensitivity(0)
2014-10-21 23:14:01 +08:00
2014-09-13 19:37:57 +08:00
@kernel
def count(self):
"""Poll the RTIO input during all the previously programmed gate
openings, and returns the number of registered events.
"""
count = 0
2014-11-30 00:13:54 +08:00
while syscall("rtio_get", self.channel, self.previous_timestamp) >= 0:
count += 1
return count
2014-10-14 15:54:10 +08:00
@kernel
def timestamp(self):
"""Poll the RTIO input and returns an event timestamp, according to
the gating.
If the gate is permanently closed, returns a negative value.
"""
2014-11-30 00:13:54 +08:00
return cycles_to_time(syscall("rtio_get", self.channel,
self.previous_timestamp))