forked from M-Labs/artiq
1
0
Fork 0

applets: Allow wildcard subscription to all datasets matching prefix via IPC

This allows ndscan v0.3+ to use the IPC interface for efficiency;
previously, the non-upstreamed RID dataset namespace feature allowed
the applets to somewhat efficient subscribe directly to the master
process via the socket interface.
This commit is contained in:
David Nadlinger 2022-06-16 19:18:15 +01:00 committed by Sébastien Bourdeauducq
parent 4c42f65909
commit 2d6fc154db
2 changed files with 29 additions and 7 deletions

View File

@ -64,9 +64,10 @@ class AppletIPCClient(AsyncioChildComm):
exc_info=True)
self.close_cb()
def subscribe(self, datasets, init_cb, mod_cb):
def subscribe(self, datasets, init_cb, mod_cb, dataset_prefixes=[]):
self.write_pyon({"action": "subscribe",
"datasets": datasets})
"datasets": datasets,
"dataset_prefixes": dataset_prefixes})
self.init_cb = init_cb
self.mod_cb = mod_cb
asyncio.ensure_future(self.listen())
@ -113,6 +114,9 @@ class SimpleApplet:
self.embed = os.getenv("ARTIQ_APPLET_EMBED")
self.datasets = {getattr(self.args, arg.replace("-", "_"))
for arg in self.dataset_args}
# Optional prefixes (dataset sub-trees) to match subscriptions against;
# currently only used by out-of-tree subclasses (ndscan).
self.dataset_prefixes = []
def qasync_init(self):
app = QtWidgets.QApplication([])
@ -162,6 +166,14 @@ class SimpleApplet:
self.data = data
return data
def is_dataset_subscribed(self, key):
if key in self.datasets:
return True
for prefix in self.dataset_prefixes:
if key.startswith(prefix):
return True
return False
def filter_mod(self, mod):
if self.embed is not None:
# the parent already filters for us
@ -170,9 +182,9 @@ class SimpleApplet:
if mod["action"] == "init":
return True
if mod["path"]:
return mod["path"][0] in self.datasets
return self.is_dataset_subscribed(mod["path"][0])
elif mod["action"] in {"setitem", "delitem"}:
return mod["key"] in self.datasets
return self.is_dataset_subscribed(mod["key"])
else:
return False

View File

@ -25,6 +25,7 @@ class AppletIPCServer(AsyncioParentComm):
AsyncioParentComm.__init__(self)
self.datasets_sub = datasets_sub
self.datasets = set()
self.dataset_prefixes = []
def write_pyon(self, obj):
self.write(pyon.encode(obj).encode() + b"\n")
@ -33,8 +34,16 @@ class AppletIPCServer(AsyncioParentComm):
line = await self.readline()
return pyon.decode(line.decode())
def _is_dataset_subscribed(self, key):
if key in self.datasets:
return True
for prefix in self.dataset_prefixes:
if key.startswith(prefix):
return True
return False
def _synthesize_init(self, data):
struct = {k: v for k, v in data.items() if k in self.datasets}
struct = {k: v for k, v in data.items() if self._is_dataset_subscribed(k)}
return {"action": "init",
"struct": struct}
@ -43,10 +52,10 @@ class AppletIPCServer(AsyncioParentComm):
mod = self._synthesize_init(mod["struct"])
else:
if mod["path"]:
if mod["path"][0] not in self.datasets:
if not self._is_dataset_subscribed(mod["path"][0]):
return
elif mod["action"] in {"setitem", "delitem"}:
if mod["key"] not in self.datasets:
if not self._is_dataset_subscribed(mod["key"]):
return
self.write_pyon({"action": "mod", "mod": mod})
@ -64,6 +73,7 @@ class AppletIPCServer(AsyncioParentComm):
fix_initial_size_cb()
elif action == "subscribe":
self.datasets = obj["datasets"]
self.dataset_prefixes = obj["dataset_prefixes"]
if self.datasets_sub.model is not None:
mod = self._synthesize_init(
self.datasets_sub.model.backing_store)