gateware: remove SAWG simulations

pull/2046/head
Sebastien Bourdeauducq 2023-01-09 18:37:19 +08:00
parent cee9f3f44e
commit ea21f474a7
9 changed files with 0 additions and 670 deletions

View File

@ -1,112 +0,0 @@
import numpy as np
import matplotlib.pyplot as plt
from migen import *
from migen.fhdl import verilog
from artiq.gateware.dsp import fir
class Transfer(Module):
def __init__(self, dut):
self.submodules.dut = dut
def drive(self, x):
for xi in x.reshape(-1, self.dut.parallelism):
yield [ij.eq(int(xj)) for ij, xj in zip(self.dut.i, xi)]
yield
def record(self, y):
for i in range(self.dut.latency):
yield
for yi in y.reshape(-1, self.dut.parallelism):
yield
yi[:] = (yield from [(yield o) for o in self.dut.o])
def run(self, samples, amplitude=1., seed=None):
if seed is not None:
np.random.seed(seed)
w = 2**(self.dut.width - 1) - 1
x = np.round(np.random.uniform(
-amplitude*w, amplitude*w, samples))
y = self.run_data(x)
x /= w
y /= w
return x, y
def run_data(self, x):
y = np.empty_like(x)
run_simulation(self, [self.drive(x), self.record(y)],
vcd_name="fir.vcd")
return y
def analyze(self, x, y):
fig, ax = plt.subplots(3)
ax[0].plot(x, "c-.", label="input")
ax[0].plot(y, "r-", label="output")
ax[0].legend(loc="right")
ax[0].set_xlabel("time (1/fs)")
ax[0].set_ylabel("signal")
n = len(x)
w = np.hanning(n)
x = (x.reshape(-1, n)*w).sum(0)
y = (y.reshape(-1, n)*w).sum(0)
t = (np.fft.rfft(y)/np.fft.rfft(x))
f = np.fft.rfftfreq(n)*2
fmin = f[1]
ax[1].plot(f, 20*np.log10(np.abs(t)), "r-")
ax[1].set_ylim(-70, 3)
ax[1].set_xlim(fmin, 1.)
# ax[1].set_xscale("log")
ax[1].set_xlabel("frequency (fs/2)")
ax[1].set_ylabel("magnitude (dB)")
ax[1].grid(True)
ax[2].plot(f, np.rad2deg(np.angle(t)), "r-")
ax[2].set_xlim(fmin, 1.)
# ax[2].set_xscale("log")
ax[2].set_xlabel("frequency (fs/2)")
ax[2].set_ylabel("phase (deg)")
ax[2].grid(True)
return fig
class UpTransfer(Transfer):
def drive(self, x):
x = x.reshape(-1, len(self.dut.o))
x[:, 1:] = 0
for xi in x:
yield self.dut.i.eq(int(xi[0]))
yield
def record(self, y):
for i in range(self.dut.latency):
yield
for yi in y.reshape(-1, len(self.dut.o)):
yield
yi[:] = (yield from [(yield o) for o in self.dut.o])
def _main():
if True:
coeff = fir.halfgen4_cascade(2, width=.4, order=8)
dut = fir.ParallelHBFUpsampler(coeff, width=16)
# print(verilog.convert(dut, ios=set([dut.i] + dut.o)))
tb = UpTransfer(dut)
else:
coeff = fir.halfgen4(.4/2, 8)
dut = fir.ParallelFIR(coeff, parallelism=4, width=16)
# print(verilog.convert(dut, ios=set(dut.i + dut.o)))
tb = Transfer(dut)
if True:
x, y = tb.run(samples=1 << 10, amplitude=.5, seed=0x1234567)
else:
x = np.zeros(100)
x[:50] = 1 << 8
x[50:] = 1 << 13
y = tb.run_data(x)
tb.analyze(x, y)
plt.show()
if __name__ == "__main__":
_main()

View File

