forked from M-Labs/artiq
1
0
Fork 0

add log_tuples function (#1896)

Co-authored-by: kk105 <kkl@m-kabs.hk>
This commit is contained in:
kk1050 2022-06-06 18:41:46 +08:00 committed by GitHub
parent e702624720
commit 4ddd2739ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 24 additions and 12 deletions

View File

@ -53,12 +53,13 @@ def get_argparser():
"--experiment-subdir", default="", "--experiment-subdir", default="",
help=("path to the experiment folder from the repository root " help=("path to the experiment folder from the repository root "
"(default: '%(default)s')")) "(default: '%(default)s')"))
log_args(parser) log_args(parser)
parser.add_argument("--name", parser.add_argument("--name",
help="friendly name, displayed in dashboards " help="friendly name, displayed in dashboards "
"to identify master instead of server address") "to identify master instead of server address")
parser.add_argument("--log-submissions", default=None,
help="set the filename to create the experiment subimission")
return parser return parser
@ -111,7 +112,7 @@ def main():
repo_backend, worker_handlers, args.experiment_subdir) repo_backend, worker_handlers, args.experiment_subdir)
atexit.register(experiment_db.close) atexit.register(experiment_db.close)
scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db) scheduler = Scheduler(RIDCounter(), worker_handlers, experiment_db, args.log_submissions)
scheduler.start() scheduler.start()
atexit_register_coroutine(scheduler.stop) atexit_register_coroutine(scheduler.stop)

View File

@ -1,5 +1,6 @@
import asyncio import asyncio
import logging import logging
import csv
from enum import Enum from enum import Enum
from time import time from time import time
@ -113,7 +114,7 @@ class Run:
class RunPool: class RunPool:
def __init__(self, ridc, worker_handlers, notifier, experiment_db): def __init__(self, ridc, worker_handlers, notifier, experiment_db, log_submissions):
self.runs = dict() self.runs = dict()
self.state_changed = Condition() self.state_changed = Condition()
@ -121,6 +122,13 @@ class RunPool:
self.worker_handlers = worker_handlers self.worker_handlers = worker_handlers
self.notifier = notifier self.notifier = notifier
self.experiment_db = experiment_db self.experiment_db = experiment_db
self.log_submissions = log_submissions
def log_submission(self, rid, expid):
start_time = time()
with open(self.log_submissions, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([rid, start_time, expid["file"]])
def submit(self, expid, priority, due_date, flush, pipeline_name): def submit(self, expid, priority, due_date, flush, pipeline_name):
# mutates expid to insert head repository revision if None. # mutates expid to insert head repository revision if None.
@ -135,6 +143,8 @@ class RunPool:
wd, repo_msg = None, None wd, repo_msg = None, None
run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush, run = Run(rid, pipeline_name, wd, expid, priority, due_date, flush,
self, repo_msg=repo_msg) self, repo_msg=repo_msg)
if self.log_submissions is not None:
self.log_submission(rid, expid)
self.runs[rid] = run self.runs[rid] = run
self.state_changed.notify() self.state_changed.notify()
return rid return rid
@ -311,8 +321,8 @@ class AnalyzeStage(TaskObject):
class Pipeline: class Pipeline:
def __init__(self, ridc, deleter, worker_handlers, notifier, experiment_db): def __init__(self, ridc, deleter, worker_handlers, notifier, experiment_db, log_submissions):
self.pool = RunPool(ridc, worker_handlers, notifier, experiment_db) self.pool = RunPool(ridc, worker_handlers, notifier, experiment_db, log_submissions)
self._prepare = PrepareStage(self.pool, deleter.delete) self._prepare = PrepareStage(self.pool, deleter.delete)
self._run = RunStage(self.pool, deleter.delete) self._run = RunStage(self.pool, deleter.delete)
self._analyze = AnalyzeStage(self.pool, deleter.delete) self._analyze = AnalyzeStage(self.pool, deleter.delete)
@ -383,7 +393,7 @@ class Deleter(TaskObject):
class Scheduler: class Scheduler:
def __init__(self, ridc, worker_handlers, experiment_db): def __init__(self, ridc, worker_handlers, experiment_db, log_submissions):
self.notifier = Notifier(dict()) self.notifier = Notifier(dict())
self._pipelines = dict() self._pipelines = dict()
@ -393,6 +403,7 @@ class Scheduler:
self._ridc = ridc self._ridc = ridc
self._deleter = Deleter(self._pipelines) self._deleter = Deleter(self._pipelines)
self._log_submissions = log_submissions
def start(self): def start(self):
self._deleter.start() self._deleter.start()
@ -423,7 +434,7 @@ class Scheduler:
logger.debug("creating pipeline '%s'", pipeline_name) logger.debug("creating pipeline '%s'", pipeline_name)
pipeline = Pipeline(self._ridc, self._deleter, pipeline = Pipeline(self._ridc, self._deleter,
self._worker_handlers, self.notifier, self._worker_handlers, self.notifier,
self._experiment_db) self._experiment_db, self._log_submissions)
self._pipelines[pipeline_name] = pipeline self._pipelines[pipeline_name] = pipeline
pipeline.start() pipeline.start()
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name) return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)

View File

@ -90,7 +90,7 @@ class SchedulerCase(unittest.TestCase):
def test_steps(self): def test_steps(self):
loop = self.loop loop = self.loop
scheduler = Scheduler(_RIDCounter(0), dict(), None) scheduler = Scheduler(_RIDCounter(0), dict(), None, None)
expid = _get_expid("EmptyExperiment") expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid) expect = _get_basic_steps(1, expid)
@ -129,7 +129,7 @@ class SchedulerCase(unittest.TestCase):
prepare.""" prepare."""
loop = self.loop loop = self.loop
handlers = {} handlers = {}
scheduler = Scheduler(_RIDCounter(0), handlers, None) scheduler = Scheduler(_RIDCounter(0), handlers, None, None)
handlers["scheduler_check_pause"] = scheduler.check_pause handlers["scheduler_check_pause"] = scheduler.check_pause
expid_empty = _get_expid("EmptyExperiment") expid_empty = _get_expid("EmptyExperiment")
@ -293,7 +293,7 @@ class SchedulerCase(unittest.TestCase):
handlers = { handlers = {
"update_dataset": check_termination "update_dataset": check_termination
} }
scheduler = Scheduler(_RIDCounter(0), handlers, None) scheduler = Scheduler(_RIDCounter(0), handlers, None, None)
expid_bg = _get_expid("BackgroundExperiment") expid_bg = _get_expid("BackgroundExperiment")
expid = _get_expid("EmptyExperiment") expid = _get_expid("EmptyExperiment")
@ -351,7 +351,7 @@ class SchedulerCase(unittest.TestCase):
"""Check scheduler exits with experiments still running""" """Check scheduler exits with experiments still running"""
loop = self.loop loop = self.loop
scheduler = Scheduler(_RIDCounter(0), {}, None) scheduler = Scheduler(_RIDCounter(0), {}, None, None)
expid_bg = _get_expid("BackgroundExperiment") expid_bg = _get_expid("BackgroundExperiment")
# Suppress the SystemExit backtrace when worker process is killed. # Suppress the SystemExit backtrace when worker process is killed.
@ -392,7 +392,7 @@ class SchedulerCase(unittest.TestCase):
def test_flush(self): def test_flush(self):
loop = self.loop loop = self.loop
scheduler = Scheduler(_RIDCounter(0), dict(), None) scheduler = Scheduler(_RIDCounter(0), dict(), None, None)
expid = _get_expid("EmptyExperiment") expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid, 1, True) expect = _get_basic_steps(1, expid, 1, True)