artiq/artiq/test/test_scheduler.py
hartytp 715bff3ebf
Revert "Merge pull request #1544 from airwoodix/dataset-compression" (#1838)
* Revert "Merge pull request #1544 from airwoodix/dataset-compression"

This reverts commit 311a818a49, reversing
changes made to 7ffe4dc2e3.

* fix accidental revert of f42bea06a8
2022-01-25 10:02:15 +08:00

429 lines
14 KiB
Python

import unittest
import logging
import asyncio
import sys
from time import time, sleep
from artiq.experiment import *
from artiq.master.scheduler import Scheduler
class EmptyExperiment(EnvExperiment):
def build(self):
pass
def run(self):
pass
class BackgroundExperiment(EnvExperiment):
def build(self):
self.setattr_device("scheduler")
def run(self):
try:
while True:
self.scheduler.pause()
sleep(0.2)
except TerminationRequested:
self.set_dataset("termination_ok", True,
broadcast=True, archive=False)
class CheckPauseBackgroundExperiment(EnvExperiment):
def build(self):
self.setattr_device("scheduler")
def run(self):
while True:
while not self.scheduler.check_pause():
sleep(0.2)
self.scheduler.pause()
def _get_expid(name):
return {
"log_level": logging.WARNING,
"file": sys.modules[__name__].__file__,
"class_name": name,
"arguments": dict()
}
def _get_basic_steps(rid, expid, priority=0, flush=False):
return [
{"action": "setitem", "key": rid, "value":
{"pipeline": "main", "status": "pending", "priority": priority,
"expid": expid, "due_date": None, "flush": flush,
"repo_msg": None},
"path": []},
{"action": "setitem", "key": "status", "value": "preparing",
"path": [rid]},
{"action": "setitem", "key": "status", "value": "prepare_done",
"path": [rid]},
{"action": "setitem", "key": "status", "value": "running",
"path": [rid]},
{"action": "setitem", "key": "status", "value": "run_done",
"path": [rid]},
{"action": "setitem", "key": "status", "value": "analyzing",
"path": [rid]},
{"action": "setitem", "key": "status", "value": "deleting",
"path": [rid]},
{"action": "delitem", "key": rid, "path": []}
]
class _RIDCounter:
def __init__(self, next_rid):
self._next_rid = next_rid
def get(self):
rid = self._next_rid
self._next_rid += 1
return rid
class SchedulerCase(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
def test_steps(self):
loop = self.loop
scheduler = Scheduler(_RIDCounter(0), dict(), None)
expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid)
done = asyncio.Event()
expect_idx = 0
def notify(mod):
nonlocal expect_idx
self.assertEqual(mod, expect[expect_idx])
expect_idx += 1
if expect_idx >= len(expect):
done.set()
scheduler.notifier.publish = notify
scheduler.start()
# Verify that a timed experiment far in the future does not
# get run, even if it has high priority.
late = time() + 100000
expect.insert(0,
{"action": "setitem", "key": 0, "value":
{"pipeline": "main", "status": "pending", "priority": 99,
"expid": expid, "due_date": late, "flush": False,
"repo_msg": None},
"path": []})
scheduler.submit("main", expid, 99, late, False)
# This one (RID 1) gets run instead.
scheduler.submit("main", expid, 0, None, False)
loop.run_until_complete(done.wait())
scheduler.notifier.publish = None
loop.run_until_complete(scheduler.stop())
def test_pending_priority(self):
"""Check due dates take precedence over priorities when waiting to
prepare."""
loop = self.loop
handlers = {}
scheduler = Scheduler(_RIDCounter(0), handlers, None)
handlers["scheduler_check_pause"] = scheduler.check_pause
expid_empty = _get_expid("EmptyExperiment")
expid_bg = _get_expid("CheckPauseBackgroundExperiment")
# Suppress the SystemExit backtrace when worker process is killed.
expid_bg["log_level"] = logging.CRITICAL
high_priority = 3
middle_priority = 2
low_priority = 1
late = time() + 100000
early = time() + 1
expect = [
{
"path": [],
"action": "setitem",
"value": {
"repo_msg": None,
"priority": low_priority,
"pipeline": "main",
"due_date": None,
"status": "pending",
"expid": expid_bg,
"flush": False
},
"key": 0
},
{
"path": [],
"action": "setitem",
"value": {
"repo_msg": None,
"priority": high_priority,
"pipeline": "main",
"due_date": late,
"status": "pending",
"expid": expid_empty,
"flush": False
},
"key": 1
},
{
"path": [],
"action": "setitem",
"value": {
"repo_msg": None,
"priority": middle_priority,
"pipeline": "main",
"due_date": early,
"status": "pending",
"expid": expid_empty,
"flush": False
},
"key": 2
},
{
"path": [0],
"action": "setitem",
"value": "preparing",
"key": "status"
},
{
"path": [0],
"action": "setitem",
"value": "prepare_done",
"key": "status"
},
{
"path": [0],
"action": "setitem",
"value": "running",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "preparing",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "prepare_done",
"key": "status"
},
{
"path": [0],
"action": "setitem",
"value": "paused",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "running",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "run_done",
"key": "status"
},
{
"path": [0],
"action": "setitem",
"value": "running",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "analyzing",
"key": "status"
},
{
"path": [2],
"action": "setitem",
"value": "deleting",
"key": "status"
},
{
"path": [],
"action": "delitem",
"key": 2
},
]
done = asyncio.Event()
expect_idx = 0
def notify(mod):
nonlocal expect_idx
self.assertEqual(mod, expect[expect_idx])
expect_idx += 1
if expect_idx >= len(expect):
done.set()
scheduler.notifier.publish = notify
scheduler.start()
scheduler.submit("main", expid_bg, low_priority)
scheduler.submit("main", expid_empty, high_priority, late)
scheduler.submit("main", expid_empty, middle_priority, early)
loop.run_until_complete(done.wait())
scheduler.notifier.publish = None
loop.run_until_complete(scheduler.stop())
def test_pause(self):
loop = self.loop
termination_ok = False
def check_termination(mod):
nonlocal termination_ok
self.assertEqual(
mod,
{"action": "setitem", "key": "termination_ok",
"value": (False, True), "path": []})
termination_ok = True
handlers = {
"update_dataset": check_termination
}
scheduler = Scheduler(_RIDCounter(0), handlers, None)
expid_bg = _get_expid("BackgroundExperiment")
expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid)
background_running = asyncio.Event()
empty_ready = asyncio.Event()
empty_completed = asyncio.Event()
background_completed = asyncio.Event()
expect_idx = 0
def notify(mod):
nonlocal expect_idx
if mod == {"path": [0],
"value": "running",
"key": "status",
"action": "setitem"}:
background_running.set()
if mod == {"path": [0],
"value": "deleting",
"key": "status",
"action": "setitem"}:
background_completed.set()
if mod == {"path": [1],
"value": "prepare_done",
"key": "status",
"action": "setitem"}:
empty_ready.set()
if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1):
self.assertEqual(mod, expect[expect_idx])
expect_idx += 1
if expect_idx >= len(expect):
empty_completed.set()
scheduler.notifier.publish = notify
scheduler.start()
scheduler.submit("main", expid_bg, -99, None, False)
loop.run_until_complete(background_running.wait())
self.assertFalse(scheduler.check_pause(0))
scheduler.submit("main", expid, 0, None, False)
self.assertFalse(scheduler.check_pause(0))
loop.run_until_complete(empty_ready.wait())
self.assertTrue(scheduler.check_pause(0))
loop.run_until_complete(empty_completed.wait())
self.assertFalse(scheduler.check_pause(0))
self.assertFalse(termination_ok)
scheduler.request_termination(0)
self.assertTrue(scheduler.check_pause(0))
loop.run_until_complete(background_completed.wait())
self.assertTrue(termination_ok)
loop.run_until_complete(scheduler.stop())
def test_close_with_active_runs(self):
"""Check scheduler exits with experiments still running"""
loop = self.loop
scheduler = Scheduler(_RIDCounter(0), {}, None)
expid_bg = _get_expid("BackgroundExperiment")
# Suppress the SystemExit backtrace when worker process is killed.
expid_bg["log_level"] = logging.CRITICAL
expid = _get_expid("EmptyExperiment")
background_running = asyncio.Event()
empty_ready = asyncio.Event()
background_completed = asyncio.Event()
def notify(mod):
if mod == {"path": [0],
"value": "running",
"key": "status",
"action": "setitem"}:
background_running.set()
if mod == {"path": [0],
"value": "deleting",
"key": "status",
"action": "setitem"}:
background_completed.set()
if mod == {"path": [1],
"value": "prepare_done",
"key": "status",
"action": "setitem"}:
empty_ready.set()
scheduler.notifier.publish = notify
scheduler.start()
scheduler.submit("main", expid_bg, -99, None, False)
loop.run_until_complete(background_running.wait())
scheduler.submit("main", expid, 0, None, False)
loop.run_until_complete(empty_ready.wait())
# At this point, (at least) BackgroundExperiment is still running; make
# sure we can stop the scheduler without hanging.
loop.run_until_complete(scheduler.stop())
def test_flush(self):
loop = self.loop
scheduler = Scheduler(_RIDCounter(0), dict(), None)
expid = _get_expid("EmptyExperiment")
expect = _get_basic_steps(1, expid, 1, True)
expect.insert(1, {"key": "status",
"path": [1],
"value": "flushing",
"action": "setitem"})
first_preparing = asyncio.Event()
done = asyncio.Event()
expect_idx = 0
def notify(mod):
nonlocal expect_idx
if mod == {"path": [0],
"value": "preparing",
"key": "status",
"action": "setitem"}:
first_preparing.set()
if mod["path"] == [1] or (mod["path"] == [] and mod["key"] == 1):
self.assertEqual(mod, expect[expect_idx])
expect_idx += 1
if expect_idx >= len(expect):
done.set()
scheduler.notifier.publish = notify
scheduler.start()
scheduler.submit("main", expid, 0, None, False)
loop.run_until_complete(first_preparing.wait())
scheduler.submit("main", expid, 1, None, True)
loop.run_until_complete(done.wait())
loop.run_until_complete(scheduler.stop())
def tearDown(self):
self.loop.close()