From cee8f288de87d44b3c4d94a2eccf1cf0b4edf2db Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 16 Oct 2015 18:35:02 +0800 Subject: [PATCH] protocols/logging: add LogForwarder --- artiq/protocols/logging.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/artiq/protocols/logging.py b/artiq/protocols/logging.py index 6c3fdded3..44376da7c 100644 --- a/artiq/protocols/logging.py +++ b/artiq/protocols/logging.py @@ -2,6 +2,7 @@ import asyncio import logging from artiq.protocols.asyncio_server import AsyncioServer +from artiq.tools import TaskObject _fwd_logger = logging.getLogger("fwd") @@ -70,3 +71,39 @@ class Server(AsyncioServer): extra={"source": source}) finally: writer.close() + + +class LogForwarder(logging.Handler, TaskObject): + def __init__(self, host, port, reconnect_timer=5.0, queue_size=1000, + **kwargs): + logging.Handler.__init__(self, **kwargs) + self.host = host + self.port = port + self.setFormatter(logging.Formatter( + "%(source)s:%(levelno)d:%(name)s:%(message)s")) + self._queue = asyncio.Queue(queue_size) + self.reconnect_timer = reconnect_timer + + def emit(self, record): + message = self.format(record) + try: + self._queue.put_nowait(message) + except asyncio.QueueFull: + pass + + async def _do(self): + while True: + try: + reader, writer = await asyncio.open_connection(self.host, + self.port) + writer.write(_init_string) + while True: + message = await self._queue.get() + "\n" + writer.write(message.encode()) + await writer.drain() + except asyncio.CancelledError: + return + except: + await asyncio.sleep(self.reconnect_timer) + finally: + writer.close()