From 0b8d496b62162767ac3298ac27f1718ab650f7aa Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Sat, 18 Apr 2015 21:21:11 -0600 Subject: [PATCH] coefficients: cleanup and refactor some code into CoefficientSource --- artiq/wavesynth/coefficients.py | 87 ++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 39 deletions(-) diff --git a/artiq/wavesynth/coefficients.py b/artiq/wavesynth/coefficients.py index 2a338bc8e..f0f489359 100644 --- a/artiq/wavesynth/coefficients.py +++ b/artiq/wavesynth/coefficients.py @@ -46,7 +46,7 @@ class UnivariateMultiSparseSpline(UnivariateMultiSpline): def pad_const(x, n, axis=0): """Prefix and postfix the array `x` by `n` repetitions of the first and - last vlaue along `axis`. + last value along `axis`. """ a = np.repeat(x.take([0], axis), n, axis) b = np.repeat(x.take([-1], axis), n, axis) @@ -86,9 +86,9 @@ class CoefficientSource: def crop_x(self, start, stop, num=2): """Return an array of valid sample positions. - This function needs to be implemented only if this - `CoefficientSource` does not support sampling at arbitrary - positions. + This method needs to be overloaded if this `CoefficientSource` + does not support sampling at arbitrary positions or at arbitrary + density. :param start: First sample position. :param stop: Last sample position. @@ -99,15 +99,23 @@ class CoefficientSource: return np.linspace(start, stop, num) def scale_x(self, x, scale): + # TODO: This could be moved to the the Driver/Mediator code as it is + # device-specific. """Scale and round sample positions. + The sample times may need to be changed and/or decimated if + incompatible with hardware requirements. + :param x: Input sample positions in data space. :param scale: Data space position to cycles conversion scale, in units of x-units per clock cycle. :return: `x_sample`, the rounded sample positions and `durations`, the integer durations of the individual samples in cycles. """ - raise NotImplementedError + t = np.rint(x/scale) + x_sample = t*scale + durations = np.diff(t).astype(np.int) + return x_sample, durations def __call__(self, x, **kwargs): """Perform sampling and return coefficients. @@ -118,25 +126,18 @@ class CoefficientSource: raise NotImplementedError def get_segment_data(self, start, stop, scale, cutoff=1e-12, - min_duration=1, min_length=20, target="bias", variable="amplitude"): """Build wavesynth segment data. :param start: see `crop_x()`. :param stop: see `crop_x()`. :param scale: see `scale_x()`. - :param num: see `crop_x()`. :param cutoff: coefficient cutoff towards zero to compress data. - :param min_duration: Minimum duration of a line. - :param min_length: Minimum segment length to space triggers. """ x = self.crop_x(start, stop) x_sample, durations = self.scale_x(x, scale) coefficients = self(x_sample) - np.clip(np.fabs(durations), min_duration, None, out=durations) - if len(durations) == 1: - durations[0] = max(durations[0], min_length) - if start == stop: + if len(x_sample) == 1 and start == stop: coefficients = coefficients[:1] # rescale coefficients accordingly coefficients *= (scale*np.sign(durations))**np.arange( @@ -146,12 +147,14 @@ class CoefficientSource: return build_segment(durations, coefficients, target=target, variable=variable) - def extend_segment(self, segment, *args, **kwargs): + def extend_segment(self, segment, trigger=True, *args, **kwargs): """Extend a wavesynth segment. See `get_segment()` for arguments. """ - for line in self.get_segment_data(*args, **kwargs): + for i, line in enumerate(self.get_segment_data(*args, **kwargs)): + if i == 0: + line["trigger"] = True segment.add_line(**line) @@ -183,36 +186,42 @@ class SplineSource(CoefficientSource): x = self.x[ia:ib] return np.r_[start, x, stop] - def scale_x(self, x, scale, nudge=1e-9): + def scale_x(self, x, scale, min_duration=1, min_length=20): + """ + Due to minimum duration and/or minimum segment length constraints + this method may drop samples from `x_sample` or adjust `durations` to + comply. But `x_sample` and `durations` should be kept consistent. + + :param min_duration: Minimum duration of a line. + :param min_length: Minimum segment length to space triggers. + """ # We want to only sample a spline at t_knot + epsilon # where the highest order derivative has just jumped # and is valid at least up to the next knot after t_knot. # - # To ensure that we are on the right side of a knot: + # To ensure that we are on the correct side of a knot: # * only ever increase t when rounding (for increasing t) # * or only ever decrease it (for decreasing t) - # - # The highest derivative is discontinuous at t - # and the correct value for a segment is obtained - # for t_int >= t_float == t_knot (and v.v. for t decreasing). - x = x/scale - inc = np.diff(x) >= 0 + t = x/scale + inc = np.diff(t) >= 0 inc = np.r_[inc, inc[-1]] - x = np.where(inc, np.ceil(x + nudge), np.floor(x - nudge)) - if len(x) > 1 and x[0] == x[1]: - x = x[1:] - if len(x) > 1 and x[-2] == x[-1]: - x = x[:-1] - x_sample = x[:-1]*scale - durations = np.diff(x.astype(np.int)) - return x_sample, durations + t = np.where(inc, np.ceil(t), np.floor(t)) + dt = np.diff(t.astype(np.int)) + + valid = np.absolute(dt) >= min_duration + dt = dt[valid] + t = t[np.r_[True, valid]] + if dt.shape[0] == 1: + dt[0] = max(dt[0], min_length) + x_sample = t[:-1]*scale + return x_sample, dt def __call__(self, x): return self.spline(x) class ComposingSplineSource(SplineSource): - # TODO + # TODO: verify, test, document def __init__(self, x, y, components, order=4, pad_dx=1.): self.x = np.asanyarray(x) assert self.x.ndim == 1 @@ -236,18 +245,18 @@ class ComposingSplineSource(SplineSource): components, self.x, order) def __call__(self, t, gain={}, offset={}): - der = list((set(self.components.n) | set(offset)) & set(self.der)) + der = list((set(self.components.n) | set(offset)) + & set(range(len(self.splines)))) u = np.zeros((self.splines[0].order, len(self.splines[0].s), len(t))) # der, order, ele, t p = np.array([self.splines[i](t) for i in der]) - # order, der, None, t - s = self.components(t) s_gain = np.array([gain.get(_, 1.) for _ in self.components.n]) - s = s[:, :, None, :]*s_gain[None, :, None, None] + # order, der, None, t + s = self.components(t)[:, :, None, :]*s_gain[None, :, None, None] for k, v in offset.items(): - if v and k in self.der: - u += v*p[self.der.index(k)] - ps = p[[self.der.index(_) for _ in self.shims.der]] + if v: + u += v*p[k] + ps = p[self.shims.n] for i in range(u.shape[1]): for j in range(i + 1): u[i] += binom(i, j)*(s[j]*ps[:, i - j]).sum(0)