forked from M-Labs/artiq
1
0
Fork 0

pdq2: implement changes in trigger/jump semantics, add unittest

The unittests now runs the compute_samples.Synthesizer against the actual
gateware and verifies similarity (up to integer rounding errors).
This commit is contained in:
Robert Jördens 2015-04-05 03:55:11 -06:00
parent e870b27830
commit 1d5f467da7
2 changed files with 64 additions and 24 deletions

View File

@ -22,14 +22,14 @@ class Segment:
self.data = b"" self.data = b""
def line(self, typ, duration, data, trigger=False, silence=False, def line(self, typ, duration, data, trigger=False, silence=False,
aux=False, shift=0, jump=False, clear=False, wait_trigger=False): aux=False, shift=0, jump=False, clear=False, wait=False):
assert len(data) % 2 == 0, data assert len(data) % 2 == 0, data
assert len(data)//2 <= 14 assert len(data)//2 <= 14
#assert dt*(1 << shift) > 1 + len(data)//2 #assert dt*(1 << shift) > 1 + len(data)//2
header = ( header = (
1 + len(data)//2 | (typ << 4) | (trigger << 6) | (silence << 7) | 1 + len(data)//2 | (typ << 4) | (trigger << 6) | (silence << 7) |
(aux << 8) | (shift << 9) | (jump << 13) | (clear << 14) | (aux << 8) | (shift << 9) | (jump << 13) | (clear << 14) |
(wait_trigger << 15) (wait << 15)
) )
self.data += struct.pack("<HH", header, duration) + data self.data += struct.pack("<HH", header, duration) + data
@ -86,7 +86,7 @@ class Segment:
coef = self.compensate([scale*a for a in amplitude]) coef = self.compensate([scale*a for a in amplitude])
if phase: if phase:
assert len(amplitude) == 4 assert len(amplitude) == 4
coef += [p*self.max_val for p in phase] coef += [p*self.max_val*2 for p in phase]
data = self.pack([0, 1, 2, 2, 0, 1, 1], coef) data = self.pack([0, 1, 2, 2, 0, 1, 1], coef)
self.line(typ=1, data=data, **kwargs) self.line(typ=1, data=data, **kwargs)
@ -195,20 +195,23 @@ class Pdq2:
def program(self, program): def program(self, program):
self.clear_all() self.clear_all()
for segment_data in program: for frame_data in program:
segments = [c.new_segment() for c in self.channels] segments = [c.new_segment() for c in self.channels]
for line in segment_data: for i, line in enumerate(frame_data): # segments are concatenated
dac_divider = line.get("dac_divider", 1) dac_divider = line.get("dac_divider", 1)
shift = int(log2(dac_divider)) shift = int(log2(dac_divider))
assert 2**shift == dac_divider assert 2**shift == dac_divider
duration = line["duration"] duration = line["duration"]
jump = line.get("jump", False) trigger = line.get("trigger", False)
wait_trigger = line.get("wait_trigger", False) if i == 0:
assert trigger
trigger = False # use wait on the last line
eof = i == len(frame_data) - 1
for segment, data in zip(segments, line.get("channel_data")): for segment, data in zip(segments, line.get("channel_data")):
assert len(data) == 1 assert len(data) == 1
for target, target_data in data.items(): for target, target_data in data.items():
getattr(segment, target)( getattr(segment, target)(
shift=shift, duration=duration, shift=shift, duration=duration,
wait_trigger=wait_trigger, jump=jump, trigger=trigger, wait=eof, jump=eof,
**target_data) **target_data)
self.write_all() self.write_all()

View File

