mirror of
https://github.com/m-labs/artiq.git
synced 2025-01-24 17:38:13 +08:00
dsp: move test tools
This commit is contained in:
parent
b9ce2bb1f0
commit
424a1f8f4e
@ -4,32 +4,6 @@ from functools import reduce
|
||||
from migen import *
|
||||
|
||||
|
||||
def set_dict(e, **k):
|
||||
for k, v in k.items():
|
||||
if isinstance(v, dict):
|
||||
yield from set_dict(getattr(e, k), **v)
|
||||
else:
|
||||
yield getattr(e, k).eq(v)
|
||||
|
||||
|
||||
def xfer(dut, **kw):
|
||||
ep = []
|
||||
for e, v in kw.items():
|
||||
e = getattr(dut, e)
|
||||
yield from set_dict(e, **v)
|
||||
ep.append(e)
|
||||
for e in ep:
|
||||
yield e.stb.eq(1)
|
||||
while ep:
|
||||
yield
|
||||
for e in ep[:]:
|
||||
if hasattr(e, "busy") and (yield e.busy):
|
||||
raise ValueError(e, "busy")
|
||||
if not hasattr(e, "ack") or (yield e.ack):
|
||||
yield e.stb.eq(0)
|
||||
ep.remove(e)
|
||||
|
||||
|
||||
class Delay(Module):
|
||||
def __init__(self, i, delay, o=None):
|
||||
if isinstance(i, (int, tuple)):
|
||||
@ -73,22 +47,3 @@ class SatAddMixin:
|
||||
)
|
||||
]
|
||||
return s0
|
||||
|
||||
|
||||
def szip(*iters):
|
||||
active = {it: None for it in iters}
|
||||
while active:
|
||||
for it in list(active):
|
||||
while True:
|
||||
try:
|
||||
val = it.send(active[it])
|
||||
except StopIteration:
|
||||
del active[it]
|
||||
break
|
||||
if val is None:
|
||||
break
|
||||
else:
|
||||
active[it] = (yield val)
|
||||
val = (yield None)
|
||||
for it in active:
|
||||
active[it] = val
|
||||
|
@ -4,7 +4,7 @@ from migen import *
|
||||
from migen.fhdl.verilog import convert
|
||||
|
||||
from artiq.gateware.dsp.accu import Accu, PhasedAccu
|
||||
from artiq.gateware.dsp.tools import xfer
|
||||
from .tools import xfer
|
||||
|
||||
|
||||
def read(o, n):
|
||||
|
@ -4,7 +4,7 @@ from migen import *
|
||||
from migen.fhdl.verilog import convert
|
||||
|
||||
from artiq.gateware.dsp.sawg import DDSFast
|
||||
from artiq.gateware.dsp.tools import xfer
|
||||
from .tools import xfer
|
||||
|
||||
|
||||
def _test_gen_dds(dut, o):
|
||||
|
@ -4,7 +4,7 @@ from migen import *
|
||||
from migen.fhdl.verilog import convert
|
||||
|
||||
from artiq.gateware.rtio.phy.sawg import Channel
|
||||
from artiq.gateware.dsp.tools import xfer, szip
|
||||
from .tools import xfer, szip
|
||||
|
||||
|
||||
def rtio_xfer(dut, **kwargs):
|
||||
|
@ -4,7 +4,7 @@ from migen import *
|
||||
from migen.fhdl.verilog import convert
|
||||
|
||||
from artiq.gateware.dsp.spline import Spline
|
||||
from artiq.gateware.dsp.tools import xfer
|
||||
from .tools import xfer
|
||||
|
||||
|
||||
def _test_gen_spline(dut, o):
|
||||
|
43
artiq/test/gateware/tools.py
Normal file
43
artiq/test/gateware/tools.py
Normal file
@ -0,0 +1,43 @@
|
||||
def set_dict(e, **k):
|
||||
for k, v in k.items():
|
||||
if isinstance(v, dict):
|
||||
yield from set_dict(getattr(e, k), **v)
|
||||
else:
|
||||
yield getattr(e, k).eq(v)
|
||||
|
||||
|
||||
def xfer(dut, **kw):
|
||||
ep = []
|
||||
for e, v in kw.items():
|
||||
e = getattr(dut, e)
|
||||
yield from set_dict(e, **v)
|
||||
ep.append(e)
|
||||
for e in ep:
|
||||
yield e.stb.eq(1)
|
||||
while ep:
|
||||
yield
|
||||
for e in ep[:]:
|
||||
if hasattr(e, "busy") and (yield e.busy):
|
||||
raise ValueError(e, "busy")
|
||||
if not hasattr(e, "ack") or (yield e.ack):
|
||||
yield e.stb.eq(0)
|
||||
ep.remove(e)
|
||||
|
||||
|
||||
def szip(*iters):
|
||||
active = {it: None for it in iters}
|
||||
while active:
|
||||
for it in list(active):
|
||||
while True:
|
||||
try:
|
||||
val = it.send(active[it])
|
||||
except StopIteration:
|
||||
del active[it]
|
||||
break
|
||||
if val is None:
|
||||
break
|
||||
else:
|
||||
active[it] = (yield val)
|
||||
val = (yield None)
|
||||
for it in active:
|
||||
active[it] = val
|
Loading…
Reference in New Issue
Block a user