From 7ed8fe57fa197f5f5e3190d8f89c552c7908ce9e Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Fri, 7 Aug 2015 15:51:56 +0800 Subject: [PATCH] Git support --- artiq/frontend/artiq_client.py | 25 +++++++--- artiq/frontend/artiq_master.py | 21 ++++++-- artiq/gui/explorer.py | 1 + artiq/gui/schedule.py | 10 +++- artiq/master/repository.py | 88 +++++++++++++++++++++++++++++----- artiq/master/scheduler.py | 34 +++++++++---- artiq/master/worker.py | 3 +- artiq/master/worker_impl.py | 15 ++++-- artiq/test/scheduler.py | 6 +-- artiq/test/worker.py | 2 +- setup.py | 5 +- 11 files changed, 167 insertions(+), 43 deletions(-) diff --git a/artiq/frontend/artiq_client.py b/artiq/frontend/artiq_client.py index 8277ba318..5650e5f09 100755 --- a/artiq/frontend/artiq_client.py +++ b/artiq/frontend/artiq_client.py @@ -42,6 +42,12 @@ def get_argparser(): parser_add.add_argument("-f", "--flush", default=False, action="store_true", help="flush the pipeline before preparing " "the experiment") + parser_add.add_argument("-R", "--repository", default=False, + action="store_true", + help="use the experiment repository") + parser_add.add_argument("-r", "--revision", default=None, + help="use a specific repository revision " + "(defaults to head, ignored without -R)") parser_add.add_argument("-c", "--class-name", default=None, help="name of the class to run") parser_add.add_argument("file", @@ -81,8 +87,8 @@ def get_argparser(): "what", help="select object to show: schedule/devices/parameters") - parser_scan_repository = subparsers.add_parser( - "scan-repository", help="rescan repository") + subparsers.add_parser("scan-repository", + help="trigger a repository rescan") return parser @@ -107,6 +113,8 @@ def _action_submit(remote, args): "class_name": args.class_name, "arguments": arguments, } + if args.repository: + expid["repo_rev"] = args.revision if args.timed is None: due_date = None else: @@ -148,7 +156,7 @@ def _show_schedule(schedule): x[1]["due_date"] or 0, x[0])) table = PrettyTable(["RID", "Pipeline", " Status ", "Prio", - "Due date", "File", "Class name"]) + "Due date", "Revision", "File", "Class name"]) for rid, v in l: row = [rid, v["pipeline"], v["status"], v["priority"]] if v["due_date"] is None: @@ -156,11 +164,16 @@ def _show_schedule(schedule): else: row.append(time.strftime("%m/%d %H:%M:%S", time.localtime(v["due_date"]))) - row.append(v["expid"]["file"]) - if v["expid"]["class_name"] is None: + expid = v["expid"] + if "repo_rev" in expid: + row.append(expid["repo_rev"]) + else: + row.append("Outside repo.") + row.append(expid["file"]) + if expid["class_name"] is None: row.append("") else: - row.append(v["expid"]["class_name"]) + row.append(expid["class_name"]) table.add_row(row) print(table) else: diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 01c3fb081..939b67659 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -10,7 +10,7 @@ from artiq.protocols.sync_struct import Notifier, Publisher, process_mod from artiq.protocols.file_db import FlatFileDB from artiq.master.scheduler import Scheduler from artiq.master.worker_db import get_last_rid -from artiq.master.repository import Repository +from artiq.master.repository import FilesystemBackend, GitBackend, Repository from artiq.tools import verbosity_args, init_logger @@ -26,6 +26,13 @@ def get_argparser(): group.add_argument( "--port-control", default=3251, type=int, help="TCP port to listen to for control (default: %(default)d)") + group = parser.add_argument_group("repository") + group.add_argument( + "-g", "--git", default=False, action="store_true", + help="use the Git repository backend") + group.add_argument( + "-r", "--repository", default="repository", + help="path to the repository (default: '%(default)s')") verbosity_args(parser) return parser @@ -57,6 +64,13 @@ def main(): rtr = Notifier(dict()) log = Log(1000) + if args.git: + repo_backend = GitBackend(args.repository) + else: + repo_backend = FilesystemBackend(args.repository) + repository = Repository(repo_backend, log.log) + repository.scan_async() + worker_handlers = { "get_device": ddb.get, "get_parameter": pdb.get, @@ -64,14 +78,11 @@ def main(): "update_rt_results": lambda mod: process_mod(rtr, mod), "log": log.log } - scheduler = Scheduler(get_last_rid() + 1, worker_handlers) + scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend) worker_handlers["scheduler_submit"] = scheduler.submit scheduler.start() atexit.register(lambda: loop.run_until_complete(scheduler.stop())) - repository = Repository(log.log) - repository.scan_async() - server_control = Server({ "master_ddb": ddb, "master_pdb": pdb, diff --git a/artiq/gui/explorer.py b/artiq/gui/explorer.py index 4a4bcfe99..4412320a6 100644 --- a/artiq/gui/explorer.py +++ b/artiq/gui/explorer.py @@ -256,6 +256,7 @@ class ExplorerDock(dockarea.Dock): def submit(self, pipeline_name, file, class_name, arguments, priority, due_date, flush): expid = { + "repo_rev": None, "file": file, "class_name": class_name, "arguments": arguments, diff --git a/artiq/gui/schedule.py b/artiq/gui/schedule.py index 65bcdc0cb..69324e07a 100644 --- a/artiq/gui/schedule.py +++ b/artiq/gui/schedule.py @@ -12,7 +12,7 @@ class _ScheduleModel(DictSyncModel): def __init__(self, parent, init): DictSyncModel.__init__(self, ["RID", "Pipeline", "Status", "Prio", "Due date", - "File", "Class name"], + "Revision", "File", "Class name"], parent, init) def sort_key(self, k, v): @@ -35,8 +35,14 @@ class _ScheduleModel(DictSyncModel): return time.strftime("%m/%d %H:%M:%S", time.localtime(v["due_date"])) elif column == 5: - return v["expid"]["file"] + expid = v["expid"] + if "repo_rev" in expid: + return expid["repo_rev"] + else: + return "Outside repo." elif column == 6: + return v["expid"]["file"] + elif column == 7: if v["expid"]["class_name"] is None: return "" else: diff --git a/artiq/master/repository.py b/artiq/master/repository.py index 465ce6f85..12e3f8a68 100644 --- a/artiq/master/repository.py +++ b/artiq/master/repository.py @@ -1,6 +1,8 @@ -import os -import logging import asyncio +import os +import tempfile +import shutil +import logging from artiq.protocols.sync_struct import Notifier from artiq.master.worker import Worker @@ -10,15 +12,14 @@ logger = logging.getLogger(__name__) @asyncio.coroutine -def _scan_experiments(log): +def _scan_experiments(wd, log): r = dict() - for f in os.listdir("repository"): + for f in os.listdir(wd): if f.endswith(".py"): try: - full_name = os.path.join("repository", f) worker = Worker({"log": lambda message: log("scan", message)}) try: - description = yield from worker.examine(full_name) + description = yield from worker.examine(os.path.join(wd, f)) finally: yield from worker.close() for class_name, class_desc in description.items(): @@ -32,7 +33,7 @@ def _scan_experiments(log): name = basename + str(i) i += 1 entry = { - "file": full_name, + "file": f, "class_name": class_name, "arguments": arguments } @@ -52,19 +53,84 @@ def _sync_explist(target, source): class Repository: - def __init__(self, log_fn): - self.explist = Notifier(dict()) - self._scanning = False + def __init__(self, backend, log_fn): + self.backend = backend self.log_fn = log_fn + self.head_rev = self.backend.get_head_rev() + self.backend.request_rev(self.head_rev) + self.explist = Notifier(dict()) + + self._scanning = False + @asyncio.coroutine def scan(self): if self._scanning: return self._scanning = True - new_explist = yield from _scan_experiments(self.log_fn) + + new_head_rev = self.backend.get_head_rev() + wd = self.backend.request_rev(new_head_rev) + self.backend.release_rev(self.head_rev) + self.head_rev = new_head_rev + new_explist = yield from _scan_experiments(wd, self.log_fn) + _sync_explist(self.explist, new_explist) self._scanning = False def scan_async(self): asyncio.async(self.scan()) + + +class FilesystemBackend: + def __init__(self, root): + self.root = os.path.abspath(root) + + def get_head_rev(self): + return "N/A" + + def request_rev(self, rev): + return self.root + + def release_rev(self, rev): + pass + + +class _GitCheckout: + def __init__(self, git, rev): + self.path = tempfile.mkdtemp() + git.checkout_tree(git.get(rev), directory=self.path) + self.ref_count = 1 + logger.info("checked out revision %s into %s", rev, self.path) + + def dispose(self): + logger.info("disposing of checkout in folder %s", self.path) + shutil.rmtree(self.path) + + +class GitBackend: + def __init__(self, root): + # lazy import - make dependency optional + import pygit2 + + self.git = pygit2.Repository(root) + self.checkouts = dict() + + def get_head_rev(self): + return str(self.git.head.target) + + def request_rev(self, rev): + if rev in self.checkouts: + co = self.checkouts[rev] + co.ref_count += 1 + else: + co = _GitCheckout(self.git, rev) + self.checkouts[rev] = co + return co.path + + def release_rev(self, rev): + co = self.checkouts[rev] + co.ref_count -= 1 + if not co.ref_count: + co.dispose() + del self.checkouts[rev] diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 93afb0508..56751a926 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -47,11 +47,12 @@ def _mk_worker_method(name): class Run: def __init__(self, rid, pipeline_name, - expid, priority, due_date, flush, + wd, expid, priority, due_date, flush, worker_handlers, notifier): # called through pool self.rid = rid self.pipeline_name = pipeline_name + self.wd = wd self.expid = expid self.priority = priority self.due_date = due_date @@ -103,7 +104,8 @@ class Run: @asyncio.coroutine def build(self): - yield from self._build(self.rid, self.pipeline_name, self.expid, + yield from self._build(self.rid, self.pipeline_name, + self.wd, self.expid, self.priority) prepare = _mk_worker_method("prepare") @@ -124,18 +126,26 @@ class RIDCounter: class RunPool: - def __init__(self, ridc, worker_handlers, notifier): + def __init__(self, ridc, worker_handlers, notifier, repo_backend): self.runs = dict() self.submitted_cb = None self._ridc = ridc self._worker_handlers = worker_handlers self._notifier = notifier + self._repo_backend = repo_backend def submit(self, expid, priority, due_date, flush, pipeline_name): + # mutates expid to insert head repository revision if None # called through scheduler rid = self._ridc.get() - run = Run(rid, pipeline_name, expid, priority, due_date, flush, + if "repo_rev" in expid: + if expid["repo_rev"] is None: + expid["repo_rev"] = self._repo_backend.get_head_rev() + wd = self._repo_backend.request_rev(expid["repo_rev"]) + else: + wd = None + run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush, self._worker_handlers, self._notifier) self.runs[rid] = run if self.submitted_cb is not None: @@ -147,7 +157,10 @@ class RunPool: # called through deleter if rid not in self.runs: return - yield from self.runs[rid].close() + run = self.runs[rid] + yield from run.close() + if "repo_rev" in run.expid: + self._repo_backend.release_rev(run.expid["repo_rev"]) del self.runs[rid] @@ -280,12 +293,12 @@ class AnalyzeStage(TaskObject): class Pipeline: - def __init__(self, ridc, deleter, worker_handlers, notifier): + def __init__(self, ridc, deleter, worker_handlers, notifier, repo_backend): flush_tracker = WaitSet() def delete_cb(rid): deleter.delete(rid) flush_tracker.discard(rid) - self.pool = RunPool(ridc, worker_handlers, notifier) + self.pool = RunPool(ridc, worker_handlers, notifier, repo_backend) self._prepare = PrepareStage(flush_tracker, delete_cb, self.pool, asyncio.Queue(maxsize=1)) self._run = RunStage(delete_cb, @@ -348,11 +361,12 @@ class Deleter(TaskObject): class Scheduler: - def __init__(self, next_rid, worker_handlers): + def __init__(self, next_rid, worker_handlers, repo_backend): self.notifier = Notifier(dict()) self._pipelines = dict() self._worker_handlers = worker_handlers + self._repo_backend = repo_backend self._terminated = False self._ridc = RIDCounter(next_rid) @@ -374,6 +388,7 @@ class Scheduler: logger.warning("some pipelines were not garbage-collected") def submit(self, pipeline_name, expid, priority, due_date, flush): + # mutates expid to insert head repository revision if None if self._terminated: return try: @@ -381,7 +396,8 @@ class Scheduler: except KeyError: logger.debug("creating pipeline '%s'", pipeline_name) pipeline = Pipeline(self._ridc, self._deleter, - self._worker_handlers, self.notifier) + self._worker_handlers, self.notifier, + self._repo_backend) self._pipelines[pipeline_name] = pipeline pipeline.start() return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name) diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 919906ca2..100b4e4ee 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -209,13 +209,14 @@ class Worker: return completed @asyncio.coroutine - def build(self, rid, pipeline_name, expid, priority, timeout=15.0): + def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0): self.rid = rid yield from self._create_process() yield from self._worker_action( {"action": "build", "rid": rid, "pipeline_name": pipeline_name, + "wd": wd, "expid": expid, "priority": priority}, timeout) diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index c0e10fe45..6fea52513 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -1,5 +1,6 @@ import sys import time +import os from artiq.protocols import pyon from artiq.tools import file_import @@ -44,8 +45,6 @@ def make_parent_action(action, argnames, exception=ParentActionError): return parent_action - - class LogForwarder: def __init__(self): self.buffer = "" @@ -175,7 +174,12 @@ def main(): start_time = time.localtime() rid = obj["rid"] expid = obj["expid"] - exp = get_exp(expid["file"], expid["class_name"]) + if obj["wd"] is not None: + # Using repository + expf = os.path.join(obj["wd"], expid["file"]) + else: + expf = expid["file"] + exp = get_exp(expf, expid["class_name"]) dmgr.virtual_devices["scheduler"].set_run_info( obj["pipeline_name"], expid, obj["priority"]) exp_inst = exp(dmgr, ParentPDB, rdb, @@ -194,6 +198,11 @@ def main(): f = get_hdf5_output(start_time, rid, exp.__name__) try: rdb.write_hdf5(f) + if "repo_rev" in expid: + rr = expid["repo_rev"] + dtype = "S{}".format(len(rr)) + dataset = f.create_dataset("repo_rev", (), dtype) + dataset[()] = rr.encode() finally: f.close() put_object({"action": "completed"}) diff --git a/artiq/test/scheduler.py b/artiq/test/scheduler.py index 9c1b2717e..60bbd83c3 100644 --- a/artiq/test/scheduler.py +++ b/artiq/test/scheduler.py @@ -67,7 +67,7 @@ class SchedulerCase(unittest.TestCase): def test_steps(self): loop = self.loop - scheduler = Scheduler(0, _handlers) + scheduler = Scheduler(0, _handlers, None) expid = _get_expid("EmptyExperiment") expect = _get_basic_steps(1, expid) @@ -102,7 +102,7 @@ class SchedulerCase(unittest.TestCase): def test_pause(self): loop = self.loop - scheduler = Scheduler(0, _handlers) + scheduler = Scheduler(0, _handlers, None) expid_bg = _get_expid("BackgroundExperiment") expid = _get_expid("EmptyExperiment") @@ -133,7 +133,7 @@ class SchedulerCase(unittest.TestCase): def test_flush(self): loop = self.loop - scheduler = Scheduler(0, _handlers) + scheduler = Scheduler(0, _handlers, None) expid = _get_expid("EmptyExperiment") expect = _get_basic_steps(1, expid, 1, True) diff --git a/artiq/test/worker.py b/artiq/test/worker.py index b40e7b6c8..abb321e27 100644 --- a/artiq/test/worker.py +++ b/artiq/test/worker.py @@ -38,7 +38,7 @@ class WatchdogTimeoutInBuild(EnvExperiment): @asyncio.coroutine def _call_worker(worker, expid): try: - yield from worker.build(0, "main", expid, 0) + yield from worker.build(0, "main", None, expid, 0) yield from worker.prepare() yield from worker.run() yield from worker.analyze() diff --git a/setup.py b/setup.py index 1936af587..763320836 100755 --- a/setup.py +++ b/setup.py @@ -4,6 +4,7 @@ from setuptools import setup, find_packages, Command import sys import os + if sys.version_info[:3] < (3, 4, 3): raise Exception("You need at least Python 3.4.3 to run ARTIQ") @@ -20,7 +21,7 @@ class PushDocCommand(Command): requirements = [ "sphinx", "sphinx-argparse", "pyserial", "numpy", "scipy", "python-dateutil", "prettytable", "h5py", "pydaqmx", "pyelftools", - "quamash", "pyqtgraph", "llvmlite_artiq" + "quamash", "pyqtgraph", "llvmlite_artiq", "pygit2" ] scripts = [ @@ -63,5 +64,5 @@ setup( entry_points={ "console_scripts": scripts, }, - cmdclass={"push_doc":PushDocCommand} + cmdclass={"push_doc": PushDocCommand} )