diff --git a/artiq/management/db.py b/artiq/management/db.py index ddd675fbc..26e559e08 100644 --- a/artiq/management/db.py +++ b/artiq/management/db.py @@ -14,10 +14,10 @@ class FlatFileDB: self.hooks = [] def save(self): - pyon.store_file(self.filename, self.data.backing_struct) + pyon.store_file(self.filename, self.data.read) def request(self, name): - return self.data.backing_struct[name] + return self.data.read[name] def set(self, name, value): self.data[name] = value @@ -40,12 +40,12 @@ class SimpleHistory: self.history = Notifier([]) def set(self, timestamp, name, value): - if len(self.history.backing_struct) >= self.depth: + if len(self.history.read) >= self.depth: del self.history[0] self.history.append((timestamp, name, value)) def delete(self, timestamp, name): - if len(self.history.backing_struct) >= self.depth: + if len(self.history.read) >= self.depth: del self.history[0] self.history.append((timestamp, name)) diff --git a/artiq/management/scheduler.py b/artiq/management/scheduler.py index 9d9448783..61f88b24b 100644 --- a/artiq/management/scheduler.py +++ b/artiq/management/scheduler.py @@ -20,8 +20,8 @@ class Scheduler: return r def new_prid(self): - prids = set(range(len(self.periodic.backing_struct) + 1)) - prids -= set(self.periodic.backing_struct.keys()) + prids = set(range(len(self.periodic.read) + 1)) + prids -= set(self.periodic.read.keys()) return next(iter(prids)) @asyncio.coroutine @@ -44,7 +44,7 @@ class Scheduler: def cancel_once(self, rid): idx = next(idx for idx, (qrid, _, _) - in enumerate(self.queue.backing_struct) + in enumerate(self.queue.read) if qrid == rid) if idx == 0: # Cannot cancel when already running @@ -75,7 +75,7 @@ class Scheduler: while True: min_next_run = None min_prid = None - for prid, params in self.periodic.backing_struct.items(): + for prid, params in self.periodic.read.items(): if min_next_run is None or params[0] < min_next_run: min_next_run = params[0] min_prid = prid @@ -89,7 +89,7 @@ class Scheduler: return min_next_run next_run, run_params, timeout, period = \ - self.periodic.backing_struct[min_prid] + self.periodic.read[min_prid] self.periodic[min_prid] = now + period, run_params, timeout, period rid = self.new_rid() @@ -101,8 +101,8 @@ class Scheduler: def _schedule(self): while True: next_periodic = yield from self._run_periodic() - if self.queue.backing_struct: - rid, run_params, timeout = self.queue.backing_struct[0] + if self.queue.read: + rid, run_params, timeout = self.queue.read[0] yield from self._run(rid, run_params, timeout) del self.queue[0] else: diff --git a/artiq/management/sync_struct.py b/artiq/management/sync_struct.py index 97a83e2c1..a479f3c10 100644 --- a/artiq/management/sync_struct.py +++ b/artiq/management/sync_struct.py @@ -1,4 +1,5 @@ import asyncio +from operator import getitem from artiq.management import pyon from artiq.management.tools import AsyncioServer @@ -52,56 +53,74 @@ class Subscriber: if action == "init": target = self.target_builder(obj["struct"]) - elif action == "append": - target.append(obj["x"]) - elif action == "insert": - target.insert(obj["i"], obj["x"]) - elif action == "pop": - target.pop(obj["i"]) - elif action == "setitem": - target.__setitem__(obj["key"], obj["value"]) - elif action == "delitem": - target.__delitem__(obj["key"]) + else: + for key in obj["path"]: + target = getitem(target, key) + if action == "append": + target.append(obj["x"]) + elif action == "insert": + target.insert(obj["i"], obj["x"]) + elif action == "pop": + target.pop(obj["i"]) + elif action == "setitem": + target.__setitem__(obj["key"], obj["value"]) + elif action == "delitem": + target.__delitem__(obj["key"]) if self.notify_cb is not None: self.notify_cb() class Notifier: - def __init__(self, backing_struct): - self.backing_struct = backing_struct - self.publisher = None + def __init__(self, backing_struct, publisher=None, path=[]): + self.read = backing_struct + self.publisher = publisher + self._backing_struct = backing_struct + self._path = path # Backing struct modification methods. # All modifications must go through them! def append(self, x): - self.backing_struct.append(x) + self._backing_struct.append(x) if self.publisher is not None: - self.publisher.publish(self, {"action": "append", "x": x}) + self.publisher.publish(self, {"action": "append", + "path": self._path, + "x": x}) def insert(self, i, x): - self.backing_struct.insert(i, x) + self._backing_struct.insert(i, x) if self.publisher is not None: - self.publisher.publish(self, {"action": "insert", "i": i, "x": x}) + self.publisher.publish(self, {"action": "insert", + "path": self._path, + "i": i, "x": x}) def pop(self, i=-1): - r = self.backing_struct.pop(i) + r = self._backing_struct.pop(i) if self.publisher is not None: - self.publisher.publish(self, {"action": "pop", "i": i}) + self.publisher.publish(self, {"action": "pop", + "path": self._path, + "i": i}) return r def __setitem__(self, key, value): - self.backing_struct.__setitem__(key, value) + self._backing_struct.__setitem__(key, value) if self.publisher is not None: self.publisher.publish(self, {"action": "setitem", + "path": self._path, "key": key, "value": value}) def __delitem__(self, key): - self.backing_struct.__delitem__(key) + self._backing_struct.__delitem__(key) if self.publisher is not None: - self.publisher.publish(self, {"action": "delitem", "key": key}) + self.publisher.publish(self, {"action": "delitem", + "path": self._path, + "key": key}) + + def __getitem__(self, key): + item = getitem(self._backing_struct, key) + return Notifier(item, self.publisher, self._path + [key]) class Publisher(AsyncioServer): @@ -131,7 +150,7 @@ class Publisher(AsyncioServer): except KeyError: return - obj = {"action": "init", "struct": notifier.backing_struct} + obj = {"action": "init", "struct": notifier.read} line = pyon.encode(obj) + "\n" writer.write(line.encode())