forked from M-Labs/artiq
master: TCP server for remote logging
This commit is contained in:
parent
9e2e233fef
commit
cbda753f44
@ -5,9 +5,10 @@ import argparse
|
|||||||
import atexit
|
import atexit
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from artiq.protocols.pc_rpc import Server
|
from artiq.protocols.pc_rpc import Server as RPCServer
|
||||||
from artiq.protocols.sync_struct import Publisher
|
from artiq.protocols.sync_struct import Publisher
|
||||||
from artiq.master.log import log_args, init_log
|
from artiq.protocols.logging import Server as LoggingServer
|
||||||
|
from artiq.master.log import log_args, init_log, log_worker
|
||||||
from artiq.master.databases import DeviceDB, DatasetDB
|
from artiq.master.databases import DeviceDB, DatasetDB
|
||||||
from artiq.master.scheduler import Scheduler
|
from artiq.master.scheduler import Scheduler
|
||||||
from artiq.master.worker_db import get_last_rid
|
from artiq.master.worker_db import get_last_rid
|
||||||
@ -27,6 +28,9 @@ def get_argparser():
|
|||||||
group.add_argument(
|
group.add_argument(
|
||||||
"--port-control", default=3251, type=int,
|
"--port-control", default=3251, type=int,
|
||||||
help="TCP port to listen to for control (default: %(default)d)")
|
help="TCP port to listen to for control (default: %(default)d)")
|
||||||
|
group.add_argument(
|
||||||
|
"--port-logging", default=1066, type=int,
|
||||||
|
help="TCP port to listen to for remote logging (default: %(default)d)")
|
||||||
|
|
||||||
group = parser.add_argument_group("databases")
|
group = parser.add_argument_group("databases")
|
||||||
group.add_argument("--device-db", default="device_db.pyon",
|
group.add_argument("--device-db", default="device_db.pyon",
|
||||||
@ -49,7 +53,7 @@ def get_argparser():
|
|||||||
|
|
||||||
def main():
|
def main():
|
||||||
args = get_argparser().parse_args()
|
args = get_argparser().parse_args()
|
||||||
log_buffer, log_forwarder = init_log(args)
|
log_buffer = init_log(args)
|
||||||
if os.name == "nt":
|
if os.name == "nt":
|
||||||
loop = asyncio.ProactorEventLoop()
|
loop = asyncio.ProactorEventLoop()
|
||||||
asyncio.set_event_loop(loop)
|
asyncio.set_event_loop(loop)
|
||||||
@ -67,7 +71,7 @@ def main():
|
|||||||
else:
|
else:
|
||||||
repo_backend = FilesystemBackend(args.repository)
|
repo_backend = FilesystemBackend(args.repository)
|
||||||
repository = Repository(repo_backend, device_db.get_device_db,
|
repository = Repository(repo_backend, device_db.get_device_db,
|
||||||
log_forwarder.log_worker)
|
log_worker)
|
||||||
atexit.register(repository.close)
|
atexit.register(repository.close)
|
||||||
repository.scan_async()
|
repository.scan_async()
|
||||||
|
|
||||||
@ -76,14 +80,14 @@ def main():
|
|||||||
"get_device": device_db.get,
|
"get_device": device_db.get,
|
||||||
"get_dataset": dataset_db.get,
|
"get_dataset": dataset_db.get,
|
||||||
"update_dataset": dataset_db.update,
|
"update_dataset": dataset_db.update,
|
||||||
"log": log_forwarder.log_worker
|
"log": log_worker
|
||||||
}
|
}
|
||||||
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend)
|
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend)
|
||||||
worker_handlers["scheduler_submit"] = scheduler.submit
|
worker_handlers["scheduler_submit"] = scheduler.submit
|
||||||
scheduler.start()
|
scheduler.start()
|
||||||
atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
|
atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
|
||||||
|
|
||||||
server_control = Server({
|
server_control = RPCServer({
|
||||||
"master_device_db": device_db,
|
"master_device_db": device_db,
|
||||||
"master_dataset_db": dataset_db,
|
"master_dataset_db": dataset_db,
|
||||||
"master_schedule": scheduler,
|
"master_schedule": scheduler,
|
||||||
@ -104,6 +108,11 @@ def main():
|
|||||||
args.bind, args.port_notify))
|
args.bind, args.port_notify))
|
||||||
atexit.register(lambda: loop.run_until_complete(server_notify.stop()))
|
atexit.register(lambda: loop.run_until_complete(server_notify.stop()))
|
||||||
|
|
||||||
|
server_logging = LoggingServer()
|
||||||
|
loop.run_until_complete(server_logging.start(
|
||||||
|
args.bind, args.port_logging))
|
||||||
|
atexit.register(lambda: loop.run_until_complete(server_logging.stop()))
|
||||||
|
|
||||||
loop.run_forever()
|
loop.run_forever()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -2,6 +2,7 @@ import logging
|
|||||||
import logging.handlers
|
import logging.handlers
|
||||||
|
|
||||||
from artiq.protocols.sync_struct import Notifier
|
from artiq.protocols.sync_struct import Notifier
|
||||||
|
from artiq.protocols.logging import parse_log_message, log_with_name
|
||||||
|
|
||||||
|
|
||||||
class LogBuffer:
|
class LogBuffer:
|
||||||
@ -25,38 +26,11 @@ class LogBufferHandler(logging.Handler):
|
|||||||
self.log_buffer.log(record.levelno, record.source, record.created, message)
|
self.log_buffer.log(record.levelno, record.source, record.created, message)
|
||||||
|
|
||||||
|
|
||||||
name_to_level = {
|
def log_worker(rid, message):
|
||||||
"CRITICAL": logging.CRITICAL,
|
|
||||||
"ERROR": logging.ERROR,
|
|
||||||
"WARN": logging.WARNING,
|
|
||||||
"WARNING": logging.WARNING,
|
|
||||||
"INFO": logging.INFO,
|
|
||||||
"DEBUG": logging.DEBUG,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def parse_log_message(msg):
|
|
||||||
for name, level in name_to_level.items():
|
|
||||||
if msg.startswith(name + ":"):
|
|
||||||
remainder = msg[len(name) + 1:]
|
|
||||||
try:
|
|
||||||
idx = remainder.index(":")
|
|
||||||
except:
|
|
||||||
continue
|
|
||||||
return level, remainder[:idx], remainder[idx+1:]
|
|
||||||
return logging.INFO, "print", msg
|
|
||||||
|
|
||||||
|
|
||||||
fwd_logger = logging.getLogger("fwd")
|
|
||||||
|
|
||||||
|
|
||||||
class LogForwarder:
|
|
||||||
def log_worker(self, rid, message):
|
|
||||||
level, name, message = parse_log_message(message)
|
level, name, message = parse_log_message(message)
|
||||||
fwd_logger.name = name
|
log_with_name(name, level, message,
|
||||||
fwd_logger.log(level, message,
|
|
||||||
extra={"source": "worker({})".format(rid)})
|
extra={"source": "worker({})".format(rid)})
|
||||||
log_worker.worker_pass_rid = True
|
log_worker.worker_pass_rid = True
|
||||||
|
|
||||||
|
|
||||||
class SourceFilter:
|
class SourceFilter:
|
||||||
@ -120,6 +94,4 @@ def init_log(args):
|
|||||||
handler.addFilter(flt)
|
handler.addFilter(flt)
|
||||||
root_logger.addHandler(handler)
|
root_logger.addHandler(handler)
|
||||||
|
|
||||||
log_forwarder = LogForwarder()
|
return log_buffer
|
||||||
|
|
||||||
return log_buffer, log_forwarder
|
|
||||||
|
67
artiq/protocols/logging.py
Normal file
67
artiq/protocols/logging.py
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from artiq.protocols.asyncio_server import AsyncioServer
|
||||||
|
|
||||||
|
|
||||||
|
_fwd_logger = logging.getLogger("fwd")
|
||||||
|
|
||||||
|
|
||||||
|
def log_with_name(name, *args, **kwargs):
|
||||||
|
_fwd_logger.name = name
|
||||||
|
_fwd_logger.log(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
_name_to_level = {
|
||||||
|
"CRITICAL": logging.CRITICAL,
|
||||||
|
"ERROR": logging.ERROR,
|
||||||
|
"WARN": logging.WARNING,
|
||||||
|
"WARNING": logging.WARNING,
|
||||||
|
"INFO": logging.INFO,
|
||||||
|
"DEBUG": logging.DEBUG,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def parse_log_message(msg):
|
||||||
|
for name, level in _name_to_level.items():
|
||||||
|
if msg.startswith(name + ":"):
|
||||||
|
remainder = msg[len(name) + 1:]
|
||||||
|
try:
|
||||||
|
idx = remainder.index(":")
|
||||||
|
except:
|
||||||
|
continue
|
||||||
|
return level, remainder[:idx], remainder[idx+1:]
|
||||||
|
return logging.INFO, "print", msg
|
||||||
|
|
||||||
|
|
||||||
|
_init_string = b"ARTIQ logging\n"
|
||||||
|
|
||||||
|
|
||||||
|
class Server(AsyncioServer):
|
||||||
|
async def _handle_connection_cr(self, reader, writer):
|
||||||
|
try:
|
||||||
|
line = await reader.readline()
|
||||||
|
if line != _init_string:
|
||||||
|
return
|
||||||
|
|
||||||
|
while True:
|
||||||
|
line = await reader.readline()
|
||||||
|
if not line:
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
line = line.decode()
|
||||||
|
except:
|
||||||
|
return
|
||||||
|
line = line[:-1]
|
||||||
|
linesplit = line.split(":", 4)
|
||||||
|
if len(linesplit) != 4:
|
||||||
|
return
|
||||||
|
source, levelname, name, message = linesplit
|
||||||
|
try:
|
||||||
|
level = _name_to_level[levelname]
|
||||||
|
except KeyError:
|
||||||
|
return
|
||||||
|
log_with_name(name, level, message,
|
||||||
|
extra={"source": source})
|
||||||
|
finally:
|
||||||
|
writer.close()
|
@ -8,6 +8,8 @@ Default network ports
|
|||||||
+--------------------------+--------------+
|
+--------------------------+--------------+
|
||||||
| Core device (mon/inj) | 3250 (UDP) |
|
| Core device (mon/inj) | 3250 (UDP) |
|
||||||
+--------------------------+--------------+
|
+--------------------------+--------------+
|
||||||
|
| Master (logging) | 1066 |
|
||||||
|
+--------------------------+--------------+
|
||||||
| InfluxDB bridge | 3248 |
|
| InfluxDB bridge | 3248 |
|
||||||
+--------------------------+--------------+
|
+--------------------------+--------------+
|
||||||
| Controller manager | 3249 |
|
| Controller manager | 3249 |
|
||||||
|
Loading…
Reference in New Issue
Block a user