sawg: spline knot packing/conversion, unittest

This commit is contained in:
Robert Jördens 2016-11-29 14:49:07 +01:00
parent 41779367b5
commit 23fd225947
2 changed files with 139 additions and 39 deletions

View File

@ -1,5 +1,5 @@
from numpy import int32, int64
from artiq.language.core import kernel, now_mu, portable
from artiq.language.core import kernel, now_mu, portable, delay
from artiq.coredevice.rtio import rtio_output, rtio_output_list
from artiq.language.types import TInt32, TInt64, TFloat, TList
@ -12,22 +12,22 @@ class Spline:
self.core = core_device
self.channel = channel
self.width = width
self.scale = (1 << width) / scale
self.scale = (1 << width) * scale
self.time_width = time_width
self.time_scale = (1 << time_width) / core_device.coarse_ref_period
self.time_scale = (1 << time_width) * core_device.coarse_ref_period
@portable(flags=["fast-math"])
@portable(flags={"fast-math"})
def to_mu(self, value: TFloat) -> TInt32:
return int(round(value*self.scale))
@portable(flags=["fast-math"])
@portable(flags={"fast-math"})
def from_mu(self, value: TInt32) -> TFloat:
return value/self.scale
@portable(flags=["fast-math"])
@portable(flags={"fast-math"})
def to_mu64(self, value: TFloat) -> TList(TInt32):
v = int64(round(value*self.scale))
return [int32(v), int32(v >> 32)]
return [int32(v), int32((v >> 32) & 0xffffffff)]
@kernel
def set_mu(self, value: TInt32):
@ -61,20 +61,45 @@ class Spline:
"""
rtio_output_list(now_mu(), self.channel, 0, value)
@portable(flags=["fast-math"])
def coeff_to_mu(self, value: TList(TFloat)) -> TList(TInt32):
l = len(value)
w = l*self.width + (l - 1)*l//2*self.time_width
v = [0] * ((w + 31)//32)
j = 0
for i, vi in enumerate(value):
w = self.width + i*self.time_width
vi = int64(round(vi*(self.scale*self.time_scale**i)))
for k in range(0, w, 16):
wi = (vi >> k) & 0xffff
v[j//2] += wi << (16 * ((j + 1)//2 - j//2))
j += 1
return v
@portable(flags={"fast-math"})
def pack_coeff_mu(self, coeff: TList(TInt64)) -> TList(TInt32):
n = len(coeff)
width = n*self.width + (n - 1)*n//2*self.time_width
packed = [int32(0)] * ((width + 31)//32)
pos = 0
for i in range(n):
wi = self.width + i*self.time_width
ci = coeff[i]
while wi:
j = pos//32
used = pos - 32*j
avail = 32 - used
if avail > wi:
avail = wi
packed[j] |= (ci & ((1 << avail) - 1)) << used
ci >>= avail
wi -= avail
pos += avail
return packed
@portable(flags={"fast-math"})
def coeff_to_mu(self, coeff: TList(TFloat)) -> TList(TInt32):
n = len(coeff)
coeff64 = [int64(0)] * n
for i in range(n):
vi = coeff[i] * self.scale
for j in range(i):
vi *= self.time_scale
vi = int(round(vi))
coeff64[i] = vi
# artiq.wavesynth.coefficients.discrete_compensate:
continue
if i == 2:
coeff64[1] += vi >> (self.time_width + 1)
elif i == 3:
coeff64[2] += vi >> self.time_width
coeff64[1] += (vi // 3) >> (2*self.time_width + 1)
return self.pack_coeff_mu(coeff64)
@kernel
def set_list(self, value: TList(TFloat)):
@ -85,6 +110,42 @@ class Spline:
"""
self.set_list_mu(self.coeff_to_mu(value))
@kernel(flags={"fast-math"})
def smooth(self, start, stop, duration, order):
"""Initiate an interpolated value change.
The third order interpolation is constrained to have zero first
order derivative at both start and stop.
For zeroth order (step) interpolation, the step is at duration/2.
For first order and third order interpolation (linear and cubic)
the interpolator needs to be stopped (or fed a new spline knot)
explicitly at the stop time.
This method advances the timeline by `duration`.
:param start: Initial value of the change.
:param stop: Final value of the change.
:param duration: Duration of the interpolation.
:param order: Order of the interpolation. Only 0, 1,
and 3 are valid: step, linear, cubic.
"""
if order == 0:
delay(duration/2)
self.set_list([stop])
delay(duration/2)
elif order == 1:
self.set_list([start, (stop - start)/duration])
delay(duration)
elif order == 3:
v2 = 6*(stop - start)/(duration*duration)
self.set_list([start, 0., v2, -2*v2/duration])
delay(duration)
else:
raise ValueError("Invalid interpolation order. "
"Supported orders are: 0, 1, 3.")
class SAWG:
"""Smart arbitrary waveform generator channel.
@ -119,21 +180,21 @@ class SAWG:
cordic_gain = 1.646760258057163 # Cordic(width=16, guard=None).gain
# cfg: channel_base
self.offset = Spline(width, time_width, channel_base + 1,
self.core, 2)
self.core, 1/2)
self.amplitude1 = Spline(width, time_width, channel_base + 2,
self.core, 2*cordic_gain**2)
self.core, 1/(2*cordic_gain**2))
self.frequency1 = Spline(3*width, time_width, channel_base + 3,
self.core, self.core.coarse_ref_period)
self.core, 1/self.core.coarse_ref_period)
self.phase1 = Spline(width, time_width, channel_base + 4,
self.core, 1.)
self.amplitude2 = Spline(width, time_width, channel_base + 5,
self.core, 2*cordic_gain**2)
self.core, 1/(2*cordic_gain**2))
self.frequency2 = Spline(3*width, time_width, channel_base + 6,
self.core, self.core.coarse_ref_period)
self.core, 1/self.core.coarse_ref_period)
self.phase2 = Spline(width, time_width, channel_base + 7,
self.core, 1.)
self.frequency0 = Spline(2*width, time_width, channel_base + 8,
self.core,
parallelism/self.core.coarse_ref_period)
self.core.coarse_ref_period/parallelism)
self.phase0 = Spline(width, time_width, channel_base + 9,
self.core, 1.)

View File

@ -1,5 +1,4 @@
import unittest
import numpy as np
from numpy import int32, int64
import migen as mg
@ -21,7 +20,7 @@ class RTIOManager:
self.rtio_output(*args, **kwargs)
def patch(self, mod):
assert not getattr(mod, "_saved", None)
assert not hasattr(mod, "_saved")
mod._saved = {}
for name in "rtio_output rtio_output_list".split():
mod._saved[name] = getattr(mod, name, None)
@ -63,7 +62,8 @@ class SAWGTest(unittest.TestCase):
(1 << self.driver.offset.width - 1)*.9))),
(2*8, 8, 0, [int(round(
(1 << self.driver.frequency0.width) /
self.channel.parallelism*.1)), 0]),
self.channel.parallelism*d.core.coarse_ref_period*.1)),
0]),
(4*8, 1, 0, 0),
])
@ -79,7 +79,7 @@ class SAWGTest(unittest.TestCase):
yield phy.rtlink.o.stb.eq(0)
rt = dut.phys[channel].rtlink.o
if isinstance(data, list):
data = sum(d << i*32 for i, d in enumerate(data))
data = sum(int(d) << (i*32) for i, d in enumerate(data))
yield rt.data.eq(int(data))
yield rt.stb.eq(1)
assert not (yield rt.busy)
@ -98,10 +98,9 @@ class SAWGTest(unittest.TestCase):
vcd_name="dds.vcd")
return sum(data, [])
def test_channel(self):
def test_run_channel(self):
self.test_make_events()
out = self.run_channel(self.rtio_manager.outputs)
print(out)
def test_coeff(self):
import struct
@ -111,10 +110,11 @@ class SAWGTest(unittest.TestCase):
p = ch.coeff_to_mu(v)
t = ch.time_width
w = ch.width
p0 = [struct.pack("<" + "_hiqq"[(w + i*t)//16],
int(round(vi*ch.scale*ch.time_scale**i))
)[:(w + i*t)//8]
p0 = [int(round(vi*ch.scale*ch.time_scale**i))
for i, vi in enumerate(v)]
p0 = [struct.pack("<" + "_bhiiqqqq"[(w + i*t)//8], vi
)[:(w + i*t)//8]
for i, vi in enumerate(p0)]
p0 = b"".join(p0)
if len(p0) % 4:
p0 += b"\x00"*(4 - len(p0) % 4)
@ -129,6 +129,45 @@ class SAWGTest(unittest.TestCase):
d.offset.set_list([0])
delay_mu(1*8)
out = self.run_channel(self.rtio_manager.outputs)
self.assertEqual(
out, sum(([100 + i*10]*self.channel.parallelism
for i in range(11)), []))
for i in range(len(out)//2):
with self.subTest(i):
v = 100 + i*10
self.assertEqual(out[2*i], v)
self.assertEqual(out[2*i+1], v)
def test_pack(self):
ch = self.driver.offset
self.assertEqual(ch.pack_coeff_mu([1]), [1])
self.assertEqual(ch.pack_coeff_mu([1, 1 << 16]), [1, 1])
self.assertEqual(ch.pack_coeff_mu([1, 1 << 32]), [1, 0])
self.assertEqual(ch.pack_coeff_mu([0x1234, 0xa5a5a5a5]),
[0xa5a51234, 0xa5a5])
self.assertEqual(ch.pack_coeff_mu([1, 2, 3, 4]),
[0x20001, 0x30000, 0, 4, 0])
self.assertEqual(ch.pack_coeff_mu([-1, -2, -3, -4]),
[0xfffeffff, 0xfffdffff, 0xffffffff,
0xfffffffc, 0xffffffff])
self.assertEqual(ch.pack_coeff_mu([0, -1, 0, -1]),
[0xffff0000, 0x0000ffff, 0,
0xffffffff, 0xffffffff])
def test_smooth_linear(self):
ch = self.driver.offset
ch.smooth(.1, .2, 13*ch.core.coarse_ref_period, 1)
ch.set(.2)
delay_mu(1*8)
out = self.run_channel(self.rtio_manager.outputs)
a = int(round(.1*ch.scale))
da = a//13
for i in range(len(out)//2):
with self.subTest(i):
v = a + i*da
self.assertEqual(out[2*i], v)
self.assertEqual(out[2*i+1], v)
def test_smooth_cubic(self):
ch = self.driver.offset
ch.smooth(.1, .2, 13*ch.core.coarse_ref_period, 3)
ch.set(.2)
delay_mu(1*8)
out = self.run_channel(self.rtio_manager.outputs)