From d16073fcaaa2d1024dfa6153f36a65b57a2216ff Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Mon, 27 Feb 2017 15:56:58 +0800 Subject: [PATCH] test: add moninj unittest --- artiq/test/coredevice/test_moninj.py | 54 ++++++++++++++++++++++++++++ 1 file changed, 54 insertions(+) create mode 100644 artiq/test/coredevice/test_moninj.py diff --git a/artiq/test/coredevice/test_moninj.py b/artiq/test/coredevice/test_moninj.py new file mode 100644 index 000000000..23e27e098 --- /dev/null +++ b/artiq/test/coredevice/test_moninj.py @@ -0,0 +1,54 @@ +import asyncio + +from artiq.coredevice.moninj import * +from artiq.test.hardware_testbench import ExperimentCase + + +class MonInjTest(ExperimentCase): + def test_moninj(self): + core_host = self.device_mgr.get_desc("comm")["arguments"]["host"] + loop_out_channel = self.device_mgr.get_desc("loop_out")["arguments"]["channel"] + loop_in_channel = self.device_mgr.get_desc("loop_in")["arguments"]["channel"] + + notifications = [] + injection_statuses = [] + + def monitor_cb(channel, probe, value): + notifications.append((channel, probe, value)) + + def injection_status_cb(channel, override, value): + injection_statuses.append((channel, override, value)) + + loop = asyncio.get_event_loop() + try: + moninj_comm = MonInjComm(monitor_cb, injection_status_cb) + loop.run_until_complete(moninj_comm.connect(core_host)) + try: + moninj_comm.get_injection_status(loop_out_channel, TTLOverride.en.value) + moninj_comm.monitor(True, loop_in_channel, TTLProbe.level.value) + moninj_comm.inject(loop_out_channel, TTLOverride.level.value, 0) + moninj_comm.inject(loop_out_channel, TTLOverride.level.en.value, 1) + loop.run_until_complete(asyncio.sleep(0.5)) + moninj_comm.get_injection_status(loop_out_channel, TTLOverride.en.value) + moninj_comm.inject(loop_out_channel, TTLOverride.level.value, 1) + loop.run_until_complete(asyncio.sleep(0.5)) + moninj_comm.inject(loop_out_channel, TTLOverride.level.value, 0) + loop.run_until_complete(asyncio.sleep(0.5)) + moninj_comm.inject(loop_out_channel, TTLOverride.level.en.value, 0) + loop.run_until_complete(moninj_comm._writer.drain()) + finally: + loop.run_until_complete(moninj_comm.close()) + finally: + loop.close() + + if notifications[0][2] == 1: + notifications = notifications[1:] + self.assertEqual(notifications, [ + (loop_in_channel, TTLProbe.level.value, 0), + (loop_in_channel, TTLProbe.level.value, 1), + (loop_in_channel, TTLProbe.level.value, 0) + ]) + self.assertEqual(injection_statuses, [ + (loop_out_channel, TTLOverride.en.value, 0), + (loop_out_channel, TTLOverride.en.value, 1) + ])