2
0
mirror of https://github.com/m-labs/artiq.git synced 2025-01-09 10:33:35 +08:00
artiq/artiq/coredevice/comm_moninj.py
2022-02-04 13:51:19 +08:00

101 lines
3.2 KiB
Python

import asyncio
import logging
import struct
from enum import Enum
from .comm import set_keepalive
__all__ = ["TTLProbe", "TTLOverride", "CommMonInj"]
logger = logging.getLogger(__name__)
class TTLProbe(Enum):
level = 0
oe = 1
class TTLOverride(Enum):
en = 0
level = 1
oe = 2
class CommMonInj:
def __init__(self, monitor_cb, injection_status_cb, disconnect_cb=None):
self.monitor_cb = monitor_cb
self.injection_status_cb = injection_status_cb
self.disconnect_cb = disconnect_cb
async def connect(self, host, port=1383):
self._reader, self._writer = await asyncio.open_connection(host, port)
set_keepalive(self._writer.transport.get_extra_info('socket'), 1, 1, 3)
try:
self._writer.write(b"ARTIQ moninj\n")
# get device endian
endian = await self._reader.read(1)
if endian == b"e":
self.endian = "<"
elif endian == b"E":
self.endian = ">"
else:
raise IOError("Incorrect reply from device: expected e/E.")
self._receive_task = asyncio.ensure_future(self._receive_cr())
except:
self._writer.close()
del self._reader
del self._writer
raise
async def close(self):
self.disconnect_cb = None
try:
self._receive_task.cancel()
try:
await asyncio.wait_for(self._receive_task, None)
except asyncio.CancelledError:
pass
finally:
self._writer.close()
del self._reader
del self._writer
def monitor_probe(self, enable, channel, probe):
packet = struct.pack(self.endian + "bblb", 0, enable, channel, probe)
self._writer.write(packet)
def monitor_injection(self, enable, channel, overrd):
packet = struct.pack(self.endian + "bblb", 3, enable, channel, overrd)
self._writer.write(packet)
def inject(self, channel, override, value):
packet = struct.pack(self.endian + "blbb", 1, channel, override, value)
self._writer.write(packet)
def get_injection_status(self, channel, override):
packet = struct.pack(self.endian + "blb", 2, channel, override)
self._writer.write(packet)
async def _receive_cr(self):
try:
while True:
ty = await self._reader.read(1)
if not ty:
return
if ty == b"\x00":
payload = await self._reader.readexactly(9)
channel, probe, value = struct.unpack(
self.endian + "lbl", payload)
self.monitor_cb(channel, probe, value)
elif ty == b"\x01":
payload = await self._reader.readexactly(6)
channel, override, value = struct.unpack(
self.endian + "lbb", payload)
self.injection_status_cb(channel, override, value)
else:
raise ValueError("Unknown packet type", ty)
finally:
if self.disconnect_cb is not None:
self.disconnect_cb()