artiq/artiq/coredevice/dma.py

76 lines
2.3 KiB
Python
Raw Normal View History

from artiq.language.core import syscall, kernel
2017-03-01 05:28:27 +08:00
from artiq.language.types import TInt64, TStr, TNone
from numpy import int64
@syscall
def dma_record_start() -> TNone:
raise NotImplementedError("syscall not simulated")
@syscall
def dma_record_stop(name: TStr) -> TNone:
raise NotImplementedError("syscall not simulated")
2017-03-01 05:28:27 +08:00
@syscall
def dma_erase(name: TStr) -> TNone:
raise NotImplementedError("syscall not simulated")
@syscall
def dma_playback(timestamp: TInt64, name: TStr) -> TNone:
raise NotImplementedError("syscall not simulated")
class DMARecordContextManager:
def __init__(self):
self.name = ""
self.saved_now_mu = int64(0)
@kernel
def __enter__(self):
"""Starts recording a DMA trace. All RTIO operations are redirected to
a newly created DMA buffer after this call, and ``now`` is reset to zero."""
dma_record_start() # this may raise, so do it before altering now
self.saved_now_mu = now_mu()
at_mu(0)
@kernel
def __exit__(self, type, value, traceback):
"""Stops recording a DMA trace. All recorded RTIO operations are stored
in a newly created trace called ``self.name``, and ``now`` is restored
to the value it had before ``__enter__`` was called."""
dma_record_stop(self.name) # see above
at_mu(self.saved_now_mu)
class CoreDMA:
"""Core device Direct Memory Access (DMA) driver.
Gives access to the DMA functionality of the core device.
"""
kernel_invariants = {"core", "recorder"}
def __init__(self, dmgr, core_device="core"):
self.core = dmgr.get(core_device)
self.recorder = DMARecordContextManager()
@kernel
def record(self, name):
2017-03-01 05:28:27 +08:00
"""Returns a context manager that will record a DMA trace called ``name``.
Any previously recorded trace with the same name is overwritten.
The trace will persist across kernel switches."""
self.recorder.name = name
return self.recorder
2017-03-01 05:28:27 +08:00
@kernel
def erase(self, name):
"""Removes the DMA trace with the given name from storage."""
dma_erase(name)
@kernel
def replay(self, name):
"""Replays a previously recorded DMA trace. This function blocks until
the entire trace is submitted to the RTIO FIFOs."""
dma_playback(now_mu(), name)