1
0
forked from M-Labs/artiq
artiq/artiq/frontend/artiq_master.py
Sebastien Bourdeauducq d5795fd619 master: watchdog support
Introduces a watchdog context manager to use in the experiment code that
terminates the process with an error if it times out. The syntax is:

with self.scheduler.watchdog(20*s):
   ...

Watchdogs timers are implemented by the master process (and the worker
communicates the necessary information about them) so that they can be
enforced even if the worker crashes. They can be nested arbitrarily.
During yields, all watchdog timers for the yielding worker are
suspended [TODO]. Setting up watchdogs is not supported in kernels,
however, a kernel can be called within watchdog contexts (and terminating
the worker will terminate the kernel [TODO]).

It is possible to implement a heartbeat mechanism using a watchdog, e.g.:

for i in range(...):
    with self.scheduler.watchdog(...):
        ....

Crashes/freezes within the iterator or the loop management would not be
detected, but they should be rare enough.
2015-03-11 16:43:14 +01:00

95 lines
2.9 KiB
Python
Executable File

#!/usr/bin/env python3
import asyncio
import argparse
import atexit
import os
from artiq.protocols.pc_rpc import Server
from artiq.protocols.sync_struct import Publisher
from artiq.protocols.file_db import FlatFileDB, SimpleHistory
from artiq.master.scheduler import Scheduler
from artiq.master.results import RTResults, get_last_rid
from artiq.master.repository import Repository
from artiq.tools import verbosity_args, init_logger
def get_argparser():
parser = argparse.ArgumentParser(description="ARTIQ master")
group = parser.add_argument_group("network")
group.add_argument(
"--bind", default="::1",
help="hostname or IP address to bind to")
group.add_argument(
"--port-notify", default=3250, type=int,
help="TCP port to listen to for notifications (default: 3250)")
group.add_argument(
"--port-control", default=3251, type=int,
help="TCP port to listen to for control (default: 3251)")
verbosity_args(parser)
return parser
def main():
args = get_argparser().parse_args()
init_logger(args)
ddb = FlatFileDB("ddb.pyon")
pdb = FlatFileDB("pdb.pyon")
simplephist = SimpleHistory(30)
pdb.hooks.append(simplephist)
rtr = RTResults()
repository = Repository()
if os.name == "nt":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
atexit.register(lambda: loop.close())
def run_cb(rid, run_params):
rtr.current_group = run_params["rtr_group"]
scheduler = Scheduler(run_cb, get_last_rid() + 1)
scheduler.worker_handlers = {
"req_device": ddb.request,
"req_parameter": pdb.request,
"set_parameter": pdb.set,
"init_rt_results": rtr.init,
"update_rt_results": rtr.update,
"scheduler_run_queued": scheduler.run_queued,
"scheduler_cancel_queued": scheduler.cancel_queued,
"scheduler_run_timed": scheduler.run_timed,
"scheduler_cancel_timed": scheduler.cancel_timed,
}
scheduler.start()
atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
server_control = Server({
"master_ddb": ddb,
"master_pdb": pdb,
"master_schedule": scheduler,
"master_repository": repository,
})
loop.run_until_complete(server_control.start(
args.bind, args.port_control))
atexit.register(lambda: loop.run_until_complete(server_control.stop()))
server_notify = Publisher({
"queue": scheduler.queue,
"timed": scheduler.timed,
"devices": ddb.data,
"parameters": pdb.data,
"parameters_simplehist": simplephist.history,
"rt_results": rtr.groups,
"explist": repository.explist
})
loop.run_until_complete(server_notify.start(
args.bind, args.port_notify))
atexit.register(lambda: loop.run_until_complete(server_notify.stop()))
loop.run_forever()
if __name__ == "__main__":
main()