scheduler: default submission arguments, closes #577

This commit is contained in:
Sebastien Bourdeauducq 2016-10-18 13:49:43 +08:00
parent 0e41725e2d
commit 1908339d4e
4 changed files with 29 additions and 19 deletions

View File

@ -52,8 +52,7 @@ class FloppingF(EnvExperiment):
self.mutate_dataset("flopping_f_frequency", i, f) self.mutate_dataset("flopping_f_frequency", i, f)
self.mutate_dataset("flopping_f_brightness", i, m_brightness) self.mutate_dataset("flopping_f_brightness", i, m_brightness)
time.sleep(0.1) time.sleep(0.1)
self.scheduler.submit(self.scheduler.pipeline_name, self.scheduler.expid, self.scheduler.submit(due_date=time.time() + 20)
self.scheduler.priority, time.time() + 20, False)
def analyze(self): def analyze(self):
# Use get_dataset so that analyze can be run stand-alone. # Use get_dataset so that analyze can be run stand-alone.

View File

@ -95,7 +95,7 @@ class DummyScheduler:
self._next_rid = 1 self._next_rid = 1
def submit(self, pipeline_name, expid, priority, due_date, flush): def submit(self, pipeline_name=None, expid=None, priority=None, due_date=None, flush=False):
rid = self._next_rid rid = self._next_rid
self._next_rid += 1 self._next_rid += 1
logger.info("Submitting: %s, RID=%s", expid, rid) logger.info("Submitting: %s, RID=%s", expid, rid)

View File

@ -402,8 +402,12 @@ class Scheduler:
if self._pipelines: if self._pipelines:
logger.warning("some pipelines were not garbage-collected") logger.warning("some pipelines were not garbage-collected")
def submit(self, pipeline_name, expid, priority, due_date, flush): def submit(self, pipeline_name, expid, priority=0, due_date=None, flush=False):
"""Submits a new run.""" """Submits a new run.
When called through an experiment, the default values of
``pipeline_name``, ``expid`` and ``priority`` correspond to those of
the current run."""
# mutates expid to insert head repository revision if None # mutates expid to insert head repository revision if None
if self._terminated: if self._terminated:
return return

View File

@ -79,32 +79,39 @@ set_watchdog_factory(Watchdog)
class Scheduler: class Scheduler:
pause_noexc = staticmethod(make_parent_action("pause"))
@host_only
def pause(self):
if self.pause_noexc():
raise TerminationRequested
submit = staticmethod(make_parent_action("scheduler_submit"))
delete = staticmethod(make_parent_action("scheduler_delete"))
request_termination = staticmethod(
make_parent_action("scheduler_request_termination"))
get_status = staticmethod(make_parent_action("scheduler_get_status"))
def set_run_info(self, rid, pipeline_name, expid, priority): def set_run_info(self, rid, pipeline_name, expid, priority):
self.rid = rid self.rid = rid
self.pipeline_name = pipeline_name self.pipeline_name = pipeline_name
self.expid = expid self.expid = expid
self.priority = priority self.priority = priority
_check_pause = staticmethod(make_parent_action("scheduler_check_pause")) pause_noexc = staticmethod(make_parent_action("pause"))
@host_only
def pause(self):
if self.pause_noexc():
raise TerminationRequested
_check_pause = staticmethod(make_parent_action("scheduler_check_pause"))
def check_pause(self, rid=None) -> TBool: def check_pause(self, rid=None) -> TBool:
if rid is None: if rid is None:
rid = self.rid rid = self.rid
return self._check_pause(rid) return self._check_pause(rid)
_submit = staticmethod(make_parent_action("scheduler_submit"))
def submit(self, pipeline_name=None, expid=None, priority=None, due_date=None, flush=False):
if pipeline_name is None:
pipeline_name = self.pipeline_name
if expid is None:
expid = self.expid
if priority is None:
priority = self.priority
return self._submit(pipeline_name, expid, priority, due_date, flush)
delete = staticmethod(make_parent_action("scheduler_delete"))
request_termination = staticmethod(
make_parent_action("scheduler_request_termination"))
get_status = staticmethod(make_parent_action("scheduler_get_status"))
class CCB: class CCB:
issue = staticmethod(make_parent_action("ccb_issue")) issue = staticmethod(make_parent_action("ccb_issue"))