From bbb2c751947386e2cf3ba913c0fc41903e89e6be Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 18 Mar 2022 17:01:51 +0800 Subject: [PATCH] add aqctl_moninj_proxy --- artiq/frontend/aqctl_moninj_proxy.py | 225 +++++++++++++++++++++++++++ artiq/test/test_frontends.py | 2 +- doc/manual/default_network_ports.rst | 4 +- doc/manual/utilities.rst | 8 + setup.py | 1 + 5 files changed, 238 insertions(+), 2 deletions(-) create mode 100755 artiq/frontend/aqctl_moninj_proxy.py diff --git a/artiq/frontend/aqctl_moninj_proxy.py b/artiq/frontend/aqctl_moninj_proxy.py new file mode 100755 index 000000000..760be7020 --- /dev/null +++ b/artiq/frontend/aqctl_moninj_proxy.py @@ -0,0 +1,225 @@ +#!/usr/bin/env python3 + +import argparse +import logging +import asyncio +import struct +from enum import Enum + +from sipyco.asyncio_tools import AsyncioServer +from sipyco.pc_rpc import Server +from sipyco import common_args + +from artiq.coredevice.comm_moninj import CommMonInj + + +logger = logging.getLogger(__name__) + + +class EventType(Enum): + PROBE = 0 + INJECTION = 1 + + +class MonitorMux: + def __init__(self): + self.listeners = dict() + self.comm_moninj = None + + def _monitor(self, listener, event): + try: + listeners = self.listeners[event] + except KeyError: + listeners = [] + self.listeners[event] = listeners + if event[0] == EventType.PROBE: + logger.debug("starting monitoring channel %d probe %d", event[1], event[2]) + self.comm_moninj.monitor_probe(True, event[1], event[2]) + elif event[0] == EventType.INJECTION: + logger.debug("starting monitoring channel %d injection %d", event[1], event[2]) + self.comm_moninj.monitor_injection(True, event[1], event[2]) + else: + raise ValueError + if listener in listeners: + logger.warning("listener trying to subscribe twice to %s", event) + else: + listeners.append(listener) + + def _unmonitor(self, listener, event): + try: + listeners = self.listeners[event] + except KeyError: + listeners = [] + try: + listeners.remove(listener) + except ValueError: + logger.warning("listener trying to unsubscribe from %s, but was not subscribed", event) + return + if not listeners: + del self.listeners[event] + if event[0] == EventType.PROBE: + logger.debug("stopped monitoring channel %d probe %d", event[1], event[2]) + self.comm_moninj.monitor_probe(False, event[1], event[2]) + elif event[0] == EventType.INJECTION: + logger.debug("stopped monitoring channel %d injection %d", event[1], event[2]) + self.comm_moninj.monitor_injection(False, event[1], event[2]) + else: + raise ValueError + + def monitor_probe(self, listener, enable, channel, probe): + if enable: + self._monitor(listener, (EventType.PROBE, channel, probe)) + else: + self._unmonitor(listener, (EventType.PROBE, channel, probe)) + + def monitor_injection(self, listener, enable, channel, overrd): + if enable: + self._monitor(listener, (EventType.INJECTION, channel, overrd)) + else: + self._unmonitor(listener, (EventType.INJECTION, channel, overrd)) + + def _event_cb(self, event, value): + try: + listeners = self.listeners[event] + except KeyError: + # We may still receive buffered events shortly after an unsubscription. They can be ignored. + logger.debug("received event %s but no listener", event) + listeners = [] + for listener in listeners: + if event[0] == EventType.PROBE: + listener.monitor_cb(event[1], event[2], value) + elif event[1] == EventType.MONITOR: + listener.injection_status_cb(event[1], event[2], value) + else: + raise ValueError + + def monitor_cb(self, channel, probe, value): + self._event_cb((EventType.PROBE, channel, probe), value) + + def injection_status_cb(self, channel, override, value): + self._event_cb((EventType.INJECTION, channel, override), value) + + def remove_listener(self, listener): + for event, listeners in list(self.listeners.items()): + try: + listeners.remove(listener) + except ValueError: + pass + if not listeners: + del self.listeners[event] + if event[0] == EventType.PROBE: + logger.debug("stopped monitoring channel %d probe %d", event[1], event[2]) + self.comm_moninj.monitor_probe(False, event[1], event[2]) + elif event[0] == EventType.INJECTION: + logger.debug("stopped monitoring channel %d injection %d", event[1], event[2]) + self.comm_moninj.monitor_injection(False, event[1], event[2]) + else: + raise ValueError + + +class ProxyConnection: + def __init__(self, monitor_mux, reader, writer): + self.monitor_mux = monitor_mux + self.reader = reader + self.writer = writer + + async def handle(self): + try: + while True: + ty = await self.reader.read(1) + if not ty: + return + if ty == "\x00": # MonitorProbe + packet = await self.reader.readexactly(6) + enable, channel, probe = struct.unpack("