From 4d21b7831404431e3eeec1b9924f9858de54fe29 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Thu, 19 Feb 2015 19:50:52 -0700 Subject: [PATCH] master,client,gui: factor timeout into run_params --- artiq/frontend/artiq_client.py | 13 +++++++------ artiq/frontend/artiq_run.py | 4 ++-- artiq/gui/explorer.py | 3 ++- artiq/gui/scheduler.py | 8 ++++---- artiq/master/scheduler.py | 24 ++++++++++++------------ artiq/master/worker.py | 4 ++-- artiq/master/worker_impl.py | 5 ++--- examples/flopping_f_simulation.py | 3 ++- 8 files changed, 33 insertions(+), 31 deletions(-) diff --git a/artiq/frontend/artiq_client.py b/artiq/frontend/artiq_client.py index f60223eea..89378e96f 100755 --- a/artiq/frontend/artiq_client.py +++ b/artiq/frontend/artiq_client.py @@ -104,19 +104,20 @@ def _action_submit(remote, args): run_params = { "file": args.file, "unit": args.unit, + "timeout": args.timeout, "arguments": arguments, "rtr_group": args.rtr_group if args.rtr_group is not None \ else args.file } if args.timed is None: - rid = remote.run_queued(run_params, args.timeout) + rid = remote.run_queued(run_params) print("RID: {}".format(rid)) else: if args.timed == "now": next_time = None else: next_time = time.mktime(parse_date(args.timed).timetuple()) - trid = remote.run_timed(run_params, args.timeout, next_time) + trid = remote.run_timed(run_params, next_time) print("TRID: {}".format(trid)) @@ -147,9 +148,9 @@ def _show_queue(queue): clear_screen() if queue: table = PrettyTable(["RID", "File", "Unit", "Timeout", "Arguments"]) - for rid, run_params, timeout in queue: + for rid, run_params in queue: row = [rid, run_params["file"]] - for x in run_params["unit"], timeout: + for x in run_params["unit"], run_params["timeout"]: row.append("-" if x is None else x) row.append(format_run_arguments(run_params["arguments"])) table.add_row(row) @@ -164,10 +165,10 @@ def _show_timed(timed): table = PrettyTable(["Next run", "TRID", "File", "Unit", "Timeout", "Arguments"]) sp = sorted(timed.items(), key=lambda x: (x[1][0], x[0])) - for trid, (next_run, run_params, timeout) in sp: + for trid, (next_run, run_params) in sp: row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), trid, run_params["file"]] - for x in run_params["unit"], timeout: + for x in run_params["unit"], run_params["timeout"]: row.append("-" if x is None else x) row.append(format_run_arguments(run_params["arguments"])) table.add_row(row) diff --git a/artiq/frontend/artiq_run.py b/artiq/frontend/artiq_run.py index af6f547c5..383f8914f 100755 --- a/artiq/frontend/artiq_run.py +++ b/artiq/frontend/artiq_run.py @@ -39,7 +39,7 @@ class DummyScheduler: self.next_rid = 0 self.next_trid = 0 - def run_queued(self, run_params, timeout): + def run_queued(self, run_params): rid = self.next_rid self.next_rid += 1 print("Queuing: {}, RID={}".format(run_params, rid)) @@ -48,7 +48,7 @@ class DummyScheduler: def cancel_queued(self, rid): print("Cancelling RID {}".format(rid)) - def run_timed(self, run_params, timeout, next_run): + def run_timed(self, run_params, next_run): trid = self.next_trid self.next_trid += 1 next_run_s = time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)) diff --git a/artiq/gui/explorer.py b/artiq/gui/explorer.py index eb2960057..95797d196 100644 --- a/artiq/gui/explorer.py +++ b/artiq/gui/explorer.py @@ -147,7 +147,8 @@ class ExplorerWindow(Window): run_params = { "file": data["file"], "unit": data["unit"], + "timeout": None, "arguments": arguments, "rtr_group": data["file"] } - asyncio.Task(self.schedule_ctl.run_queued(run_params, None)) + asyncio.Task(self.schedule_ctl.run_queued(run_params)) diff --git a/artiq/gui/scheduler.py b/artiq/gui/scheduler.py index bce52110b..8854354a6 100644 --- a/artiq/gui/scheduler.py +++ b/artiq/gui/scheduler.py @@ -10,9 +10,9 @@ from artiq.tools import format_run_arguments class _QueueStoreSyncer(ListSyncer): def convert(self, x): - rid, run_params, timeout = x + rid, run_params = x row = [rid, run_params["file"]] - for e in run_params["unit"], timeout: + for e in run_params["unit"], run_params["timeout"]: row.append("-" if e is None else str(e)) row.append(format_run_arguments(run_params["arguments"])) return row @@ -24,10 +24,10 @@ class _TimedStoreSyncer(DictSyncer): return (kv_pair[1][0], kv_pair[0]) def convert(self, trid, x): - next_run, run_params, timeout = x + next_run, run_params = x row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)), trid, run_params["file"]] - for e in run_params["unit"], timeout: + for e in run_params["unit"], run_params["timeout"]: row.append("-" if e is None else str(e)) row.append(format_run_arguments(run_params["arguments"])) return row diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index 8698bcc6a..cd007d0ed 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -37,14 +37,14 @@ class Scheduler: del self.task yield from self.worker.end_process() - def run_queued(self, run_params, timeout): + def run_queued(self, run_params): rid = self.new_rid() - self.queue.append((rid, run_params, timeout)) + self.queue.append((rid, run_params)) self.queue_modified.set() return rid def cancel_queued(self, rid): - idx = next(idx for idx, (qrid, _, _) + idx = next(idx for idx, (qrid, _) in enumerate(self.queue.read) if qrid == rid) if idx == 0: @@ -52,11 +52,11 @@ class Scheduler: raise NotImplementedError del self.queue[idx] - def run_timed(self, run_params, timeout, next_run): + def run_timed(self, run_params, next_run): if next_run is None: next_run = time() trid = self.new_trid() - self.timed[trid] = next_run, run_params, timeout + self.timed[trid] = next_run, run_params self.timed_modified.set() return trid @@ -64,10 +64,10 @@ class Scheduler: del self.timed[trid] @asyncio.coroutine - def _run(self, rid, run_params, timeout): + def _run(self, rid, run_params): self.run_cb(rid, run_params) try: - yield from self.worker.run(run_params, timeout) + yield from self.worker.run(run_params) except Exception as e: print("RID {} failed:".format(rid)) print(e) @@ -92,12 +92,12 @@ class Scheduler: if min_next_run > 0: return min_next_run - next_run, run_params, timeout = self.timed.read[min_trid] + next_run, run_params = self.timed.read[min_trid] del self.timed[min_trid] rid = self.new_rid() - self.queue.insert(0, (rid, run_params, timeout)) - yield from self._run(rid, run_params, timeout) + self.queue.insert(0, (rid, run_params)) + yield from self._run(rid, run_params) del self.queue[0] @asyncio.coroutine @@ -105,8 +105,8 @@ class Scheduler: while True: next_timed = yield from self._run_timed() if self.queue.read: - rid, run_params, timeout = self.queue.read[0] - yield from self._run(rid, run_params, timeout) + rid, run_params = self.queue.read[0] + yield from self._run(rid, run_params) del self.queue[0] else: self.queue_modified.clear() diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 63d2a4201..fedcd6ebc 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -60,13 +60,13 @@ class Worker: return obj @asyncio.coroutine - def run(self, run_params, result_timeout): + def run(self, run_params): yield from self._send(run_params, self.send_timeout) obj = yield from self._recv(self.start_reply_timeout) if obj != "ack": raise WorkerFailed("Incorrect acknowledgement") while True: - obj = yield from self._recv(result_timeout) + obj = yield from self._recv(run_params["timeout"]) action = obj["action"] if action == "report_completed": if obj["status"] != "ok": diff --git a/artiq/master/worker_impl.py b/artiq/master/worker_impl.py index 592e089ad..0a7627afc 100644 --- a/artiq/master/worker_impl.py +++ b/artiq/master/worker_impl.py @@ -59,11 +59,10 @@ def publish_rt_results(notifier, data): class Scheduler: - run_queued = make_parent_action("scheduler_run_queued", - "run_params timeout") + run_queued = make_parent_action("scheduler_run_queued", "run_params") cancel_queued = make_parent_action("scheduler_cancel_queued", "rid") run_timed = make_parent_action("scheduler_run_timed", - "run_params timeout next_run") + "run_params next_run") cancel_timed = make_parent_action("scheduler_cancel_timed", "trid") diff --git a/examples/flopping_f_simulation.py b/examples/flopping_f_simulation.py index 91630a272..cd7564526 100644 --- a/examples/flopping_f_simulation.py +++ b/examples/flopping_f_simulation.py @@ -56,10 +56,11 @@ class FloppingF(AutoDB): run_params = { "file": "flopping_f_simulation.py", "unit": None, + "timeout": None, "arguments": dict(), "rtr_group": "flopping_f_simulation.py" } - self.scheduler.run_timed(run_params, None, time.time() + 20) + self.scheduler.run_timed(run_params, time.time() + 20) def analyze(self): popt, pcov = curve_fit(model_numpy,