@ -1,46 +0,0 @@
import numpy as np
from migen import *
from migen.fhdl.verilog import convert
from artiq.gateware.dsp.accu import Accu, PhasedAccu
from .tools import xfer
def read(o, n):
p = []
for i in range(n):
p.append((yield from [(yield pi) for pi in o.payload.flatten()]))
yield
return p
def _test_gen_accu(dut, o):
yield dut.o.ack.eq(1)
yield from xfer(dut, i=dict(p=0, f=1, clr=1))
o.extend((yield from read(dut.o, 8)))
yield from xfer(dut, i=dict(p=0, f=2, clr=0))
o.extend((yield from read(dut.o, 8)))
yield from xfer(dut, i=dict(p=0, f=2, clr=1))
o.extend((yield from read(dut.o, 8)))
yield from xfer(dut, i=dict(p=8, f=-1, clr=1))
o.extend((yield from read(dut.o, 8)))
yield from xfer(dut, i=dict(p=0, f=0, clr=1))
yield from xfer(dut, i=dict(p=1, f=0, clr=0))
o.extend((yield from read(dut.o, 8)))
def _test_accu():
dut = PhasedAccu(8, parallelism=8)
if False:
print(convert(dut))
else:
o = []
run_simulation(dut, _test_gen_accu(dut, o), vcd_name="accu.vcd")
o = np.array(o)
print(o)
if __name__ == "__main__":
_test_accu()

View File

@ -1,71 +0,0 @@
import unittest
import migen as mg
from artiq.gateware.dsp.tools import SatAddMixin
class DUT(mg.Module, SatAddMixin):
def __init__(self, width):
self.o = mg.Signal((width, True))
self.i0 = mg.Signal.like(self.o)
self.i1 = mg.Signal.like(self.o)
self.l0 = mg.Signal.like(self.o)
self.l1 = mg.Signal.like(self.o)
self.c = mg.Signal(2)
self.comb += self.o.eq(self.sat_add((self.i0, self.i1),
width=4, limits=(self.l0, self.l1), clipped=self.c))
class SatAddTest(unittest.TestCase):
def setUp(self):
self.dut = DUT(width=4)
# import migen.fhdl.verilog
# print(mg.fhdl.verilog.convert(self.dut))
def _sweep(self):
def gen():
for i0 in range(-8, 8):
yield self.dut.i0.eq(i0)
for i1 in range(-8, 8):
yield self.dut.i1.eq(i1)
yield
def rec():
l0 = yield self.dut.l0
l1 = yield self.dut.l1
for i in range(1 << 8):
i0 = yield self.dut.i0
i1 = yield self.dut.i1
o = yield self.dut.o
c = yield self.dut.c
full = i0 + i1
lim = full
clip = 0
if full < l0:
lim = l0
clip = 1
if full > l1:
lim = l1
clip = 2
with self.subTest(i0=i0, i1=i1):
self.assertEqual(lim, o)
self.assertEqual(clip, c)
yield
mg.run_simulation(self.dut, (gen(), rec()))
def test_inst(self):
pass
def test_run(self):
self._sweep()
def test_limits(self):
for l0 in -8, 0, 1, 7:
for l1 in -8, 0, 1, 7:
self.setUp()
self.dut.l0.reset = l0
self.dut.l1.reset = l1
with self.subTest(l0=l0, l1=l1):
self._sweep()

View File

