test/scheduler: test flush

This commit is contained in:
Sebastien Bourdeauducq 2015-05-29 20:16:47 +08:00
parent 6ff2e1a083
commit 048782e26c
1 changed files with 37 additions and 3 deletions

View File

@ -27,11 +27,11 @@ def _get_expid(name):
} }
def _get_basic_steps(rid, expid): def _get_basic_steps(rid, expid, priority=0, flush=False):
return [ return [
{"action": "setitem", "key": rid, "value": {"action": "setitem", "key": rid, "value":
{"pipeline": "main", "status": "pending", "priority": 0, {"pipeline": "main", "status": "pending", "priority": priority,
"expid": expid, "due_date": None, "flush": False}, "expid": expid, "due_date": None, "flush": flush},
"path": []}, "path": []},
{"action": "setitem", "key": "status", "value": "preparing", {"action": "setitem", "key": "status", "value": "preparing",
"path": [rid]}, "path": [rid]},
@ -120,3 +120,37 @@ class SchedulerCase(unittest.TestCase):
scheduler.submit("main", expid, 0, None, False) scheduler.submit("main", expid, 0, None, False)
loop.run_until_complete(done.wait()) loop.run_until_complete(done.wait())
loop.run_until_complete(scheduler.stop()) loop.run_until_complete(scheduler.stop())
def test_flush(self):
scheduler = Scheduler(0, _handlers)
expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid, 1, True)
expect.insert(1, {"key": "status",
"path": [1],
"value": "flushing",
"action": "setitem"})
first_preparing = asyncio.Event()
done = asyncio.Event()
expect_idx = 0
def notify(notifier, mod):
nonlocal expect_idx
if mod == {"path": [0],
"value": "preparing",
"key": "status",
"action": "setitem"}:
first_preparing.set()
if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1):
self.assertEqual(mod, expect[expect_idx])
expect_idx += 1
if expect_idx >= len(expect):
done.set()
scheduler.notifier.publish = notify
loop = asyncio.get_event_loop()
scheduler.start()
scheduler.submit("main", expid, 0, None, False)
loop.run_until_complete(first_preparing.wait())
scheduler.submit("main", expid, 1, None, True)
loop.run_until_complete(done.wait())
loop.run_until_complete(scheduler.stop())