forked from M-Labs/artiq
influxdb: better filtering
This commit is contained in:
parent
c625f2e7c9
commit
66b5ca99d8
|
@ -52,8 +52,8 @@ def get_argparser():
|
||||||
"--bind-port", default=3248, type=int,
|
"--bind-port", default=3248, type=int,
|
||||||
help="TCP port to listen to for control (default: %(default)d)")
|
help="TCP port to listen to for control (default: %(default)d)")
|
||||||
group.add_argument(
|
group.add_argument(
|
||||||
"--filter-file", default="influxdb_filter.pyon",
|
"--pattern-file", default="influxdb_patterns.pyon",
|
||||||
help="file to save the filter in (default: %(default)s)")
|
help="file to save the patterns in (default: %(default)s)")
|
||||||
verbosity_args(parser)
|
verbosity_args(parser)
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
@ -143,37 +143,57 @@ class MasterReader(TaskObject):
|
||||||
|
|
||||||
|
|
||||||
class Filter:
|
class Filter:
|
||||||
def __init__(self, filter_file):
|
def __init__(self, pattern_file):
|
||||||
self.filter_file = filter_file
|
self.pattern_file = pattern_file
|
||||||
self.filter = []
|
self.patterns = []
|
||||||
try:
|
try:
|
||||||
self.filter = pyon.load_file(self.filter_file)
|
self.patterns = pyon.load_file(self.pattern_file)
|
||||||
except FileNotFoundError:
|
except FileNotFoundError:
|
||||||
logger.info("no filter file found, using empty filter")
|
logger.info("no pattern file found, logging everything")
|
||||||
|
|
||||||
def _save(self):
|
def _save(self):
|
||||||
pyon.store_file(self.filter_file, self.filter)
|
pyon.store_file(self.pattern_file, self.patterns)
|
||||||
|
|
||||||
|
# Privatize so that it is not shown in artiq_rpctool list-methods.
|
||||||
def _filter(self, k):
|
def _filter(self, k):
|
||||||
for pattern in self.filter:
|
take = "+"
|
||||||
|
for pattern in self.patterns:
|
||||||
|
sign = "-"
|
||||||
|
if pattern[0] in "+-":
|
||||||
|
sign, pattern = pattern[0], pattern[1:]
|
||||||
if fnmatch.fnmatchcase(k, pattern):
|
if fnmatch.fnmatchcase(k, pattern):
|
||||||
return False
|
take = sign
|
||||||
return True
|
return take == "+"
|
||||||
|
|
||||||
def add_pattern(self, pattern):
|
def add_pattern(self, pattern, index=None):
|
||||||
"""Add a name pattern to ignore."""
|
"""Add a pattern.
|
||||||
if pattern not in self.filter:
|
|
||||||
self.filter.append(pattern)
|
Optional + and - pattern prefixes specify whether to ignore or log
|
||||||
|
keys matching the rest of the pattern.
|
||||||
|
Default (in the absence of prefix) is to ignore. Keys that match no
|
||||||
|
pattern are logged. Last matched pattern takes precedence.
|
||||||
|
|
||||||
|
The optional index parameter specifies where to insert the pattern.
|
||||||
|
By default, patterns are added at the end. If index is an integer, it
|
||||||
|
specifies the index where the pattern is inserted. If it is a string,
|
||||||
|
that string must match an existing pattern and the new pattern is
|
||||||
|
inserted immediately after it."""
|
||||||
|
if pattern not in self.patterns:
|
||||||
|
if index is None:
|
||||||
|
index = len(self.patterns)
|
||||||
|
if isinstance(index, str):
|
||||||
|
index = self.patterns.index(index) + 1
|
||||||
|
self.patterns.insert(index, pattern)
|
||||||
self._save()
|
self._save()
|
||||||
|
|
||||||
def remove_pattern(self, pattern):
|
def remove_pattern(self, pattern):
|
||||||
"""Remove a pattern name to ignore."""
|
"""Remove a pattern."""
|
||||||
self.pattern.remove(pattern)
|
self.patterns.remove(pattern)
|
||||||
self._save()
|
self._save()
|
||||||
|
|
||||||
def get_patterns(self):
|
def get_patterns(self):
|
||||||
"""Show ignore patterns."""
|
"""Show existing patterns."""
|
||||||
return self.filter
|
return self.patterns
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
@ -189,7 +209,7 @@ def main():
|
||||||
writer.start()
|
writer.start()
|
||||||
atexit.register(lambda: loop.run_until_complete(writer.stop()))
|
atexit.register(lambda: loop.run_until_complete(writer.stop()))
|
||||||
|
|
||||||
filter = Filter(args.filter_file)
|
filter = Filter(args.pattern_file)
|
||||||
rpc_server = Server({"influxdb_filter": filter}, builtin_terminate=True)
|
rpc_server = Server({"influxdb_filter": filter}, builtin_terminate=True)
|
||||||
loop.run_until_complete(rpc_server.start(args.bind, args.bind_port))
|
loop.run_until_complete(rpc_server.start(args.bind, args.bind_port))
|
||||||
atexit.register(lambda: loop.run_until_complete(rpc_server.stop()))
|
atexit.register(lambda: loop.run_until_complete(rpc_server.stop()))
|
||||||
|
|
Loading…
Reference in New Issue