forked from M-Labs/artiq
test: add DMA test that checks the analyzer trace.
This commit is contained in:
parent
6caab4d10b
commit
7eb368fd5d
|
@ -8,6 +8,8 @@ from math import sqrt
|
||||||
from artiq.experiment import *
|
from artiq.experiment import *
|
||||||
from artiq.test.hardware_testbench import ExperimentCase
|
from artiq.test.hardware_testbench import ExperimentCase
|
||||||
from artiq.coredevice import exceptions
|
from artiq.coredevice import exceptions
|
||||||
|
from artiq.coredevice.comm_analyzer import (StoppedMessage, OutputMessage, InputMessage,
|
||||||
|
decode_dump, get_analyzer_dump)
|
||||||
|
|
||||||
|
|
||||||
artiq_low_latency = os.getenv("ARTIQ_LOW_LATENCY")
|
artiq_low_latency = os.getenv("ARTIQ_LOW_LATENCY")
|
||||||
|
@ -467,20 +469,21 @@ class _DMA(EnvExperiment):
|
||||||
def build(self, trace_name="foobar"):
|
def build(self, trace_name="foobar"):
|
||||||
self.setattr_device("core")
|
self.setattr_device("core")
|
||||||
self.setattr_device("core_dma")
|
self.setattr_device("core_dma")
|
||||||
self.setattr_device("ttl0")
|
self.setattr_device("ttl1")
|
||||||
self.trace_name = trace_name
|
self.trace_name = trace_name
|
||||||
|
|
||||||
@kernel
|
@kernel
|
||||||
def record(self):
|
def record(self):
|
||||||
with self.core_dma.record(self.trace_name):
|
with self.core_dma.record(self.trace_name):
|
||||||
delay(100*ns)
|
delay(100*ns)
|
||||||
self.ttl0.on()
|
self.ttl1.on()
|
||||||
delay(100*ns)
|
delay(100*ns)
|
||||||
self.ttl0.off()
|
self.ttl1.off()
|
||||||
|
|
||||||
@kernel
|
@kernel
|
||||||
def replay(self):
|
def replay(self):
|
||||||
self.core.break_realtime()
|
self.core.break_realtime()
|
||||||
|
delay(100*ms)
|
||||||
self.core_dma.replay(self.trace_name)
|
self.core_dma.replay(self.trace_name)
|
||||||
|
|
||||||
@kernel
|
@kernel
|
||||||
|
@ -509,4 +512,24 @@ class DMATest(ExperimentCase):
|
||||||
with self.assertRaises(exceptions.DMAError):
|
with self.assertRaises(exceptions.DMAError):
|
||||||
exp.nested()
|
exp.nested()
|
||||||
|
|
||||||
# TODO: check replay against analyzer
|
def test_dma_trace(self):
|
||||||
|
core_host = self.device_mgr.get_desc("comm")["arguments"]["host"]
|
||||||
|
|
||||||
|
exp = self.create(_DMA)
|
||||||
|
exp.record()
|
||||||
|
get_analyzer_dump(core_host) # clear analyzer buffer
|
||||||
|
exp.replay()
|
||||||
|
|
||||||
|
dump = decode_dump(get_analyzer_dump(core_host))
|
||||||
|
self.assertEqual(len(dump.messages), 3)
|
||||||
|
self.assertIsInstance(dump.messages[-1], StoppedMessage)
|
||||||
|
self.assertIsInstance(dump.messages[0], OutputMessage)
|
||||||
|
self.assertEqual(dump.messages[0].channel, 1)
|
||||||
|
self.assertEqual(dump.messages[0].address, 0)
|
||||||
|
self.assertEqual(dump.messages[0].data, 1)
|
||||||
|
self.assertIsInstance(dump.messages[1], OutputMessage)
|
||||||
|
self.assertEqual(dump.messages[1].channel, 1)
|
||||||
|
self.assertEqual(dump.messages[1].address, 0)
|
||||||
|
self.assertEqual(dump.messages[1].data, 0)
|
||||||
|
self.assertEqual(dump.messages[1].timestamp -
|
||||||
|
dump.messages[0].timestamp, 100)
|
||||||
|
|
Loading…
Reference in New Issue