1
0
forked from M-Labs/artiq
artiq/artiq/test/test_rpctool.py

40 lines
1.1 KiB
Python

import os
import sys
import asyncio
import unittest
from artiq.protocols.pc_rpc import Server
class Target:
def output_value(self):
return 4125380
class TestRPCTool(unittest.TestCase):
async def check_value(self):
proc = await asyncio.create_subprocess_exec(
sys.executable, "-m", "artiq.frontend.artiq_rpctool", "::1", "7777", "call", "output_value",
stdout = asyncio.subprocess.PIPE)
(value, err) = await proc.communicate()
self.assertEqual(value.decode('ascii').rstrip(), '4125380')
await proc.wait()
async def do_test(self):
server = Server({"target": Target()})
await server.start("::1", 7777)
await self.check_value()
await server.stop()
def test_rpc(self):
if os.name == "nt":
loop = asyncio.ProactorEventLoop()
else:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self.do_test())
finally:
loop.close()