influxdb: tag-based schema, better type support

This commit is contained in:
Sebastien Bourdeauducq 2015-08-18 14:49:23 +08:00
parent ed00ca1485
commit 600e8335f2
1 changed files with 24 additions and 20 deletions

View File

@ -59,6 +59,26 @@ def get_argparser():
return parser return parser
def influxdb_str(s):
return '"' + s.replace('"', '\\"') + '"'
def format_influxdb(v):
if isinstance(v, bool):
if v:
return "bool", "t"
else:
return "bool", "f"
elif np.issubdtype(type(v), int):
return "int", "{}i".format(v)
elif np.issubdtype(type(v), float):
return "float", "{}".format(v)
elif isinstance(v, str):
return "str", influxdb_str(v)
else:
return "pyon", influxdb_str(pyon.encode(v))
class DBWriter(TaskObject): class DBWriter(TaskObject):
def __init__(self, base_url, user, password, database, table): def __init__(self, base_url, user, password, database, table):
self.base_url = base_url self.base_url = base_url
@ -83,7 +103,8 @@ class DBWriter(TaskObject):
url = self.base_url + "/write" url = self.base_url + "/write"
params = {"u": self.user, "p": self.password, "db": self.database, params = {"u": self.user, "p": self.password, "db": self.database,
"consistency": "any", "precision": "n"} "consistency": "any", "precision": "n"}
data = "{} {}={}".format(self.table, k, v) fmt_ty, fmt_v = format_influxdb(v)
data = "{},parameter={} {}={}".format(self.table, k, fmt_ty, fmt_v)
try: try:
response = yield from aiohttp.request( response = yield from aiohttp.request(
"POST", url, params=params, data=data) "POST", url, params=params, data=data)
@ -101,31 +122,14 @@ class DBWriter(TaskObject):
response.close() response.close()
def format_influxdb(v):
if isinstance(v, bool):
if v:
return "t"
else:
return "f"
elif np.issubdtype(type(v), int):
return "{}i".format(v)
elif np.issubdtype(type(v), float):
return "{}".format(v)
elif isinstance(v, str):
return '"' + v.replace('"', '\\"') + '"'
else:
return None
class Parameters: class Parameters:
def __init__(self, filter_function, writer, init): def __init__(self, filter_function, writer, init):
self.filter_function = filter_function self.filter_function = filter_function
self.writer = writer self.writer = writer
def __setitem__(self, k, v): def __setitem__(self, k, v):
v_db = format_influxdb(v) if self.filter_function(k):
if v_db is not None and self.filter_function(k): self.writer.update(k, v)
self.writer.update(k, v_db)
def __delitem__(self, k): def __delitem__(self, k):
pass pass