artiq/artiq/test/coredevice/test_analyzer.py

78 lines
2.6 KiB
Python
Raw Normal View History

from artiq.experiment import *
from artiq.coredevice.comm_analyzer import (decode_dump, StoppedMessage,
OutputMessage, InputMessage,
2017-02-27 19:19:46 +08:00
_extract_log_chars, get_analyzer_dump)
2015-12-24 19:25:29 +08:00
from artiq.test.hardware_testbench import ExperimentCase
class CreateTTLPulse(EnvExperiment):
def build(self):
self.setattr_device("core")
self.setattr_device("loop_in")
self.setattr_device("loop_out")
2015-12-24 19:25:29 +08:00
@kernel
def initialize_io(self):
2016-06-29 02:37:39 +08:00
self.core.reset()
self.loop_in.input()
2016-04-14 22:57:16 +08:00
self.loop_out.off()
@kernel
def run(self):
self.core.break_realtime()
with parallel:
with sequential:
delay_mu(100)
self.loop_out.pulse_mu(1000)
self.loop_in.count(self.loop_in.gate_both_mu(1200))
2015-12-24 19:25:29 +08:00
class WriteLog(EnvExperiment):
def build(self):
self.setattr_device("core")
@kernel
def run(self):
2016-06-29 02:37:39 +08:00
self.core.reset()
rtio_log("foo", 32)
2015-12-24 19:25:29 +08:00
class AnalyzerTest(ExperimentCase):
def test_ttl_pulse(self):
2017-05-22 15:45:45 +08:00
core_host = self.device_mgr.get_desc("core")["arguments"]["host"]
2015-12-24 19:25:29 +08:00
exp = self.create(CreateTTLPulse)
exp.initialize_io()
2017-02-27 19:19:46 +08:00
get_analyzer_dump(core_host) # clear analyzer buffer
2015-12-24 19:25:29 +08:00
exp.run()
2017-02-27 19:19:46 +08:00
dump = decode_dump(get_analyzer_dump(core_host))
self.assertIsInstance(dump.messages[-1], StoppedMessage)
output_messages = [msg for msg in dump.messages
if isinstance(msg, OutputMessage)
and msg.address == 0]
self.assertEqual(len(output_messages), 2)
2015-12-24 19:26:24 +08:00
self.assertEqual(
abs(output_messages[0].timestamp - output_messages[1].timestamp),
2015-12-24 19:26:24 +08:00
1000)
input_messages = [msg for msg in dump.messages
if isinstance(msg, InputMessage)]
self.assertEqual(len(input_messages), 2)
# on Kasli systems, this has to go through the isolated DIO card
self.assertAlmostEqual(
abs(input_messages[0].timestamp - input_messages[1].timestamp),
1000, delta=4)
def test_rtio_log(self):
2017-05-22 15:45:45 +08:00
core_host = self.device_mgr.get_desc("core")["arguments"]["host"]
exp = self.create(WriteLog)
2017-02-27 19:19:46 +08:00
get_analyzer_dump(core_host) # clear analyzer buffer
exp.run()
2017-02-27 19:19:46 +08:00
dump = decode_dump(get_analyzer_dump(core_host))
log = "".join([_extract_log_chars(msg.data)
for msg in dump.messages
if isinstance(msg, OutputMessage) and msg.channel == dump.log_channel])
2017-11-03 00:52:53 +08:00
self.assertEqual(log, "foo\x1E32\x1D")