sync_struct: support nested structures

This commit is contained in:
Sebastien Bourdeauducq 2015-01-13 17:31:58 +08:00
parent 893c18679f
commit f12e721974
3 changed files with 53 additions and 34 deletions

View File

@ -14,10 +14,10 @@ class FlatFileDB:
self.hooks = []
def save(self):
pyon.store_file(self.filename, self.data.backing_struct)
pyon.store_file(self.filename, self.data.read)
def request(self, name):
return self.data.backing_struct[name]
return self.data.read[name]
def set(self, name, value):
self.data[name] = value
@ -40,12 +40,12 @@ class SimpleHistory:
self.history = Notifier([])
def set(self, timestamp, name, value):
if len(self.history.backing_struct) >= self.depth:
if len(self.history.read) >= self.depth:
del self.history[0]
self.history.append((timestamp, name, value))
def delete(self, timestamp, name):
if len(self.history.backing_struct) >= self.depth:
if len(self.history.read) >= self.depth:
del self.history[0]
self.history.append((timestamp, name))

View File

@ -20,8 +20,8 @@ class Scheduler:
return r
def new_prid(self):
prids = set(range(len(self.periodic.backing_struct) + 1))
prids -= set(self.periodic.backing_struct.keys())
prids = set(range(len(self.periodic.read) + 1))
prids -= set(self.periodic.read.keys())
return next(iter(prids))
@asyncio.coroutine
@ -44,7 +44,7 @@ class Scheduler:
def cancel_once(self, rid):
idx = next(idx for idx, (qrid, _, _)
in enumerate(self.queue.backing_struct)
in enumerate(self.queue.read)
if qrid == rid)
if idx == 0:
# Cannot cancel when already running
@ -75,7 +75,7 @@ class Scheduler:
while True:
min_next_run = None
min_prid = None
for prid, params in self.periodic.backing_struct.items():
for prid, params in self.periodic.read.items():
if min_next_run is None or params[0] < min_next_run:
min_next_run = params[0]
min_prid = prid
@ -89,7 +89,7 @@ class Scheduler:
return min_next_run
next_run, run_params, timeout, period = \
self.periodic.backing_struct[min_prid]
self.periodic.read[min_prid]
self.periodic[min_prid] = now + period, run_params, timeout, period
rid = self.new_rid()
@ -101,8 +101,8 @@ class Scheduler:
def _schedule(self):
while True:
next_periodic = yield from self._run_periodic()
if self.queue.backing_struct:
rid, run_params, timeout = self.queue.backing_struct[0]
if self.queue.read:
rid, run_params, timeout = self.queue.read[0]
yield from self._run(rid, run_params, timeout)
del self.queue[0]
else:

View File

@ -1,4 +1,5 @@
import asyncio
from operator import getitem
from artiq.management import pyon
from artiq.management.tools import AsyncioServer
@ -52,56 +53,74 @@ class Subscriber:
if action == "init":
target = self.target_builder(obj["struct"])
elif action == "append":
target.append(obj["x"])
elif action == "insert":
target.insert(obj["i"], obj["x"])
elif action == "pop":
target.pop(obj["i"])
elif action == "setitem":
target.__setitem__(obj["key"], obj["value"])
elif action == "delitem":
target.__delitem__(obj["key"])
else:
for key in obj["path"]:
target = getitem(target, key)
if action == "append":
target.append(obj["x"])
elif action == "insert":
target.insert(obj["i"], obj["x"])
elif action == "pop":
target.pop(obj["i"])
elif action == "setitem":
target.__setitem__(obj["key"], obj["value"])
elif action == "delitem":
target.__delitem__(obj["key"])
if self.notify_cb is not None:
self.notify_cb()
class Notifier:
def __init__(self, backing_struct):
self.backing_struct = backing_struct
self.publisher = None
def __init__(self, backing_struct, publisher=None, path=[]):
self.read = backing_struct
self.publisher = publisher
self._backing_struct = backing_struct
self._path = path
# Backing struct modification methods.
# All modifications must go through them!
def append(self, x):
self.backing_struct.append(x)
self._backing_struct.append(x)
if self.publisher is not None:
self.publisher.publish(self, {"action": "append", "x": x})
self.publisher.publish(self, {"action": "append",
"path": self._path,
"x": x})
def insert(self, i, x):
self.backing_struct.insert(i, x)
self._backing_struct.insert(i, x)
if self.publisher is not None:
self.publisher.publish(self, {"action": "insert", "i": i, "x": x})
self.publisher.publish(self, {"action": "insert",
"path": self._path,
"i": i, "x": x})
def pop(self, i=-1):
r = self.backing_struct.pop(i)
r = self._backing_struct.pop(i)
if self.publisher is not None:
self.publisher.publish(self, {"action": "pop", "i": i})
self.publisher.publish(self, {"action": "pop",
"path": self._path,
"i": i})
return r
def __setitem__(self, key, value):
self.backing_struct.__setitem__(key, value)
self._backing_struct.__setitem__(key, value)
if self.publisher is not None:
self.publisher.publish(self, {"action": "setitem",
"path": self._path,
"key": key,
"value": value})
def __delitem__(self, key):
self.backing_struct.__delitem__(key)
self._backing_struct.__delitem__(key)
if self.publisher is not None:
self.publisher.publish(self, {"action": "delitem", "key": key})
self.publisher.publish(self, {"action": "delitem",
"path": self._path,
"key": key})
def __getitem__(self, key):
item = getitem(self._backing_struct, key)
return Notifier(item, self.publisher, self._path + [key])
class Publisher(AsyncioServer):
@ -131,7 +150,7 @@ class Publisher(AsyncioServer):
except KeyError:
return
obj = {"action": "init", "struct": notifier.backing_struct}
obj = {"action": "init", "struct": notifier.read}
line = pyon.encode(obj) + "\n"
writer.write(line.encode())