diff --git a/artiq/test/coefficients.py b/artiq/test/coefficients.py index 20fd9ba99..35bb3f35f 100644 --- a/artiq/test/coefficients.py +++ b/artiq/test/coefficients.py @@ -12,7 +12,7 @@ class TestSplineCoef(unittest.TestCase): self.s = coefficients.SplineSource(self.x, self.y, order=4) def test_get_segment(self): - return list(self.s.get_segment_data(1.5, 3.2, 1/100.)) + return list(self.s.get_segment_data(start=1.5, stop=3.2, scale=.01)) def test_synth(self): d = self.test_get_segment() @@ -32,7 +32,7 @@ class TestSplineCoef(unittest.TestCase): @unittest.skip("manual/visual test") def test_plot(self): - import cairoplot + import matplotlib.pyplot as plt y = self.test_run() - x = list(range(len(y))) - cairoplot.scatter_plot("plot.png", [x, y]) + plt.step(np.arange(len(y)), y) + plt.show() diff --git a/artiq/wavesynth/coefficients.py b/artiq/wavesynth/coefficients.py index f0f489359..ee9efb351 100644 --- a/artiq/wavesynth/coefficients.py +++ b/artiq/wavesynth/coefficients.py @@ -5,11 +5,19 @@ from scipy.special import binom class UnivariateMultiSpline: """Multidimensional wrapper around `scipy.interpolate.sp*` functions. - `scipy.inteprolate.splprep` unfortunately does only up to 12 dimsions. + `scipy.inteprolate.splprep` is limited to 12 dimensions. """ - def __init__(self, x, y, order=4): + def __init__(self, x, y, *, x0=None, order=4, **kwargs): self.order = order - self.s = [splrep(x, yi, k=order - 1) for yi in y] + self.x = x + self.s = [] + for i, yi in enumerate(y): + if x0 is not None: + yi = self.upsample_knots(x0[i], yi, x) + self.s.append(splrep(x, yi, k=order - 1, **kwargs)) + + def upsample_knots(self, x0, y0, x): + return splev(x, splrep(x0, y0, k=self.order - 1)) def lev(self, x, *a, **k): return np.array([splev(x, si) for si in self.s]) @@ -30,20 +38,6 @@ class UnivariateMultiSpline: return np.array([self.lev(x, der=i) for i in range(self.order)]) -class UnivariateMultiSparseSpline(UnivariateMultiSpline): - def __init__(self, d, x0=None, order=4): - self.order = order - self.n = sorted(set(n for x, n, y in d)) - self.s = [] - for n in self.n: - x, y = np.array([(x, y) for x, ni, y in d if n == ni]).T - if x0 is not None: - y0 = splev(x0, splrep(x, y, k=order - 1)) - x, y = x0, y0 - s = splrep(x, y, k=order - 1) - self.s.append(s) - - def pad_const(x, n, axis=0): """Prefix and postfix the array `x` by `n` repetitions of the first and last value along `axis`. @@ -58,28 +52,28 @@ def pad_const(x, n, axis=0): def build_segment(durations, coefficients, target="bias", - variable="amplitude"): + variable="amplitude", compress=True): """Build a wavesynth-style segment from homogeneous duration and coefficient data. :param durations: 1D sequence of line durations. - :param coefficients: 3D array with shape `(n, m, len(x))`, + :param coefficients: 3D array with shape `(n, m, len(durations))`, with `n` being the interpolation order + 1 and `m` the number of channels. :param target: The target component of the channel to affect. :param variable: The variable within the target component. + :param compress: If `True`, skip zero high order coefficients. """ for dxi, yi in zip(durations, coefficients.transpose()): - d = {"duration": int(dxi)} - d["channel_data"] = cd = [] + cd = [] for yij in yi: cdj = [] for yijk in reversed(yij): - if cdj or abs(yijk): + if cdj or abs(yijk) or not compress: cdj.append(float(yijk)) cdj.reverse() cd.append({target: {variable: cdj}}) - yield d + yield {"duration": int(dxi), "channel_data": cd} class CoefficientSource: @@ -125,7 +119,7 @@ class CoefficientSource: with `n` being the number of channels.""" raise NotImplementedError - def get_segment_data(self, start, stop, scale, cutoff=1e-12, + def get_segment_data(self, start, stop, scale, *, cutoff=1e-12, target="bias", variable="amplitude"): """Build wavesynth segment data. @@ -154,7 +148,7 @@ class CoefficientSource: """ for i, line in enumerate(self.get_segment_data(*args, **kwargs)): if i == 0: - line["trigger"] = True + line["trigger"] = trigger segment.add_line(**line) @@ -176,7 +170,7 @@ class SplineSource(CoefficientSource): self.y = pad_const(self.y, order, axis=1) assert self.y.shape[1] == self.x.shape[0] - self.spline = UnivariateMultiSpline(self.x, self.y, order) + self.spline = UnivariateMultiSpline(self.x, self.y, order=order) def crop_x(self, start, stop): ia, ib = np.searchsorted(self.x, (start, stop)) @@ -187,7 +181,8 @@ class SplineSource(CoefficientSource): return np.r_[start, x, stop] def scale_x(self, x, scale, min_duration=1, min_length=20): - """ + """Enforce, round, and scale x to device-dependent values. + Due to minimum duration and/or minimum segment length constraints this method may drop samples from `x_sample` or adjust `durations` to comply. But `x_sample` and `durations` should be kept consistent. @@ -235,14 +230,14 @@ class ComposingSplineSource(SplineSource): self.y = pad_const(self.y, order, axis=2) assert self.y.shape[2] == self.x.shape[0] - self.splines = [UnivariateMultiSpline(self.x, yi, order) + self.splines = [UnivariateMultiSpline(self.x, yi, order=order) for yi in self.y] # need to resample/upsample the shim splines to the master spline knots # shim knot spacings can span an master spline knot and thus would # cross a highest order derivative boundary - self.components = UnivariateMultiSparseSpline( - components, self.x, order) + y0, x0 = zip(*components) + self.components = UnivariateMultiSpline(self.x, y0, x0=x0, order=order) def __call__(self, t, gain={}, offset={}): der = list((set(self.components.n) | set(offset))