From 85850ad9e8e76d4f623bad656460490b0ebfe4b6 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Wed, 13 Dec 2023 13:36:21 +0800 Subject: [PATCH] wavesynth: remove --- RELEASE_NOTES.rst | 1 + artiq/test/test_wavesynth.py | 127 ---------------- artiq/wavesynth/__init__.py | 0 artiq/wavesynth/coefficients.py | 234 ----------------------------- artiq/wavesynth/compute_samples.py | 133 ---------------- 5 files changed, 1 insertion(+), 494 deletions(-) delete mode 100644 artiq/test/test_wavesynth.py delete mode 100644 artiq/wavesynth/__init__.py delete mode 100644 artiq/wavesynth/coefficients.py delete mode 100644 artiq/wavesynth/compute_samples.py diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index c80d85a18..15ce87197 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -94,6 +94,7 @@ Accesses to the data argument should be replaced as below: txn.put(key.encode(), pyon.encode((value, {})).encode()) new.close() +* ``artiq.wavesynth`` has been removed. ARTIQ-7 ------- diff --git a/artiq/test/test_wavesynth.py b/artiq/test/test_wavesynth.py deleted file mode 100644 index c8323ba5b..000000000 --- a/artiq/test/test_wavesynth.py +++ /dev/null @@ -1,127 +0,0 @@ -# Copyright (C) 2014, 2015 Robert Jordens - -import unittest - -from artiq.wavesynth import compute_samples - - -class TestSynthesizer(unittest.TestCase): - program = [ - [ - # frame 0 - { - # frame 0, segment 0, line 0 - "dac_divider": 1, - "duration": 100, - "channel_data": [ - { - # channel 0 - "dds": {"amplitude": [0.0, 0.0, 0.01], - "phase": [0.0, 0.0, 0.0005], - "clear": False} - } - ], - "trigger": True - }, - { - # frame 0, segment 0, line 1 - "dac_divider": 1, - "duration": 100, - "channel_data": [ - { - # channel 0 - "dds": {"amplitude": [49.5, 1.0, -0.01], - "phase": [0.0, 0.05, 0.0005], - "clear": False} - } - ], - "trigger": False - }, - ], - [ - # frame 1 - { - # frame 1, segment 0, line 0 - "dac_divider": 1, - "duration": 100, - "channel_data": [ - { - # channel 0 - "dds": {"amplitude": [100.0, 0.0, -0.01], - "phase": [0.0, 0.1, -0.0005], - "clear": False} - } - ], - "trigger": True - }, - { - # frame 1, segment 0, line 1 - "dac_divider": 1, - "duration": 100, - "channel_data": [ - { - # channel 0 - "dds": {"amplitude": [50.5, -1.0, 0.01], - "phase": [0.0, 0.05, -0.0005], - "clear": False} - } - ], - "trigger": False - } - ], - [ - # frame 2 - { - # frame 2, segment 0, line 0 - "dac_divider": 1, - "duration": 84, - "channel_data": [ - { - # channel 0 - "dds": {"amplitude": [100.0], - "phase": [0.0, 0.05], - "clear": False} - } - ], - "trigger": True - }, - { - # frame 2, segment 1, line 0 - "dac_divider": 1, - "duration": 116, - "channel_data": [ - { - # channel 0 - "dds": {"amplitude": [100.0], - "phase": [0.0, 0.05], - "clear": True} - } - ], - "trigger": True - } - ] - ] - - def setUp(self): - self.dev = compute_samples.Synthesizer(1, self.program) - self.t = list(range(600)) - - def drive(self): - s = self.dev - y = [] - for f in 0, 2, None, 1: - if f is not None: - s.select(f) - y += s.trigger()[0] - x = list(range(600)) - return x, y - - def test_run(self): - x, y = self.drive() - - @unittest.skip("manual/visual test") - def test_plot(self): - from matplotlib import pyplot as plt - x, y = self.drive() - plt.plot(x, y) - plt.show() diff --git a/artiq/wavesynth/__init__.py b/artiq/wavesynth/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/artiq/wavesynth/coefficients.py b/artiq/wavesynth/coefficients.py deleted file mode 100644 index e70a4080e..000000000 --- a/artiq/wavesynth/coefficients.py +++ /dev/null @@ -1,234 +0,0 @@ -# Copyright (C) 2014, 2015 Robert Jordens - -import numpy as np -from scipy.interpolate import splrep, splev, spalde - - -class UnivariateMultiSpline: - """Multidimensional wrapper around `scipy.interpolate.sp*` functions. - `scipy.inteprolate.splprep` is limited to 12 dimensions. - """ - def __init__(self, x, y, *, x0=None, order=4, **kwargs): - self.order = order - self.x = x - self.s = [] - for i, yi in enumerate(y): - if x0 is not None: - yi = self.upsample_knots(x0[i], yi, x) - self.s.append(splrep(x, yi, k=order - 1, **kwargs)) - - def upsample_knots(self, x0, y0, x): - return splev(x, splrep(x0, y0, k=self.order - 1)) - - def lev(self, x, *a, **k): - return np.array([splev(x, si) for si in self.s]) - - def alde(self, x): - u = np.array([spalde(x, si) for si in self.s]) - if len(x) == 1: - u = u[:, None, :] - return u - - def __call__(self, x, use_alde=True): - if use_alde: - u = self.alde(x)[:, :, :self.order] - s = (len(self.s), len(x), self.order) - assert u.shape == s, (u.shape, s) - return u.transpose(2, 0, 1) - else: - return np.array([self.lev(x, der=i) for i in range(self.order)]) - - -def pad_const(x, n, axis=0): - """Prefix and postfix the array `x` by `n` repetitions of the first and - last value along `axis`. - """ - a = np.repeat(x.take([0], axis), n, axis) - b = np.repeat(x.take([-1], axis), n, axis) - xp = np.concatenate([a, x, b], axis) - s = list(x.shape) - s[axis] += 2*n - assert xp.shape == tuple(s), (x.shape, s, xp.shape) - return xp - - -def build_segment(durations, coefficients, target="bias", - variable="amplitude", compress=True): - """Build a wavesynth-style segment from homogeneous duration and - coefficient data. - - :param durations: 1D sequence of line durations. - :param coefficients: 3D array with shape `(n, m, len(durations))`, - with `n` being the interpolation order + 1 and `m` the number of - channels. - :param target: The target component of the channel to affect. - :param variable: The variable within the target component. - :param compress: If `True`, skip zero high order coefficients. - """ - for dxi, yi in zip(durations, coefficients.transpose()): - cd = [] - for yij in yi: - cdj = [] - for yijk in reversed(yij): - if cdj or abs(yijk) or not compress: - cdj.append(float(yijk)) - cdj.reverse() - if not cdj: - cdj.append(float(yij[0])) - cd.append({target: {variable: cdj}}) - yield {"duration": int(dxi), "channel_data": cd} - - -class CoefficientSource: - def crop_x(self, start, stop, num=2): - """Return an array of valid sample positions. - - This method needs to be overloaded if this `CoefficientSource` - does not support sampling at arbitrary positions or at arbitrary - density. - - :param start: First sample position. - :param stop: Last sample position. - :param num: Number of samples between `start` and `stop`. - :return: Array of sample positions. `start` and `stop` should be - returned as the first and last value in the array respectively. - """ - return np.linspace(start, stop, num) - - def scale_x(self, x, scale): - # TODO: This could be moved to the the Driver/Mediator code as it is - # device-specific. - """Scale and round sample positions. - - The sample times may need to be changed and/or decimated if - incompatible with hardware requirements. - - :param x: Input sample positions in data space. - :param scale: Data space position to cycles conversion scale, - in units of x-units per clock cycle. - :return: `x_sample`, the rounded sample positions and `durations`, the - integer durations of the individual samples in cycles. - """ - t = np.rint(x/scale) - x_sample = t*scale - durations = np.diff(t).astype(int) - return x_sample, durations - - def __call__(self, x, **kwargs): - """Perform sampling and return coefficients. - - :param x: Sample positions. - :return: `y` the array of coefficients. `y.shape == (order, n, len(x))` - with `n` being the number of channels.""" - raise NotImplementedError - - def get_segment(self, start, stop, scale, *, cutoff=1e-12, - target="bias", variable="amplitude"): - """Build wavesynth segment. - - :param start: see `crop_x()`. - :param stop: see `crop_x()`. - :param scale: see `scale_x()`. - :param cutoff: coefficient cutoff towards zero to compress data. - """ - x = self.crop_x(start, stop) - x_sample, durations = self.scale_x(x, scale) - coefficients = self(x_sample) - if len(x_sample) == 1 and start == stop: - coefficients = coefficients[:1] - # rescale coefficients accordingly - coefficients *= (scale*np.sign(durations))**np.arange( - coefficients.shape[0])[:, None, None] - if cutoff: - coefficients[np.fabs(coefficients) < cutoff] = 0 - return build_segment(np.fabs(durations), coefficients, target=target, - variable=variable) - - def extend_segment(self, segment, *args, **kwargs): - """Extend a wavesynth segment. - - See `get_segment()` for arguments. - """ - for line in self.get_segment(*args, **kwargs): - segment.add_line(**line) - - -class SplineSource(CoefficientSource): - def __init__(self, x, y, order=4, pad_dx=1.): - """ - :param x: 1D sample positions. - :param y: 2D sample values. - """ - self.x = np.asanyarray(x) - assert self.x.ndim == 1 - self.y = np.asanyarray(y) - assert self.y.ndim == 2 - - if pad_dx is not None: - a = np.arange(-order, 0)*pad_dx + self.x[0] - b = self.x[-1] + np.arange(1, order + 1)*pad_dx - self.x = np.r_[a, self.x, b] - self.y = pad_const(self.y, order, axis=1) - - assert self.y.shape[1] == self.x.shape[0] - self.spline = UnivariateMultiSpline(self.x, self.y, order=order) - - def crop_x(self, start, stop): - ia, ib = np.searchsorted(self.x, (start, stop)) - if start > stop: - x = self.x[ia - 1:ib - 1:-1] - else: - x = self.x[ia:ib] - return np.r_[start, x, stop] - - def scale_x(self, x, scale, min_duration=1, min_length=20): - """Enforce, round, and scale x to device-dependent values. - - Due to minimum duration and/or minimum segment length constraints - this method may drop samples from `x_sample` to comply. - - :param min_duration: Minimum duration of a line. - :param min_length: Minimum segment length to space triggers. - """ - # We want to only sample a spline at t_knot + epsilon - # where the highest order derivative has just jumped - # and is valid at least up to the next knot after t_knot. - # - # To ensure that we are on the correct side of a knot: - # * only ever increase t when rounding (for increasing t) - # * or only ever decrease it (for decreasing t) - t = x/scale - inc = np.diff(t) >= 0 - inc = np.r_[inc, inc[-1]] - t = np.where(inc, np.ceil(t), np.floor(t)) - dt = np.diff(t.astype(int)) - - valid = np.absolute(dt) >= min_duration - if not np.any(valid): - valid[0] = True - dt[0] = max(dt[0], min_length) - dt = dt[valid] - x_sample = t[:-1][valid]*scale - return x_sample, dt - - def __call__(self, x): - return self.spline(x) - - -def discrete_compensate(c): - """Compensate spline coefficients for discrete accumulators - - Given continuous-time b-spline coefficients, this function - compensates for the effect of discrete time steps in the - target devices. - - The compensation is performed in-place. - """ - l = len(c) - if l > 2: - c[1] += c[2]/2. - if l > 3: - c[1] += c[3]/6. - c[2] += c[3] - if l > 4: - raise ValueError("only third-order splines supported") diff --git a/artiq/wavesynth/compute_samples.py b/artiq/wavesynth/compute_samples.py deleted file mode 100644 index 9b80f8298..000000000 --- a/artiq/wavesynth/compute_samples.py +++ /dev/null @@ -1,133 +0,0 @@ -# Copyright (C) 2014, 2015 M-Labs Limited -# Copyright (C) 2014, 2015 Robert Jordens - -from copy import copy -from math import cos, pi - -from artiq.wavesynth.coefficients import discrete_compensate - - -class Spline: - def __init__(self): - self.c = [0.0] - - def set_coefficients(self, c): - if not c: - c = [0.] - self.c = copy(c) - discrete_compensate(self.c) - - def next(self): - r = self.c[0] - for i in range(len(self.c) - 1): - self.c[i] += self.c[i + 1] - return r - - -class SplinePhase: - def __init__(self): - self.c = [0.0] - self.c0 = 0.0 - - def set_coefficients(self, c): - if not c: - c = [0.] - self.c0 = c[0] - c1p = c[1:] - discrete_compensate(c1p) - self.c[1:] = c1p - - def clear(self): - self.c[0] = 0.0 - - def next(self): - r = self.c[0] - for i in range(len(self.c) - 1): - self.c[i] += self.c[i + 1] - self.c[i] %= 1.0 - return r + self.c0 - - -class DDS: - def __init__(self): - self.amplitude = Spline() - self.phase = SplinePhase() - - def next(self): - return self.amplitude.next()*cos(2*pi*self.phase.next()) - - -class Channel: - def __init__(self): - self.bias = Spline() - self.dds = DDS() - self.v = 0. - self.silence = False - - def next(self): - v = self.bias.next() + self.dds.next() - if not self.silence: - self.v = v - return self.v - - def set_silence(self, s): - self.silence = s - - -class TriggerError(Exception): - pass - - -class Synthesizer: - def __init__(self, nchannels, program): - self.channels = [Channel() for _ in range(nchannels)] - self.program = program - # line_iter is None: "wait for segment selection" state - # otherwise: iterator on the current position in the frame - self.line_iter = None - - def select(self, selection): - if self.line_iter is not None: - raise TriggerError("a frame is already selected") - self.line_iter = iter(self.program[selection]) - self.line = next(self.line_iter) - - def trigger(self): - if self.line_iter is None: - raise TriggerError("no frame selected") - - line = self.line - if not line.get("trigger", False): - raise TriggerError("segment is not triggered") - - r = [[] for _ in self.channels] - while True: - for channel, channel_data in zip(self.channels, - line["channel_data"]): - channel.set_silence(channel_data.get("silence", False)) - if "bias" in channel_data: - channel.bias.set_coefficients( - channel_data["bias"]["amplitude"]) - if "dds" in channel_data: - channel.dds.amplitude.set_coefficients( - channel_data["dds"]["amplitude"]) - if "phase" in channel_data["dds"]: - channel.dds.phase.set_coefficients( - channel_data["dds"]["phase"]) - if channel_data["dds"].get("clear", False): - channel.dds.phase.clear() - - if line.get("dac_divider", 1) != 1: - raise NotImplementedError - - for channel, rc in zip(self.channels, r): - for i in range(line["duration"]): - rc.append(channel.next()) - - try: - self.line = line = next(self.line_iter) - if line.get("trigger", False): - return r - except StopIteration: - self.line_iter = None - return r