test: split full_stack into coredevice and coredevice_vs_host

also adapt it to hardware_testbench
closes: #62
This commit is contained in:
Robert Jördens 2015-07-04 20:35:02 -06:00
parent 4cbf280f1a
commit 6faa8ecd51
2 changed files with 106 additions and 150 deletions

View File

@ -3,6 +3,7 @@ from math import sqrt
from artiq.language import * from artiq.language import *
from artiq.test.hardware_testbench import ExperimentCase from artiq.test.hardware_testbench import ExperimentCase
from artiq.coredevice.runtime_exceptions import RTIOUnderflow from artiq.coredevice.runtime_exceptions import RTIOUnderflow
from artiq.coredevice import runtime_exceptions
class RTT(Experiment, AutoDB): class RTT(Experiment, AutoDB):
@ -88,6 +89,64 @@ class PulseRate(Experiment, AutoDB):
break break
class Watchdog(Experiment, AutoDB):
class DBKeys:
core = Device()
@kernel
def run(self):
with watchdog(50*ms):
while True:
pass
class LoopbackCount(Experiment, AutoDB):
class DBKeys:
core = Device()
ttl_inout = Device()
npulses = Argument()
def report(self, n):
self.result = n
@kernel
def run(self):
self.ttl_inout.output()
delay(1*us)
with parallel:
self.ttl_inout.gate_rising(10*us)
with sequential:
for i in range(self.npulses):
delay(25*ns)
self.ttl_inout.pulse(25*ns)
self.report(self.ttl_inout.count())
class Underflow(Experiment, AutoDB):
class DBKeys:
core = Device()
ttl_out = Device()
@kernel
def run(self):
while True:
delay(25*ns)
self.ttl_out.pulse(25*ns)
class SequenceError(Experiment, AutoDB):
class DBKeys:
core = Device()
ttl_out = Device()
@kernel
def run(self):
t = now_mu()
self.ttl_out.pulse(25*us)
at_mu(t)
self.ttl_out.pulse(25*us)
class CoredeviceTest(ExperimentCase): class CoredeviceTest(ExperimentCase):
def test_rtt(self): def test_rtt(self):
self.execute(RTT) self.execute(RTT)
@ -115,6 +174,24 @@ class CoredeviceTest(ExperimentCase):
self.assertGreater(rate, 100*ns) self.assertGreater(rate, 100*ns)
self.assertLess(rate, 2500*ns) self.assertLess(rate, 2500*ns)
def test_loopback_count(self):
npulses = 2
r = self.execute(LoopbackCount, npulses=npulses)
self.assertEqual(r.result, npulses)
def test_underflow(self):
with self.assertRaises(runtime_exceptions.RTIOUnderflow):
self.execute(Underflow)
def test_sequence_error(self):
with self.assertRaises(runtime_exceptions.RTIOSequenceError):
self.execute(SequenceError)
def test_watchdog(self):
# watchdog only works on the device
with self.assertRaises(IOError):
self.execute(Watchdog)
class RPCTiming(Experiment, AutoDB): class RPCTiming(Experiment, AutoDB):
class DBKeys: class DBKeys:

View File

