From 86681dccffc7cea966fdd1214a928978510fa9c1 Mon Sep 17 00:00:00 2001 From: Robert Jordens Date: Sun, 24 Apr 2016 13:11:46 +0200 Subject: [PATCH] influxdb: use queue insertion time --- artiq/frontend/artiq_influxdb.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/artiq/frontend/artiq_influxdb.py b/artiq/frontend/artiq_influxdb.py index c12384e11..143c09797 100755 --- a/artiq/frontend/artiq_influxdb.py +++ b/artiq/frontend/artiq_influxdb.py @@ -6,6 +6,7 @@ import asyncio import atexit import fnmatch from functools import partial +import time import numpy as np import aiohttp @@ -95,19 +96,20 @@ class DBWriter(TaskObject): def update(self, k, v): try: - self._queue.put_nowait((k, v)) + self._queue.put_nowait((k, v, time.time())) except asyncio.QueueFull: logger.warning("failed to update dataset '%s': " "too many pending updates", k) async def _do(self): while True: - k, v = await self._queue.get() + k, v, t = await self._queue.get() url = self.base_url + "/write" params = {"u": self.user, "p": self.password, "db": self.database, "precision": "ms"} fmt_ty, fmt_v = format_influxdb(v) - data = "{},dataset={} {}={}".format(self.table, k, fmt_ty, fmt_v) + data = "{},dataset={} {}={} {}".format( + self.table, k, fmt_ty, fmt_v, round(t*1e3)) try: response = await aiohttp.request( "POST", url, params=params, data=data)