forked from M-Labs/artiq
1
0
Fork 0

influxdb: use queue insertion time

This commit is contained in:
Robert Jördens 2016-04-24 13:11:46 +02:00
parent aadcf0fe98
commit 86681dccff
1 changed files with 5 additions and 3 deletions

View File

@ -6,6 +6,7 @@ import asyncio
import atexit import atexit
import fnmatch import fnmatch
from functools import partial from functools import partial
import time
import numpy as np import numpy as np
import aiohttp import aiohttp
@ -95,19 +96,20 @@ class DBWriter(TaskObject):
def update(self, k, v): def update(self, k, v):
try: try:
self._queue.put_nowait((k, v)) self._queue.put_nowait((k, v, time.time()))
except asyncio.QueueFull: except asyncio.QueueFull:
logger.warning("failed to update dataset '%s': " logger.warning("failed to update dataset '%s': "
"too many pending updates", k) "too many pending updates", k)
async def _do(self): async def _do(self):
while True: while True:
k, v = await self._queue.get() k, v, t = await self._queue.get()
url = self.base_url + "/write" url = self.base_url + "/write"
params = {"u": self.user, "p": self.password, "db": self.database, params = {"u": self.user, "p": self.password, "db": self.database,
"precision": "ms"} "precision": "ms"}
fmt_ty, fmt_v = format_influxdb(v) fmt_ty, fmt_v = format_influxdb(v)
data = "{},dataset={} {}={}".format(self.table, k, fmt_ty, fmt_v) data = "{},dataset={} {}={} {}".format(
self.table, k, fmt_ty, fmt_v, round(t*1e3))
try: try:
response = await aiohttp.request( response = await aiohttp.request(
"POST", url, params=params, data=data) "POST", url, params=params, data=data)