artiq_influxdb: use aiohttp.ClientSession. Closes #829

This commit is contained in:
Sebastien Bourdeauducq 2017-11-22 17:31:09 +08:00
parent 8ebca38323
commit f83cf8d1bb

View File

@ -94,26 +94,26 @@ class DBWriter(TaskObject):
"too many pending updates", k) "too many pending updates", k)
async def _do(self): async def _do(self):
while True: async with aiohttp.ClientSession() as session:
k, v, t = await self._queue.get() while True:
url = self.base_url + "/write" k, v, t = await self._queue.get()
params = {"u": self.user, "p": self.password, "db": self.database, url = self.base_url + "/write"
"precision": "ms"} params = {"u": self.user, "p": self.password, "db": self.database,
data = "{},dataset={} {} {}".format( "precision": "ms"}
self.table, k, format_influxdb(v), round(t*1e3)) data = "{},dataset={} {} {}".format(
try: self.table, k, format_influxdb(v), round(t*1e3))
response = await aiohttp.request( try:
"POST", url, params=params, data=data) response = await session.post(url, params=params, data=data)
except: except:
logger.warning("got exception trying to update '%s'", logger.warning("got exception trying to update '%s'",
k, exc_info=True) k, exc_info=True)
else: else:
if response.status not in (200, 204): if response.status not in (200, 204):
content = (await response.content.read()).decode().strip() content = (await response.content.read()).decode().strip()
logger.warning("got HTTP status %d " logger.warning("got HTTP status %d "
"trying to update '%s': %s", "trying to update '%s': %s",
response.status, k, content) response.status, k, content)
response.close() response.close()
class _Mock: class _Mock: