artiq/artiq/frontend/artiq_master.py

180 lines
6.5 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
import asyncio
2014-10-25 11:34:52 +08:00
import argparse
2014-12-31 17:41:22 +08:00
import atexit
2016-02-17 01:07:13 +08:00
import logging
2023-04-18 15:03:06 +08:00
from types import SimpleNamespace
2019-11-10 15:55:17 +08:00
from sipyco.pc_rpc import Server as RPCServer
from sipyco.sync_struct import Publisher
from sipyco.logging_tools import Server as LoggingServer
from sipyco.broadcast import Broadcaster
from sipyco import common_args
from sipyco.asyncio_tools import atexit_register_coroutine, SignalHandler
2019-11-10 15:55:17 +08:00
from artiq import __version__ as artiq_version
from artiq.master.log import log_args, init_log
from artiq.master.databases import (DeviceDB, DatasetDB,
InteractiveArgDB)
2015-01-14 12:16:49 +08:00
from artiq.master.scheduler import Scheduler
from artiq.master.rid_counter import RIDCounter
2016-02-17 01:07:13 +08:00
from artiq.master.experiments import (FilesystemBackend, GitBackend,
ExperimentDB)
logger = logging.getLogger(__name__)
2015-01-23 00:52:13 +08:00
def get_argparser():
parser = argparse.ArgumentParser(description="ARTIQ master")
parser.add_argument("--version", action="version",
version="ARTIQ v{}".format(artiq_version),
help="print the ARTIQ version number")
2019-11-10 15:55:17 +08:00
common_args.simple_network_args(parser, [
("notify", "notifications", 3250),
("control", "control", 3251),
("logging", "remote logging", 1066),
("broadcast", "broadcasts", 1067)
])
2015-08-08 11:13:36 +08:00
group = parser.add_argument_group("databases")
group.add_argument("--device-db", default="device_db.py",
help="device database file (default: '%(default)s')")
2023-04-24 17:34:30 +08:00
group.add_argument("--dataset-db", default="dataset_db.mdb",
help="dataset file (default: '%(default)s')")
2015-08-07 15:51:56 +08:00
group = parser.add_argument_group("repository")
group.add_argument(
"-g", "--git", default=False, action="store_true",
help="use the Git repository backend")
group.add_argument(
"-r", "--repository", default="repository",
help="path to the repository (default: '%(default)s')")
group.add_argument(
"--experiment-subdir", default="",
help=("path to the experiment folder from the repository root "
"(default: '%(default)s')"))
log_args(parser)
2015-07-22 05:13:50 +08:00
2017-11-25 19:40:13 +08:00
parser.add_argument("--name",
2024-02-27 11:10:21 +08:00
help="friendly name, displayed in dashboards "
"to identify master instead of server address")
parser.add_argument("--log-submissions", default=None,
help="log experiment submissions to specified file")
2017-11-25 08:39:45 +08:00
return parser
2015-07-22 05:13:50 +08:00
def main():
2015-01-23 00:52:13 +08:00
args = get_argparser().parse_args()
log_forwarder = init_log(args)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
atexit.register(loop.close)
signal_handler = SignalHandler()
signal_handler.setup()
atexit.register(signal_handler.teardown)
2019-11-10 15:55:17 +08:00
bind = common_args.bind_address_from_args(args)
server_broadcast = Broadcaster()
loop.run_until_complete(server_broadcast.start(
bind, args.port_broadcast))
2023-01-11 18:46:54 +08:00
atexit_register_coroutine(server_broadcast.stop, loop=loop)
2024-02-27 11:10:21 +08:00
log_forwarder.callback = lambda msg: server_broadcast.broadcast("log", msg)
def ccb_issue(service, *args, **kwargs):
msg = {
"service": service,
"args": args,
"kwargs": kwargs
}
server_broadcast.broadcast("ccb", msg)
2014-12-31 17:41:22 +08:00
device_db = DeviceDB(args.device_db)
dataset_db = DatasetDB(args.dataset_db)
2023-04-24 17:34:30 +08:00
atexit.register(dataset_db.close_db)
2023-01-11 18:46:54 +08:00
dataset_db.start(loop=loop)
atexit_register_coroutine(dataset_db.stop, loop=loop)
interactive_arg_db = InteractiveArgDB()
worker_handlers = dict()
2015-07-22 05:13:50 +08:00
2015-08-07 15:51:56 +08:00
if args.git:
repo_backend = GitBackend(args.repository)
else:
repo_backend = FilesystemBackend(args.repository)
experiment_db = ExperimentDB(
repo_backend, worker_handlers, args.experiment_subdir)
2015-12-06 18:39:27 +08:00
atexit.register(experiment_db.close)
2015-08-07 15:51:56 +08:00
2024-02-27 11:10:21 +08:00
scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db,
args.log_submissions)
2023-01-11 18:46:54 +08:00
scheduler.start(loop=loop)
atexit_register_coroutine(scheduler.stop, loop=loop)
# Python doesn't allow writing attributes to bound methods.
def get_interactive_arguments(*args, **kwargs):
return interactive_arg_db.get(*args, **kwargs)
get_interactive_arguments._worker_pass_rid = True
worker_handlers.update({
2015-10-12 19:46:31 +08:00
"get_device_db": device_db.get_device_db,
"get_device": device_db.get,
"get_dataset": dataset_db.get,
2024-04-30 16:41:17 +08:00
"get_dataset_metadata": dataset_db.get_metadata,
"update_dataset": dataset_db.update,
"get_interactive_arguments": get_interactive_arguments,
"scheduler_submit": scheduler.submit,
"scheduler_delete": scheduler.delete,
"scheduler_request_termination": scheduler.request_termination,
2016-06-27 14:37:29 +08:00
"scheduler_get_status": scheduler.get_status,
"scheduler_check_pause": scheduler.check_pause,
"scheduler_check_termination": scheduler.check_termination,
"ccb_issue": ccb_issue,
})
2023-01-11 18:46:54 +08:00
experiment_db.scan_repository_async(loop=loop)
2014-12-31 17:41:22 +08:00
2023-04-18 15:03:06 +08:00
signal_handler_task = loop.create_task(signal_handler.wait_terminate())
master_management = SimpleNamespace(
get_name=lambda: args.name,
terminate=lambda: signal_handler_task.cancel()
)
2023-04-18 15:03:06 +08:00
2015-10-16 00:53:35 +08:00
server_control = RPCServer({
"master_management": master_management,
2024-02-27 15:20:48 +08:00
"device_db": device_db,
"dataset_db": dataset_db,
"interactive_arg_db": interactive_arg_db,
2024-02-27 15:20:48 +08:00
"schedule": scheduler,
"experiment_db": experiment_db,
}, allow_parallel=True)
2014-12-31 20:13:10 +08:00
loop.run_until_complete(server_control.start(
bind, args.port_control))
2023-01-11 18:46:54 +08:00
atexit_register_coroutine(server_control.stop, loop=loop)
2014-12-31 17:41:22 +08:00
2014-12-31 20:13:10 +08:00
server_notify = Publisher({
2015-05-17 16:11:00 +08:00
"schedule": scheduler.notifier,
"devices": device_db.data,
"datasets": dataset_db.data,
"interactive_args": interactive_arg_db.pending,
2015-12-06 18:39:27 +08:00
"explist": experiment_db.explist,
"explist_status": experiment_db.status,
2014-12-31 17:41:22 +08:00
})
2014-12-31 20:13:10 +08:00
loop.run_until_complete(server_notify.start(
bind, args.port_notify))
2023-01-11 18:46:54 +08:00
atexit_register_coroutine(server_notify.stop, loop=loop)
2014-12-31 17:41:22 +08:00
2015-10-16 00:53:35 +08:00
server_logging = LoggingServer()
loop.run_until_complete(server_logging.start(
bind, args.port_logging))
2023-01-11 18:46:54 +08:00
atexit_register_coroutine(server_logging.stop, loop=loop)
2015-10-16 00:53:35 +08:00
2017-05-15 17:05:22 +08:00
print("ARTIQ master is now ready.")
2023-04-18 15:03:06 +08:00
try:
loop.run_until_complete(signal_handler_task)
except asyncio.CancelledError:
pass
if __name__ == "__main__":
main()