2
0
mirror of https://github.com/m-labs/artiq.git synced 2025-01-26 10:28:13 +08:00

coefficients: cleanup and refactor some code into CoefficientSource

This commit is contained in:
Robert Jördens 2015-04-18 21:21:11 -06:00
parent 904bcd247f
commit 0b8d496b62

View File

@ -46,7 +46,7 @@ class UnivariateMultiSparseSpline(UnivariateMultiSpline):
def pad_const(x, n, axis=0):
"""Prefix and postfix the array `x` by `n` repetitions of the first and
last vlaue along `axis`.
last value along `axis`.
"""
a = np.repeat(x.take([0], axis), n, axis)
b = np.repeat(x.take([-1], axis), n, axis)
@ -86,9 +86,9 @@ class CoefficientSource:
def crop_x(self, start, stop, num=2):
"""Return an array of valid sample positions.
This function needs to be implemented only if this
`CoefficientSource` does not support sampling at arbitrary
positions.
This method needs to be overloaded if this `CoefficientSource`
does not support sampling at arbitrary positions or at arbitrary
density.
:param start: First sample position.
:param stop: Last sample position.
@ -99,15 +99,23 @@ class CoefficientSource:
return np.linspace(start, stop, num)
def scale_x(self, x, scale):
# TODO: This could be moved to the the Driver/Mediator code as it is
# device-specific.
"""Scale and round sample positions.
The sample times may need to be changed and/or decimated if
incompatible with hardware requirements.
:param x: Input sample positions in data space.
:param scale: Data space position to cycles conversion scale,
in units of x-units per clock cycle.
:return: `x_sample`, the rounded sample positions and `durations`, the
integer durations of the individual samples in cycles.
"""
raise NotImplementedError
t = np.rint(x/scale)
x_sample = t*scale
durations = np.diff(t).astype(np.int)
return x_sample, durations
def __call__(self, x, **kwargs):
"""Perform sampling and return coefficients.
@ -118,25 +126,18 @@ class CoefficientSource:
raise NotImplementedError
def get_segment_data(self, start, stop, scale, cutoff=1e-12,
min_duration=1, min_length=20,
target="bias", variable="amplitude"):
"""Build wavesynth segment data.
:param start: see `crop_x()`.
:param stop: see `crop_x()`.
:param scale: see `scale_x()`.
:param num: see `crop_x()`.
:param cutoff: coefficient cutoff towards zero to compress data.
:param min_duration: Minimum duration of a line.
:param min_length: Minimum segment length to space triggers.
"""
x = self.crop_x(start, stop)
x_sample, durations = self.scale_x(x, scale)
coefficients = self(x_sample)
np.clip(np.fabs(durations), min_duration, None, out=durations)
if len(durations) == 1:
durations[0] = max(durations[0], min_length)
if start == stop:
if len(x_sample) == 1 and start == stop:
coefficients = coefficients[:1]
# rescale coefficients accordingly
coefficients *= (scale*np.sign(durations))**np.arange(
@ -146,12 +147,14 @@ class CoefficientSource:
return build_segment(durations, coefficients, target=target,
variable=variable)
def extend_segment(self, segment, *args, **kwargs):
def extend_segment(self, segment, trigger=True, *args, **kwargs):
"""Extend a wavesynth segment.
See `get_segment()` for arguments.
"""
for line in self.get_segment_data(*args, **kwargs):
for i, line in enumerate(self.get_segment_data(*args, **kwargs)):
if i == 0:
line["trigger"] = True
segment.add_line(**line)
@ -183,36 +186,42 @@ class SplineSource(CoefficientSource):
x = self.x[ia:ib]
return np.r_[start, x, stop]
def scale_x(self, x, scale, nudge=1e-9):
def scale_x(self, x, scale, min_duration=1, min_length=20):
"""
Due to minimum duration and/or minimum segment length constraints
this method may drop samples from `x_sample` or adjust `durations` to
comply. But `x_sample` and `durations` should be kept consistent.
:param min_duration: Minimum duration of a line.
:param min_length: Minimum segment length to space triggers.
"""
# We want to only sample a spline at t_knot + epsilon
# where the highest order derivative has just jumped
# and is valid at least up to the next knot after t_knot.
#
# To ensure that we are on the right side of a knot:
# To ensure that we are on the correct side of a knot:
# * only ever increase t when rounding (for increasing t)
# * or only ever decrease it (for decreasing t)
#
# The highest derivative is discontinuous at t
# and the correct value for a segment is obtained
# for t_int >= t_float == t_knot (and v.v. for t decreasing).
x = x/scale
inc = np.diff(x) >= 0
t = x/scale
inc = np.diff(t) >= 0
inc = np.r_[inc, inc[-1]]
x = np.where(inc, np.ceil(x + nudge), np.floor(x - nudge))
if len(x) > 1 and x[0] == x[1]:
x = x[1:]
if len(x) > 1 and x[-2] == x[-1]:
x = x[:-1]
x_sample = x[:-1]*scale
durations = np.diff(x.astype(np.int))
return x_sample, durations
t = np.where(inc, np.ceil(t), np.floor(t))
dt = np.diff(t.astype(np.int))
valid = np.absolute(dt) >= min_duration
dt = dt[valid]
t = t[np.r_[True, valid]]
if dt.shape[0] == 1:
dt[0] = max(dt[0], min_length)
x_sample = t[:-1]*scale
return x_sample, dt
def __call__(self, x):
return self.spline(x)
class ComposingSplineSource(SplineSource):
# TODO
# TODO: verify, test, document
def __init__(self, x, y, components, order=4, pad_dx=1.):
self.x = np.asanyarray(x)
assert self.x.ndim == 1
@ -236,18 +245,18 @@ class ComposingSplineSource(SplineSource):
components, self.x, order)
def __call__(self, t, gain={}, offset={}):
der = list((set(self.components.n) | set(offset)) & set(self.der))
der = list((set(self.components.n) | set(offset))
& set(range(len(self.splines))))
u = np.zeros((self.splines[0].order, len(self.splines[0].s), len(t)))
# der, order, ele, t
p = np.array([self.splines[i](t) for i in der])
# order, der, None, t
s = self.components(t)
s_gain = np.array([gain.get(_, 1.) for _ in self.components.n])
s = s[:, :, None, :]*s_gain[None, :, None, None]
# order, der, None, t
s = self.components(t)[:, :, None, :]*s_gain[None, :, None, None]
for k, v in offset.items():
if v and k in self.der:
u += v*p[self.der.index(k)]
ps = p[[self.der.index(_) for _ in self.shims.der]]
if v:
u += v*p[k]
ps = p[self.shims.n]
for i in range(u.shape[1]):
for j in range(i + 1):
u[i] += binom(i, j)*(s[j]*ps[:, i - j]).sum(0)