scheduler: support passing event loop

pull/2046/head
Sebastien Bourdeauducq 2023-01-10 12:26:24 +08:00
parent c401559ed5
commit 6cfd1480a7
1 changed files with 8 additions and 7 deletions

View File

@ -332,10 +332,10 @@ class Pipeline:
self._run = RunStage(self.pool, deleter.delete)
self._analyze = AnalyzeStage(self.pool, deleter.delete)
def start(self):
self._prepare.start()
self._run.start()
self._analyze.start()
def start(self, loop=None):
self._prepare.start(loop)
self._run.start(loop)
self._analyze.start(loop)
async def stop(self):
# NB: restart of a stopped pipeline is not supported
@ -410,8 +410,9 @@ class Scheduler:
self._deleter = Deleter(self._pipelines)
self._log_submissions = log_submissions
def start(self):
self._deleter.start()
def start(self, loop=None):
self._loop = loop
self._deleter.start(self._loop)
async def stop(self):
# NB: restart of a stopped scheduler is not supported
@ -442,7 +443,7 @@ class Scheduler:
self._worker_handlers, self.notifier,
self._experiment_db, self._log_submissions)
self._pipelines[pipeline_name] = pipeline
pipeline.start()
pipeline.start(self._loop)
return pipeline.pool.submit(expid, priority, due_date, flush, pipeline_name)
def delete(self, rid):