forked from M-Labs/artiq
phaser: add more tools
This commit is contained in:
parent
062aca2a6b
commit
d2f776b0d0
|
@ -0,0 +1,46 @@
|
|||
from migen import *
|
||||
from misoc.interconnect.stream import Endpoint
|
||||
|
||||
|
||||
class Spline(Module):
|
||||
def __init__(self, order, width, step=1, time_width=None):
|
||||
if not (step == 1 or order <= 2):
|
||||
raise ValueError("For non-linear splines, "
|
||||
"`step` needs to be one.")
|
||||
layout = [("a{}".format(i), (width, True)) for i in range(order)]
|
||||
self.i = Endpoint(layout)
|
||||
self.o = Endpoint(layout)
|
||||
self.latency = 1
|
||||
|
||||
###
|
||||
|
||||
o = self.o.payload.flatten()
|
||||
|
||||
self.comb += self.i.ack.eq(~self.o.stb | self.o.ack)
|
||||
self.sync += [
|
||||
If(self.o.ack,
|
||||
self.o.stb.eq(0),
|
||||
),
|
||||
If(self.i.ack,
|
||||
self.o.stb.eq(1),
|
||||
[o[i].eq(o[i] + (o[i + 1] << log2_int(step)))
|
||||
for i in range(order - 1)],
|
||||
If(self.i.stb,
|
||||
self.o.payload.eq(self.i.payload),
|
||||
),
|
||||
),
|
||||
]
|
||||
|
||||
def tri(self, time_width):
|
||||
layout = [(name, (length - i*time_width, signed))
|
||||
for i, (name, (length, signed), dir) in
|
||||
enumerate(self.i.payload.layout[::-1])]
|
||||
layout.reverse()
|
||||
i = Endpoint(layout)
|
||||
self.comb += [
|
||||
self.i.stb.eq(i.stb),
|
||||
i.ack.eq(self.i.ack),
|
||||
[i0[-len(i1):].eq(i1) for i0, i1 in
|
||||
zip(self.i.payload.flatten(), i.payload.flatten())]
|
||||
]
|
||||
return i
|
|
@ -1,3 +1,6 @@
|
|||
from operator import add
|
||||
from functools import reduce
|
||||
|
||||
from migen import *
|
||||
|
||||
|
||||
|
@ -27,10 +30,51 @@ def xfer(dut, **kw):
|
|||
ep.remove(e)
|
||||
|
||||
|
||||
class Delay(Module):
|
||||
def __init__(self, i, delay, o=None):
|
||||
if isinstance(i, (int, tuple)):
|
||||
z = [Signal(i) for j in range(delay + 1)]
|
||||
elif isinstance(i, list):
|
||||
z = [Record(i) for j in range(delay + 1)]
|
||||
elif isinstance(i, Record):
|
||||
z = [Record(i.layout) for j in range(delay + 1)]
|
||||
else:
|
||||
z = [Signal.like(i) for j in range(delay + 1)]
|
||||
self.i = z[0]
|
||||
self.o = z[-1]
|
||||
if not isinstance(i, (int, list, tuple)):
|
||||
self.comb += self.i.eq(i)
|
||||
if o is not None:
|
||||
self.comb += o.eq(self.o)
|
||||
self.latency = delay
|
||||
self.sync += [z[j + 1].eq(z[j]) for j in range(delay)]
|
||||
|
||||
|
||||
def eqh(a, b):
|
||||
return a[-len(b):].eq(b[-len(a):])
|
||||
|
||||
|
||||
class SatAddMixin:
|
||||
def sat_add(self, a):
|
||||
a = list(a)
|
||||
# assert all(value_bits_sign(ai)[1] for ai in a)
|
||||
n = max(len(ai) for ai in a)
|
||||
o = log2_int(len(a), need_pow2=False)
|
||||
s = Signal((n + o, True))
|
||||
s0 = Signal((n, True))
|
||||
z = Signal((1, True))
|
||||
self.comb += [
|
||||
s.eq(reduce(add, a, z)),
|
||||
s0[-1].eq(s[-1]),
|
||||
If(s[-o-1:] == Replicate(s[-1], o + 1),
|
||||
s0[:-1].eq(s[:n-1]),
|
||||
).Else(
|
||||
s0[:-1].eq(Replicate(~s[-1], n - 1)),
|
||||
)
|
||||
]
|
||||
return s0
|
||||
|
||||
|
||||
def szip(*iters):
|
||||
active = {it: None for it in iters}
|
||||
while active:
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
import numpy as np
|
||||
|
||||
from migen import *
|
||||
from migen.fhdl.verilog import convert
|
||||
|
||||
from artiq.gateware.dsp.spline import Spline
|
||||
from artiq.gateware.dsp.tools import xfer
|
||||
|
||||
|
||||
def _test_gen_spline(dut, o):
|
||||
yield dut.o.ack.eq(1)
|
||||
yield from xfer(dut, i=dict(a0=0, a1=1, a2=2))
|
||||
for i in range(20):
|
||||
yield
|
||||
o.append((yield dut.o.a0))
|
||||
|
||||
|
||||
def _test_spline():
|
||||
dut = Spline(order=3, width=16, step=1)
|
||||
|
||||
if False:
|
||||
print(convert(dut))
|
||||
else:
|
||||
o = []
|
||||
run_simulation(dut, _test_gen_spline(dut, o), vcd_name="spline.vcd")
|
||||
o = np.array(o)
|
||||
print(o)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
_test_spline()
|
Loading…
Reference in New Issue