2017-02-27 15:56:58 +08:00
|
|
|
import asyncio
|
|
|
|
|
2017-02-27 18:44:32 +08:00
|
|
|
from artiq.coredevice.comm_moninj import *
|
2017-02-27 15:56:58 +08:00
|
|
|
from artiq.test.hardware_testbench import ExperimentCase
|
|
|
|
|
|
|
|
|
|
|
|
class MonInjTest(ExperimentCase):
|
|
|
|
def test_moninj(self):
|
2017-05-22 15:45:45 +08:00
|
|
|
core_host = self.device_mgr.get_desc("core")["arguments"]["host"]
|
2017-02-27 15:56:58 +08:00
|
|
|
loop_out_channel = self.device_mgr.get_desc("loop_out")["arguments"]["channel"]
|
|
|
|
loop_in_channel = self.device_mgr.get_desc("loop_in")["arguments"]["channel"]
|
|
|
|
|
|
|
|
notifications = []
|
|
|
|
injection_statuses = []
|
|
|
|
|
|
|
|
def monitor_cb(channel, probe, value):
|
|
|
|
notifications.append((channel, probe, value))
|
|
|
|
|
|
|
|
def injection_status_cb(channel, override, value):
|
|
|
|
injection_statuses.append((channel, override, value))
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
try:
|
2017-02-27 18:37:30 +08:00
|
|
|
moninj_comm = CommMonInj(monitor_cb, injection_status_cb)
|
2017-02-27 15:56:58 +08:00
|
|
|
loop.run_until_complete(moninj_comm.connect(core_host))
|
|
|
|
try:
|
|
|
|
moninj_comm.get_injection_status(loop_out_channel, TTLOverride.en.value)
|
|
|
|
moninj_comm.monitor(True, loop_in_channel, TTLProbe.level.value)
|
|
|
|
moninj_comm.inject(loop_out_channel, TTLOverride.level.value, 0)
|
|
|
|
moninj_comm.inject(loop_out_channel, TTLOverride.level.en.value, 1)
|
|
|
|
loop.run_until_complete(asyncio.sleep(0.5))
|
|
|
|
moninj_comm.get_injection_status(loop_out_channel, TTLOverride.en.value)
|
|
|
|
moninj_comm.inject(loop_out_channel, TTLOverride.level.value, 1)
|
|
|
|
loop.run_until_complete(asyncio.sleep(0.5))
|
|
|
|
moninj_comm.inject(loop_out_channel, TTLOverride.level.value, 0)
|
|
|
|
loop.run_until_complete(asyncio.sleep(0.5))
|
|
|
|
moninj_comm.inject(loop_out_channel, TTLOverride.level.en.value, 0)
|
|
|
|
loop.run_until_complete(moninj_comm._writer.drain())
|
|
|
|
finally:
|
|
|
|
loop.run_until_complete(moninj_comm.close())
|
|
|
|
finally:
|
|
|
|
loop.close()
|
|
|
|
|
|
|
|
if notifications[0][2] == 1:
|
|
|
|
notifications = notifications[1:]
|
|
|
|
self.assertEqual(notifications, [
|
|
|
|
(loop_in_channel, TTLProbe.level.value, 0),
|
|
|
|
(loop_in_channel, TTLProbe.level.value, 1),
|
|
|
|
(loop_in_channel, TTLProbe.level.value, 0)
|
|
|
|
])
|
|
|
|
self.assertEqual(injection_statuses, [
|
|
|
|
(loop_out_channel, TTLOverride.en.value, 0),
|
|
|
|
(loop_out_channel, TTLOverride.en.value, 1)
|
|
|
|
])
|