test: use new dataset API

This commit is contained in:
Sebastien Bourdeauducq 2015-10-12 19:20:04 +08:00
parent a83ffb3dce
commit a754d4b5f5
4 changed files with 45 additions and 39 deletions

View File

@ -15,7 +15,7 @@ class RTT(EnvExperiment):
self.setattr_device("ttl_inout")
def set_rtt(self, rtt):
self.set_result("rtt", rtt)
self.set_dataset("rtt", rtt)
@kernel
def run(self):
@ -39,7 +39,7 @@ class Loopback(EnvExperiment):
self.setattr_device("loop_out")
def set_rtt(self, rtt):
self.set_result("rtt", rtt)
self.set_dataset("rtt", rtt)
@kernel
def run(self):
@ -61,7 +61,7 @@ class ClockGeneratorLoopback(EnvExperiment):
self.setattr_device("loop_clock_out")
def set_count(self, count):
self.set_result("count", count)
self.set_dataset("count", count)
@kernel
def run(self):
@ -82,7 +82,7 @@ class PulseRate(EnvExperiment):
self.setattr_device("ttl_out")
def set_pulse_rate(self, pulse_rate):
self.set_result("pulse_rate", pulse_rate)
self.set_dataset("pulse_rate", pulse_rate)
@kernel
def run(self):
@ -118,7 +118,7 @@ class LoopbackCount(EnvExperiment):
self.setattr_argument("npulses")
def set_count(self, count):
self.set_result("count", count)
self.set_dataset("count", count)
@kernel
def run(self):
@ -176,7 +176,7 @@ class TimeKeepsRunning(EnvExperiment):
self.setattr_device("core")
def set_time_at_start(self, time_at_start):
self.set_result("time_at_start", time_at_start)
self.set_dataset("time_at_start", time_at_start)
@kernel
def run(self):
@ -193,34 +193,34 @@ class Handover(EnvExperiment):
def run(self):
self.get_now()
self.set_result("t1", self.time_at_start)
self.set_dataset("t1", self.time_at_start)
self.get_now()
self.set_result("t2", self.time_at_start)
self.set_dataset("t2", self.time_at_start)
class CoredeviceTest(ExperimentCase):
def test_rtt(self):
self.execute(RTT)
rtt = self.rdb.get("rtt")
rtt = self.dataset_mgr.get("rtt")
print(rtt)
self.assertGreater(rtt, 0*ns)
self.assertLess(rtt, 100*ns)
def test_loopback(self):
self.execute(Loopback)
rtt = self.rdb.get("rtt")
rtt = self.dataset_mgr.get("rtt")
print(rtt)
self.assertGreater(rtt, 0*ns)
self.assertLess(rtt, 50*ns)
def test_clock_generator_loopback(self):
self.execute(ClockGeneratorLoopback)
count = self.rdb.get("count")
count = self.dataset_mgr.get("count")
self.assertEqual(count, 10)
def test_pulse_rate(self):
self.execute(PulseRate)
rate = self.rdb.get("pulse_rate")
rate = self.dataset_mgr.get("pulse_rate")
print(rate)
self.assertGreater(rate, 100*ns)
self.assertLess(rate, 2500*ns)
@ -228,7 +228,7 @@ class CoredeviceTest(ExperimentCase):
def test_loopback_count(self):
npulses = 2
self.execute(LoopbackCount, npulses=npulses)
count = self.rdb.get("count")
count = self.dataset_mgr.get("count")
self.assertEqual(count, npulses)
def test_underflow(self):
@ -250,17 +250,18 @@ class CoredeviceTest(ExperimentCase):
def test_time_keeps_running(self):
self.execute(TimeKeepsRunning)
t1 = self.rdb.get("time_at_start")
t1 = self.dataset_mgr.get("time_at_start")
self.execute(TimeKeepsRunning)
t2 = self.rdb.get("time_at_start")
dead_time = mu_to_seconds(t2 - t1, self.dmgr.get("core"))
t2 = self.dataset_mgr.get("time_at_start")
dead_time = mu_to_seconds(t2 - t1, self.device_mgr.get("core"))
print(dead_time)
self.assertGreater(dead_time, 1*ms)
self.assertLess(dead_time, 300*ms)
def test_handover(self):
self.execute(Handover)
self.assertEqual(self.rdb.get("t1"), self.rdb.get("t2"))
self.assertEqual(self.dataset_mgr.get("t1"),
self.dataset_mgr.get("t2"))
class RPCTiming(EnvExperiment):
@ -283,14 +284,14 @@ class RPCTiming(EnvExperiment):
def run(self):
self.bench()
mean = sum(self.ts)/self.repeats
self.set_result("rpc_time_stddev", sqrt(
self.set_dataset("rpc_time_stddev", sqrt(
sum([(t - mean)**2 for t in self.ts])/self.repeats))
self.set_result("rpc_time_mean", mean)
self.set_dataset("rpc_time_mean", mean)
class RPCTest(ExperimentCase):
def test_rpc_timing(self):
self.execute(RPCTiming)
self.assertGreater(self.rdb.get("rpc_time_mean"), 100*ns)
self.assertLess(self.rdb.get("rpc_time_mean"), 15*ms)
self.assertLess(self.rdb.get("rpc_time_stddev"), 1*ms)
self.assertGreater(self.dataset_mgr.get("rpc_time_mean"), 100*ns)
self.assertLess(self.dataset_mgr.get("rpc_time_mean"), 15*ms)
self.assertLess(self.dataset_mgr.get("rpc_time_stddev"), 1*ms)

View File

@ -85,7 +85,7 @@ class _Pulses(EnvExperiment):
self.setattr_argument("output_list")
for name in "a", "b", "c", "d":
pl = _PulseLogger(*self.dbs(),
pl = _PulseLogger(*self.managers(),
output_list=self.output_list,
name=name)
setattr(self, name, pl)

View File

@ -1,3 +1,4 @@
# Copyright (C) 2015 M-Labs Limited
# Copyright (C) 2014, 2015 Robert Jordens <jordens@gmail.com>
import os
@ -6,8 +7,9 @@ import unittest
import logging
from artiq.language import *
from artiq.protocols.pyon import FlatFileDB
from artiq.master.worker_db import DeviceManager, ResultDB
from artiq.protocols import pyon
from artiq.master.databases import DeviceDB, DatasetDB
from artiq.master.worker_db import DeviceManager, DatasetManager
from artiq.frontend.artiq_run import DummyScheduler
@ -18,14 +20,14 @@ logger = logging.getLogger(__name__)
def get_from_ddb(*path, default="skip"):
if not artiq_root:
raise unittest.SkipTest("no ARTIQ_ROOT")
v = FlatFileDB(os.path.join(artiq_root, "ddb.pyon")).data
v = pyon.load_file(os.path.join(artiq_root, "device_db.pyon"))
try:
for p in path:
v = v[p]
return v.read
except KeyError:
if default == "skip":
raise unittest.SkipTest("ddb path {} not found".format(path))
raise unittest.SkipTest("device db path {} not found".format(path))
else:
return default
@ -33,11 +35,11 @@ def get_from_ddb(*path, default="skip"):
@unittest.skipUnless(artiq_root, "no ARTIQ_ROOT")
class ExperimentCase(unittest.TestCase):
def setUp(self):
self.ddb = FlatFileDB(os.path.join(artiq_root, "ddb.pyon"))
self.dmgr = DeviceManager(self.ddb,
self.device_db = DeviceDB(os.path.join(artiq_root, "device_db.pyon"))
self.dataset_db = DatasetDB(os.path.join(artiq_root, "dataset_db.pyon"))
self.device_mgr = DeviceManager(self.device_db,
virtual_devices={"scheduler": DummyScheduler()})
self.pdb = FlatFileDB(os.path.join(artiq_root, "pdb.pyon"))
self.rdb = ResultDB()
self.dataset_mgr = DatasetManager(self.dataset_db)
def execute(self, cls, **kwargs):
expid = {
@ -45,10 +47,10 @@ class ExperimentCase(unittest.TestCase):
"class_name": cls.__name__,
"arguments": kwargs
}
self.dmgr.virtual_devices["scheduler"].expid = expid
self.device_mgr.virtual_devices["scheduler"].expid = expid
try:
try:
exp = cls(self.dmgr, self.pdb, self.rdb, **kwargs)
exp = cls(self.device_mgr, self.dataset_mgr, **kwargs)
except KeyError as e:
# skip if ddb does not match requirements
raise unittest.SkipTest(*e.args)
@ -57,4 +59,4 @@ class ExperimentCase(unittest.TestCase):
exp.analyze()
return exp
finally:
self.dmgr.close_devices()
self.device_mgr.close_devices()

View File

@ -26,7 +26,8 @@ class BackgroundExperiment(EnvExperiment):
self.scheduler.pause()
sleep(0.2)
except TerminationRequested:
self.set_parameter("termination_ok", True)
self.set_dataset("termination_ok", True,
broadcast=True, save=False)
def _get_expid(name):
@ -108,13 +109,15 @@ class SchedulerCase(unittest.TestCase):
loop = self.loop
termination_ok = False
def check_termination(key, value):
def check_termination(mod):
nonlocal termination_ok
self.assertEqual(key, "termination_ok")
self.assertEqual(value, True)
self.assertEqual(
mod,
{"action": "setitem", "key": "termination_ok",
"value": (False, True), "path": []})
termination_ok = True
handlers = {
"set_parameter": check_termination
"update_dataset": check_termination
}
scheduler = Scheduler(0, handlers, None)