From a3b55b61647b30552c7de140776b82ffcdb87fee Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Tue, 1 Mar 2022 17:44:48 +0800 Subject: [PATCH] dma: tentative port to NAC3 will not work due to missing context manager and possibly memory management issues --- artiq/coredevice/dma.py | 52 ++++++++++++++++++++++++----------------- 1 file changed, 30 insertions(+), 22 deletions(-) diff --git a/artiq/coredevice/dma.py b/artiq/coredevice/dma.py index 261a6bcfe..76f1ab468 100644 --- a/artiq/coredevice/dma.py +++ b/artiq/coredevice/dma.py @@ -5,34 +5,36 @@ the core device's SDRAM, and playing them back at higher speeds than the CPU alone could achieve. """ -from artiq.language.core import syscall, kernel -from artiq.language.types import TInt32, TInt64, TStr, TNone, TTuple +from numpy import int32, int64 + +from artiq.language.core import nac3, extern, kernel, Kernel, KernelInvariant from artiq.coredevice.exceptions import DMAError - -from numpy import int64 +from artiq.coredevice.core import Core -@syscall -def dma_record_start(name: TStr) -> TNone: + +@extern +def dma_record_start(name: str): raise NotImplementedError("syscall not simulated") -@syscall -def dma_record_stop(duration: TInt64) -> TNone: +@extern +def dma_record_stop(duration: int64): raise NotImplementedError("syscall not simulated") -@syscall -def dma_erase(name: TStr) -> TNone: +@extern +def dma_erase(name: str): raise NotImplementedError("syscall not simulated") -@syscall -def dma_retrieve(name: TStr) -> TTuple([TInt64, TInt32]): +@extern +def dma_retrieve(name: str) -> tuple[int64, int32]: raise NotImplementedError("syscall not simulated") -@syscall -def dma_playback(timestamp: TInt64, ptr: TInt32) -> TNone: +@extern +def dma_playback(timestamp: int64, ptr: int32): raise NotImplementedError("syscall not simulated") +@nac3 class DMARecordContextManager: """Context manager returned by :meth:`CoreDMA.record()`. @@ -44,6 +46,9 @@ class DMARecordContextManager: are stored in a newly created trace, and ``now`` is restored to the value it had before the context manager was entered. """ + name: Kernel[str] + saved_now_mu: Kernel[int64] + def __init__(self): self.name = "" self.saved_now_mu = int64(0) @@ -52,21 +57,24 @@ class DMARecordContextManager: def __enter__(self): dma_record_start(self.name) # this may raise, so do it before altering now self.saved_now_mu = now_mu() - at_mu(0) + at_mu(int64(0)) @kernel - def __exit__(self, type, value, traceback): + def __exit__(self): dma_record_stop(now_mu()) # see above at_mu(self.saved_now_mu) +@nac3 class CoreDMA: """Core device Direct Memory Access (DMA) driver. Gives access to the DMA functionality of the core device. """ - kernel_invariants = {"core", "recorder"} + core: KernelInvariant[Core] + recorder: KernelInvariant[DMARecordContextManager] + epoch: Kernel[int32] def __init__(self, dmgr, core_device="core"): self.core = dmgr.get(core_device) @@ -74,7 +82,7 @@ class CoreDMA: self.epoch = 0 @kernel - def record(self, name): + def record(self, name: str) -> DMARecordContextManager: """Returns a context manager that will record a DMA trace called ``name``. Any previously recorded trace with the same name is overwritten. The trace will persist across kernel switches.""" @@ -83,13 +91,13 @@ class CoreDMA: return self.recorder @kernel - def erase(self, name): + def erase(self, name: str): """Removes the DMA trace with the given name from storage.""" self.epoch += 1 dma_erase(name) @kernel - def playback(self, name): + def playback(self, name: str): """Replays a previously recorded DMA trace. This function blocks until the entire trace is submitted to the RTIO FIFOs.""" (advance_mu, ptr) = dma_retrieve(name) @@ -97,14 +105,14 @@ class CoreDMA: delay_mu(advance_mu) @kernel - def get_handle(self, name): + def get_handle(self, name: str) -> tuple[int32, int64, int32]: """Returns a handle to a previously recorded DMA trace. The returned handle is only valid until the next call to :meth:`record` or :meth:`erase`.""" (advance_mu, ptr) = dma_retrieve(name) return (self.epoch, advance_mu, ptr) @kernel - def playback_handle(self, handle): + def playback_handle(self, handle: tuple[int32, int64, int32]): """Replays a handle obtained with :meth:`get_handle`. Using this function is much faster than :meth:`playback` for replaying a set of traces repeatedly, but incurs the overhead of managing the handles onto the programmer."""