Git support

This commit is contained in:
Sebastien Bourdeauducq 2015-08-07 15:51:56 +08:00
parent 968760d48f
commit 7ed8fe57fa
11 changed files with 167 additions and 43 deletions

View File

@ -42,6 +42,12 @@ def get_argparser():
parser_add.add_argument("-f", "--flush", default=False, action="store_true",
help="flush the pipeline before preparing "
"the experiment")
parser_add.add_argument("-R", "--repository", default=False,
action="store_true",
help="use the experiment repository")
parser_add.add_argument("-r", "--revision", default=None,
help="use a specific repository revision "
"(defaults to head, ignored without -R)")
parser_add.add_argument("-c", "--class-name", default=None,
help="name of the class to run")
parser_add.add_argument("file",
@ -81,8 +87,8 @@ def get_argparser():
"what",
help="select object to show: schedule/devices/parameters")
parser_scan_repository = subparsers.add_parser(
"scan-repository", help="rescan repository")
subparsers.add_parser("scan-repository",
help="trigger a repository rescan")
return parser
@ -107,6 +113,8 @@ def _action_submit(remote, args):
"class_name": args.class_name,
"arguments": arguments,
}
if args.repository:
expid["repo_rev"] = args.revision
if args.timed is None:
due_date = None
else:
@ -148,7 +156,7 @@ def _show_schedule(schedule):
x[1]["due_date"] or 0,
x[0]))
table = PrettyTable(["RID", "Pipeline", " Status ", "Prio",
"Due date", "File", "Class name"])
"Due date", "Revision", "File", "Class name"])
for rid, v in l:
row = [rid, v["pipeline"], v["status"], v["priority"]]
if v["due_date"] is None:
@ -156,11 +164,16 @@ def _show_schedule(schedule):
else:
row.append(time.strftime("%m/%d %H:%M:%S",
time.localtime(v["due_date"])))
row.append(v["expid"]["file"])
if v["expid"]["class_name"] is None:
expid = v["expid"]
if "repo_rev" in expid:
row.append(expid["repo_rev"])
else:
row.append("Outside repo.")
row.append(expid["file"])
if expid["class_name"] is None:
row.append("")
else:
row.append(v["expid"]["class_name"])
row.append(expid["class_name"])
table.add_row(row)
print(table)
else:

View File

