diff --git a/examples/coredev_test.py b/examples/coredev_test.py deleted file mode 100644 index 9512e689d..000000000 --- a/examples/coredev_test.py +++ /dev/null @@ -1,38 +0,0 @@ -from artiq.language.core import AutoContext, kernel -from artiq.devices import corecom_serial, core, gpio_core - - -class CompilerTest(AutoContext): - parameters = "led" - - def output(self, n): - print("Received: "+str(n)) - - def get_max(self): - return int(input("Maximum: ")) - - @kernel - def run(self): - self.led.set(1) - m = self.get_max() - for x in range(1, m): - d = 2 - prime = True - while d*d <= x: - if x % d == 0: - prime = False - break - d += 1 - if prime: - self.output(x) - self.led.set(0) - - -if __name__ == "__main__": - with corecom_serial.CoreCom() as com: - coredev = core.Core(com) - exp = CompilerTest( - core=coredev, - led=gpio_core.GPIOOut(core=coredev, channel=0) - ) - exp.run() diff --git a/examples/rtio_loopback.py b/examples/rtio_loopback.py deleted file mode 100644 index f2d009ee9..000000000 --- a/examples/rtio_loopback.py +++ /dev/null @@ -1,31 +0,0 @@ -from artiq.language.core import * -from artiq.language.units import * -from artiq.devices import corecom_serial, core, rtio_core - - -class RTIOLoopback(AutoContext): - parameters = "i o" - - def report(self, n): - print(n) - - @kernel - def run(self): - with parallel: - with sequential: - for i in range(4): - delay(25*ns) - self.o.pulse(25*ns) - self.i.count_rising(1*us) - self.report(self.i.sync()) - - -if __name__ == "__main__": - with corecom_serial.CoreCom() as com: - coredev = core.Core(com) - exp = RTIOLoopback( - core=coredev, - i=rtio_core.RTIOCounter(core=coredev, channel=0), - o=rtio_core.RTIOOut(core=coredev, channel=1) - ) - exp.run() diff --git a/examples/time_test.py b/examples/time_test.py deleted file mode 100644 index 0d760abe9..000000000 --- a/examples/time_test.py +++ /dev/null @@ -1,49 +0,0 @@ -from artiq.language.units import * -from artiq.language.core import * -from artiq.devices import corecom_serial, core - - -class DummyPulse(AutoContext): - parameters = "name" - - def print_on(self, t, f): - print("{} ON:{:4} @{}".format(self.name, f, t)) - - def print_off(self, t): - print("{} OFF @{}".format(self.name, t)) - - @kernel - def pulse(self, f, duration): - self.print_on(int(now()), f) - delay(duration) - self.print_off(int(now())) - - -class TimeTest(AutoContext): - parameters = "a b c d" - - @kernel - def run(self): - i = 0 - while i < 3: - with parallel: - with sequential: - self.a.pulse(100+i, 20*us) - self.b.pulse(200+i, 20*us) - with sequential: - self.c.pulse(300+i, 10*us) - self.d.pulse(400+i, 20*us) - i += 1 - - -if __name__ == "__main__": - with corecom_serial.CoreCom() as com: - coredev = core.Core(com) - exp = TimeTest( - core=coredev, - a=DummyPulse(core=coredev, name="a"), - b=DummyPulse(core=coredev, name="b"), - c=DummyPulse(core=coredev, name="c"), - d=DummyPulse(core=coredev, name="d"), - ) - exp.run() diff --git a/test/full_stack.py b/test/full_stack.py new file mode 100644 index 000000000..43407fe3a --- /dev/null +++ b/test/full_stack.py @@ -0,0 +1,115 @@ +import unittest + +from artiq.language.core import * +from artiq.language.units import * +from artiq.devices import corecom_serial, core, rtio_core +from artiq.sim import devices as sim_devices + + +def _run_on_device(k_class, **parameters): + with corecom_serial.CoreCom() as com: + coredev = core.Core(com) + k_inst = k_class(core=coredev, **parameters) + k_inst.run() + + +def _run_on_host(k_class, **parameters): + coredev = sim_devices.Core() + k_inst = k_class(core=coredev, **parameters) + k_inst.run() + + +class _Primes(AutoContext): + parameters = "output_list max" + + @kernel + def run(self): + for x in range(1, self.max): + d = 2 + prime = True + while d*d <= x: + if x % d == 0: + prime = False + break + d += 1 + if prime: + self.output_list.append(x) + + +class _PulseLogger(AutoContext): + parameters = "name" + + def print_on(self, t, f): + print("{} ON:{:4} @{}".format(self.name, f, t)) + + def print_off(self, t): + print("{} OFF @{}".format(self.name, t)) + + @kernel + def pulse(self, f, duration): + self.print_on(int(now()), f) + delay(duration) + self.print_off(int(now())) + + +class _PulseTest(AutoContext): + def build(self): + for name in "a", "b", "c", "d": + pl = _PulseLogger(self, name=name) + setattr(self, name, pl) + + @kernel + def run(self): + for i in range(3): + with parallel: + with sequential: + self.a.pulse(100+i, 20*us) + self.b.pulse(200+i, 20*us) + with sequential: + self.c.pulse(300+i, 10*us) + self.d.pulse(400+i, 20*us) + + +class SimCompareCase(unittest.TestCase): + def test_primes(self): + l_device, l_host = [], [] + _run_on_device(_Primes, max=100, output_list=l_device) + _run_on_host(_Primes, max=100, output_list=l_host) + self.assertEqual(l_device, l_host) + + def test_pulses(self): + # TODO: compare results on host and device + # (this requires better unit management in the compiler) + _run_on_device(_PulseTest) + + +class _RTIOLoopback(AutoContext): + parameters = "i o npulses" + + def report(self, n): + self.result = n + + @kernel + def run(self): + with parallel: + with sequential: + for i in range(self.npulses): + delay(25*ns) + self.o.pulse(25*ns) + self.i.count_rising(10*us) + self.report(self.i.sync()) + + +class RTIOCase(unittest.TestCase): + def test_loopback(self): + npulses = 4 + with corecom_serial.CoreCom() as com: + coredev = core.Core(com) + lb = _RTIOLoopback( + core=coredev, + i=rtio_core.RTIOCounter(core=coredev, channel=0), + o=rtio_core.RTIOOut(core=coredev, channel=1), + npulses=npulses + ) + lb.run() + self.assertEqual(lb.result, npulses)