@ -3,14 +3,16 @@ import os
import io import io
from artiq.devices.pdq2.driver import Pdq2 from artiq.devices.pdq2.driver import Pdq2
from artiq.wavesynth.compute_samples import Synthesizer
pdq2_source = os.getenv("ARTIQ_PDQ2_SOURCE") pdq2_gateware = os.getenv("ARTIQ_PDQ2_GATEWARE")
class TestPdq2(unittest.TestCase): class TestPdq2(unittest.TestCase):
def setUp(self): def setUp(self):
self.dev = Pdq2(dev=io.BytesIO()) self.dev = Pdq2(dev=io.BytesIO())
self.synth = Synthesizer(3, _test_program)
def test_reset(self): def test_reset(self):
self.dev.cmd("RESET", True) self.dev.cmd("RESET", True)
@ -21,40 +23,77 @@ class TestPdq2(unittest.TestCase):
# about 0.14 ms # about 0.14 ms
self.dev.program(_test_program) self.dev.program(_test_program)
@unittest.skipUnless(pdq2_source, "no pdq2 source and gateware") def test_cmd_program(self):
def test_gateware(self):
self.dev.cmd("START", False)
self.dev.cmd("ARM", False) self.dev.cmd("ARM", False)
self.dev.cmd("START", False)
self.dev.program(_test_program) self.dev.program(_test_program)
self.dev.cmd("START", True) self.dev.cmd("START", True)
self.dev.cmd("ARM", True) self.dev.cmd("ARM", True)
#self.dev.cmd("TRIGGER", True) #self.dev.cmd("TRIGGER", True)
buf = self.dev.dev.getvalue() return self.dev.dev.getvalue()
def test_synth(self):
s = self.synth
s.select(0)
y = s.trigger()
return list(zip(*y))
def run_gateware(self):
import sys import sys
sys.path.append(pdq2_source) sys.path.append(pdq2_gateware)
from gateware.pdq2 import Pdq2Sim from gateware.pdq2 import Pdq2Sim
from migen.sim.generic import run_simulation from migen.sim.generic import run_simulation
from matplotlib import pyplot as plt
import numpy as np buf = self.test_cmd_program()
tb = Pdq2Sim(buf) tb = Pdq2Sim(buf)
tb.ctrl_pads.trigger.reset = 0 tb.ctrl_pads.trigger.reset = 0
run_simulation(tb, vcd_name="pdq2.vcd", ncycles=len(buf) + 250) run_simulation(tb, vcd_name="pdq2.vcd", ncycles=len(buf) + 250)
out = np.array(tb.outputs, np.uint16).view(np.int16) delays = 7, 10, 30
for outi in out[len(buf) + 100:].T: y = list(zip(*tb.outputs[len(buf) + 130:]))
plt.step(np.arange(len(outi)), outi) y = list(zip(*(yi[di:] for yi, di in zip(y, delays))))
self.assertGreaterEqual(len(y), 80)
self.assertEqual(len(y[0]), 3)
return y
@unittest.skipUnless(pdq2_gateware, "no pdq2 gateware")
def test_run_compare(self):
y_ref = self.test_synth()
y = self.run_gateware()
for i, (yi, yi_ref) in enumerate(zip(y, y_ref)):
for j, (yij, yij_ref) in enumerate(zip(yi, yi_ref)):
yij = yij*20./2**16
if yij > 10:
yij -= 20
self.assertAlmostEqual(yij, yij_ref, 2,
"foo t={}, c={}".format(i, j))
@unittest.skipUnless(pdq2_gateware, "no pdq2 gateware")
@unittest.skip("manual/visual test")
def test_run_plot(self):
from matplotlib import pyplot as plt
import numpy as np
y_ref = self.test_synth()
y_ref = np.array(y_ref)
y = self.run_gateware()
y = np.array(y, dtype=np.uint16).view(np.int16)
y = y*20./2**16
plt.step(np.arange(len(y)), y)
plt.step(np.arange(len(y_ref)), y_ref, "k")
plt.show() plt.show()
_test_program = [ _test_program = [
[ [
{ {
"trigger": True,
"duration": 20, "duration": 20,
"channel_data": [ "channel_data": [
{"bias": {"amplitude": [0, 0, 2e-3]}}, {"bias": {"amplitude": [0, 0, 2e-3]}},
{"bias": {"amplitude": [1, 0, -7.5e-3, 7.5e-4]}}, {"bias": {"amplitude": [1, 0, -7.5e-3, 7.5e-4]}},
{"dds": { {"dds": {
"amplitude": [0, 0, 4e-3, 0], "amplitude": [0, 0, 4e-3, 0],
"phase": [.5, .05], "phase": [.25, .025],
}}, }},
], ],
}, },
@ -68,7 +107,7 @@ _test_program = [
}}, }},
{"dds": { {"dds": {
"amplitude": [.8, .08, -4e-3, 0], "amplitude": [.8, .08, -4e-3, 0],
"phase": [.5, .05, .04/40], "phase": [.25, .025, .02/40],
"clear": True, "clear": True,
}}, }},
], ],
@ -80,11 +119,9 @@ _test_program = [
{"bias": {"amplitude": [.5, 0, -7.5e-3, 7.5e-4]}}, {"bias": {"amplitude": [.5, 0, -7.5e-3, 7.5e-4]}},
{"dds": { {"dds": {
"amplitude": [.8, -.08, 4e-3, 0], "amplitude": [.8, -.08, 4e-3, 0],
"phase": [-.5], "phase": [-.25],
}}, }},
], ],
"wait_trigger": True,
"jump": True,
}, },
] ]
] ]