@ -1,33 +1,19 @@
import unittest
from operator import itemgetter from operator import itemgetter
import os
from fractions import Fraction from fractions import Fraction
from artiq import * from artiq import *
from artiq.coredevice import comm_tcp, core, runtime_exceptions, ttl
from artiq.sim import devices as sim_devices from artiq.sim import devices as sim_devices
from artiq.test.hardware_testbench import ExperimentCase
core_device = os.getenv("ARTIQ_CORE_DEVICE")
def _run_on_device(k_class, **parameters):
comm = comm_tcp.Comm(host=core_device)
try:
coredev = core.Core(comm=comm)
k_inst = k_class(core=coredev, **parameters)
k_inst.run()
finally:
comm.close()
def _run_on_host(k_class, **parameters): def _run_on_host(k_class, **parameters):
coredev = sim_devices.Core() coredev = sim_devices.Core()
k_inst = k_class(core=coredev, **parameters) k_inst = k_class(core=coredev, **parameters)
k_inst.run() k_inst.run()
return k_inst
class _Primes(AutoDB): class _Primes(Experiment, AutoDB):
class DBKeys: class DBKeys:
core = Device() core = Device()
output_list = Argument() output_list = Argument()
@ -47,7 +33,10 @@ class _Primes(AutoDB):
self.output_list.append(x) self.output_list.append(x)
class _Misc(AutoDB): class _Misc(Experiment, AutoDB):
class DBKeys:
core = Device()
def build(self): def build(self):
self.input = 84 self.input = 84
self.al = [1, 2, 3, 4, 5] self.al = [1, 2, 3, 4, 5]
@ -63,7 +52,7 @@ class _Misc(AutoDB):
self.list_copy_out = self.list_copy_in self.list_copy_out = self.list_copy_in
class _PulseLogger(AutoDB): class _PulseLogger(Experiment, AutoDB):
class DBKeys: class DBKeys:
core = Device() core = Device()
output_list = Argument() output_list = Argument()
@ -90,7 +79,7 @@ class _PulseLogger(AutoDB):
self.off(now_mu()) self.off(now_mu())
class _Pulses(AutoDB): class _Pulses(Experiment, AutoDB):
class DBKeys: class DBKeys:
core = Device() core = Device()
output_list = Argument() output_list = Argument()
@ -118,7 +107,7 @@ class _MyException(Exception):
pass pass
class _Exceptions(AutoDB): class _Exceptions(Experiment, AutoDB):
class DBKeys: class DBKeys:
core = Device() core = Device()
trace = Argument() trace = Argument()
@ -162,9 +151,10 @@ class _Exceptions(AutoDB):
self.trace.append(104) self.trace.append(104)
class _RPCExceptions(AutoDB): class _RPCExceptions(Experiment, AutoDB):
class DBKeys: class DBKeys:
core = Device() core = Device()
catch = Argument(False)
def build(self): def build(self):
self.success = False self.success = False
@ -172,53 +162,43 @@ class _RPCExceptions(AutoDB):
def exception_raiser(self): def exception_raiser(self):
raise _MyException raise _MyException
@kernel
def run(self):
if self.catch:
self.do_catch()
else:
self.do_not_catch()
@kernel @kernel
def do_not_catch(self): def do_not_catch(self):
self.exception_raiser() self.exception_raiser()
@kernel @kernel
def catch(self): def do_catch(self):
try: try:
self.exception_raiser() self.exception_raiser()
except _MyException: except _MyException:
self.success = True self.success = True
class _Watchdog(AutoDB): class HostVsDeviceCase(ExperimentCase):
class DBKeys:
core = Device()
@kernel
def run(self):
with watchdog(50*ms):
while True:
pass
@unittest.skipUnless(core_device, "no hardware")
class ExecutionCase(unittest.TestCase):
def test_primes(self): def test_primes(self):
l_device, l_host = [], [] l_device, l_host = [], []
_run_on_device(_Primes, maximum=100, output_list=l_device) self.execute(_Primes, maximum=100, output_list=l_device)
_run_on_host(_Primes, maximum=100, output_list=l_host) _run_on_host(_Primes, maximum=100, output_list=l_host)
self.assertEqual(l_device, l_host) self.assertEqual(l_device, l_host)
def test_misc(self): def test_misc(self):
comm = comm_tcp.Comm(host=core_device) for f in self.execute, _run_on_host:
try: uut = f(_Misc)
coredev = core.Core(comm=comm)
uut = _Misc(core=coredev)
uut.run()
self.assertEqual(uut.half_input, 42) self.assertEqual(uut.half_input, 42)
self.assertEqual(uut.decimal_fraction, Fraction("1.2")) self.assertEqual(uut.decimal_fraction, Fraction("1.2"))
self.assertEqual(uut.acc, sum(uut.al)) self.assertEqual(uut.acc, sum(uut.al))
self.assertEqual(uut.list_copy_in, uut.list_copy_out) self.assertEqual(uut.list_copy_in, uut.list_copy_out)
finally:
comm.close()
def test_pulses(self): def test_pulses(self):
l_device, l_host = [], [] l_device, l_host = [], []
_run_on_device(_Pulses, output_list=l_device) self.execute(_Pulses, output_list=l_device)
_run_on_host(_Pulses, output_list=l_host) _run_on_host(_Pulses, output_list=l_host)
l_host = sorted(l_host, key=itemgetter(1)) l_host = sorted(l_host, key=itemgetter(1))
for channel in "a", "b", "c", "d": for channel in "a", "b", "c", "d":
@ -229,115 +209,14 @@ class ExecutionCase(unittest.TestCase):
def test_exceptions(self): def test_exceptions(self):
t_device, t_host = [], [] t_device, t_host = [], []
with self.assertRaises(IndexError): with self.assertRaises(IndexError):
_run_on_device(_Exceptions, trace=t_device) self.execute(_Exceptions, trace=t_device)
with self.assertRaises(IndexError): with self.assertRaises(IndexError):
_run_on_host(_Exceptions, trace=t_host) _run_on_host(_Exceptions, trace=t_host)
self.assertEqual(t_device, t_host) self.assertEqual(t_device, t_host)
def test_rpc_exceptions(self): def test_rpc_exceptions(self):
comm = comm_tcp.Comm(host=core_device) for f in self.execute, _run_on_host:
try:
uut = _RPCExceptions(core=core.Core(comm=comm))
with self.assertRaises(_MyException): with self.assertRaises(_MyException):
uut.do_not_catch() f(_RPCExceptions, catch=False)
uut.catch() uut = self.execute(_RPCExceptions, catch=True)
self.assertTrue(uut.success) self.assertTrue(uut.success)
finally:
comm.close()
def test_watchdog(self):
with self.assertRaises(IOError):
_run_on_device(_Watchdog)
class _RTIOLoopback(AutoDB):
class DBKeys:
core = Device()
io = Device()
npulses = Argument()
def report(self, n):
self.result = n
@kernel
def run(self):
self.io.output()
delay(1*us)
with parallel:
self.io.gate_rising(10*us)
with sequential:
for i in range(self.npulses):
delay(25*ns)
self.io.pulse(25*ns)
self.report(self.io.count())
class _RTIOUnderflow(AutoDB):
class DBKeys:
core = Device()
o = Device()
@kernel
def run(self):
while True:
delay(25*ns)
self.o.pulse(25*ns)
class _RTIOSequenceError(AutoDB):
class DBKeys:
core = Device()
o = Device()
@kernel
def run(self):
t = now_mu()
self.o.pulse(25*us)
at_mu(t)
self.o.pulse(25*us)
@unittest.skipUnless(core_device, "no hardware")
class RTIOCase(unittest.TestCase):
# Connect channels 0 and 1 together for this test
# (C11 and C13 on Papilio Pro)
def test_loopback(self):
npulses = 4
comm = comm_tcp.Comm(host=core_device)
try:
coredev = core.Core(comm=comm)
uut = _RTIOLoopback(
core=coredev,
io=ttl.TTLInOut(core=coredev, channel=0),
npulses=npulses
)
uut.run()
self.assertEqual(uut.result, npulses)
finally:
comm.close()
def test_underflow(self):
comm = comm_tcp.Comm(host=core_device)
try:
coredev = core.Core(comm=comm)
uut = _RTIOUnderflow(
core=coredev,
o=ttl.TTLOut(core=coredev, channel=2)
)
with self.assertRaises(runtime_exceptions.RTIOUnderflow):
uut.run()
finally:
comm.close()
def test_sequence_error(self):
comm = comm_tcp.Comm(host=core_device)
try:
coredev = core.Core(comm=comm)
uut = _RTIOSequenceError(
core=coredev,
o=ttl.TTLOut(core=coredev, channel=2)
)
with self.assertRaises(runtime_exceptions.RTIOSequenceError):
uut.run()
finally:
comm.close()