@ -10,7 +10,7 @@ from artiq.protocols.sync_struct import Notifier, Publisher, process_mod
from artiq.protocols.file_db import FlatFileDB
from artiq.master.scheduler import Scheduler
from artiq.master.worker_db import get_last_rid
from artiq.master.repository import Repository
from artiq.master.repository import FilesystemBackend, GitBackend, Repository
from artiq.tools import verbosity_args, init_logger
@ -26,6 +26,13 @@ def get_argparser():
group.add_argument(
"--port-control", default=3251, type=int,
help="TCP port to listen to for control (default: %(default)d)")
group = parser.add_argument_group("repository")
group.add_argument(
"-g", "--git", default=False, action="store_true",
help="use the Git repository backend")
group.add_argument(
"-r", "--repository", default="repository",
help="path to the repository (default: '%(default)s')")
verbosity_args(parser)
return parser
@ -57,6 +64,13 @@ def main():
rtr = Notifier(dict())
log = Log(1000)
if args.git:
repo_backend = GitBackend(args.repository)
else:
repo_backend = FilesystemBackend(args.repository)
repository = Repository(repo_backend, log.log)
repository.scan_async()
worker_handlers = {
"get_device": ddb.get,
"get_parameter": pdb.get,
@ -64,14 +78,11 @@ def main():
"update_rt_results": lambda mod: process_mod(rtr, mod),
"log": log.log
}
scheduler = Scheduler(get_last_rid() + 1, worker_handlers)
scheduler = Scheduler(get_last_rid() + 1, worker_handlers, repo_backend)
worker_handlers["scheduler_submit"] = scheduler.submit
scheduler.start()
atexit.register(lambda: loop.run_until_complete(scheduler.stop()))
repository = Repository(log.log)
repository.scan_async()
server_control = Server({
"master_ddb": ddb,
"master_pdb": pdb,

View File

@ -256,6 +256,7 @@ class ExplorerDock(dockarea.Dock):
def submit(self, pipeline_name, file, class_name, arguments,
priority, due_date, flush):
expid = {
"repo_rev": None,
"file": file,
"class_name": class_name,
"arguments": arguments,

View File

@ -12,7 +12,7 @@ class _ScheduleModel(DictSyncModel):
def __init__(self, parent, init):
DictSyncModel.__init__(self,
["RID", "Pipeline", "Status", "Prio", "Due date",
"File", "Class name"],
"Revision", "File", "Class name"],
parent, init)
def sort_key(self, k, v):
@ -35,8 +35,14 @@ class _ScheduleModel(DictSyncModel):
return time.strftime("%m/%d %H:%M:%S",
time.localtime(v["due_date"]))
elif column == 5:
return v["expid"]["file"]
expid = v["expid"]
if "repo_rev" in expid:
return expid["repo_rev"]
else:
return "Outside repo."
elif column == 6:
return v["expid"]["file"]
elif column == 7:
if v["expid"]["class_name"] is None:
return ""
else:

View File

@ -1,6 +1,8 @@
import os
import logging
import asyncio
import os
import tempfile
import shutil
import logging
from artiq.protocols.sync_struct import Notifier
from artiq.master.worker import Worker
@ -10,15 +12,14 @@ logger = logging.getLogger(__name__)
@asyncio.coroutine
def _scan_experiments(log):
def _scan_experiments(wd, log):
r = dict()
for f in os.listdir("repository"):
for f in os.listdir(wd):
if f.endswith(".py"):
try:
full_name = os.path.join("repository", f)
worker = Worker({"log": lambda message: log("scan", message)})
try:
description = yield from worker.examine(full_name)
description = yield from worker.examine(os.path.join(wd, f))
finally:
yield from worker.close()
for class_name, class_desc in description.items():
@ -32,7 +33,7 @@ def _scan_experiments(log):
name = basename + str(i)
i += 1
entry = {
"file": full_name,
"file": f,
"class_name": class_name,
"arguments": arguments
}
@ -52,19 +53,84 @@ def _sync_explist(target, source):
class Repository:
def __init__(self, log_fn):
self.explist = Notifier(dict())
self._scanning = False
def __init__(self, backend, log_fn):
self.backend = backend
self.log_fn = log_fn
self.head_rev = self.backend.get_head_rev()
self.backend.request_rev(self.head_rev)
self.explist = Notifier(dict())
self._scanning = False
@asyncio.coroutine
def scan(self):
if self._scanning:
return
self._scanning = True
new_explist = yield from _scan_experiments(self.log_fn)
new_head_rev = self.backend.get_head_rev()
wd = self.backend.request_rev(new_head_rev)
self.backend.release_rev(self.head_rev)
self.head_rev = new_head_rev
new_explist = yield from _scan_experiments(wd, self.log_fn)
_sync_explist(self.explist, new_explist)
self._scanning = False
def scan_async(self):
asyncio.async(self.scan())
class FilesystemBackend:
def __init__(self, root):
self.root = os.path.abspath(root)
def get_head_rev(self):
return "N/A"
def request_rev(self, rev):
return self.root
def release_rev(self, rev):
pass
class _GitCheckout:
def __init__(self, git, rev):
self.path = tempfile.mkdtemp()
git.checkout_tree(git.get(rev), directory=self.path)
self.ref_count = 1
logger.info("checked out revision %s into %s", rev, self.path)
def dispose(self):
logger.info("disposing of checkout in folder %s", self.path)
shutil.rmtree(self.path)
class GitBackend:
def __init__(self, root):
# lazy import - make dependency optional
import pygit2
self.git = pygit2.Repository(root)
self.checkouts = dict()
def get_head_rev(self):
return str(self.git.head.target)
def request_rev(self, rev):
if rev in self.checkouts:
co = self.checkouts[rev]
co.ref_count += 1
else:
co = _GitCheckout(self.git, rev)
self.checkouts[rev] = co
return co.path
def release_rev(self, rev):
co = self.checkouts[rev]
co.ref_count -= 1
if not co.ref_count:
co.dispose()
del self.checkouts[rev]

View File

@ -47,11 +47,12 @@ def _mk_worker_method(name):
class Run:
def __init__(self, rid, pipeline_name,
expid, priority, due_date, flush,
wd, expid, priority, due_date, flush,
worker_handlers, notifier):
# called through pool
self.rid = rid
self.pipeline_name = pipeline_name
self.wd = wd
self.expid = expid
self.priority = priority
self.due_date = due_date
@ -103,7 +104,8 @@ class Run:
@asyncio.coroutine
def build(self):
yield from self._build(self.rid, self.pipeline_name, self.expid,
yield from self._build(self.rid, self.pipeline_name,
self.wd, self.expid,
self.priority)
prepare = _mk_worker_method("prepare")
@ -124,18 +126,26 @@ class RIDCounter:
class RunPool:
def __init__(self, ridc, worker_handlers, notifier):
def __init__(self, ridc, worker_handlers, notifier, repo_backend):
self.runs = dict()
self.submitted_cb = None
self._ridc = ridc
self._worker_handlers = worker_handlers
self._notifier = notifier
self._repo_backend = repo_backend
def submit(self, expid, priority, due_date, flush, pipeline_name):
# mutates expid to insert head repository revision if None
# called through scheduler
rid = self._ridc.get()
run = Run(rid, pipeline_name, expid, priority, due_date, flush,
if "repo_rev" in expid:
if expid["repo_rev"] is None:
expid["repo_rev"] = self._repo_backend.get_head_rev()
wd = self._repo_backend.request_rev(expid["repo_rev"])
else:
wd = None
run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush,
self._worker_handlers, self._notifier)
self.runs[rid] = run
if self.submitted_cb is not None:
@ -147,7 +157,10 @@ class RunPool:
# called through deleter
if rid not in self.runs:
return
yield from self.runs[rid].close()
run = self.runs[rid]
yield from run.close()
if "repo_rev" in run.expid:
self._repo_backend.release_rev(run.expid["repo_rev"])
del self.runs[rid]
@ -280,12 +293,12 @@ class AnalyzeStage(TaskObject):
class Pipeline:
def __init__(self, ridc, deleter, worker_handlers, notifier):
def __init__(self, ridc, deleter, worker_handlers, notifier, repo_backend):
flush_tracker = WaitSet()
def delete_cb(rid):
deleter.delete(rid)
flush_tracker.discard(rid)
self.pool = RunPool(ridc, worker_handlers, notifier)
self.pool = RunPool(ridc, worker_handlers, notifier, repo_backend)
self._prepare = PrepareStage(flush_tracker, delete_cb,
self.pool, asyncio.Queue(maxsize=1))
self._run = RunStage(delete_cb,
@ -348,11 +361,12 @@ class Deleter(TaskObject):
class Scheduler:
def __init__(self, next_rid, worker_handlers):
def __init__(self, next_rid, worker_handlers, repo_backend):
self.notifier = Notifier(dict())
self._pipelines = dict()
self._worker_handlers = worker_handlers
self._repo_backend = repo_backend
self._terminated = False
self._ridc = RIDCounter(next_rid)
@ -374,6 +388,7 @@ class Scheduler:
logger.warning("some pipelines were not garbage-collected")
def submit(self, pipeline_name, expid, priority, due_date, flush):
# mutates expid to insert head repository revision if None
if self._terminated:
return
try:
@ -381,7 +396,8 @@ class Scheduler:
except KeyError:
logger.debug("creating pipeline '%s'", pipeline_name)
pipeline = Pipeline(self._ridc, self._deleter,
self._worker_handlers, self.notifier)
self._worker_handlers, self.notifier,
self._repo_backend)
self._pipelines[pipeline_name] = pipeline
pipeline.start()
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)

View File

@ -209,13 +209,14 @@ class Worker:
return completed
@asyncio.coroutine
def build(self, rid, pipeline_name, expid, priority, timeout=15.0):
def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0):
self.rid = rid
yield from self._create_process()
yield from self._worker_action(
{"action": "build",
"rid": rid,
"pipeline_name": pipeline_name,
"wd": wd,
"expid": expid,
"priority": priority},
timeout)

View File

@ -1,5 +1,6 @@
import sys
import time
import os
from artiq.protocols import pyon
from artiq.tools import file_import
@ -44,8 +45,6 @@ def make_parent_action(action, argnames, exception=ParentActionError):
return parent_action
class LogForwarder:
def __init__(self):
self.buffer = ""
@ -175,7 +174,12 @@ def main():
start_time = time.localtime()
rid = obj["rid"]
expid = obj["expid"]
exp = get_exp(expid["file"], expid["class_name"])
if obj["wd"] is not None:
# Using repository
expf = os.path.join(obj["wd"], expid["file"])
else:
expf = expid["file"]
exp = get_exp(expf, expid["class_name"])
dmgr.virtual_devices["scheduler"].set_run_info(
obj["pipeline_name"], expid, obj["priority"])
exp_inst = exp(dmgr, ParentPDB, rdb,
@ -194,6 +198,11 @@ def main():
f = get_hdf5_output(start_time, rid, exp.__name__)
try:
rdb.write_hdf5(f)
if "repo_rev" in expid:
rr = expid["repo_rev"]
dtype = "S{}".format(len(rr))
dataset = f.create_dataset("repo_rev", (), dtype)
dataset[()] = rr.encode()
finally:
f.close()
put_object({"action": "completed"})

View File

@ -67,7 +67,7 @@ class SchedulerCase(unittest.TestCase):
def test_steps(self):
loop = self.loop
scheduler = Scheduler(0, _handlers)
scheduler = Scheduler(0, _handlers, None)
expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid)
@ -102,7 +102,7 @@ class SchedulerCase(unittest.TestCase):
def test_pause(self):
loop = self.loop
scheduler = Scheduler(0, _handlers)
scheduler = Scheduler(0, _handlers, None)
expid_bg = _get_expid("BackgroundExperiment")
expid = _get_expid("EmptyExperiment")
@ -133,7 +133,7 @@ class SchedulerCase(unittest.TestCase):
def test_flush(self):
loop = self.loop
scheduler = Scheduler(0, _handlers)
scheduler = Scheduler(0, _handlers, None)
expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid, 1, True)

View File

@ -38,7 +38,7 @@ class WatchdogTimeoutInBuild(EnvExperiment):
@asyncio.coroutine
def _call_worker(worker, expid):
try:
yield from worker.build(0, "main", expid, 0)
yield from worker.build(0, "main", None, expid, 0)
yield from worker.prepare()
yield from worker.run()
yield from worker.analyze()

View File

@ -4,6 +4,7 @@ from setuptools import setup, find_packages, Command
import sys
import os
if sys.version_info[:3] < (3, 4, 3):
raise Exception("You need at least Python 3.4.3 to run ARTIQ")
@ -20,7 +21,7 @@ class PushDocCommand(Command):
requirements = [
"sphinx", "sphinx-argparse", "pyserial", "numpy", "scipy",
"python-dateutil", "prettytable", "h5py", "pydaqmx", "pyelftools",
"quamash", "pyqtgraph", "llvmlite_artiq"
"quamash", "pyqtgraph", "llvmlite_artiq", "pygit2"
]
scripts = [
@ -63,5 +64,5 @@ setup(
entry_points={
"console_scripts": scripts,
},
cmdclass={"push_doc":PushDocCommand}
cmdclass={"push_doc": PushDocCommand}
)