master,client,gui: factor timeout into run_params

This commit is contained in:
Sebastien Bourdeauducq 2015-02-19 19:50:52 -07:00
parent cca6a17cfb
commit 4d21b78314
8 changed files with 33 additions and 31 deletions

View File

@ -104,19 +104,20 @@ def _action_submit(remote, args):
run_params = {
"file": args.file,
"unit": args.unit,
"timeout": args.timeout,
"arguments": arguments,
"rtr_group": args.rtr_group if args.rtr_group is not None \
else args.file
}
if args.timed is None:
rid = remote.run_queued(run_params, args.timeout)
rid = remote.run_queued(run_params)
print("RID: {}".format(rid))
else:
if args.timed == "now":
next_time = None
else:
next_time = time.mktime(parse_date(args.timed).timetuple())
trid = remote.run_timed(run_params, args.timeout, next_time)
trid = remote.run_timed(run_params, next_time)
print("TRID: {}".format(trid))
@ -147,9 +148,9 @@ def _show_queue(queue):
clear_screen()
if queue:
table = PrettyTable(["RID", "File", "Unit", "Timeout", "Arguments"])
for rid, run_params, timeout in queue:
for rid, run_params in queue:
row = [rid, run_params["file"]]
for x in run_params["unit"], timeout:
for x in run_params["unit"], run_params["timeout"]:
row.append("-" if x is None else x)
row.append(format_run_arguments(run_params["arguments"]))
table.add_row(row)
@ -164,10 +165,10 @@ def _show_timed(timed):
table = PrettyTable(["Next run", "TRID", "File", "Unit",
"Timeout", "Arguments"])
sp = sorted(timed.items(), key=lambda x: (x[1][0], x[0]))
for trid, (next_run, run_params, timeout) in sp:
for trid, (next_run, run_params) in sp:
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
trid, run_params["file"]]
for x in run_params["unit"], timeout:
for x in run_params["unit"], run_params["timeout"]:
row.append("-" if x is None else x)
row.append(format_run_arguments(run_params["arguments"]))
table.add_row(row)

View File

@ -39,7 +39,7 @@ class DummyScheduler:
self.next_rid = 0
self.next_trid = 0
def run_queued(self, run_params, timeout):
def run_queued(self, run_params):
rid = self.next_rid
self.next_rid += 1
print("Queuing: {}, RID={}".format(run_params, rid))
@ -48,7 +48,7 @@ class DummyScheduler:
def cancel_queued(self, rid):
print("Cancelling RID {}".format(rid))
def run_timed(self, run_params, timeout, next_run):
def run_timed(self, run_params, next_run):
trid = self.next_trid
self.next_trid += 1
next_run_s = time.strftime("%m/%d %H:%M:%S", time.localtime(next_run))

View File

@ -147,7 +147,8 @@ class ExplorerWindow(Window):
run_params = {
"file": data["file"],
"unit": data["unit"],
"timeout": None,
"arguments": arguments,
"rtr_group": data["file"]
}
asyncio.Task(self.schedule_ctl.run_queued(run_params, None))
asyncio.Task(self.schedule_ctl.run_queued(run_params))

View File

@ -10,9 +10,9 @@ from artiq.tools import format_run_arguments
class _QueueStoreSyncer(ListSyncer):
def convert(self, x):
rid, run_params, timeout = x
rid, run_params = x
row = [rid, run_params["file"]]
for e in run_params["unit"], timeout:
for e in run_params["unit"], run_params["timeout"]:
row.append("-" if e is None else str(e))
row.append(format_run_arguments(run_params["arguments"]))
return row
@ -24,10 +24,10 @@ class _TimedStoreSyncer(DictSyncer):
return (kv_pair[1][0], kv_pair[0])
def convert(self, trid, x):
next_run, run_params, timeout = x
next_run, run_params = x
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
trid, run_params["file"]]
for e in run_params["unit"], timeout:
for e in run_params["unit"], run_params["timeout"]:
row.append("-" if e is None else str(e))
row.append(format_run_arguments(run_params["arguments"]))
return row

