forked from M-Labs/artiq
applets: Allow wildcard subscription to all datasets matching prefix via IPC
This allows ndscan v0.3+ to use the IPC interface for efficiency; previously, the non-upstreamed RID dataset namespace feature allowed the applets to somewhat efficient subscribe directly to the master process via the socket interface.
This commit is contained in:
parent
4c42f65909
commit
2d6fc154db
|
@ -64,9 +64,10 @@ class AppletIPCClient(AsyncioChildComm):
|
|||
exc_info=True)
|
||||
self.close_cb()
|
||||
|
||||
def subscribe(self, datasets, init_cb, mod_cb):
|
||||
def subscribe(self, datasets, init_cb, mod_cb, dataset_prefixes=[]):
|
||||
self.write_pyon({"action": "subscribe",
|
||||
"datasets": datasets})
|
||||
"datasets": datasets,
|
||||
"dataset_prefixes": dataset_prefixes})
|
||||
self.init_cb = init_cb
|
||||
self.mod_cb = mod_cb
|
||||
asyncio.ensure_future(self.listen())
|
||||
|
@ -113,6 +114,9 @@ class SimpleApplet:
|
|||
self.embed = os.getenv("ARTIQ_APPLET_EMBED")
|
||||
self.datasets = {getattr(self.args, arg.replace("-", "_"))
|
||||
for arg in self.dataset_args}
|
||||
# Optional prefixes (dataset sub-trees) to match subscriptions against;
|
||||
# currently only used by out-of-tree subclasses (ndscan).
|
||||
self.dataset_prefixes = []
|
||||
|
||||
def qasync_init(self):
|
||||
app = QtWidgets.QApplication([])
|
||||
|
@ -162,6 +166,14 @@ class SimpleApplet:
|
|||
self.data = data
|
||||
return data
|
||||
|
||||
def is_dataset_subscribed(self, key):
|
||||
if key in self.datasets:
|
||||
return True
|
||||
for prefix in self.dataset_prefixes:
|
||||
if key.startswith(prefix):
|
||||
return True
|
||||
return False
|
||||
|
||||
def filter_mod(self, mod):
|
||||
if self.embed is not None:
|
||||
# the parent already filters for us
|
||||
|
@ -170,9 +182,9 @@ class SimpleApplet:
|
|||
if mod["action"] == "init":
|
||||
return True
|
||||
if mod["path"]:
|
||||
return mod["path"][0] in self.datasets
|
||||
return self.is_dataset_subscribed(mod["path"][0])
|
||||
elif mod["action"] in {"setitem", "delitem"}:
|
||||
return mod["key"] in self.datasets
|
||||
return self.is_dataset_subscribed(mod["key"])
|
||||
else:
|
||||
return False
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ class AppletIPCServer(AsyncioParentComm):
|
|||
AsyncioParentComm.__init__(self)
|
||||
self.datasets_sub = datasets_sub
|
||||
self.datasets = set()
|
||||
self.dataset_prefixes = []
|
||||
|
||||
def write_pyon(self, obj):
|
||||
self.write(pyon.encode(obj).encode() + b"\n")
|
||||
|
@ -33,8 +34,16 @@ class AppletIPCServer(AsyncioParentComm):
|
|||
line = await self.readline()
|
||||
return pyon.decode(line.decode())
|
||||
|
||||
def _is_dataset_subscribed(self, key):
|
||||
if key in self.datasets:
|
||||
return True
|
||||
for prefix in self.dataset_prefixes:
|
||||
if key.startswith(prefix):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _synthesize_init(self, data):
|
||||
struct = {k: v for k, v in data.items() if k in self.datasets}
|
||||
struct = {k: v for k, v in data.items() if self._is_dataset_subscribed(k)}
|
||||
return {"action": "init",
|
||||
"struct": struct}
|
||||
|
||||
|
@ -43,10 +52,10 @@ class AppletIPCServer(AsyncioParentComm):
|
|||
mod = self._synthesize_init(mod["struct"])
|
||||
else:
|
||||
if mod["path"]:
|
||||
if mod["path"][0] not in self.datasets:
|
||||
if not self._is_dataset_subscribed(mod["path"][0]):
|
||||
return
|
||||
elif mod["action"] in {"setitem", "delitem"}:
|
||||
if mod["key"] not in self.datasets:
|
||||
if not self._is_dataset_subscribed(mod["key"]):
|
||||
return
|
||||
self.write_pyon({"action": "mod", "mod": mod})
|
||||
|
||||
|
@ -64,6 +73,7 @@ class AppletIPCServer(AsyncioParentComm):
|
|||
fix_initial_size_cb()
|
||||
elif action == "subscribe":
|
||||
self.datasets = obj["datasets"]
|
||||
self.dataset_prefixes = obj["dataset_prefixes"]
|
||||
if self.datasets_sub.model is not None:
|
||||
mod = self._synthesize_init(
|
||||
self.datasets_sub.model.backing_store)
|
||||
|
|
Loading…
Reference in New Issue