@ -1,36 +0,0 @@
import numpy as np
from migen import *
from migen.fhdl.verilog import convert
from artiq.gateware.dsp import sawg
from artiq.gateware.test.dsp.tools import xfer
def _test_gen_dds(dut, o):
yield from xfer(dut,
a=dict(a0=10),
p=dict(a0=0),
f=dict(a0=1),
)
for i in range(256//dut.parallelism):
yield
o.append((yield from [(yield _) for _ in dut.xo]))
def _test_channel():
widths = sawg._Widths(t=8, a=4*8, p=8, f=16)
orders = sawg._Orders(a=4, p=1, f=2)
dut = sawg.SplineParallelDDS(widths, orders, parallelism=2)
if False:
print(convert(dut))
else:
o = []
run_simulation(dut, _test_gen_dds(dut, o), vcd_name="dds.vcd")
o = np.array(o)
print(o[:, :])
if __name__ == "__main__":
_test_channel()

View File

@ -1,255 +0,0 @@
import unittest
import migen as mg
from numpy import int32
from artiq.coredevice import sawg, spline
from artiq.language import (at_mu, now_mu, delay,
core as core_language)
from artiq.gateware.rtio.phy.sawg import Channel
from artiq.sim import devices as sim_devices, time as sim_time
class RTIOManager:
def __init__(self):
self.outputs = []
def rtio_output(self, target, data):
channel = target >> 8
addr = target & 0xff
self.outputs.append((now_mu(), channel, addr, data))
def rtio_output_wide(self, *args, **kwargs):
self.rtio_output(*args, **kwargs)
def delay_mu(self, t):
delay(t)
def patch(self, mod):
assert not hasattr(mod, "_saved")
mod._saved = {}
for name in "rtio_output rtio_output_wide delay_mu".split():
mod._saved[name] = getattr(mod, name, None)
setattr(mod, name, getattr(self, name))
def unpatch(self, mod):
mod.__dict__.update(mod._saved)
del mod._saved
class SAWGTest(unittest.TestCase):
def setUp(self):
core_language.set_time_manager(sim_time.Manager())
self.rtio_manager = RTIOManager()
self.rtio_manager.patch(spline)
self.rtio_manager.patch(sawg)
self.core = sim_devices.Core({})
self.core.coarse_ref_period = 20/3
self.core.ref_multiplier = 1
self.t = self.core.coarse_ref_period
self.channel = mg.ClockDomainsRenamer({"rio_phy": "sys"})(
Channel(width=16, parallelism=2))
self.driver = sawg.SAWG({"core": self.core}, channel_base=0,
parallelism=self.channel.parallelism)
def tearDown(self):
self.rtio_manager.unpatch(spline)
self.rtio_manager.unpatch(sawg)
def test_instantiate(self):
pass
def test_make_events(self):
d = self.driver
d.offset.set(.9)
delay(2*self.t)
d.frequency0.set(.1)
d.frequency1.set(.1)
delay(2*self.t)
d.offset.set(0)
v = int(round((1 << 48) * .1 * self.t))
self.assertEqual(
self.rtio_manager.outputs, [
(0., 1, 0, int(round(self.driver.offset.scale*.9))),
(2.*self.t, 8, 0, int(round(
(1 << self.driver.frequency0.width) *
self.t/self.channel.parallelism*.1))),
(2.*self.t, 3, 0, [int32(v), int32(v >> 32)]),
(4.*self.t, 1, 0, 0),
])
def run_channel(self, events):
def gen(dut, events):
c = 0
for time, channel, address, data in events:
time //= self.t
assert c <= time
while c < time:
yield
c += 1
for phy in dut.phys:
yield phy.rtlink.o.stb.eq(0)
rt = dut.phys[channel].rtlink.o
if isinstance(data, list):
data = sum(int(d) << (i*32) for i, d in enumerate(data))
yield rt.data.eq(int(data))
if hasattr(rt, "address"):
yield rt.address.eq(address)
yield rt.stb.eq(1)
assert not (yield rt.busy)
# print("{}: set ch {} to {}".format(time, channel, hex(data)))
def log(dut, data, n):
for i in range(n + dut.latency):
yield
data.append((yield from [(yield _) for _ in dut.o]))
data = []
# print(int(events[-1][0]) + 1)
mg.run_simulation(self.channel, [
gen(self.channel, events),
log(self.channel, data, int(events[-1][0]//self.t) + 1)],
vcd_name="dds.vcd")
return data
def test_run_channel(self):
self.test_make_events()
self.run_channel(self.rtio_manager.outputs)
def test_coeff(self):
import struct
# these get discrete_compensate
# [.1, .01, -.00001], [.1, .01, .00001, -.000000001]
for v in [-.1], [.1, -.01]:
ch = self.driver.offset
p = ch.coeff_as_packed(v)
t = ch.time_width
w = ch.width
p = [_ & 0xffffffff for _ in p]
p0 = [int(round(vi*ch.scale*ch.time_scale**i))
for i, vi in enumerate(v)]
p0 = [struct.pack("<" + "_bhiiqqqq"[(w + i*t)//8], vi
)[:(w + i*t)//8]
for i, vi in enumerate(p0)]
p0 = b"".join(p0)
if len(p0) % 4:
p0 += b"\x00"*(4 - len(p0) % 4)
p0 = list(struct.unpack("<" + "I"*((len(p0) + 3)//4), p0))
with self.subTest(v):
self.assertEqual(p, p0)
def test_linear(self):
d = self.driver
d.offset.set_coeff_mu([100, 10])
delay(10*self.t)
d.offset.set_coeff([0])
delay(1*self.t)
out = self.run_channel(self.rtio_manager.outputs)
out = out[self.channel.latency + self.channel.u.latency:][:11]
for i in range(len(out) - 1):
with self.subTest(i):
v = 100 + i*10
self.assertEqual(out[i], [v, v])
self.assertEqual(out[-1], [0, 0])
def test_pack(self):
ch = self.driver.offset
self.assertEqual(ch.coeff_as_packed_mu([1]), [1])
self.assertEqual(ch.coeff_as_packed_mu([1, 1 << 16]), [1, 1])
self.assertEqual(ch.coeff_as_packed_mu([1, 1 << 32]), [1, 0])
self.assertEqual(ch.coeff_as_packed_mu([0x1234, 0xa5a5a5a5]),
[0xa5a51234, 0xa5a5])
self.assertEqual(ch.coeff_as_packed_mu([1, 2, 3, 4]),
[0x20001, 0x30000, 0, 4, 0])
self.assertEqual(ch.coeff_as_packed_mu([-1, -2, -3, -4]),
[0xfffeffff, 0xfffdffff, -1, -4, -1])
self.assertEqual(ch.coeff_as_packed_mu([0, -1, 0, -1]),
[0xffff0000, 0x0000ffff, 0, -1, -1])
def test_smooth_linear(self):
ch = self.driver.offset
ch.smooth(.1, .2, 13*self.t, 1)
ch.set(.2)
delay(1*self.t)
out = self.run_channel(self.rtio_manager.outputs)
out = out[self.channel.latency + self.channel.u.latency:][:14]
a = int(round(.1*ch.scale))
da = int(round(.1*ch.scale*(1 << ch.width)//13))
for i in range(len(out) - 1):
with self.subTest(i):
v = a + (i*da >> ch.width)
self.assertEqual(out[i], [v, v])
a = int(round(.2*ch.scale))
self.assertEqual(out[-1], [a, a])
def test_smooth_cubic(self):
ch = self.driver.offset
ch.smooth(.1, .2, 13*self.t, 3)
ch.set(.2)
delay(1*self.t)
out = self.run_channel(self.rtio_manager.outputs)
out = out[self.channel.latency + self.channel.u.latency:][:14]
if False:
import matplotlib.pyplot as plt
plt.plot(sum(out, []))
plt.show()
@unittest.skip("needs artiq.sim.time.TimeManager tweak for "
"reverse timeline jumps")
def test_demo_2tone(self):
MHz = 1e-3
ns = 1.
self.sawg0 = self.driver
t_up = t_hold = t_down = 400*ns
a1 = .3
a2 = .4
order = 3
self.sawg0.frequency0.set(10*MHz)
self.sawg0.phase0.set(0.)
self.sawg0.frequency1.set(1*MHz)
self.sawg0.phase1.set(0.)
self.sawg0.frequency2.set(13*MHz)
self.sawg0.phase2.set(0.)
t = now_mu()
self.sawg0.amplitude1.smooth(.0, a1, t_up, order)
at_mu(t)
self.sawg0.amplitude2.smooth(.0, a2, t_up, order)
self.sawg0.amplitude1.set(a1)
self.sawg0.amplitude2.set(a2)
delay(t_hold)
t = now_mu()
self.sawg0.amplitude1.smooth(a1, .0, t_down, order)
at_mu(t)
self.sawg0.amplitude2.smooth(a2, .0, t_down, order)
self.sawg0.amplitude1.set(.0)
self.sawg0.amplitude2.set(.0)
out = self.run_channel(self.rtio_manager.outputs)
out = sum(out, [])
if True:
import matplotlib.pyplot as plt
plt.plot(out)
plt.show()
def test_fir_overflow(self):
MHz = 1e-3
ns = 1.
f1 = self.driver.frequency1
a1 = self.driver.amplitude1
p1 = self.driver.phase1
cfg = self.driver.config
f1.set(1*MHz)
a1.set(.99)
delay(100*ns)
p1.set(.5)
delay(100*ns)
a1.set(0)
out = self.run_channel(self.rtio_manager.outputs)
out = sum(out, [])
if False:
import matplotlib.pyplot as plt
plt.plot(out)
plt.show()

View File

@ -1,70 +0,0 @@
import numpy as np
from operator import or_
from migen import *
from migen.fhdl.verilog import convert
from artiq.gateware.rtio.phy.sawg import Channel
from .tools import rtio_xfer
def pack_tri(port, *v):
r = 0
w = 0
for vi, p in zip(v, port.payload.flatten()):
w += len(p)
r |= int(vi*(1 << w))
return r
def gen_rtio(dut):
yield
yield from rtio_xfer(
dut,
a1=pack_tri(dut.a1.a, .1),
f0=pack_tri(dut.b.f, .01234567),
f1=pack_tri(dut.a1.f, .01234567),
a2=pack_tri(dut.a1.a, .05),
f2=pack_tri(dut.a1.f, .00534567),
)
def gen_log(dut, o, n):
for i in range(3 + dut.latency):
yield
for i in range(n):
yield
o.append((yield from [(yield _) for _ in dut.o]))
#o.append([(yield dut.a1.xo[0])])
def _test_channel():
width = 16
dut = ClockDomainsRenamer({"rio_phy": "sys"})(
Channel(width=width, parallelism=4)
)
if False:
print(convert(dut))
return
o = []
run_simulation(
dut,
[gen_rtio(dut), gen_log(dut, o, 128)],
vcd_name="dds.vcd")
o = np.array(o)/(1 << (width - 1))
o = o.ravel()
np.savez_compressed("dds.npz", o=o)
import matplotlib.pyplot as plt
fig, ax = plt.subplots(2)
ax[0].step(np.arange(o.size), o)
ax[1].psd(o, 1 << 10, Fs=1, noverlap=1 << 9, scale_by_freq=False)
fig.savefig("dds.pdf")
plt.show()
if __name__ == "__main__":
_test_channel()

View File

@ -1,31 +0,0 @@
import numpy as np
from migen import *
from migen.fhdl.verilog import convert
from artiq.gateware.dsp.spline import Spline
from .tools import xfer
def _test_gen_spline(dut, o):
yield dut.o.ack.eq(1)
yield from xfer(dut, i=dict(a0=0, a1=1, a2=2))
for i in range(20):
yield
o.append((yield dut.o.a0))
def _test_spline():
dut = Spline(order=3, width=16, step=1)
if False:
print(convert(dut))
else:
o = []
run_simulation(dut, _test_gen_spline(dut, o), vcd_name="spline.vcd")
o = np.array(o)
print(o)
if __name__ == "__main__":
_test_spline()

View File

@ -1,49 +0,0 @@
def set_dict(e, **k):
for k, v in k.items():
if isinstance(v, dict):
yield from set_dict(getattr(e, k), **v)
else:
yield getattr(e, k).eq(v)
def xfer(dut, **kw):
ep = []
for e, v in kw.items():
e = getattr(dut, e)
yield from set_dict(e, **v)
ep.append(e)
for e in ep:
yield e.stb.eq(1)
while ep:
yield
for e in ep[:]:
if hasattr(e, "busy") and (yield e.busy):
raise ValueError(e, "busy")
if not hasattr(e, "ack") or (yield e.ack):
yield e.stb.eq(0)
ep.remove(e)
def szip(*iters):
active = {it: None for it in iters}
while active:
for it in list(active):
while True:
try:
val = it.send(active[it])
except StopIteration:
del active[it]
break
if val is None:
break
else:
active[it] = (yield val)
val = (yield None)
for it in active:
active[it] = val
def rtio_xfer(dut, **kwargs):
yield from szip(*(
xfer(dut.phys_named[k].rtlink, o={"data": v})
for k, v in kwargs.items()))