View File

@ -37,14 +37,14 @@ class Scheduler:
del self.task
yield from self.worker.end_process()
def run_queued(self, run_params, timeout):
def run_queued(self, run_params):
rid = self.new_rid()
self.queue.append((rid, run_params, timeout))
self.queue.append((rid, run_params))
self.queue_modified.set()
return rid
def cancel_queued(self, rid):
idx = next(idx for idx, (qrid, _, _)
idx = next(idx for idx, (qrid, _)
in enumerate(self.queue.read)
if qrid == rid)
if idx == 0:
@ -52,11 +52,11 @@ class Scheduler:
raise NotImplementedError
del self.queue[idx]
def run_timed(self, run_params, timeout, next_run):
def run_timed(self, run_params, next_run):
if next_run is None:
next_run = time()
trid = self.new_trid()
self.timed[trid] = next_run, run_params, timeout
self.timed[trid] = next_run, run_params
self.timed_modified.set()
return trid
@ -64,10 +64,10 @@ class Scheduler:
del self.timed[trid]
@asyncio.coroutine
def _run(self, rid, run_params, timeout):
def _run(self, rid, run_params):
self.run_cb(rid, run_params)
try:
yield from self.worker.run(run_params, timeout)
yield from self.worker.run(run_params)
except Exception as e:
print("RID {} failed:".format(rid))
print(e)
@ -92,12 +92,12 @@ class Scheduler:
if min_next_run > 0:
return min_next_run
next_run, run_params, timeout = self.timed.read[min_trid]
next_run, run_params = self.timed.read[min_trid]
del self.timed[min_trid]
rid = self.new_rid()
self.queue.insert(0, (rid, run_params, timeout))
yield from self._run(rid, run_params, timeout)
self.queue.insert(0, (rid, run_params))
yield from self._run(rid, run_params)
del self.queue[0]
@asyncio.coroutine
@ -105,8 +105,8 @@ class Scheduler:
while True:
next_timed = yield from self._run_timed()
if self.queue.read:
rid, run_params, timeout = self.queue.read[0]
yield from self._run(rid, run_params, timeout)
rid, run_params = self.queue.read[0]
yield from self._run(rid, run_params)
del self.queue[0]
else:
self.queue_modified.clear()

View File

@ -60,13 +60,13 @@ class Worker:
return obj
@asyncio.coroutine
def run(self, run_params, result_timeout):
def run(self, run_params):
yield from self._send(run_params, self.send_timeout)
obj = yield from self._recv(self.start_reply_timeout)
if obj != "ack":
raise WorkerFailed("Incorrect acknowledgement")
while True:
obj = yield from self._recv(result_timeout)
obj = yield from self._recv(run_params["timeout"])
action = obj["action"]
if action == "report_completed":
if obj["status"] != "ok":

View File

@ -59,11 +59,10 @@ def publish_rt_results(notifier, data):
class Scheduler:
run_queued = make_parent_action("scheduler_run_queued",
"run_params timeout")
run_queued = make_parent_action("scheduler_run_queued", "run_params")
cancel_queued = make_parent_action("scheduler_cancel_queued", "rid")
run_timed = make_parent_action("scheduler_run_timed",
"run_params timeout next_run")
"run_params next_run")
cancel_timed = make_parent_action("scheduler_cancel_timed", "trid")

View File

@ -56,10 +56,11 @@ class FloppingF(AutoDB):
run_params = {
"file": "flopping_f_simulation.py",
"unit": None,
"timeout": None,
"arguments": dict(),
"rtr_group": "flopping_f_simulation.py"
}
self.scheduler.run_timed(run_params, None, time.time() + 20)
self.scheduler.run_timed(run_params, time.time() + 20)
def analyze(self):
popt, pcov = curve_fit(model_numpy,