From 4ddd2739eed1775db0bbfce77f5eb06b73bfdc32 Mon Sep 17 00:00:00 2001 From: kk1050 <103404672+kk1050@users.noreply.github.com> Date: Mon, 6 Jun 2022 18:41:46 +0800 Subject: [PATCH] add log_tuples function (#1896) Co-authored-by: kk105 --- artiq/frontend/artiq_master.py | 5 +++-- artiq/master/scheduler.py | 21 ++++++++++++++++----- artiq/test/test_scheduler.py | 10 +++++----- 3 files changed, 24 insertions(+), 12 deletions(-) diff --git a/artiq/frontend/artiq_master.py b/artiq/frontend/artiq_master.py index 42a7a0a98..99c2fc9f1 100755 --- a/artiq/frontend/artiq_master.py +++ b/artiq/frontend/artiq_master.py @@ -53,12 +53,13 @@ def get_argparser(): "--experiment-subdir", default="", help=("path to the experiment folder from the repository root " "(default: '%(default)s')")) - log_args(parser) parser.add_argument("--name", help="friendly name, displayed in dashboards " "to identify master instead of server address") + parser.add_argument("--log-submissions", default=None, + help="set the filename to create the experiment subimission") return parser @@ -111,7 +112,7 @@ def main(): repo_backend, worker_handlers, args.experiment_subdir) atexit.register(experiment_db.close) - scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db) + scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db, args.log_submissions) scheduler.start() atexit_register_coroutine(scheduler.stop) diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index d978fa2f9..d6d44acef 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -1,5 +1,6 @@ import asyncio import logging +import csv from enum import Enum from time import time @@ -113,7 +114,7 @@ class Run: class RunPool: - def __init__(self, ridc, worker_handlers, notifier, experiment_db): + def __init__(self, ridc, worker_handlers, notifier, experiment_db, log_submissions): self.runs = dict() self.state_changed = Condition() @@ -121,6 +122,13 @@ class RunPool: self.worker_handlers = worker_handlers self.notifier = notifier self.experiment_db = experiment_db + self.log_submissions = log_submissions + + def log_submission(self, rid, expid): + start_time = time() + with open(self.log_submissions, 'a', newline='') as f: + writer = csv.writer(f) + writer.writerow([rid, start_time, expid["file"]]) def submit(self, expid, priority, due_date, flush, pipeline_name): # mutates expid to insert head repository revision if None. @@ -135,6 +143,8 @@ class RunPool: wd, repo_msg = None, None run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush, self, repo_msg=repo_msg) + if self.log_submissions is not None: + self.log_submission(rid, expid) self.runs[rid] = run self.state_changed.notify() return rid @@ -311,8 +321,8 @@ class AnalyzeStage(TaskObject): class Pipeline: - def __init__(self, ridc, deleter, worker_handlers, notifier, experiment_db): - self.pool = RunPool(ridc, worker_handlers, notifier, experiment_db) + def __init__(self, ridc, deleter, worker_handlers, notifier, experiment_db, log_submissions): + self.pool = RunPool(ridc, worker_handlers, notifier, experiment_db, log_submissions) self._prepare = PrepareStage(self.pool, deleter.delete) self._run = RunStage(self.pool, deleter.delete) self._analyze = AnalyzeStage(self.pool, deleter.delete) @@ -383,7 +393,7 @@ class Deleter(TaskObject): class Scheduler: - def __init__(self, ridc, worker_handlers, experiment_db): + def __init__(self, ridc, worker_handlers, experiment_db, log_submissions): self.notifier = Notifier(dict()) self._pipelines = dict() @@ -393,6 +403,7 @@ class Scheduler: self._ridc = ridc self._deleter = Deleter(self._pipelines) + self._log_submissions = log_submissions def start(self): self._deleter.start() @@ -423,7 +434,7 @@ class Scheduler: logger.debug("creating pipeline '%s'", pipeline_name) pipeline = Pipeline(self._ridc, self._deleter, self._worker_handlers, self.notifier, - self._experiment_db) + self._experiment_db, self._log_submissions) self._pipelines[pipeline_name] = pipeline pipeline.start() return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name) diff --git a/artiq/test/test_scheduler.py b/artiq/test/test_scheduler.py index b4327e72e..854f17a93 100644 --- a/artiq/test/test_scheduler.py +++ b/artiq/test/test_scheduler.py @@ -90,7 +90,7 @@ class SchedulerCase(unittest.TestCase): def test_steps(self): loop = self.loop - scheduler = Scheduler(_RIDCounter(0), dict(), None) + scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") expect = _get_basic_steps(1, expid) @@ -129,7 +129,7 @@ class SchedulerCase(unittest.TestCase): prepare.""" loop = self.loop handlers = {} - scheduler = Scheduler(_RIDCounter(0), handlers, None) + scheduler = Scheduler(_RIDCounter(0), handlers, None, None) handlers["scheduler_check_pause"] = scheduler.check_pause expid_empty = _get_expid("EmptyExperiment") @@ -293,7 +293,7 @@ class SchedulerCase(unittest.TestCase): handlers = { "update_dataset": check_termination } - scheduler = Scheduler(_RIDCounter(0), handlers, None) + scheduler = Scheduler(_RIDCounter(0), handlers, None, None) expid_bg = _get_expid("BackgroundExperiment") expid = _get_expid("EmptyExperiment") @@ -351,7 +351,7 @@ class SchedulerCase(unittest.TestCase): """Check scheduler exits with experiments still running""" loop = self.loop - scheduler = Scheduler(_RIDCounter(0), {}, None) + scheduler = Scheduler(_RIDCounter(0), {}, None, None) expid_bg = _get_expid("BackgroundExperiment") # Suppress the SystemExit backtrace when worker process is killed. @@ -392,7 +392,7 @@ class SchedulerCase(unittest.TestCase): def test_flush(self): loop = self.loop - scheduler = Scheduler(_RIDCounter(0), dict(), None) + scheduler = Scheduler(_RIDCounter(0), dict(), None, None) expid = _get_expid("EmptyExperiment") expect = _get_basic_steps(1, expid, 1, True)