From f552d62b6987fa33068bcfe7c73a914eee013b2c Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sat, 3 Oct 2015 19:28:57 +0800 Subject: [PATCH] use Python 3.5 coroutines --- artiq/frontend/artiq_ctlmgr.py | 85 +++++++++++++-------------- artiq/frontend/artiq_influxdb.py | 20 +++---- artiq/gui/explorer.py | 17 +++--- artiq/gui/log.py | 10 ++-- artiq/gui/moninj.py | 21 +++---- artiq/gui/parameters.py | 10 ++-- artiq/gui/results.py | 10 ++-- artiq/gui/schedule.py | 15 ++--- artiq/gui/state.py | 5 +- artiq/master/repository.py | 12 ++-- artiq/master/scheduler.py | 95 +++++++++++++------------------ artiq/master/worker.py | 93 +++++++++++++----------------- artiq/protocols/asyncio_server.py | 14 ++--- artiq/protocols/pc_rpc.py | 38 ++++++------- artiq/protocols/sync_struct.py | 26 ++++----- artiq/test/pc_rpc.py | 13 ++--- artiq/test/sync_struct.py | 10 ++-- artiq/test/worker.py | 13 ++--- artiq/tools.py | 25 ++++---- 19 files changed, 228 insertions(+), 304 deletions(-) diff --git a/artiq/frontend/artiq_ctlmgr.py b/artiq/frontend/artiq_ctlmgr.py index 273cae9af..fb25519b1 100755 --- a/artiq/frontend/artiq_ctlmgr.py +++ b/artiq/frontend/artiq_ctlmgr.py @@ -56,60 +56,55 @@ class Controller: self.process = None self.launch_task = asyncio.Task(self.launcher()) - @asyncio.coroutine - def end(self): + async def end(self): self.launch_task.cancel() - yield from asyncio.wait_for(self.launch_task, None) + await asyncio.wait_for(self.launch_task, None) - @asyncio.coroutine - def _call_controller(self, method): + async def _call_controller(self, method): remote = AsyncioClient() - yield from remote.connect_rpc(self.host, self.port, None) + await remote.connect_rpc(self.host, self.port, None) try: targets, _ = remote.get_rpc_id() remote.select_rpc_target(targets[0]) - r = yield from getattr(remote, method)() + r = await getattr(remote, method)() finally: remote.close_rpc() return r - @asyncio.coroutine - def _ping(self): + async def _ping(self): try: - ok = yield from asyncio.wait_for(self._call_controller("ping"), - self.ping_timeout) + ok = await asyncio.wait_for(self._call_controller("ping"), + self.ping_timeout) if ok: self.retry_timer_cur = self.retry_timer return ok except: return False - @asyncio.coroutine - def _wait_and_ping(self): + async def _wait_and_ping(self): while True: try: - yield from asyncio.wait_for(self.process.wait(), - self.ping_timer) + await asyncio.wait_for(self.process.wait(), + self.ping_timer) except asyncio.TimeoutError: logger.debug("pinging controller %s", self.name) - ok = yield from self._ping() + ok = await self._ping() if not ok: logger.warning("Controller %s ping failed", self.name) - yield from self._terminate() + await self._terminate() return else: break - @asyncio.coroutine - def launcher(self): + async def launcher(self): try: while True: logger.info("Starting controller %s with command: %s", self.name, self.command) try: - self.process = yield from asyncio.create_subprocess_exec( + self.process = await asyncio.create_subprocess_exec( *shlex.split(self.command)) - yield from self._wait_and_ping() + await self._wait_and_ping() except FileNotFoundError: logger.warning("Controller %s failed to start", self.name) else: @@ -117,33 +112,32 @@ class Controller: logger.warning("Restarting in %.1f seconds", self.retry_timer_cur) try: - yield from asyncio.wait_for(self.retry_now.wait(), - self.retry_timer_cur) + await asyncio.wait_for(self.retry_now.wait(), + self.retry_timer_cur) except asyncio.TimeoutError: pass self.retry_timer_cur *= self.retry_timer_backoff except asyncio.CancelledError: - yield from self._terminate() + await self._terminate() - @asyncio.coroutine - def _terminate(self): + async def _terminate(self): logger.info("Terminating controller %s", self.name) if self.process is not None and self.process.returncode is None: try: - yield from asyncio.wait_for(self._call_controller("terminate"), - self.term_timeout) + await asyncio.wait_for(self._call_controller("terminate"), + self.term_timeout) except: logger.warning("Controller %s did not respond to terminate " "command, killing", self.name) self.process.kill() try: - yield from asyncio.wait_for(self.process.wait(), - self.term_timeout) + await asyncio.wait_for(self.process.wait(), + self.term_timeout) except: logger.warning("Controller %s failed to exit, killing", self.name) self.process.kill() - yield from self.process.wait() + await self.process.wait() logger.debug("Controller %s terminated", self.name) @@ -163,17 +157,16 @@ class Controllers: self.active = dict() self.process_task = asyncio.Task(self._process()) - @asyncio.coroutine - def _process(self): + async def _process(self): while True: - action, param = yield from self.queue.get() + action, param = await self.queue.get() if action == "set": k, ddb_entry = param if k in self.active: - yield from self.active[k].end() + await self.active[k].end() self.active[k] = Controller(k, ddb_entry) elif action == "del": - yield from self.active[param].end() + await self.active[param].end() del self.active[param] else: raise ValueError @@ -196,11 +189,10 @@ class Controllers: for name in set(self.active_or_queued): del self[name] - @asyncio.coroutine - def shutdown(self): + async def shutdown(self): self.process_task.cancel() for c in self.active.values(): - yield from c.end() + await c.end() class ControllerDB: @@ -225,8 +217,7 @@ class ControllerManager(TaskObject): self.retry_master = retry_master self.controller_db = ControllerDB() - @asyncio.coroutine - def _do(self): + async def _do(self): try: subscriber = Subscriber("devices", self.controller_db.sync_struct_init) @@ -236,12 +227,12 @@ class ControllerManager(TaskObject): s = subscriber.writer.get_extra_info("socket") localhost = s.getsockname()[0] self.controller_db.set_host_filter(localhost) - yield from subscriber.connect(self.server, self.port, - set_host_filter) + await subscriber.connect(self.server, self.port, + set_host_filter) try: - yield from asyncio.wait_for(subscriber.receive_task, None) + await asyncio.wait_for(subscriber.receive_task, None) finally: - yield from subscriber.close() + await subscriber.close() except (ConnectionAbortedError, ConnectionError, ConnectionRefusedError, ConnectionResetError) as e: logger.warning("Connection to master failed (%s: %s)", @@ -249,11 +240,11 @@ class ControllerManager(TaskObject): else: logger.warning("Connection to master lost") logger.warning("Retrying in %.1f seconds", self.retry_master) - yield from asyncio.sleep(self.retry_master) + await asyncio.sleep(self.retry_master) except asyncio.CancelledError: pass finally: - yield from self.controller_db.current_controllers.shutdown() + await self.controller_db.current_controllers.shutdown() def retry_now(self, k): """If a controller is disabled and pending retry, perform that retry diff --git a/artiq/frontend/artiq_influxdb.py b/artiq/frontend/artiq_influxdb.py index 299695030..f69da9b0e 100755 --- a/artiq/frontend/artiq_influxdb.py +++ b/artiq/frontend/artiq_influxdb.py @@ -96,24 +96,23 @@ class DBWriter(TaskObject): logger.warning("failed to update parameter '%s': " "too many pending updates", k) - @asyncio.coroutine - def _do(self): + async def _do(self): while True: - k, v = yield from self._queue.get() + k, v = await self._queue.get() url = self.base_url + "/write" params = {"u": self.user, "p": self.password, "db": self.database, "consistency": "any", "precision": "n"} fmt_ty, fmt_v = format_influxdb(v) data = "{},parameter={} {}={}".format(self.table, k, fmt_ty, fmt_v) try: - response = yield from aiohttp.request( + response = await aiohttp.request( "POST", url, params=params, data=data) except: logger.warning("got exception trying to update '%s'", k, exc_info=True) else: if response.status not in (200, 204): - content = (yield from response.content.read()).decode() + content = (await response.content.read()).decode() if content: content = content[:-1] # drop \n logger.warning("got HTTP status %d " @@ -144,18 +143,17 @@ class MasterReader(TaskObject): self.filter_function = filter_function self.writer = writer - @asyncio.coroutine - def _do(self): + async def _do(self): subscriber = Subscriber( "parameters", partial(Parameters, self.filter_function, self.writer)) while True: try: - yield from subscriber.connect(self.server, self.port) + await subscriber.connect(self.server, self.port) try: - yield from asyncio.wait_for(subscriber.receive_task, None) + await asyncio.wait_for(subscriber.receive_task, None) finally: - yield from subscriber.close() + await subscriber.close() except (ConnectionAbortedError, ConnectionError, ConnectionRefusedError, ConnectionResetError) as e: logger.warning("Connection to master failed (%s: %s)", @@ -163,7 +161,7 @@ class MasterReader(TaskObject): else: logger.warning("Connection to master lost") logger.warning("Retrying in %.1f seconds", self.retry) - yield from asyncio.sleep(self.retry) + await asyncio.sleep(self.retry) class Filter: diff --git a/artiq/gui/explorer.py b/artiq/gui/explorer.py index bf2fc3e69..fe6cb58cb 100644 --- a/artiq/gui/explorer.py +++ b/artiq/gui/explorer.py @@ -300,23 +300,20 @@ class ExplorerDock(dockarea.Dock): def enable_duedate(self): self.datetime_en.setChecked(True) - @asyncio.coroutine - def sub_connect(self, host, port): + async def sub_connect(self, host, port): self.explist_subscriber = Subscriber("explist", self.init_explist_model) - yield from self.explist_subscriber.connect(host, port) + await self.explist_subscriber.connect(host, port) - @asyncio.coroutine - def sub_close(self): - yield from self.explist_subscriber.close() + async def sub_close(self): + await self.explist_subscriber.close() def init_explist_model(self, init): self.explist_model = _ExplistModel(self, self.el, init) self.el.setModel(self.explist_model) return self.explist_model - @asyncio.coroutine - def submit(self, pipeline_name, file, class_name, arguments, + async def submit(self, pipeline_name, file, class_name, arguments, priority, due_date, flush): expid = { "repo_rev": None, @@ -324,8 +321,8 @@ class ExplorerDock(dockarea.Dock): "class_name": class_name, "arguments": arguments, } - rid = yield from self.schedule_ctl.submit(pipeline_name, expid, - priority, due_date, flush) + rid = await self.schedule_ctl.submit(pipeline_name, expid, + priority, due_date, flush) self.status_bar.showMessage("Submitted RID {}".format(rid)) def submit_clicked(self): diff --git a/artiq/gui/log.py b/artiq/gui/log.py index 8c7a99c24..4c49673fa 100644 --- a/artiq/gui/log.py +++ b/artiq/gui/log.py @@ -41,14 +41,12 @@ class LogDock(dockarea.Dock): self.addWidget(self.log) self.scroll_at_bottom = False - @asyncio.coroutine - def sub_connect(self, host, port): + async def sub_connect(self, host, port): self.subscriber = Subscriber("log", self.init_log_model) - yield from self.subscriber.connect(host, port) + await self.subscriber.connect(host, port) - @asyncio.coroutine - def sub_close(self): - yield from self.subscriber.close() + async def sub_close(self): + await self.subscriber.close() def rows_inserted_before(self): scrollbar = self.log.verticalScrollBar() diff --git a/artiq/gui/moninj.py b/artiq/gui/moninj.py index bd7e94214..835fb58ab 100644 --- a/artiq/gui/moninj.py +++ b/artiq/gui/moninj.py @@ -232,26 +232,24 @@ class MonInj(TaskObject): self.dm = _DeviceManager(self.send_to_device, dict()) self.transport = None - @asyncio.coroutine - def start(self, server, port): + async def start(self, server, port): loop = asyncio.get_event_loop() - yield from loop.create_datagram_endpoint(lambda: self, + await loop.create_datagram_endpoint(lambda: self, family=socket.AF_INET) try: - yield from self.subscriber.connect(server, port) + await self.subscriber.connect(server, port) try: TaskObject.start(self) except: - yield from self.subscriber.close() + await self.subscriber.close() raise except: self.transport.close() raise - @asyncio.coroutine - def stop(self): - yield from TaskObject.stop(self) - yield from self.subscriber.close() + async def stop(self): + await TaskObject.stop(self) + await self.subscriber.close() if self.transport is not None: self.transport.close() self.transport = None @@ -295,10 +293,9 @@ class MonInj(TaskObject): else: self.transport.sendto(data, (ca, 3250)) - @asyncio.coroutine - def _do(self): + async def _do(self): while True: - yield from asyncio.sleep(0.2) + await asyncio.sleep(0.2) # MONINJ_REQ_MONITOR self.send_to_device(b"\x01") diff --git a/artiq/gui/parameters.py b/artiq/gui/parameters.py index 4bc53b927..21b91a499 100644 --- a/artiq/gui/parameters.py +++ b/artiq/gui/parameters.py @@ -59,14 +59,12 @@ class ParametersDock(dockarea.Dock): else: self.table.hideRow(row) - @asyncio.coroutine - def sub_connect(self, host, port): + async def sub_connect(self, host, port): self.subscriber = Subscriber("parameters", self.init_parameters_model) - yield from self.subscriber.connect(host, port) + await self.subscriber.connect(host, port) - @asyncio.coroutine - def sub_close(self): - yield from self.subscriber.close() + async def sub_close(self): + await self.subscriber.close() def init_parameters_model(self, init): self.table_model = ParametersModel(self.table, init) diff --git a/artiq/gui/results.py b/artiq/gui/results.py index 0ed872a6a..edd40e2ad 100644 --- a/artiq/gui/results.py +++ b/artiq/gui/results.py @@ -68,15 +68,13 @@ class ResultsDock(dockarea.Dock): def get_result(self, key): return self.table_model.backing_store[key] - @asyncio.coroutine - def sub_connect(self, host, port): + async def sub_connect(self, host, port): self.subscriber = Subscriber("rt_results", self.init_results_model, self.on_mod) - yield from self.subscriber.connect(host, port) + await self.subscriber.connect(host, port) - @asyncio.coroutine - def sub_close(self): - yield from self.subscriber.close() + async def sub_close(self): + await self.subscriber.close() def init_results_model(self, init): self.table_model = ResultsModel(self.table, init) diff --git a/artiq/gui/schedule.py b/artiq/gui/schedule.py index 21ab02b88..f1606caf9 100644 --- a/artiq/gui/schedule.py +++ b/artiq/gui/schedule.py @@ -75,23 +75,20 @@ class ScheduleDock(dockarea.Dock): delete_action.triggered.connect(self.delete_clicked) self.table.addAction(delete_action) - @asyncio.coroutine - def sub_connect(self, host, port): + async def sub_connect(self, host, port): self.subscriber = Subscriber("schedule", self.init_schedule_model) - yield from self.subscriber.connect(host, port) + await self.subscriber.connect(host, port) - @asyncio.coroutine - def sub_close(self): - yield from self.subscriber.close() + async def sub_close(self): + await self.subscriber.close() def init_schedule_model(self, init): self.table_model = _ScheduleModel(self.table, init) self.table.setModel(self.table_model) return self.table_model - @asyncio.coroutine - def delete(self, rid): - yield from self.schedule_ctl.delete(rid) + async def delete(self, rid): + await self.schedule_ctl.delete(rid) def delete_clicked(self): idx = self.table.selectedIndexes() diff --git a/artiq/gui/state.py b/artiq/gui/state.py index 9088da4e6..8cead13f5 100644 --- a/artiq/gui/state.py +++ b/artiq/gui/state.py @@ -69,11 +69,10 @@ class StateManager(TaskObject): exc_info=True) pyon.store_file(self.filename, data) - @asyncio.coroutine - def _do(self): + async def _do(self): try: while True: - yield from asyncio.sleep(self.autosave_period) + await asyncio.sleep(self.autosave_period) self.save() finally: self.save() diff --git a/artiq/master/repository.py b/artiq/master/repository.py index 6478570c8..7560cd240 100644 --- a/artiq/master/repository.py +++ b/artiq/master/repository.py @@ -12,17 +12,16 @@ from artiq.tools import exc_to_warning logger = logging.getLogger(__name__) -@asyncio.coroutine -def _scan_experiments(wd, log): +async def _scan_experiments(wd, log): r = dict() for f in os.listdir(wd): if f.endswith(".py"): try: worker = Worker({"log": lambda message: log("scan", message)}) try: - description = yield from worker.examine(os.path.join(wd, f)) + description = await worker.examine(os.path.join(wd, f)) finally: - yield from worker.close() + await worker.close() for class_name, class_desc in description.items(): name = class_desc["name"] arguments = class_desc["arguments"] @@ -68,8 +67,7 @@ class Repository: # The object cannot be used anymore after calling this method. self.backend.release_rev(self.cur_rev) - @asyncio.coroutine - def scan(self, new_cur_rev=None): + async def scan(self, new_cur_rev=None): if self._scanning: return self._scanning = True @@ -79,7 +77,7 @@ class Repository: wd, _ = self.backend.request_rev(new_cur_rev) self.backend.release_rev(self.cur_rev) self.cur_rev = new_cur_rev - new_explist = yield from _scan_experiments(wd, self.log_fn) + new_explist = await _scan_experiments(wd, self.log_fn) _sync_explist(self.explist, new_explist) finally: diff --git a/artiq/master/scheduler.py b/artiq/master/scheduler.py index d7b931f42..fdd925287 100644 --- a/artiq/master/scheduler.py +++ b/artiq/master/scheduler.py @@ -24,13 +24,12 @@ class RunStatus(Enum): def _mk_worker_method(name): - @asyncio.coroutine - def worker_method(self, *args, **kwargs): + async def worker_method(self, *args, **kwargs): if self.worker.closed.is_set(): return True m = getattr(self.worker, name) try: - return (yield from m(*args, **kwargs)) + return await m(*args, **kwargs) except Exception as e: if isinstance(e, asyncio.CancelledError): raise @@ -97,19 +96,17 @@ class Run: runnable = 1 return (runnable, self.priority, due_date_k, -self.rid) - @asyncio.coroutine - def close(self): + async def close(self): # called through pool - yield from self.worker.close() + await self.worker.close() del self._notifier[self.rid] _build = _mk_worker_method("build") - @asyncio.coroutine - def build(self): - yield from self._build(self.rid, self.pipeline_name, - self.wd, self.expid, - self.priority) + async def build(self): + await self._build(self.rid, self.pipeline_name, + self.wd, self.expid, + self.priority) prepare = _mk_worker_method("prepare") run = _mk_worker_method("run") @@ -154,13 +151,12 @@ class RunPool: self.state_changed.notify() return rid - @asyncio.coroutine - def delete(self, rid): + async def delete(self, rid): # called through deleter if rid not in self.runs: return run = self.runs[rid] - yield from run.close() + await run.close() if "repo_rev" in run.expid: self.repo_backend.release_rev(run.expid["repo_rev"]) del self.runs[rid] @@ -203,14 +199,13 @@ class PrepareStage(TaskObject): else: return candidate.due_date - now - @asyncio.coroutine - def _do(self): + async def _do(self): while True: run = self._get_run() if run is None: - yield from self.pool.state_changed.wait() + await self.pool.state_changed.wait() elif isinstance(run, float): - yield from asyncio_wait_or_cancel([self.pool.state_changed.wait()], + await asyncio_wait_or_cancel([self.pool.state_changed.wait()], timeout=run) else: if run.flush: @@ -221,7 +216,7 @@ class PrepareStage(TaskObject): for r in self.pool.runs.values()): ev = [self.pool.state_changed.wait(), run.worker.closed.wait()] - yield from asyncio_wait_or_cancel( + await asyncio_wait_or_cancel( ev, return_when=asyncio.FIRST_COMPLETED) if run.worker.closed.is_set(): break @@ -229,8 +224,8 @@ class PrepareStage(TaskObject): continue run.status = RunStatus.preparing try: - yield from run.build() - yield from run.prepare() + await run.build() + await run.prepare() except: logger.warning("got worker exception in prepare stage, " "deleting RID %d", @@ -255,8 +250,7 @@ class RunStage(TaskObject): r = None return r - @asyncio.coroutine - def _do(self): + async def _do(self): stack = [] while True: @@ -265,7 +259,7 @@ class RunStage(TaskObject): next_irun is not None and next_irun.priority_key() > stack[-1].priority_key()): while next_irun is None: - yield from self.pool.state_changed.wait() + await self.pool.state_changed.wait() next_irun = self._get_run() stack.append(next_irun) @@ -273,10 +267,10 @@ class RunStage(TaskObject): try: if run.status == RunStatus.paused: run.status = RunStatus.running - completed = yield from run.resume() + completed = await run.resume() else: run.status = RunStatus.running - completed = yield from run.run() + completed = await run.run() except: logger.warning("got worker exception in run stage, " "deleting RID %d", @@ -305,17 +299,16 @@ class AnalyzeStage(TaskObject): r = None return r - @asyncio.coroutine - def _do(self): + async def _do(self): while True: run = self._get_run() while run is None: - yield from self.pool.state_changed.wait() + await self.pool.state_changed.wait() run = self._get_run() run.status = RunStatus.analyzing try: - yield from run.analyze() - yield from run.write_results() + await run.analyze() + await run.write_results() except: logger.warning("got worker exception in analyze stage, " "deleting RID %d", @@ -337,12 +330,11 @@ class Pipeline: self._run.start() self._analyze.start() - @asyncio.coroutine - def stop(self): + async def stop(self): # NB: restart of a stopped pipeline is not supported - yield from self._analyze.stop() - yield from self._run.stop() - yield from self._prepare.stop() + await self._analyze.stop() + await self._run.stop() + await self._prepare.stop() class Deleter(TaskObject): @@ -358,36 +350,32 @@ class Deleter(TaskObject): break self._queue.put_nowait(rid) - @asyncio.coroutine - def join(self): - yield from self._queue.join() + async def join(self): + await self._queue.join() - @asyncio.coroutine - def _delete(self, rid): + async def _delete(self, rid): for pipeline in self._pipelines.values(): if rid in pipeline.pool.runs: logger.debug("deleting RID %d...", rid) - yield from pipeline.pool.delete(rid) + await pipeline.pool.delete(rid) logger.debug("deletion of RID %d completed", rid) break - @asyncio.coroutine - def _gc_pipelines(self): + async def _gc_pipelines(self): pipeline_names = list(self._pipelines.keys()) for name in pipeline_names: if not self._pipelines[name].pool.runs: logger.debug("garbage-collecting pipeline '%s'...", name) - yield from self._pipelines[name].stop() + await self._pipelines[name].stop() del self._pipelines[name] logger.debug("garbage-collection of pipeline '%s' completed", name) - @asyncio.coroutine - def _do(self): + async def _do(self): while True: - rid = yield from self._queue.get() - yield from self._delete(rid) - yield from self._gc_pipelines() + rid = await self._queue.get() + await self._delete(rid) + await self._gc_pipelines() self._queue.task_done() @@ -406,15 +394,14 @@ class Scheduler: def start(self): self._deleter.start() - @asyncio.coroutine - def stop(self): + async def stop(self): # NB: restart of a stopped scheduler is not supported self._terminated = True # prevent further runs from being created for pipeline in self._pipelines.values(): for rid in pipeline.pool.runs.keys(): self._deleter.delete(rid) - yield from self._deleter.join() - yield from self._deleter.stop() + await self._deleter.join() + await self._deleter.stop() if self._pipelines: logger.warning("some pipelines were not garbage-collected") diff --git a/artiq/master/worker.py b/artiq/master/worker.py index 1bb36f94d..104cd8a15 100644 --- a/artiq/master/worker.py +++ b/artiq/master/worker.py @@ -56,27 +56,25 @@ class Worker: else: return None - @asyncio.coroutine - def _create_process(self): - yield from self.io_lock.acquire() + async def _create_process(self): + await self.io_lock.acquire() try: if self.closed.is_set(): raise WorkerError("Attempting to create process after close") - self.process = yield from asyncio.create_subprocess_exec( + self.process = await asyncio.create_subprocess_exec( sys.executable, "-m", "artiq.master.worker_impl", stdout=subprocess.PIPE, stdin=subprocess.PIPE) finally: self.io_lock.release() - @asyncio.coroutine - def close(self, term_timeout=1.0): + async def close(self, term_timeout=1.0): """Interrupts any I/O with the worker process and terminates the worker process. This method should always be called by the user to clean up, even if build() or examine() raises an exception.""" self.closed.set() - yield from self.io_lock.acquire() + await self.io_lock.acquire() try: if self.process is None: # Note the %s - self.rid can be None @@ -91,26 +89,25 @@ class Worker: return obj = {"action": "terminate"} try: - yield from self._send(obj, cancellable=False) + await self._send(obj, cancellable=False) except: logger.warning("failed to send terminate command to worker" " (RID %s), killing", self.rid, exc_info=True) self.process.kill() - yield from self.process.wait() + await self.process.wait() return try: - yield from asyncio.wait_for(self.process.wait(), term_timeout) + await asyncio.wait_for(self.process.wait(), term_timeout) except asyncio.TimeoutError: logger.warning("worker did not exit (RID %s), killing", self.rid) self.process.kill() - yield from self.process.wait() + await self.process.wait() else: logger.debug("worker exited gracefully (RID %s)", self.rid) finally: self.io_lock.release() - @asyncio.coroutine - def _send(self, obj, cancellable=True): + async def _send(self, obj, cancellable=True): assert self.io_lock.locked() line = pyon.encode(obj) self.process.stdin.write(line.encode()) @@ -118,7 +115,7 @@ class Worker: ifs = [self.process.stdin.drain()] if cancellable: ifs.append(self.closed.wait()) - fs = yield from asyncio_wait_or_cancel( + fs = await asyncio_wait_or_cancel( ifs, timeout=self.send_timeout, return_when=asyncio.FIRST_COMPLETED) if all(f.cancelled() for f in fs): @@ -129,10 +126,9 @@ class Worker: if cancellable and self.closed.is_set(): raise WorkerError("Data transmission to worker cancelled") - @asyncio.coroutine - def _recv(self, timeout): + async def _recv(self, timeout): assert self.io_lock.locked() - fs = yield from asyncio_wait_or_cancel( + fs = await asyncio_wait_or_cancel( [self.process.stdout.readline(), self.closed.wait()], timeout=timeout, return_when=asyncio.FIRST_COMPLETED) if all(f.cancelled() for f in fs): @@ -148,13 +144,12 @@ class Worker: raise WorkerError("Worker sent invalid PYON data") return obj - @asyncio.coroutine - def _handle_worker_requests(self): + async def _handle_worker_requests(self): while True: try: - yield from self.io_lock.acquire() + await self.io_lock.acquire() try: - obj = yield from self._recv(self.watchdog_time()) + obj = await self._recv(self.watchdog_time()) finally: self.io_lock.release() except WorkerTimeout: @@ -181,24 +176,23 @@ class Worker: except: reply = {"status": "failed", "message": traceback.format_exc()} - yield from self.io_lock.acquire() + await self.io_lock.acquire() try: - yield from self._send(reply) + await self._send(reply) finally: self.io_lock.release() - @asyncio.coroutine - def _worker_action(self, obj, timeout=None): + async def _worker_action(self, obj, timeout=None): if timeout is not None: self.watchdogs[-1] = time.monotonic() + timeout try: - yield from self.io_lock.acquire() + await self.io_lock.acquire() try: - yield from self._send(obj) + await self._send(obj) finally: self.io_lock.release() try: - completed = yield from self._handle_worker_requests() + completed = await self._handle_worker_requests() except WorkerTimeout: raise WorkerWatchdogTimeout finally: @@ -206,11 +200,10 @@ class Worker: del self.watchdogs[-1] return completed - @asyncio.coroutine - def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0): + async def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0): self.rid = rid - yield from self._create_process() - yield from self._worker_action( + await self._create_process() + await self._worker_action( {"action": "build", "rid": rid, "pipeline_name": pipeline_name, @@ -219,45 +212,39 @@ class Worker: "priority": priority}, timeout) - @asyncio.coroutine - def prepare(self): - yield from self._worker_action({"action": "prepare"}) + async def prepare(self): + await self._worker_action({"action": "prepare"}) - @asyncio.coroutine - def run(self): - completed = yield from self._worker_action({"action": "run"}) + async def run(self): + completed = await self._worker_action({"action": "run"}) if not completed: self.yield_time = time.monotonic() return completed - @asyncio.coroutine - def resume(self): + async def resume(self): stop_duration = time.monotonic() - self.yield_time for wid, expiry in self.watchdogs: self.watchdogs[wid] += stop_duration - completed = yield from self._worker_action({"status": "ok", - "data": None}) + completed = await self._worker_action({"status": "ok", + "data": None}) if not completed: self.yield_time = time.monotonic() return completed - @asyncio.coroutine - def analyze(self): - yield from self._worker_action({"action": "analyze"}) + async def analyze(self): + await self._worker_action({"action": "analyze"}) - @asyncio.coroutine - def write_results(self, timeout=15.0): - yield from self._worker_action({"action": "write_results"}, - timeout) + async def write_results(self, timeout=15.0): + await self._worker_action({"action": "write_results"}, + timeout) - @asyncio.coroutine - def examine(self, file, timeout=20.0): - yield from self._create_process() + async def examine(self, file, timeout=20.0): + await self._create_process() r = dict() def register(class_name, name, arguments): r[class_name] = {"name": name, "arguments": arguments} self.register_experiment = register - yield from self._worker_action({"action": "examine", + await self._worker_action({"action": "examine", "file": file}, timeout) del self.register_experiment return r diff --git a/artiq/protocols/asyncio_server.py b/artiq/protocols/asyncio_server.py index 451bb272f..a2425137f 100644 --- a/artiq/protocols/asyncio_server.py +++ b/artiq/protocols/asyncio_server.py @@ -12,8 +12,7 @@ class AsyncioServer: def __init__(self): self._client_tasks = set() - @asyncio.coroutine - def start(self, host, port): + async def start(self, host, port): """Starts the server. The user must call ``stop`` to free resources properly after this @@ -26,11 +25,10 @@ class AsyncioServer: :param port: TCP port to bind to. """ - self.server = yield from asyncio.start_server(self._handle_connection, - host, port) + self.server = await asyncio.start_server(self._handle_connection, + host, port) - @asyncio.coroutine - def stop(self): + async def stop(self): """Stops the server. """ @@ -39,11 +37,11 @@ class AsyncioServer: task.cancel() for task in wait_for: try: - yield from asyncio.wait_for(task, None) + await asyncio.wait_for(task, None) except asyncio.CancelledError: pass self.server.close() - yield from self.server.wait_closed() + await self.server.wait_closed() del self.server def _client_done(self, task): diff --git a/artiq/protocols/pc_rpc.py b/artiq/protocols/pc_rpc.py index f001d3a26..beab88224 100644 --- a/artiq/protocols/pc_rpc.py +++ b/artiq/protocols/pc_rpc.py @@ -159,16 +159,15 @@ class AsyncioClient: self.__target_names = None self.__description = None - @asyncio.coroutine - def connect_rpc(self, host, port, target_name): + async def connect_rpc(self, host, port, target_name): """Connects to the server. This cannot be done in __init__ because this method is a coroutine. See ``Client`` for a description of the parameters.""" self.__reader, self.__writer = \ - yield from asyncio.open_connection(host, port) + await asyncio.open_connection(host, port) try: self.__writer.write(_init_string) - server_identification = yield from self.__recv() + server_identification = await self.__recv() self.__target_names = server_identification["targets"] self.__description = server_identification["description"] if target_name is not None: @@ -205,20 +204,18 @@ class AsyncioClient: line = pyon.encode(obj) + "\n" self.__writer.write(line.encode()) - @asyncio.coroutine - def __recv(self): - line = yield from self.__reader.readline() + async def __recv(self): + line = await self.__reader.readline() return pyon.decode(line.decode()) - @asyncio.coroutine - def __do_rpc(self, name, args, kwargs): - yield from self.__lock.acquire() + async def __do_rpc(self, name, args, kwargs): + await self.__lock.acquire() try: obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs} self.__send(obj) - obj = yield from self.__recv() + obj = await self.__recv() if obj["status"] == "ok": return obj["ret"] elif obj["status"] == "failed": @@ -229,9 +226,8 @@ class AsyncioClient: self.__lock.release() def __getattr__(self, name): - @asyncio.coroutine - def proxy(*args, **kwargs): - res = yield from self.__do_rpc(name, args, kwargs) + async def proxy(*args, **kwargs): + res = await self.__do_rpc(name, args, kwargs) return res return proxy @@ -413,10 +409,9 @@ class Server(_AsyncioServer): if builtin_terminate: self._terminate_request = asyncio.Event() - @asyncio.coroutine - def _handle_connection_cr(self, reader, writer): + async def _handle_connection_cr(self, reader, writer): try: - line = yield from reader.readline() + line = await reader.readline() if line != _init_string: return @@ -426,7 +421,7 @@ class Server(_AsyncioServer): } line = pyon.encode(obj) + "\n" writer.write(line.encode()) - line = yield from reader.readline() + line = await reader.readline() if not line: return target_name = line.decode()[:-1] @@ -436,7 +431,7 @@ class Server(_AsyncioServer): return while True: - line = yield from reader.readline() + line = await reader.readline() if not line: break obj = pyon.decode(line.decode()) @@ -486,9 +481,8 @@ class Server(_AsyncioServer): finally: writer.close() - @asyncio.coroutine - def wait_terminate(self): - yield from self._terminate_request.wait() + async def wait_terminate(self): + await self._terminate_request.wait() def simple_server_loop(targets, host, port, description=None): diff --git a/artiq/protocols/sync_struct.py b/artiq/protocols/sync_struct.py index e8fd2c69d..e2c1021ad 100644 --- a/artiq/protocols/sync_struct.py +++ b/artiq/protocols/sync_struct.py @@ -61,10 +61,9 @@ class Subscriber: self.target_builders = [target_builder] self.notify_cb = notify_cb - @asyncio.coroutine - def connect(self, host, port, before_receive_cb=None): + async def connect(self, host, port, before_receive_cb=None): self.reader, self.writer = \ - yield from asyncio.open_connection(host, port) + await asyncio.open_connection(host, port) try: if before_receive_cb is not None: before_receive_cb() @@ -77,12 +76,11 @@ class Subscriber: del self.writer raise - @asyncio.coroutine - def close(self): + async def close(self): try: self.receive_task.cancel() try: - yield from asyncio.wait_for(self.receive_task, None) + await asyncio.wait_for(self.receive_task, None) except asyncio.CancelledError: pass finally: @@ -90,11 +88,10 @@ class Subscriber: del self.reader del self.writer - @asyncio.coroutine - def _receive_cr(self): + async def _receive_cr(self): targets = [] while True: - line = yield from self.reader.readline() + line = await self.reader.readline() if not line: return mod = pyon.decode(line.decode()) @@ -209,14 +206,13 @@ class Publisher(AsyncioServer): for notifier in notifiers.values(): notifier.publish = partial(self.publish, notifier) - @asyncio.coroutine - def _handle_connection_cr(self, reader, writer): + async def _handle_connection_cr(self, reader, writer): try: - line = yield from reader.readline() + line = await reader.readline() if line != _init_string: return - line = yield from reader.readline() + line = await reader.readline() if not line: return notifier_name = line.decode()[:-1] @@ -234,10 +230,10 @@ class Publisher(AsyncioServer): self._recipients[notifier_name].add(queue) try: while True: - line = yield from queue.get() + line = await queue.get() writer.write(line) # raise exception on connection error - yield from writer.drain() + await writer.drain() finally: self._recipients[notifier_name].remove(queue) except ConnectionResetError: diff --git a/artiq/test/pc_rpc.py b/artiq/test/pc_rpc.py index 5bd0a64cd..19000c659 100644 --- a/artiq/test/pc_rpc.py +++ b/artiq/test/pc_rpc.py @@ -52,23 +52,22 @@ class RPCCase(unittest.TestCase): def test_blocking_echo(self): self._run_server_and_test(self._blocking_echo) - @asyncio.coroutine - def _asyncio_echo(self): + async def _asyncio_echo(self): remote = pc_rpc.AsyncioClient() for attempt in range(100): - yield from asyncio.sleep(.2) + await asyncio.sleep(.2) try: - yield from remote.connect_rpc(test_address, test_port, "test") + await remote.connect_rpc(test_address, test_port, "test") except ConnectionRefusedError: pass else: break try: - test_object_back = yield from remote.echo(test_object) + test_object_back = await remote.echo(test_object) self.assertEqual(test_object, test_object_back) with self.assertRaises(pc_rpc.RemoteError): - yield from remote.non_existing_method() - yield from remote.terminate() + await remote.non_existing_method() + await remote.terminate() finally: remote.close_rpc() diff --git a/artiq/test/sync_struct.py b/artiq/test/sync_struct.py index 3f30062ff..00e4af878 100644 --- a/artiq/test/sync_struct.py +++ b/artiq/test/sync_struct.py @@ -8,7 +8,6 @@ test_address = "::1" test_port = 7777 -@asyncio.coroutine def write_test_data(test_dict): test_values = [5, 2.1, None, True, False, {"a": 5, 2: np.linspace(0, 10, 1)}, @@ -30,12 +29,11 @@ def write_test_data(test_dict): test_dict["finished"] = True -@asyncio.coroutine -def start_server(publisher_future, test_dict_future): +async def start_server(publisher_future, test_dict_future): test_dict = sync_struct.Notifier(dict()) publisher = sync_struct.Publisher( {"test": test_dict}) - yield from publisher.start(test_address, test_port) + await publisher.start(test_address, test_port) publisher_future.set_result(publisher) test_dict_future.set_result(test_dict) @@ -66,9 +64,9 @@ class SyncStructCase(unittest.TestCase): self.publisher = publisher.result() test_dict = test_dict.result() test_vector = dict() - loop.run_until_complete(write_test_data(test_vector)) + write_test_data(test_vector) - asyncio.ensure_future(write_test_data(test_dict)) + write_test_data(test_dict) self.subscriber = sync_struct.Subscriber("test", self.init_test_dict, self.notify) loop.run_until_complete(self.subscriber.connect(test_address, diff --git a/artiq/test/worker.py b/artiq/test/worker.py index b00660188..7be4304a0 100644 --- a/artiq/test/worker.py +++ b/artiq/test/worker.py @@ -36,15 +36,14 @@ class WatchdogTimeoutInBuild(EnvExperiment): pass -@asyncio.coroutine -def _call_worker(worker, expid): +async def _call_worker(worker, expid): try: - yield from worker.build(0, "main", None, expid, 0) - yield from worker.prepare() - yield from worker.run() - yield from worker.analyze() + await worker.build(0, "main", None, expid, 0) + await worker.prepare() + await worker.run() + await worker.analyze() finally: - yield from worker.close() + await worker.close() def _run_experiment(class_name): diff --git a/artiq/tools.py b/artiq/tools.py index e89910e20..56ae0fda1 100644 --- a/artiq/tools.py +++ b/artiq/tools.py @@ -79,27 +79,25 @@ def init_logger(args): logging.basicConfig(level=logging.WARNING + args.quiet*10 - args.verbose*10) -@asyncio.coroutine -def exc_to_warning(coro): +async def exc_to_warning(coro): try: - yield from coro + await coro except: logger.warning("asyncio coroutine terminated with exception", exc_info=True) -@asyncio.coroutine -def asyncio_wait_or_cancel(fs, **kwargs): +async def asyncio_wait_or_cancel(fs, **kwargs): fs = [asyncio.ensure_future(f) for f in fs] try: - d, p = yield from asyncio.wait(fs, **kwargs) + d, p = await asyncio.wait(fs, **kwargs) except: for f in fs: f.cancel() raise for f in p: f.cancel() - yield from asyncio.wait([f]) + await asyncio.wait([f]) return fs @@ -107,17 +105,15 @@ class TaskObject: def start(self): self.task = asyncio.ensure_future(self._do()) - @asyncio.coroutine - def stop(self): + async def stop(self): self.task.cancel() try: - yield from asyncio.wait_for(self.task, None) + await asyncio.wait_for(self.task, None) except asyncio.CancelledError: pass del self.task - @asyncio.coroutine - def _do(self): + async def _do(self): raise NotImplementedError @@ -129,13 +125,12 @@ class Condition: self._loop = asyncio.get_event_loop() self._waiters = collections.deque() - @asyncio.coroutine - def wait(self): + async def wait(self): """Wait until notified.""" fut = asyncio.Future(loop=self._loop) self._waiters.append(fut) try: - yield from fut + await fut finally: self._waiters.remove(fut)