wavesynth: cleanup, refactor

This commit is contained in:
Robert Jördens 2015-05-05 17:25:09 +02:00
parent e94e7c6a75
commit 07a58dc0da
2 changed files with 29 additions and 34 deletions

View File

@ -12,7 +12,7 @@ class TestSplineCoef(unittest.TestCase):
self.s = coefficients.SplineSource(self.x, self.y, order=4) self.s = coefficients.SplineSource(self.x, self.y, order=4)
def test_get_segment(self): def test_get_segment(self):
return list(self.s.get_segment_data(1.5, 3.2, 1/100.)) return list(self.s.get_segment_data(start=1.5, stop=3.2, scale=.01))
def test_synth(self): def test_synth(self):
d = self.test_get_segment() d = self.test_get_segment()
@ -32,7 +32,7 @@ class TestSplineCoef(unittest.TestCase):
@unittest.skip("manual/visual test") @unittest.skip("manual/visual test")
def test_plot(self): def test_plot(self):
import cairoplot import matplotlib.pyplot as plt
y = self.test_run() y = self.test_run()
x = list(range(len(y))) plt.step(np.arange(len(y)), y)
cairoplot.scatter_plot("plot.png", [x, y]) plt.show()

View File

@ -5,11 +5,19 @@ from scipy.special import binom
class UnivariateMultiSpline: class UnivariateMultiSpline:
"""Multidimensional wrapper around `scipy.interpolate.sp*` functions. """Multidimensional wrapper around `scipy.interpolate.sp*` functions.
`scipy.inteprolate.splprep` unfortunately does only up to 12 dimsions. `scipy.inteprolate.splprep` is limited to 12 dimensions.
""" """
def __init__(self, x, y, order=4): def __init__(self, x, y, *, x0=None, order=4, **kwargs):
self.order = order self.order = order
self.s = [splrep(x, yi, k=order - 1) for yi in y] self.x = x
self.s = []
for i, yi in enumerate(y):
if x0 is not None:
yi = self.upsample_knots(x0[i], yi, x)
self.s.append(splrep(x, yi, k=order - 1, **kwargs))
def upsample_knots(self, x0, y0, x):
return splev(x, splrep(x0, y0, k=self.order - 1))
def lev(self, x, *a, **k): def lev(self, x, *a, **k):
return np.array([splev(x, si) for si in self.s]) return np.array([splev(x, si) for si in self.s])
@ -30,20 +38,6 @@ class UnivariateMultiSpline:
return np.array([self.lev(x, der=i) for i in range(self.order)]) return np.array([self.lev(x, der=i) for i in range(self.order)])
class UnivariateMultiSparseSpline(UnivariateMultiSpline):
def __init__(self, d, x0=None, order=4):
self.order = order
self.n = sorted(set(n for x, n, y in d))
self.s = []
for n in self.n:
x, y = np.array([(x, y) for x, ni, y in d if n == ni]).T
if x0 is not None:
y0 = splev(x0, splrep(x, y, k=order - 1))
x, y = x0, y0
s = splrep(x, y, k=order - 1)
self.s.append(s)
def pad_const(x, n, axis=0): def pad_const(x, n, axis=0):
"""Prefix and postfix the array `x` by `n` repetitions of the first and """Prefix and postfix the array `x` by `n` repetitions of the first and
last value along `axis`. last value along `axis`.
@ -58,28 +52,28 @@ def pad_const(x, n, axis=0):
def build_segment(durations, coefficients, target="bias", def build_segment(durations, coefficients, target="bias",
variable="amplitude"): variable="amplitude", compress=True):
"""Build a wavesynth-style segment from homogeneous duration and """Build a wavesynth-style segment from homogeneous duration and
coefficient data. coefficient data.
:param durations: 1D sequence of line durations. :param durations: 1D sequence of line durations.
:param coefficients: 3D array with shape `(n, m, len(x))`, :param coefficients: 3D array with shape `(n, m, len(durations))`,
with `n` being the interpolation order + 1 and `m` the number of with `n` being the interpolation order + 1 and `m` the number of
channels. channels.
:param target: The target component of the channel to affect. :param target: The target component of the channel to affect.
:param variable: The variable within the target component. :param variable: The variable within the target component.
:param compress: If `True`, skip zero high order coefficients.
""" """
for dxi, yi in zip(durations, coefficients.transpose()): for dxi, yi in zip(durations, coefficients.transpose()):
d = {"duration": int(dxi)} cd = []
d["channel_data"] = cd = []
for yij in yi: for yij in yi:
cdj = [] cdj = []
for yijk in reversed(yij): for yijk in reversed(yij):
if cdj or abs(yijk): if cdj or abs(yijk) or not compress:
cdj.append(float(yijk)) cdj.append(float(yijk))
cdj.reverse() cdj.reverse()
cd.append({target: {variable: cdj}}) cd.append({target: {variable: cdj}})
yield d yield {"duration": int(dxi), "channel_data": cd}
class CoefficientSource: class CoefficientSource:
@ -125,7 +119,7 @@ class CoefficientSource:
with `n` being the number of channels.""" with `n` being the number of channels."""
raise NotImplementedError raise NotImplementedError
def get_segment_data(self, start, stop, scale, cutoff=1e-12, def get_segment_data(self, start, stop, scale, *, cutoff=1e-12,
target="bias", variable="amplitude"): target="bias", variable="amplitude"):
"""Build wavesynth segment data. """Build wavesynth segment data.
@ -154,7 +148,7 @@ class CoefficientSource:
""" """
for i, line in enumerate(self.get_segment_data(*args, **kwargs)): for i, line in enumerate(self.get_segment_data(*args, **kwargs)):
if i == 0: if i == 0:
line["trigger"] = True line["trigger"] = trigger
segment.add_line(**line) segment.add_line(**line)
@ -176,7 +170,7 @@ class SplineSource(CoefficientSource):
self.y = pad_const(self.y, order, axis=1) self.y = pad_const(self.y, order, axis=1)
assert self.y.shape[1] == self.x.shape[0] assert self.y.shape[1] == self.x.shape[0]
self.spline = UnivariateMultiSpline(self.x, self.y, order) self.spline = UnivariateMultiSpline(self.x, self.y, order=order)
def crop_x(self, start, stop): def crop_x(self, start, stop):
ia, ib = np.searchsorted(self.x, (start, stop)) ia, ib = np.searchsorted(self.x, (start, stop))
@ -187,7 +181,8 @@ class SplineSource(CoefficientSource):
return np.r_[start, x, stop] return np.r_[start, x, stop]
def scale_x(self, x, scale, min_duration=1, min_length=20): def scale_x(self, x, scale, min_duration=1, min_length=20):
""" """Enforce, round, and scale x to device-dependent values.
Due to minimum duration and/or minimum segment length constraints Due to minimum duration and/or minimum segment length constraints
this method may drop samples from `x_sample` or adjust `durations` to this method may drop samples from `x_sample` or adjust `durations` to
comply. But `x_sample` and `durations` should be kept consistent. comply. But `x_sample` and `durations` should be kept consistent.
@ -235,14 +230,14 @@ class ComposingSplineSource(SplineSource):
self.y = pad_const(self.y, order, axis=2) self.y = pad_const(self.y, order, axis=2)
assert self.y.shape[2] == self.x.shape[0] assert self.y.shape[2] == self.x.shape[0]
self.splines = [UnivariateMultiSpline(self.x, yi, order) self.splines = [UnivariateMultiSpline(self.x, yi, order=order)
for yi in self.y] for yi in self.y]
# need to resample/upsample the shim splines to the master spline knots # need to resample/upsample the shim splines to the master spline knots
# shim knot spacings can span an master spline knot and thus would # shim knot spacings can span an master spline knot and thus would
# cross a highest order derivative boundary # cross a highest order derivative boundary
self.components = UnivariateMultiSparseSpline( y0, x0 = zip(*components)
components, self.x, order) self.components = UnivariateMultiSpline(self.x, y0, x0=x0, order=order)
def __call__(self, t, gain={}, offset={}): def __call__(self, t, gain={}, offset={}):
der = list((set(self.components.n) | set(offset)) der = list((set(self.components.n) | set(offset))