From 60baed68b413e0e5a06b19b637eb9c6ea596e7b3 Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Sat, 18 Apr 2015 01:12:24 -0600 Subject: [PATCH] wavesynth: get coefficients.py into useable state SplineSource() supports spline interpolating multi-channel tabular data, cropping it and generating wavesynth compatible segment data from it. ComposingSplineSource() needs some verification still. --- artiq/test/coefficients.py | 38 ++++ artiq/wavesynth/coefficients.py | 284 ++++++++++++++++++++++++----- artiq/wavesynth/compute_samples.py | 6 +- 3 files changed, 281 insertions(+), 47 deletions(-) create mode 100644 artiq/test/coefficients.py diff --git a/artiq/test/coefficients.py b/artiq/test/coefficients.py new file mode 100644 index 000000000..20fd9ba99 --- /dev/null +++ b/artiq/test/coefficients.py @@ -0,0 +1,38 @@ +import unittest + +import numpy as np + +from artiq.wavesynth import coefficients, compute_samples + + +class TestSplineCoef(unittest.TestCase): + def setUp(self): + self.x = np.arange(5.) + self.y = np.sin(2*np.pi*self.x/5) + np.arange(2)[:, None] + self.s = coefficients.SplineSource(self.x, self.y, order=4) + + def test_get_segment(self): + return list(self.s.get_segment_data(1.5, 3.2, 1/100.)) + + def test_synth(self): + d = self.test_get_segment() + d[0]["trigger"] = True + return compute_samples.Synthesizer(self.y.shape[0], [d, d + d]) + + def drive(self, s): + y = [] + for f in 0, 1, None, 0: + if f is not None: + s.select(f) + y += s.trigger()[0] + return y + + def test_run(self): + return self.drive(self.test_synth()) + + @unittest.skip("manual/visual test") + def test_plot(self): + import cairoplot + y = self.test_run() + x = list(range(len(y))) + cairoplot.scatter_plot("plot.png", [x, y]) diff --git a/artiq/wavesynth/coefficients.py b/artiq/wavesynth/coefficients.py index f0e66ab13..2a338bc8e 100644 --- a/artiq/wavesynth/coefficients.py +++ b/artiq/wavesynth/coefficients.py @@ -1,67 +1,263 @@ import numpy as np from scipy.interpolate import splrep, splev, spalde +from scipy.special import binom + + +class UnivariateMultiSpline: + """Multidimensional wrapper around `scipy.interpolate.sp*` functions. + `scipy.inteprolate.splprep` unfortunately does only up to 12 dimsions. + """ + def __init__(self, x, y, order=4): + self.order = order + self.s = [splrep(x, yi, k=order - 1) for yi in y] + + def lev(self, x, *a, **k): + return np.array([splev(x, si) for si in self.s]) + + def alde(self, x): + u = np.array([spalde(x, si) for si in self.s]) + if len(x) == 1: + u = u[:, None, :] + return u + + def __call__(self, x, use_alde=True): + if use_alde: + u = self.alde(x)[:, :, :self.order] + s = (len(self.s), len(x), self.order) + assert u.shape == s, (u.shape, s) + return u.transpose(2, 0, 1) + else: + return np.array([self.lev(x, der=i) for i in range(self.order)]) + + +class UnivariateMultiSparseSpline(UnivariateMultiSpline): + def __init__(self, d, x0=None, order=4): + self.order = order + self.n = sorted(set(n for x, n, y in d)) + self.s = [] + for n in self.n: + x, y = np.array([(x, y) for x, ni, y in d if n == ni]).T + if x0 is not None: + y0 = splev(x0, splrep(x, y, k=order - 1)) + x, y = x0, y0 + s = splrep(x, y, k=order - 1) + self.s.append(s) + + +def pad_const(x, n, axis=0): + """Prefix and postfix the array `x` by `n` repetitions of the first and + last vlaue along `axis`. + """ + a = np.repeat(x.take([0], axis), n, axis) + b = np.repeat(x.take([-1], axis), n, axis) + xp = np.concatenate([a, x, b], axis) + s = list(x.shape) + s[axis] += 2*n + assert xp.shape == tuple(s), (x.shape, s, xp.shape) + return xp + + +def build_segment(durations, coefficients, target="bias", + variable="amplitude"): + """Build a wavesynth-style segment from homogeneous duration and + coefficient data. + + :param durations: 1D sequence of line durations. + :param coefficients: 3D array with shape `(n, m, len(x))`, + with `n` being the interpolation order + 1 and `m` the number of + channels. + :param target: The target component of the channel to affect. + :param variable: The variable within the target component. + """ + for dxi, yi in zip(durations, coefficients.transpose()): + d = {"duration": int(dxi)} + d["channel_data"] = cd = [] + for yij in yi: + cdj = [] + for yijk in reversed(yij): + if cdj or abs(yijk): + cdj.append(float(yijk)) + cdj.reverse() + cd.append({target: {variable: cdj}}) + yield d class CoefficientSource: - def get_times(self, t, speed, clock): - pass + def crop_x(self, start, stop, num=2): + """Return an array of valid sample positions. - def get_coefficients(self, t, speed): - pass + This function needs to be implemented only if this + `CoefficientSource` does not support sampling at arbitrary + positions. - def get_program(self, dt, u): - pass + :param start: First sample position. + :param stop: Last sample position. + :param num: Number of samples between `start` and `stop`. + :return: Array of sample positions. `start` and `stop` should be + returned as the first and last value in the array respectively. + """ + return np.linspace(start, stop, num) + + def scale_x(self, x, scale): + """Scale and round sample positions. + + :param x: Input sample positions in data space. + :param scale: Data space position to cycles conversion scale, + in units of x-units per clock cycle. + :return: `x_sample`, the rounded sample positions and `durations`, the + integer durations of the individual samples in cycles. + """ + raise NotImplementedError + + def __call__(self, x, **kwargs): + """Perform sampling and return coefficients. + + :param x: Sample positions. + :return: `y` the array of coefficients. `y.shape == (order, n, len(x))` + with `n` being the number of channels.""" + raise NotImplementedError + + def get_segment_data(self, start, stop, scale, cutoff=1e-12, + min_duration=1, min_length=20, + target="bias", variable="amplitude"): + """Build wavesynth segment data. + + :param start: see `crop_x()`. + :param stop: see `crop_x()`. + :param scale: see `scale_x()`. + :param num: see `crop_x()`. + :param cutoff: coefficient cutoff towards zero to compress data. + :param min_duration: Minimum duration of a line. + :param min_length: Minimum segment length to space triggers. + """ + x = self.crop_x(start, stop) + x_sample, durations = self.scale_x(x, scale) + coefficients = self(x_sample) + np.clip(np.fabs(durations), min_duration, None, out=durations) + if len(durations) == 1: + durations[0] = max(durations[0], min_length) + if start == stop: + coefficients = coefficients[:1] + # rescale coefficients accordingly + coefficients *= (scale*np.sign(durations))**np.arange( + coefficients.shape[0])[:, None, None] + if cutoff: + coefficients[np.fabs(coefficients) < cutoff] = 0 + return build_segment(durations, coefficients, target=target, + variable=variable) + + def extend_segment(self, segment, *args, **kwargs): + """Extend a wavesynth segment. + + See `get_segment()` for arguments. + """ + for line in self.get_segment_data(*args, **kwargs): + segment.add_line(**line) -def _round_times(times, sample_times=None): - times = np.asanyarray(times) - if sample_times is None: - sample_times = np.rint(times) - duration = np.diff(sample_times) - sample_times = sample_times[:-1] - assert np.all(duration >= 0) - assert np.all(duration < (1 << 16)) - return times, sample_times, duration +class SplineSource(CoefficientSource): + def __init__(self, x, y, order=4, pad_dx=1.): + """ + :param x: 1D sample positions. + :param y: 2D sample values. + """ + self.x = np.asanyarray(x) + assert self.x.ndim == 1 + self.y = np.asanyarray(y) + assert self.y.ndim == 2 + + if pad_dx is not None: + a = np.arange(-order, 0)*pad_dx + self.x[0] + b = self.x[-1] + np.arange(1, order + 1)*pad_dx + self.x = np.r_[a, self.x, b] + self.y = pad_const(self.y, order, axis=1) + + assert self.y.shape[1] == self.x.shape[0] + self.spline = UnivariateMultiSpline(self.x, self.y, order) + + def crop_x(self, start, stop): + ia, ib = np.searchsorted(self.x, (start, stop)) + if start > stop: + x = self.x[ia - 1:ib - 1:-1] + else: + x = self.x[ia:ib] + return np.r_[start, x, stop] + + def scale_x(self, x, scale, nudge=1e-9): + # We want to only sample a spline at t_knot + epsilon + # where the highest order derivative has just jumped + # and is valid at least up to the next knot after t_knot. + # + # To ensure that we are on the right side of a knot: + # * only ever increase t when rounding (for increasing t) + # * or only ever decrease it (for decreasing t) + # + # The highest derivative is discontinuous at t + # and the correct value for a segment is obtained + # for t_int >= t_float == t_knot (and v.v. for t decreasing). + x = x/scale + inc = np.diff(x) >= 0 + inc = np.r_[inc, inc[-1]] + x = np.where(inc, np.ceil(x + nudge), np.floor(x - nudge)) + if len(x) > 1 and x[0] == x[1]: + x = x[1:] + if len(x) > 1 and x[-2] == x[-1]: + x = x[:-1] + x_sample = x[:-1]*scale + durations = np.diff(x.astype(np.int)) + return x_sample, durations + + def __call__(self, x): + return self.spline(x) -def _interpolate(time, data, sample_times, order=3): - # FIXME: this does not ensure that the spline does not clip - spline = splrep(time, data, k=order or 1) - # FIXME: this could be faster but needs k knots outside t_eval - # dv = np.array(spalde(t_eval, s)) - coeffs = np.array([splev(sample_times, spline, der=i, ext=0) - for i in range(order + 1)]).T - return coeffs +class ComposingSplineSource(SplineSource): + # TODO + def __init__(self, x, y, components, order=4, pad_dx=1.): + self.x = np.asanyarray(x) + assert self.x.ndim == 1 + self.y = np.asanyarray(y) + assert self.y.ndim == 3 + if pad_dx is not None: + a = np.arange(-order, 0)*pad_dx + self.x[0] + b = self.x[-1] + np.arange(1, order + 1)*pad_dx + self.x = np.r_[a, self.x, b] + self.y = pad_const(self.y, order, axis=2) + assert self.y.shape[2] == self.x.shape[0] + self.splines = [UnivariateMultiSpline(self.x, yi, order) + for yi in self.y] -def _zip_program(times, channels, target): - for tc in zip(times, *channels): - yield { - "duration": tc[0], - "channel_data": tc[1:], - } -# FIXME: this does not handle: -# `clear` (clearing the phase accumulator) -# `silence` (stopping the dac clock) + # need to resample/upsample the shim splines to the master spline knots + # shim knot spacings can span an master spline knot and thus would + # cross a highest order derivative boundary + self.components = UnivariateMultiSparseSpline( + components, self.x, order) + def __call__(self, t, gain={}, offset={}): + der = list((set(self.components.n) | set(offset)) & set(self.der)) + u = np.zeros((self.splines[0].order, len(self.splines[0].s), len(t))) + # der, order, ele, t + p = np.array([self.splines[i](t) for i in der]) + # order, der, None, t + s = self.components(t) + s_gain = np.array([gain.get(_, 1.) for _ in self.components.n]) + s = s[:, :, None, :]*s_gain[None, :, None, None] + for k, v in offset.items(): + if v and k in self.der: + u += v*p[self.der.index(k)] + ps = p[[self.der.index(_) for _ in self.shims.der]] + for i in range(u.shape[1]): + for j in range(i + 1): + u[i] += binom(i, j)*(s[j]*ps[:, i - j]).sum(0) + return u # (order, ele, t) -def interpolate_channels(times, data, sample_times=None, **kwargs): - if len(times) == 1: - return _zip_program(np.array([1]), data[:, :, None]) - data = np.asanyarray(data) - assert len(times) == len(data) - times, sample_times, duration = _round_times(times, sample_times) - channel_coeff = [_interpolate(sample_times, i, **kwargs) for i in data.T] - return _zip_program(duration, np.array(channel_coeff)) - # v = np.clip(v/self.max_out, -1, 1) - # - # def discrete_compensate(c): """Compensate spline coefficients for discrete accumulators - Given continuous time b-spline coefficients, this function + Given continuous-time b-spline coefficients, this function compensates for the effect of discrete time steps in the target devices. diff --git a/artiq/wavesynth/compute_samples.py b/artiq/wavesynth/compute_samples.py index c615c9dd1..0658f8bde 100644 --- a/artiq/wavesynth/compute_samples.py +++ b/artiq/wavesynth/compute_samples.py @@ -81,17 +81,17 @@ class Synthesizer: def select(self, selection): if self.line_iter is not None: - raise TriggerError + raise TriggerError("a frame is already selected") self.line_iter = iter(self.program[selection]) self.line = next(self.line_iter) def trigger(self): if self.line_iter is None: - raise TriggerError + raise TriggerError("no frame selected") line = self.line if not line.get("trigger", False): - raise TriggerError + raise TriggerError("segment is not triggered") r = [[] for _ in self.channels] while True: