From 05781699b8baabbafd3d42cd953ac7aa113724a9 Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Sat, 21 Mar 2015 00:28:55 -0600 Subject: [PATCH] pdq2: driver and unittest * parses wavesynth style programs * verified with cosimulated gateware --- artiq/devices/pdq2/driver.py | 205 ++++++++++++----------------------- artiq/test/pdq2.py | 92 ++++++++++++++++ 2 files changed, 164 insertions(+), 133 deletions(-) create mode 100644 artiq/test/pdq2.py diff --git a/artiq/devices/pdq2/driver.py b/artiq/devices/pdq2/driver.py index c1a6f7869..f3f1b609f 100644 --- a/artiq/devices/pdq2/driver.py +++ b/artiq/devices/pdq2/driver.py @@ -1,137 +1,88 @@ # Robert Jordens , 2012-2015 +from math import log2, sqrt import logging import struct -import warnings -import numpy as np -from scipy import interpolate import serial logger = logging.getLogger(__name__) class Segment: + max_time = 1 << 16 # uint16 timer + max_val = 1 << 15 # int16 DAC + max_out = 10. # Volt + out_scale = max_val/max_out + cordic_gain = 1. + for i in range(16): + cordic_gain *= sqrt(1 + 2**(-2*i)) + def __init__(self): self.data = b"" - def line(self, typ, dt, data, trigger=False, silence=False, - aux=False, shift=0, end=False, clear=False, wait=False): + def line(self, typ, duration, data, trigger=False, silence=False, + aux=False, shift=0, jump=False, clear=False, wait_trigger=False): assert len(data) % 2 == 0, data assert len(data)//2 <= 14 #assert dt*(1 << shift) > 1 + len(data)//2 - head = ( + header = ( 1 + len(data)//2 | (typ << 4) | (trigger << 6) | (silence << 7) | - (aux << 8) | (shift << 9) | (end << 13) | (clear << 14) | - (wait << 15) + (aux << 8) | (shift << 9) | (jump << 13) | (clear << 14) | + (wait_trigger << 15) ) - self.data += struct.pack(">= 16 width -= 1 ud.append(value) - fmt += " hi"[width] + fmt += "hi"[width] try: return struct.pack(fmt, *ud) except struct.error as e: - logger.error("%s as %s: %s", ud, fmt, e) + logger.error("can not pack %s as %s (%s as %s): %s", + values, widths, ud, fmt, e) raise e - def lines(self, typ, dt, widths, v, first={}, mid={}, last={}, shift=0): - n = len(dt) - 1 - dt = dt.astype(np.uint16) - v = v.astype(np.int64) - for i, (dti, vi) in enumerate(zip(dt, v)): - opts = mid - if i == 0: - opts = first - elif i == n: - opts = last - data = self.pack(widths, vi) - self.line(typ, dti, data, shift=shift, **opts) - @staticmethod - def interpolate(t, v, order, t_eval, widths=None): - """Spline interpolating derivatives for t,v. - The returned spline coefficients are one shorter than t - """ - if order == 0: - return np.rint(v[:, None]) - # FIXME: does not ensure that interpolates do not clip - s = interpolate.splrep(t, v, k=order) - # FIXME: needs k knots outside t_eval - # dv = np.array(interpolate.spalde(t_eval, s)) - dv = np.array([interpolate.splev(t_eval, s, der=i, ext=0) - for i in range(order + 1)]).T - # correct for adder chain latency - if order > 1: - dv[:, 1] += dv[:, 2]/2 + def compensate(coef): + """compensates higher order spline coefficients for integrator chain + latency""" + order = len(coef) if order > 2: - dv[:, 1] += dv[:, 3]/6 - dv[:, 2] += dv[:, 3] - if widths is not None: - dv *= 1 << 16*widths - return np.rint(dv) + coef[1] += coef[2]/2. + if order > 3: + coef[1] += coef[3]/6. + coef[2] += coef[3] + return coef - def line_times(self, t, tr=None): - if tr is None: - tr = np.rint(t) - if len(tr) == 1: - return None, np.array([1]) - dt = np.diff(tr) - assert np.all(dt >= 0) - assert np.all(dt < (1 << 16)) - return tr[:-1], dt + def bias(self, amplitude=[], **kwargs): + coef = self.compensate([self.out_scale*a for a in amplitude]) + data = self.pack([0, 1, 2, 2], coef) + self.line(typ=0, data=data, **kwargs) - def dac(self, t, v, first={}, mid={}, last={}, - shift=0, tr=None, order=3, stop=True): - widths = np.array([1, 2, 3, 3]) - tr, dt = self.line_times(t, tr) - dv = self.interpolate(t, v, order, tr, widths[:order + 1] - 1) - self.lines(0, dt, widths, dv, first, mid, mid if stop else last, shift) - if stop: - self.line(0, 2, self.pack([1], [int(round(v[-1]))]), **last) - - def dds(self, t, v, p=None, f=None, first={}, mid={}, last={}, - shift=0, tr=None, order=3, stop=True): - widths = np.array([1, 2, 3, 3, 1, 2, 2]) - tr, dt = self.line_times(t, tr) - dv = self.interpolate(t, v, order, tr, widths[:order + 1] - 1) - if p is not None: - assert order == 3 - dp = self.interpolate(t, p, 1, tr)[:, :1] - dv = np.concatenate((dv, dp), axis=1) - if f is not None: - df = self.interpolate(t, f, 1, tr, widths[-2:] - 1) - dv = np.concatenate((dv, df), axis=1) - self.lines(1, dt, widths, dv, first, mid, mid if stop else last, shift) - if stop: - dv = [int(round(v[-1])), 0, 0, 0] - if p is not None: - dv.append(int(round(p[-1]))) - if f is not None: - dv.append(int(round(f[-1]))) - self.line(1, 2, self.pack(widths, dv), **last) + def dds(self, amplitude=[], phase=[], **kwargs): + scale = self.out_scale/self.cordic_gain + coef = self.compensate([scale*a for a in amplitude]) + if phase: + assert len(amplitude) == 4 + coef += [p*self.max_val for p in phase] + data = self.pack([0, 1, 2, 2, 0, 1, 1], coef) + self.line(typ=1, data=data, **kwargs) class Channel: - max_data = 4*(1 << 10) # 8kx16 8kx16 4kx16 num_frames = 8 - max_val = 1 << 15 # int16 bit DAC - max_time = 1 << 16 # uint16 bit timer - cordic_gain = 1. - for i in range(16): - cordic_gain *= np.sqrt(1 + 2**(-2*i)) - max_out = 10. - freq = 50e6 # samples/s + max_data = 4*(1 << 10) # 8kx16 8kx16 4kx16 def __init__(self): self.segments = [] @@ -145,29 +96,6 @@ class Channel: self.segments.append(segment) return segment - def segment(self, t, v, p=None, f=None, - order=3, aux=False, shift=0, trigger=True, end=True, - silence=False, stop=True, clear=True, wait=False): - segment = self.new_segment() - t = t*(self.freq/2**shift) - v = np.clip(v/self.max_out, -1, 1) - order = min(order, len(t) - 1) - first = dict(trigger=trigger, clear=clear, aux=aux) - mid = dict(aux=aux) - last = dict(silence=silence, end=end, wait=wait, aux=aux) - if p is None: - v = v*self.max_val - segment.dac(t, v, first, mid, last, shift=shift, order=order, - stop=stop) - else: - v = v*(self.max_val/self.cordic_gain) - p = p*(self.max_val/np.pi) - if f is not None: - f = f*(self.max_val/self.freq) - segment.dds(t, v, p, f, first, mid, last, shift=shift, - order=order, stop=stop) - return segment - def place(self): addr = self.num_frames for segment in self.segments: @@ -196,27 +124,26 @@ class Pdq2: PDQ DAC (a.k.a. QC_Waveform) """ num_dacs = 3 - num_boards = 3 - num_channels = num_dacs*num_boards _escape = b"\xa5" _commands = "RESET TRIGGER ARM DCM START".split() - def __init__(self, url=None, dev=None): + def __init__(self, url=None, dev=None, num_boards=3): if dev is None: dev = serial.serial_for_url(url) self.dev = dev + self.num_boards = num_boards + self.num_channels = self.num_dacs * self.num_boards self.channels = [Channel() for i in range(self.num_channels)] - self.set_freq() - - def set_freq(self, f=50e6): - for c in self.channels: - c.freq = f def close(self): self.dev.close() del self.dev + def clear_all(self): + for channel in self.channels: + channel.clear() + def write(self, data): logger.debug("> %r", data) written = self.dev.write(data) @@ -236,29 +163,41 @@ class Pdq2: data = data.replace(self._escape, self._escape + self._escape) self.write(data) - def write_channel(self, channel, entry=None): + def write_channel(self, channel): self.write_mem(self.channels.index(channel), - channel.serialize(entry)) + channel.serialize()) def write_all(self): for channel in self.channels: self.write_mem(self.channels.index(channel), channel.serialize()) - def write_table(self, channel, segments=None): + def write_table(self, channel): # no segment placement # no segment writing - self.write_mem(channel, self.channels[channel].table(segments)) + self.write_mem(channel, self.channels[channel].table()) def write_segment(self, channel, segment): # no collision check s = self.channels[channel].segments[segment] self.write_mem(channel, s.data, s.adr) - def multi_segment(self, times_voltages, channel, map=None, **kwargs): - warnings.warn("deprecated", DeprecationWarning) - c = self.channels[channel] - del c.segments[:] - for t, v in times_voltages: - c.segment(t, v, **kwargs) - return c.serialize(map) + def program(self, program): + self.clear_all() + for segment_data in program: + segments = [c.new_segment() for c in self.channels] + for line in segment_data: + dac_divider = line.get("dac_divider", 1) + shift = int(log2(dac_divider)) + assert 2**shift == dac_divider + duration = line["duration"] + jump = line.get("jump", False) + wait_trigger = line.get("wait_trigger", False) + for segment, data in zip(segments, line.get("channel_data")): + assert len(data) == 1 + for target, target_data in data.items(): + getattr(segment, target)( + shift=shift, duration=duration, + wait_trigger=wait_trigger, jump=jump, + **target_data) + self.write_all() diff --git a/artiq/test/pdq2.py b/artiq/test/pdq2.py new file mode 100644 index 000000000..5c40d6aeb --- /dev/null +++ b/artiq/test/pdq2.py @@ -0,0 +1,92 @@ +import unittest +import os +import io + +from artiq.devices.pdq2.driver import Pdq2 + + +no_hardware = bool(os.getenv("ARTIQ_NO_HARDWARE")) \ + or bool(os.getenv("ARTIQ_NO_PERIPHERALS")) + +pdq2_source = os.getenv("ARTIQ_PDQ2_SOURCE") + + +class TestPdq2(unittest.TestCase): + def setUp(self): + self.dev = Pdq2(dev=io.BytesIO()) + + def test_reset(self): + self.dev.cmd("RESET", True) + buf = self.dev.dev.getvalue() + self.assertEqual(buf, b"\xa5\x00") + + def test_program(self): + self.dev.program(_test_program) + + @unittest.skipUnless(pdq2_source, "no pdq2 source and gateware") + def test_gateware(self): + self.dev.cmd("START", False) + self.dev.cmd("ARM", False) + self.dev.program(_test_program) + self.dev.cmd("START", True) + self.dev.cmd("ARM", True) + #self.dev.cmd("TRIGGER", True) + buf = self.dev.dev.getvalue() + import sys + sys.path.append(pdq2_source) + from gateware.pdq2 import Pdq2Sim + from migen.sim.generic import run_simulation + from matplotlib import pyplot as plt + import numpy as np + tb = Pdq2Sim(buf) + tb.ctrl_pads.trigger.reset = 0 + run_simulation(tb, vcd_name="pdq2.vcd", ncycles=700) + out = np.array(tb.outputs, np.uint16).view(np.int16) + for outi in out[len(buf):].T: + plt.step(np.arange(len(outi)), outi) + plt.show() + + +_test_program = [ + [ + { + "duration": 20, + "channel_data": [ + {"bias": {"amplitude": [0, 0, 2e-3]}}, + {"bias": {"amplitude": [1, 0, -7.5e-3, 7.5e-4]}}, + {"dds": { + "amplitude": [0, 0, 4e-3, 0], + "phase": [.5, .05], + }}, + ], + }, + { + "duration": 40, + "channel_data": [ + {"bias": {"amplitude": [.4, .04, -2e-3]}}, + {"bias": { + "amplitude": [.5], + "silence": True, + }}, + {"dds": { + "amplitude": [.8, .08, -4e-3, 0], + "phase": [.5, .05, .04/40], + "clear": True, + }}, + ], + }, + { + "duration": 20, + "channel_data": [ + {"bias": {"amplitude": [.4, -.04, 2e-3]}}, + {"bias": {"amplitude": [.5, 0, -7.5e-3, 7.5e-4]}}, + {"dds": { + "amplitude": [.8, -.08, 4e-3, 0], + "phase": [-.5], + }}, + ], + "wait_trigger": True, + "jump": True, + }, + ] +]