diff --git a/artiq/coredevice/sawg.py b/artiq/coredevice/sawg.py index 6823bcf6d..da4c9a5be 100644 --- a/artiq/coredevice/sawg.py +++ b/artiq/coredevice/sawg.py @@ -1,5 +1,5 @@ from numpy import int32, int64 -from artiq.language.core import kernel, now_mu, portable +from artiq.language.core import kernel, now_mu, portable, delay from artiq.coredevice.rtio import rtio_output, rtio_output_list from artiq.language.types import TInt32, TInt64, TFloat, TList @@ -12,22 +12,22 @@ class Spline: self.core = core_device self.channel = channel self.width = width - self.scale = (1 << width) / scale + self.scale = (1 << width) * scale self.time_width = time_width - self.time_scale = (1 << time_width) / core_device.coarse_ref_period + self.time_scale = (1 << time_width) * core_device.coarse_ref_period - @portable(flags=["fast-math"]) + @portable(flags={"fast-math"}) def to_mu(self, value: TFloat) -> TInt32: return int(round(value*self.scale)) - @portable(flags=["fast-math"]) + @portable(flags={"fast-math"}) def from_mu(self, value: TInt32) -> TFloat: return value/self.scale - @portable(flags=["fast-math"]) + @portable(flags={"fast-math"}) def to_mu64(self, value: TFloat) -> TList(TInt32): v = int64(round(value*self.scale)) - return [int32(v), int32(v >> 32)] + return [int32(v), int32((v >> 32) & 0xffffffff)] @kernel def set_mu(self, value: TInt32): @@ -61,20 +61,45 @@ class Spline: """ rtio_output_list(now_mu(), self.channel, 0, value) - @portable(flags=["fast-math"]) - def coeff_to_mu(self, value: TList(TFloat)) -> TList(TInt32): - l = len(value) - w = l*self.width + (l - 1)*l//2*self.time_width - v = [0] * ((w + 31)//32) - j = 0 - for i, vi in enumerate(value): - w = self.width + i*self.time_width - vi = int64(round(vi*(self.scale*self.time_scale**i))) - for k in range(0, w, 16): - wi = (vi >> k) & 0xffff - v[j//2] += wi << (16 * ((j + 1)//2 - j//2)) - j += 1 - return v + @portable(flags={"fast-math"}) + def pack_coeff_mu(self, coeff: TList(TInt64)) -> TList(TInt32): + n = len(coeff) + width = n*self.width + (n - 1)*n//2*self.time_width + packed = [int32(0)] * ((width + 31)//32) + pos = 0 + for i in range(n): + wi = self.width + i*self.time_width + ci = coeff[i] + while wi: + j = pos//32 + used = pos - 32*j + avail = 32 - used + if avail > wi: + avail = wi + packed[j] |= (ci & ((1 << avail) - 1)) << used + ci >>= avail + wi -= avail + pos += avail + return packed + + @portable(flags={"fast-math"}) + def coeff_to_mu(self, coeff: TList(TFloat)) -> TList(TInt32): + n = len(coeff) + coeff64 = [int64(0)] * n + for i in range(n): + vi = coeff[i] * self.scale + for j in range(i): + vi *= self.time_scale + vi = int(round(vi)) + coeff64[i] = vi + # artiq.wavesynth.coefficients.discrete_compensate: + continue + if i == 2: + coeff64[1] += vi >> (self.time_width + 1) + elif i == 3: + coeff64[2] += vi >> self.time_width + coeff64[1] += (vi // 3) >> (2*self.time_width + 1) + return self.pack_coeff_mu(coeff64) @kernel def set_list(self, value: TList(TFloat)): @@ -85,6 +110,42 @@ class Spline: """ self.set_list_mu(self.coeff_to_mu(value)) + @kernel(flags={"fast-math"}) + def smooth(self, start, stop, duration, order): + """Initiate an interpolated value change. + + The third order interpolation is constrained to have zero first + order derivative at both start and stop. + + For zeroth order (step) interpolation, the step is at duration/2. + + For first order and third order interpolation (linear and cubic) + the interpolator needs to be stopped (or fed a new spline knot) + explicitly at the stop time. + + This method advances the timeline by `duration`. + + :param start: Initial value of the change. + :param stop: Final value of the change. + :param duration: Duration of the interpolation. + :param order: Order of the interpolation. Only 0, 1, + and 3 are valid: step, linear, cubic. + """ + if order == 0: + delay(duration/2) + self.set_list([stop]) + delay(duration/2) + elif order == 1: + self.set_list([start, (stop - start)/duration]) + delay(duration) + elif order == 3: + v2 = 6*(stop - start)/(duration*duration) + self.set_list([start, 0., v2, -2*v2/duration]) + delay(duration) + else: + raise ValueError("Invalid interpolation order. " + "Supported orders are: 0, 1, 3.") + class SAWG: """Smart arbitrary waveform generator channel. @@ -119,21 +180,21 @@ class SAWG: cordic_gain = 1.646760258057163 # Cordic(width=16, guard=None).gain # cfg: channel_base self.offset = Spline(width, time_width, channel_base + 1, - self.core, 2) + self.core, 1/2) self.amplitude1 = Spline(width, time_width, channel_base + 2, - self.core, 2*cordic_gain**2) + self.core, 1/(2*cordic_gain**2)) self.frequency1 = Spline(3*width, time_width, channel_base + 3, - self.core, self.core.coarse_ref_period) + self.core, 1/self.core.coarse_ref_period) self.phase1 = Spline(width, time_width, channel_base + 4, self.core, 1.) self.amplitude2 = Spline(width, time_width, channel_base + 5, - self.core, 2*cordic_gain**2) + self.core, 1/(2*cordic_gain**2)) self.frequency2 = Spline(3*width, time_width, channel_base + 6, - self.core, self.core.coarse_ref_period) + self.core, 1/self.core.coarse_ref_period) self.phase2 = Spline(width, time_width, channel_base + 7, self.core, 1.) self.frequency0 = Spline(2*width, time_width, channel_base + 8, self.core, - parallelism/self.core.coarse_ref_period) + self.core.coarse_ref_period/parallelism) self.phase0 = Spline(width, time_width, channel_base + 9, self.core, 1.) diff --git a/artiq/test/gateware/test_sawg_fe.py b/artiq/test/gateware/test_sawg_fe.py index bb1d03478..a93f95daa 100644 --- a/artiq/test/gateware/test_sawg_fe.py +++ b/artiq/test/gateware/test_sawg_fe.py @@ -1,5 +1,4 @@ import unittest -import numpy as np from numpy import int32, int64 import migen as mg @@ -21,7 +20,7 @@ class RTIOManager: self.rtio_output(*args, **kwargs) def patch(self, mod): - assert not getattr(mod, "_saved", None) + assert not hasattr(mod, "_saved") mod._saved = {} for name in "rtio_output rtio_output_list".split(): mod._saved[name] = getattr(mod, name, None) @@ -63,7 +62,8 @@ class SAWGTest(unittest.TestCase): (1 << self.driver.offset.width - 1)*.9))), (2*8, 8, 0, [int(round( (1 << self.driver.frequency0.width) / - self.channel.parallelism*.1)), 0]), + self.channel.parallelism*d.core.coarse_ref_period*.1)), + 0]), (4*8, 1, 0, 0), ]) @@ -79,7 +79,7 @@ class SAWGTest(unittest.TestCase): yield phy.rtlink.o.stb.eq(0) rt = dut.phys[channel].rtlink.o if isinstance(data, list): - data = sum(d << i*32 for i, d in enumerate(data)) + data = sum(int(d) << (i*32) for i, d in enumerate(data)) yield rt.data.eq(int(data)) yield rt.stb.eq(1) assert not (yield rt.busy) @@ -98,10 +98,9 @@ class SAWGTest(unittest.TestCase): vcd_name="dds.vcd") return sum(data, []) - def test_channel(self): + def test_run_channel(self): self.test_make_events() out = self.run_channel(self.rtio_manager.outputs) - print(out) def test_coeff(self): import struct @@ -111,10 +110,11 @@ class SAWGTest(unittest.TestCase): p = ch.coeff_to_mu(v) t = ch.time_width w = ch.width - p0 = [struct.pack("<" + "_hiqq"[(w + i*t)//16], - int(round(vi*ch.scale*ch.time_scale**i)) - )[:(w + i*t)//8] + p0 = [int(round(vi*ch.scale*ch.time_scale**i)) for i, vi in enumerate(v)] + p0 = [struct.pack("<" + "_bhiiqqqq"[(w + i*t)//8], vi + )[:(w + i*t)//8] + for i, vi in enumerate(p0)] p0 = b"".join(p0) if len(p0) % 4: p0 += b"\x00"*(4 - len(p0) % 4) @@ -129,6 +129,45 @@ class SAWGTest(unittest.TestCase): d.offset.set_list([0]) delay_mu(1*8) out = self.run_channel(self.rtio_manager.outputs) - self.assertEqual( - out, sum(([100 + i*10]*self.channel.parallelism - for i in range(11)), [])) + for i in range(len(out)//2): + with self.subTest(i): + v = 100 + i*10 + self.assertEqual(out[2*i], v) + self.assertEqual(out[2*i+1], v) + + def test_pack(self): + ch = self.driver.offset + self.assertEqual(ch.pack_coeff_mu([1]), [1]) + self.assertEqual(ch.pack_coeff_mu([1, 1 << 16]), [1, 1]) + self.assertEqual(ch.pack_coeff_mu([1, 1 << 32]), [1, 0]) + self.assertEqual(ch.pack_coeff_mu([0x1234, 0xa5a5a5a5]), + [0xa5a51234, 0xa5a5]) + self.assertEqual(ch.pack_coeff_mu([1, 2, 3, 4]), + [0x20001, 0x30000, 0, 4, 0]) + self.assertEqual(ch.pack_coeff_mu([-1, -2, -3, -4]), + [0xfffeffff, 0xfffdffff, 0xffffffff, + 0xfffffffc, 0xffffffff]) + self.assertEqual(ch.pack_coeff_mu([0, -1, 0, -1]), + [0xffff0000, 0x0000ffff, 0, + 0xffffffff, 0xffffffff]) + + def test_smooth_linear(self): + ch = self.driver.offset + ch.smooth(.1, .2, 13*ch.core.coarse_ref_period, 1) + ch.set(.2) + delay_mu(1*8) + out = self.run_channel(self.rtio_manager.outputs) + a = int(round(.1*ch.scale)) + da = a//13 + for i in range(len(out)//2): + with self.subTest(i): + v = a + i*da + self.assertEqual(out[2*i], v) + self.assertEqual(out[2*i+1], v) + + def test_smooth_cubic(self): + ch = self.driver.offset + ch.smooth(.1, .2, 13*ch.core.coarse_ref_period, 3) + ch.set(.2) + delay_mu(1*8) + out = self.run_channel(self.rtio_manager.outputs)