artiq/artiq/test/sync_struct.py

83 lines
2.6 KiB
Python

import unittest
import asyncio
import numpy as np
from artiq.protocols import sync_struct
test_address = "::1"
test_port = 7777
@asyncio.coroutine
def write_test_data(test_dict):
test_values = [5, 2.1, None, True, False,
{"a": 5, 2: np.linspace(0, 10, 1)},
(4, 5), (10,), "ab\nx\"'"]
for i in range(10):
test_dict[str(i)] = i
for key, value in enumerate(test_values):
test_dict[key] = value
test_dict[1.5] = 1.5
test_dict["array"] = []
test_dict["array"].append(42)
test_dict["array"].insert(1, 1)
test_dict[100] = 0
test_dict[100] = 1
test_dict[101] = 1
test_dict.pop(101)
test_dict[102] = 1
del test_dict[102]
test_dict["finished"] = True
@asyncio.coroutine
def start_server(publisher_future, test_dict_future):
test_dict = sync_struct.Notifier(dict())
publisher = sync_struct.Publisher(
{"test": test_dict})
yield from publisher.start(test_address, test_port)
publisher_future.set_result(publisher)
test_dict_future.set_result(test_dict)
class SyncStructCase(unittest.TestCase):
def init_test_dict(self, init):
self.test_dict = init
return init
def notify(self, mod):
if ((mod["action"] == "init" and "finished" in mod["struct"])
or (mod["action"] == "setitem" and mod["key"] == "finished")):
self.receiving_done.set()
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def test_recv(self):
loop = self.loop
self.receiving_done = asyncio.Event()
publisher = asyncio.Future()
test_dict = asyncio.Future()
asyncio.async(start_server(publisher, test_dict))
loop.run_until_complete(publisher)
loop.run_until_complete(test_dict)
self.publisher = publisher.result()
test_dict = test_dict.result()
test_vector = dict()
loop.run_until_complete(write_test_data(test_vector))
asyncio.async(write_test_data(test_dict))
self.subscriber = sync_struct.Subscriber("test", self.init_test_dict,
self.notify)
loop.run_until_complete(self.subscriber.connect(test_address,
test_port))
loop.run_until_complete(self.receiving_done.wait())
self.assertEqual(self.test_dict, test_vector)
self.loop.run_until_complete(self.subscriber.close())
self.loop.run_until_complete(self.publisher.stop())
def tearDown(self):
self.loop.close()