use Python 3.5 coroutines

pull/235/head
Sebastien Bourdeauducq 2015-10-03 19:28:57 +08:00
parent b117b9320d
commit f552d62b69
19 changed files with 228 additions and 304 deletions

View File

@ -56,60 +56,55 @@ class Controller:
self.process = None
self.launch_task = asyncio.Task(self.launcher())
@asyncio.coroutine
def end(self):
async def end(self):
self.launch_task.cancel()
yield from asyncio.wait_for(self.launch_task, None)
await asyncio.wait_for(self.launch_task, None)
@asyncio.coroutine
def _call_controller(self, method):
async def _call_controller(self, method):
remote = AsyncioClient()
yield from remote.connect_rpc(self.host, self.port, None)
await remote.connect_rpc(self.host, self.port, None)
try:
targets, _ = remote.get_rpc_id()
remote.select_rpc_target(targets[0])
r = yield from getattr(remote, method)()
r = await getattr(remote, method)()
finally:
remote.close_rpc()
return r
@asyncio.coroutine
def _ping(self):
async def _ping(self):
try:
ok = yield from asyncio.wait_for(self._call_controller("ping"),
self.ping_timeout)
ok = await asyncio.wait_for(self._call_controller("ping"),
self.ping_timeout)
if ok:
self.retry_timer_cur = self.retry_timer
return ok
except:
return False
@asyncio.coroutine
def _wait_and_ping(self):
async def _wait_and_ping(self):
while True:
try:
yield from asyncio.wait_for(self.process.wait(),
self.ping_timer)
await asyncio.wait_for(self.process.wait(),
self.ping_timer)
except asyncio.TimeoutError:
logger.debug("pinging controller %s", self.name)
ok = yield from self._ping()
ok = await self._ping()
if not ok:
logger.warning("Controller %s ping failed", self.name)
yield from self._terminate()
await self._terminate()
return
else:
break
@asyncio.coroutine
def launcher(self):
async def launcher(self):
try:
while True:
logger.info("Starting controller %s with command: %s",
self.name, self.command)
try:
self.process = yield from asyncio.create_subprocess_exec(
self.process = await asyncio.create_subprocess_exec(
*shlex.split(self.command))
yield from self._wait_and_ping()
await self._wait_and_ping()
except FileNotFoundError:
logger.warning("Controller %s failed to start", self.name)
else:
@ -117,33 +112,32 @@ class Controller:
logger.warning("Restarting in %.1f seconds",
self.retry_timer_cur)
try:
yield from asyncio.wait_for(self.retry_now.wait(),
self.retry_timer_cur)
await asyncio.wait_for(self.retry_now.wait(),
self.retry_timer_cur)
except asyncio.TimeoutError:
pass
self.retry_timer_cur *= self.retry_timer_backoff
except asyncio.CancelledError:
yield from self._terminate()
await self._terminate()
@asyncio.coroutine
def _terminate(self):
async def _terminate(self):
logger.info("Terminating controller %s", self.name)
if self.process is not None and self.process.returncode is None:
try:
yield from asyncio.wait_for(self._call_controller("terminate"),
self.term_timeout)
await asyncio.wait_for(self._call_controller("terminate"),
self.term_timeout)
except:
logger.warning("Controller %s did not respond to terminate "
"command, killing", self.name)
self.process.kill()
try:
yield from asyncio.wait_for(self.process.wait(),
self.term_timeout)
await asyncio.wait_for(self.process.wait(),
self.term_timeout)
except:
logger.warning("Controller %s failed to exit, killing",
self.name)
self.process.kill()
yield from self.process.wait()
await self.process.wait()
logger.debug("Controller %s terminated", self.name)
@ -163,17 +157,16 @@ class Controllers:
self.active = dict()
self.process_task = asyncio.Task(self._process())
@asyncio.coroutine
def _process(self):
async def _process(self):
while True:
action, param = yield from self.queue.get()
action, param = await self.queue.get()
if action == "set":
k, ddb_entry = param
if k in self.active:
yield from self.active[k].end()
await self.active[k].end()
self.active[k] = Controller(k, ddb_entry)
elif action == "del":
yield from self.active[param].end()
await self.active[param].end()
del self.active[param]
else:
raise ValueError
@ -196,11 +189,10 @@ class Controllers:
for name in set(self.active_or_queued):
del self[name]
@asyncio.coroutine
def shutdown(self):
async def shutdown(self):
self.process_task.cancel()
for c in self.active.values():
yield from c.end()
await c.end()
class ControllerDB:
@ -225,8 +217,7 @@ class ControllerManager(TaskObject):
self.retry_master = retry_master
self.controller_db = ControllerDB()
@asyncio.coroutine
def _do(self):
async def _do(self):
try:
subscriber = Subscriber("devices",
self.controller_db.sync_struct_init)
@ -236,12 +227,12 @@ class ControllerManager(TaskObject):
s = subscriber.writer.get_extra_info("socket")
localhost = s.getsockname()[0]
self.controller_db.set_host_filter(localhost)
yield from subscriber.connect(self.server, self.port,
set_host_filter)
await subscriber.connect(self.server, self.port,
set_host_filter)
try:
yield from asyncio.wait_for(subscriber.receive_task, None)
await asyncio.wait_for(subscriber.receive_task, None)
finally:
yield from subscriber.close()
await subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
@ -249,11 +240,11 @@ class ControllerManager(TaskObject):
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", self.retry_master)
yield from asyncio.sleep(self.retry_master)
await asyncio.sleep(self.retry_master)
except asyncio.CancelledError:
pass
finally:
yield from self.controller_db.current_controllers.shutdown()
await self.controller_db.current_controllers.shutdown()
def retry_now(self, k):
"""If a controller is disabled and pending retry, perform that retry

View File

@ -96,24 +96,23 @@ class DBWriter(TaskObject):
logger.warning("failed to update parameter '%s': "
"too many pending updates", k)
@asyncio.coroutine
def _do(self):
async def _do(self):
while True:
k, v = yield from self._queue.get()
k, v = await self._queue.get()
url = self.base_url + "/write"
params = {"u": self.user, "p": self.password, "db": self.database,
"consistency": "any", "precision": "n"}
fmt_ty, fmt_v = format_influxdb(v)
data = "{},parameter={} {}={}".format(self.table, k, fmt_ty, fmt_v)
try:
response = yield from aiohttp.request(
response = await aiohttp.request(
"POST", url, params=params, data=data)
except:
logger.warning("got exception trying to update '%s'",
k, exc_info=True)
else:
if response.status not in (200, 204):
content = (yield from response.content.read()).decode()
content = (await response.content.read()).decode()
if content:
content = content[:-1] # drop \n
logger.warning("got HTTP status %d "
@ -144,18 +143,17 @@ class MasterReader(TaskObject):
self.filter_function = filter_function
self.writer = writer
@asyncio.coroutine
def _do(self):
async def _do(self):
subscriber = Subscriber(
"parameters",
partial(Parameters, self.filter_function, self.writer))
while True:
try:
yield from subscriber.connect(self.server, self.port)
await subscriber.connect(self.server, self.port)
try:
yield from asyncio.wait_for(subscriber.receive_task, None)
await asyncio.wait_for(subscriber.receive_task, None)
finally:
yield from subscriber.close()
await subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
@ -163,7 +161,7 @@ class MasterReader(TaskObject):
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", self.retry)
yield from asyncio.sleep(self.retry)
await asyncio.sleep(self.retry)
class Filter:

View File

@ -300,23 +300,20 @@ class ExplorerDock(dockarea.Dock):
def enable_duedate(self):
self.datetime_en.setChecked(True)
@asyncio.coroutine
def sub_connect(self, host, port):
async def sub_connect(self, host, port):
self.explist_subscriber = Subscriber("explist",
self.init_explist_model)
yield from self.explist_subscriber.connect(host, port)
await self.explist_subscriber.connect(host, port)
@asyncio.coroutine
def sub_close(self):
yield from self.explist_subscriber.close()
async def sub_close(self):
await self.explist_subscriber.close()
def init_explist_model(self, init):
self.explist_model = _ExplistModel(self, self.el, init)
self.el.setModel(self.explist_model)
return self.explist_model
@asyncio.coroutine
def submit(self, pipeline_name, file, class_name, arguments,
async def submit(self, pipeline_name, file, class_name, arguments,
priority, due_date, flush):
expid = {
"repo_rev": None,
@ -324,8 +321,8 @@ class ExplorerDock(dockarea.Dock):
"class_name": class_name,
"arguments": arguments,
}
rid = yield from self.schedule_ctl.submit(pipeline_name, expid,
priority, due_date, flush)
rid = await self.schedule_ctl.submit(pipeline_name, expid,
priority, due_date, flush)
self.status_bar.showMessage("Submitted RID {}".format(rid))
def submit_clicked(self):

View File

@ -41,14 +41,12 @@ class LogDock(dockarea.Dock):
self.addWidget(self.log)
self.scroll_at_bottom = False
@asyncio.coroutine
def sub_connect(self, host, port):
async def sub_connect(self, host, port):
self.subscriber = Subscriber("log", self.init_log_model)
yield from self.subscriber.connect(host, port)
await self.subscriber.connect(host, port)
@asyncio.coroutine
def sub_close(self):
yield from self.subscriber.close()
async def sub_close(self):
await self.subscriber.close()
def rows_inserted_before(self):
scrollbar = self.log.verticalScrollBar()

View File

@ -232,26 +232,24 @@ class MonInj(TaskObject):
self.dm = _DeviceManager(self.send_to_device, dict())
self.transport = None
@asyncio.coroutine
def start(self, server, port):
async def start(self, server, port):
loop = asyncio.get_event_loop()
yield from loop.create_datagram_endpoint(lambda: self,
await loop.create_datagram_endpoint(lambda: self,
family=socket.AF_INET)
try:
yield from self.subscriber.connect(server, port)
await self.subscriber.connect(server, port)
try:
TaskObject.start(self)
except:
yield from self.subscriber.close()
await self.subscriber.close()
raise
except:
self.transport.close()
raise
@asyncio.coroutine
def stop(self):
yield from TaskObject.stop(self)
yield from self.subscriber.close()
async def stop(self):
await TaskObject.stop(self)
await self.subscriber.close()
if self.transport is not None:
self.transport.close()
self.transport = None
@ -295,10 +293,9 @@ class MonInj(TaskObject):
else:
self.transport.sendto(data, (ca, 3250))
@asyncio.coroutine
def _do(self):
async def _do(self):
while True:
yield from asyncio.sleep(0.2)
await asyncio.sleep(0.2)
# MONINJ_REQ_MONITOR
self.send_to_device(b"\x01")

View File

@ -59,14 +59,12 @@ class ParametersDock(dockarea.Dock):
else:
self.table.hideRow(row)
@asyncio.coroutine
def sub_connect(self, host, port):
async def sub_connect(self, host, port):
self.subscriber = Subscriber("parameters", self.init_parameters_model)
yield from self.subscriber.connect(host, port)
await self.subscriber.connect(host, port)
@asyncio.coroutine
def sub_close(self):
yield from self.subscriber.close()
async def sub_close(self):
await self.subscriber.close()
def init_parameters_model(self, init):
self.table_model = ParametersModel(self.table, init)

View File

@ -68,15 +68,13 @@ class ResultsDock(dockarea.Dock):
def get_result(self, key):
return self.table_model.backing_store[key]
@asyncio.coroutine
def sub_connect(self, host, port):
async def sub_connect(self, host, port):
self.subscriber = Subscriber("rt_results", self.init_results_model,
self.on_mod)
yield from self.subscriber.connect(host, port)
await self.subscriber.connect(host, port)
@asyncio.coroutine
def sub_close(self):
yield from self.subscriber.close()
async def sub_close(self):
await self.subscriber.close()
def init_results_model(self, init):
self.table_model = ResultsModel(self.table, init)

View File

@ -75,23 +75,20 @@ class ScheduleDock(dockarea.Dock):
delete_action.triggered.connect(self.delete_clicked)
self.table.addAction(delete_action)
@asyncio.coroutine
def sub_connect(self, host, port):
async def sub_connect(self, host, port):
self.subscriber = Subscriber("schedule", self.init_schedule_model)
yield from self.subscriber.connect(host, port)
await self.subscriber.connect(host, port)
@asyncio.coroutine
def sub_close(self):
yield from self.subscriber.close()
async def sub_close(self):
await self.subscriber.close()
def init_schedule_model(self, init):
self.table_model = _ScheduleModel(self.table, init)
self.table.setModel(self.table_model)
return self.table_model
@asyncio.coroutine
def delete(self, rid):
yield from self.schedule_ctl.delete(rid)
async def delete(self, rid):
await self.schedule_ctl.delete(rid)
def delete_clicked(self):
idx = self.table.selectedIndexes()

View File

@ -69,11 +69,10 @@ class StateManager(TaskObject):
exc_info=True)
pyon.store_file(self.filename, data)
@asyncio.coroutine
def _do(self):
async def _do(self):
try:
while True:
yield from asyncio.sleep(self.autosave_period)
await asyncio.sleep(self.autosave_period)
self.save()
finally:
self.save()

View File

@ -12,17 +12,16 @@ from artiq.tools import exc_to_warning
logger = logging.getLogger(__name__)
@asyncio.coroutine
def _scan_experiments(wd, log):
async def _scan_experiments(wd, log):
r = dict()
for f in os.listdir(wd):
if f.endswith(".py"):
try:
worker = Worker({"log": lambda message: log("scan", message)})
try:
description = yield from worker.examine(os.path.join(wd, f))
description = await worker.examine(os.path.join(wd, f))
finally:
yield from worker.close()
await worker.close()
for class_name, class_desc in description.items():
name = class_desc["name"]
arguments = class_desc["arguments"]
@ -68,8 +67,7 @@ class Repository:
# The object cannot be used anymore after calling this method.
self.backend.release_rev(self.cur_rev)
@asyncio.coroutine
def scan(self, new_cur_rev=None):
async def scan(self, new_cur_rev=None):
if self._scanning:
return
self._scanning = True
@ -79,7 +77,7 @@ class Repository:
wd, _ = self.backend.request_rev(new_cur_rev)
self.backend.release_rev(self.cur_rev)
self.cur_rev = new_cur_rev
new_explist = yield from _scan_experiments(wd, self.log_fn)
new_explist = await _scan_experiments(wd, self.log_fn)
_sync_explist(self.explist, new_explist)
finally:

View File

@ -24,13 +24,12 @@ class RunStatus(Enum):
def _mk_worker_method(name):
@asyncio.coroutine
def worker_method(self, *args, **kwargs):
async def worker_method(self, *args, **kwargs):
if self.worker.closed.is_set():
return True
m = getattr(self.worker, name)
try:
return (yield from m(*args, **kwargs))
return await m(*args, **kwargs)
except Exception as e:
if isinstance(e, asyncio.CancelledError):
raise
@ -97,19 +96,17 @@ class Run:
runnable = 1
return (runnable, self.priority, due_date_k, -self.rid)
@asyncio.coroutine
def close(self):
async def close(self):
# called through pool
yield from self.worker.close()
await self.worker.close()
del self._notifier[self.rid]
_build = _mk_worker_method("build")
@asyncio.coroutine
def build(self):
yield from self._build(self.rid, self.pipeline_name,
self.wd, self.expid,
self.priority)
async def build(self):
await self._build(self.rid, self.pipeline_name,
self.wd, self.expid,
self.priority)
prepare = _mk_worker_method("prepare")
run = _mk_worker_method("run")
@ -154,13 +151,12 @@ class RunPool:
self.state_changed.notify()
return rid
@asyncio.coroutine
def delete(self, rid):
async def delete(self, rid):
# called through deleter
if rid not in self.runs:
return
run = self.runs[rid]
yield from run.close()
await run.close()
if "repo_rev" in run.expid:
self.repo_backend.release_rev(run.expid["repo_rev"])
del self.runs[rid]
@ -203,14 +199,13 @@ class PrepareStage(TaskObject):
else:
return candidate.due_date - now
@asyncio.coroutine
def _do(self):
async def _do(self):
while True:
run = self._get_run()
if run is None:
yield from self.pool.state_changed.wait()
await self.pool.state_changed.wait()
elif isinstance(run, float):
yield from asyncio_wait_or_cancel([self.pool.state_changed.wait()],
await asyncio_wait_or_cancel([self.pool.state_changed.wait()],
timeout=run)
else:
if run.flush:
@ -221,7 +216,7 @@ class PrepareStage(TaskObject):
for r in self.pool.runs.values()):
ev = [self.pool.state_changed.wait(),
run.worker.closed.wait()]
yield from asyncio_wait_or_cancel(
await asyncio_wait_or_cancel(
ev, return_when=asyncio.FIRST_COMPLETED)
if run.worker.closed.is_set():
break
@ -229,8 +224,8 @@ class PrepareStage(TaskObject):
continue
run.status = RunStatus.preparing
try:
yield from run.build()
yield from run.prepare()
await run.build()
await run.prepare()
except:
logger.warning("got worker exception in prepare stage, "
"deleting RID %d",
@ -255,8 +250,7 @@ class RunStage(TaskObject):
r = None
return r
@asyncio.coroutine
def _do(self):
async def _do(self):
stack = []
while True:
@ -265,7 +259,7 @@ class RunStage(TaskObject):
next_irun is not None and
next_irun.priority_key() > stack[-1].priority_key()):
while next_irun is None:
yield from self.pool.state_changed.wait()
await self.pool.state_changed.wait()
next_irun = self._get_run()
stack.append(next_irun)
@ -273,10 +267,10 @@ class RunStage(TaskObject):
try:
if run.status == RunStatus.paused:
run.status = RunStatus.running
completed = yield from run.resume()
completed = await run.resume()
else:
run.status = RunStatus.running
completed = yield from run.run()
completed = await run.run()
except:
logger.warning("got worker exception in run stage, "
"deleting RID %d",
@ -305,17 +299,16 @@ class AnalyzeStage(TaskObject):
r = None
return r
@asyncio.coroutine
def _do(self):
async def _do(self):
while True:
run = self._get_run()
while run is None:
yield from self.pool.state_changed.wait()
await self.pool.state_changed.wait()
run = self._get_run()
run.status = RunStatus.analyzing
try:
yield from run.analyze()
yield from run.write_results()
await run.analyze()
await run.write_results()
except:
logger.warning("got worker exception in analyze stage, "
"deleting RID %d",
@ -337,12 +330,11 @@ class Pipeline:
self._run.start()
self._analyze.start()
@asyncio.coroutine
def stop(self):
async def stop(self):
# NB: restart of a stopped pipeline is not supported
yield from self._analyze.stop()
yield from self._run.stop()
yield from self._prepare.stop()
await self._analyze.stop()
await self._run.stop()
await self._prepare.stop()
class Deleter(TaskObject):
@ -358,36 +350,32 @@ class Deleter(TaskObject):
break
self._queue.put_nowait(rid)
@asyncio.coroutine
def join(self):
yield from self._queue.join()
async def join(self):
await self._queue.join()
@asyncio.coroutine
def _delete(self, rid):
async def _delete(self, rid):
for pipeline in self._pipelines.values():
if rid in pipeline.pool.runs:
logger.debug("deleting RID %d...", rid)
yield from pipeline.pool.delete(rid)
await pipeline.pool.delete(rid)
logger.debug("deletion of RID %d completed", rid)
break
@asyncio.coroutine
def _gc_pipelines(self):
async def _gc_pipelines(self):
pipeline_names = list(self._pipelines.keys())
for name in pipeline_names:
if not self._pipelines[name].pool.runs:
logger.debug("garbage-collecting pipeline '%s'...", name)
yield from self._pipelines[name].stop()
await self._pipelines[name].stop()
del self._pipelines[name]
logger.debug("garbage-collection of pipeline '%s' completed",
name)
@asyncio.coroutine
def _do(self):
async def _do(self):
while True:
rid = yield from self._queue.get()
yield from self._delete(rid)
yield from self._gc_pipelines()
rid = await self._queue.get()
await self._delete(rid)
await self._gc_pipelines()
self._queue.task_done()
@ -406,15 +394,14 @@ class Scheduler:
def start(self):
self._deleter.start()
@asyncio.coroutine
def stop(self):
async def stop(self):
# NB: restart of a stopped scheduler is not supported
self._terminated = True # prevent further runs from being created
for pipeline in self._pipelines.values():
for rid in pipeline.pool.runs.keys():
self._deleter.delete(rid)
yield from self._deleter.join()
yield from self._deleter.stop()
await self._deleter.join()
await self._deleter.stop()
if self._pipelines:
logger.warning("some pipelines were not garbage-collected")

View File

@ -56,27 +56,25 @@ class Worker:
else:
return None
@asyncio.coroutine
def _create_process(self):
yield from self.io_lock.acquire()
async def _create_process(self):
await self.io_lock.acquire()
try:
if self.closed.is_set():
raise WorkerError("Attempting to create process after close")
self.process = yield from asyncio.create_subprocess_exec(
self.process = await asyncio.create_subprocess_exec(
sys.executable, "-m", "artiq.master.worker_impl",
stdout=subprocess.PIPE, stdin=subprocess.PIPE)
finally:
self.io_lock.release()
@asyncio.coroutine
def close(self, term_timeout=1.0):
async def close(self, term_timeout=1.0):
"""Interrupts any I/O with the worker process and terminates the
worker process.
This method should always be called by the user to clean up, even if
build() or examine() raises an exception."""
self.closed.set()
yield from self.io_lock.acquire()
await self.io_lock.acquire()
try:
if self.process is None:
# Note the %s - self.rid can be None
@ -91,26 +89,25 @@ class Worker:
return
obj = {"action": "terminate"}
try:
yield from self._send(obj, cancellable=False)
await self._send(obj, cancellable=False)
except:
logger.warning("failed to send terminate command to worker"
" (RID %s), killing", self.rid, exc_info=True)
self.process.kill()
yield from self.process.wait()
await self.process.wait()
return
try:
yield from asyncio.wait_for(self.process.wait(), term_timeout)
await asyncio.wait_for(self.process.wait(), term_timeout)
except asyncio.TimeoutError:
logger.warning("worker did not exit (RID %s), killing", self.rid)
self.process.kill()
yield from self.process.wait()
await self.process.wait()
else:
logger.debug("worker exited gracefully (RID %s)", self.rid)
finally:
self.io_lock.release()
@asyncio.coroutine
def _send(self, obj, cancellable=True):
async def _send(self, obj, cancellable=True):
assert self.io_lock.locked()
line = pyon.encode(obj)
self.process.stdin.write(line.encode())
@ -118,7 +115,7 @@ class Worker:
ifs = [self.process.stdin.drain()]
if cancellable:
ifs.append(self.closed.wait())
fs = yield from asyncio_wait_or_cancel(
fs = await asyncio_wait_or_cancel(
ifs, timeout=self.send_timeout,
return_when=asyncio.FIRST_COMPLETED)
if all(f.cancelled() for f in fs):
@ -129,10 +126,9 @@ class Worker:
if cancellable and self.closed.is_set():
raise WorkerError("Data transmission to worker cancelled")
@asyncio.coroutine
def _recv(self, timeout):
async def _recv(self, timeout):
assert self.io_lock.locked()
fs = yield from asyncio_wait_or_cancel(
fs = await asyncio_wait_or_cancel(
[self.process.stdout.readline(), self.closed.wait()],
timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
if all(f.cancelled() for f in fs):
@ -148,13 +144,12 @@ class Worker:
raise WorkerError("Worker sent invalid PYON data")
return obj
@asyncio.coroutine
def _handle_worker_requests(self):
async def _handle_worker_requests(self):
while True:
try:
yield from self.io_lock.acquire()
await self.io_lock.acquire()
try:
obj = yield from self._recv(self.watchdog_time())
obj = await self._recv(self.watchdog_time())
finally:
self.io_lock.release()
except WorkerTimeout:
@ -181,24 +176,23 @@ class Worker:
except:
reply = {"status": "failed",
"message": traceback.format_exc()}
yield from self.io_lock.acquire()
await self.io_lock.acquire()
try:
yield from self._send(reply)
await self._send(reply)
finally:
self.io_lock.release()
@asyncio.coroutine
def _worker_action(self, obj, timeout=None):
async def _worker_action(self, obj, timeout=None):
if timeout is not None:
self.watchdogs[-1] = time.monotonic() + timeout
try:
yield from self.io_lock.acquire()
await self.io_lock.acquire()
try:
yield from self._send(obj)
await self._send(obj)
finally:
self.io_lock.release()
try:
completed = yield from self._handle_worker_requests()
completed = await self._handle_worker_requests()
except WorkerTimeout:
raise WorkerWatchdogTimeout
finally:
@ -206,11 +200,10 @@ class Worker:
del self.watchdogs[-1]
return completed
@asyncio.coroutine
def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0):
async def build(self, rid, pipeline_name, wd, expid, priority, timeout=15.0):
self.rid = rid
yield from self._create_process()
yield from self._worker_action(
await self._create_process()
await self._worker_action(
{"action": "build",
"rid": rid,
"pipeline_name": pipeline_name,
@ -219,45 +212,39 @@ class Worker:
"priority": priority},
timeout)
@asyncio.coroutine
def prepare(self):
yield from self._worker_action({"action": "prepare"})
async def prepare(self):
await self._worker_action({"action": "prepare"})
@asyncio.coroutine
def run(self):
completed = yield from self._worker_action({"action": "run"})
async def run(self):
completed = await self._worker_action({"action": "run"})
if not completed:
self.yield_time = time.monotonic()
return completed
@asyncio.coroutine
def resume(self):
async def resume(self):
stop_duration = time.monotonic() - self.yield_time
for wid, expiry in self.watchdogs:
self.watchdogs[wid] += stop_duration
completed = yield from self._worker_action({"status": "ok",
"data": None})
completed = await self._worker_action({"status": "ok",
"data": None})
if not completed:
self.yield_time = time.monotonic()
return completed
@asyncio.coroutine
def analyze(self):
yield from self._worker_action({"action": "analyze"})
async def analyze(self):
await self._worker_action({"action": "analyze"})
@asyncio.coroutine
def write_results(self, timeout=15.0):
yield from self._worker_action({"action": "write_results"},
timeout)
async def write_results(self, timeout=15.0):
await self._worker_action({"action": "write_results"},
timeout)
@asyncio.coroutine
def examine(self, file, timeout=20.0):
yield from self._create_process()
async def examine(self, file, timeout=20.0):
await self._create_process()
r = dict()
def register(class_name, name, arguments):
r[class_name] = {"name": name, "arguments": arguments}
self.register_experiment = register
yield from self._worker_action({"action": "examine",
await self._worker_action({"action": "examine",
"file": file}, timeout)
del self.register_experiment
return r

View File

@ -12,8 +12,7 @@ class AsyncioServer:
def __init__(self):
self._client_tasks = set()
@asyncio.coroutine
def start(self, host, port):
async def start(self, host, port):
"""Starts the server.
The user must call ``stop`` to free resources properly after this
@ -26,11 +25,10 @@ class AsyncioServer:
:param port: TCP port to bind to.
"""
self.server = yield from asyncio.start_server(self._handle_connection,
host, port)
self.server = await asyncio.start_server(self._handle_connection,
host, port)
@asyncio.coroutine
def stop(self):
async def stop(self):
"""Stops the server.
"""
@ -39,11 +37,11 @@ class AsyncioServer:
task.cancel()
for task in wait_for:
try:
yield from asyncio.wait_for(task, None)
await asyncio.wait_for(task, None)
except asyncio.CancelledError:
pass
self.server.close()
yield from self.server.wait_closed()
await self.server.wait_closed()
del self.server
def _client_done(self, task):

View File

@ -159,16 +159,15 @@ class AsyncioClient:
self.__target_names = None
self.__description = None
@asyncio.coroutine
def connect_rpc(self, host, port, target_name):
async def connect_rpc(self, host, port, target_name):
"""Connects to the server. This cannot be done in __init__ because
this method is a coroutine. See ``Client`` for a description of the
parameters."""
self.__reader, self.__writer = \
yield from asyncio.open_connection(host, port)
await asyncio.open_connection(host, port)
try:
self.__writer.write(_init_string)
server_identification = yield from self.__recv()
server_identification = await self.__recv()
self.__target_names = server_identification["targets"]
self.__description = server_identification["description"]
if target_name is not None:
@ -205,20 +204,18 @@ class AsyncioClient:
line = pyon.encode(obj) + "\n"
self.__writer.write(line.encode())
@asyncio.coroutine
def __recv(self):
line = yield from self.__reader.readline()
async def __recv(self):
line = await self.__reader.readline()
return pyon.decode(line.decode())
@asyncio.coroutine
def __do_rpc(self, name, args, kwargs):
yield from self.__lock.acquire()
async def __do_rpc(self, name, args, kwargs):
await self.__lock.acquire()
try:
obj = {"action": "call", "name": name,
"args": args, "kwargs": kwargs}
self.__send(obj)
obj = yield from self.__recv()
obj = await self.__recv()
if obj["status"] == "ok":
return obj["ret"]
elif obj["status"] == "failed":
@ -229,9 +226,8 @@ class AsyncioClient:
self.__lock.release()
def __getattr__(self, name):
@asyncio.coroutine
def proxy(*args, **kwargs):
res = yield from self.__do_rpc(name, args, kwargs)
async def proxy(*args, **kwargs):
res = await self.__do_rpc(name, args, kwargs)
return res
return proxy
@ -413,10 +409,9 @@ class Server(_AsyncioServer):
if builtin_terminate:
self._terminate_request = asyncio.Event()
@asyncio.coroutine
def _handle_connection_cr(self, reader, writer):
async def _handle_connection_cr(self, reader, writer):
try:
line = yield from reader.readline()
line = await reader.readline()
if line != _init_string:
return
@ -426,7 +421,7 @@ class Server(_AsyncioServer):
}
line = pyon.encode(obj) + "\n"
writer.write(line.encode())
line = yield from reader.readline()
line = await reader.readline()
if not line:
return
target_name = line.decode()[:-1]
@ -436,7 +431,7 @@ class Server(_AsyncioServer):
return
while True:
line = yield from reader.readline()
line = await reader.readline()
if not line:
break
obj = pyon.decode(line.decode())
@ -486,9 +481,8 @@ class Server(_AsyncioServer):
finally:
writer.close()
@asyncio.coroutine
def wait_terminate(self):
yield from self._terminate_request.wait()
async def wait_terminate(self):
await self._terminate_request.wait()
def simple_server_loop(targets, host, port, description=None):

View File

@ -61,10 +61,9 @@ class Subscriber:
self.target_builders = [target_builder]
self.notify_cb = notify_cb
@asyncio.coroutine
def connect(self, host, port, before_receive_cb=None):
async def connect(self, host, port, before_receive_cb=None):
self.reader, self.writer = \
yield from asyncio.open_connection(host, port)
await asyncio.open_connection(host, port)
try:
if before_receive_cb is not None:
before_receive_cb()
@ -77,12 +76,11 @@ class Subscriber:
del self.writer
raise
@asyncio.coroutine
def close(self):
async def close(self):
try:
self.receive_task.cancel()
try:
yield from asyncio.wait_for(self.receive_task, None)
await asyncio.wait_for(self.receive_task, None)
except asyncio.CancelledError:
pass
finally:
@ -90,11 +88,10 @@ class Subscriber:
del self.reader
del self.writer
@asyncio.coroutine
def _receive_cr(self):
async def _receive_cr(self):
targets = []
while True:
line = yield from self.reader.readline()
line = await self.reader.readline()
if not line:
return
mod = pyon.decode(line.decode())
@ -209,14 +206,13 @@ class Publisher(AsyncioServer):
for notifier in notifiers.values():
notifier.publish = partial(self.publish, notifier)
@asyncio.coroutine
def _handle_connection_cr(self, reader, writer):
async def _handle_connection_cr(self, reader, writer):
try:
line = yield from reader.readline()
line = await reader.readline()
if line != _init_string:
return
line = yield from reader.readline()
line = await reader.readline()
if not line:
return
notifier_name = line.decode()[:-1]
@ -234,10 +230,10 @@ class Publisher(AsyncioServer):
self._recipients[notifier_name].add(queue)
try:
while True:
line = yield from queue.get()
line = await queue.get()
writer.write(line)
# raise exception on connection error
yield from writer.drain()
await writer.drain()
finally:
self._recipients[notifier_name].remove(queue)
except ConnectionResetError:

View File

@ -52,23 +52,22 @@ class RPCCase(unittest.TestCase):
def test_blocking_echo(self):
self._run_server_and_test(self._blocking_echo)
@asyncio.coroutine
def _asyncio_echo(self):
async def _asyncio_echo(self):
remote = pc_rpc.AsyncioClient()
for attempt in range(100):
yield from asyncio.sleep(.2)
await asyncio.sleep(.2)
try:
yield from remote.connect_rpc(test_address, test_port, "test")
await remote.connect_rpc(test_address, test_port, "test")
except ConnectionRefusedError:
pass
else:
break
try:
test_object_back = yield from remote.echo(test_object)
test_object_back = await remote.echo(test_object)
self.assertEqual(test_object, test_object_back)
with self.assertRaises(pc_rpc.RemoteError):
yield from remote.non_existing_method()
yield from remote.terminate()
await remote.non_existing_method()
await remote.terminate()
finally:
remote.close_rpc()

View File

@ -8,7 +8,6 @@ test_address = "::1"
test_port = 7777
@asyncio.coroutine
def write_test_data(test_dict):
test_values = [5, 2.1, None, True, False,
{"a": 5, 2: np.linspace(0, 10, 1)},
@ -30,12 +29,11 @@ def write_test_data(test_dict):
test_dict["finished"] = True
@asyncio.coroutine
def start_server(publisher_future, test_dict_future):
async def start_server(publisher_future, test_dict_future):
test_dict = sync_struct.Notifier(dict())
publisher = sync_struct.Publisher(
{"test": test_dict})
yield from publisher.start(test_address, test_port)
await publisher.start(test_address, test_port)
publisher_future.set_result(publisher)
test_dict_future.set_result(test_dict)
@ -66,9 +64,9 @@ class SyncStructCase(unittest.TestCase):
self.publisher = publisher.result()
test_dict = test_dict.result()
test_vector = dict()
loop.run_until_complete(write_test_data(test_vector))
write_test_data(test_vector)
asyncio.ensure_future(write_test_data(test_dict))
write_test_data(test_dict)
self.subscriber = sync_struct.Subscriber("test", self.init_test_dict,
self.notify)
loop.run_until_complete(self.subscriber.connect(test_address,

View File

@ -36,15 +36,14 @@ class WatchdogTimeoutInBuild(EnvExperiment):
pass
@asyncio.coroutine
def _call_worker(worker, expid):
async def _call_worker(worker, expid):
try:
yield from worker.build(0, "main", None, expid, 0)
yield from worker.prepare()
yield from worker.run()
yield from worker.analyze()
await worker.build(0, "main", None, expid, 0)
await worker.prepare()
await worker.run()
await worker.analyze()
finally:
yield from worker.close()
await worker.close()
def _run_experiment(class_name):

View File

@ -79,27 +79,25 @@ def init_logger(args):
logging.basicConfig(level=logging.WARNING + args.quiet*10 - args.verbose*10)
@asyncio.coroutine
def exc_to_warning(coro):
async def exc_to_warning(coro):
try:
yield from coro
await coro
except:
logger.warning("asyncio coroutine terminated with exception",
exc_info=True)
@asyncio.coroutine
def asyncio_wait_or_cancel(fs, **kwargs):
async def asyncio_wait_or_cancel(fs, **kwargs):
fs = [asyncio.ensure_future(f) for f in fs]
try:
d, p = yield from asyncio.wait(fs, **kwargs)
d, p = await asyncio.wait(fs, **kwargs)
except:
for f in fs:
f.cancel()
raise
for f in p:
f.cancel()
yield from asyncio.wait([f])
await asyncio.wait([f])
return fs
@ -107,17 +105,15 @@ class TaskObject:
def start(self):
self.task = asyncio.ensure_future(self._do())
@asyncio.coroutine
def stop(self):
async def stop(self):
self.task.cancel()
try:
yield from asyncio.wait_for(self.task, None)
await asyncio.wait_for(self.task, None)
except asyncio.CancelledError:
pass
del self.task
@asyncio.coroutine
def _do(self):
async def _do(self):
raise NotImplementedError
@ -129,13 +125,12 @@ class Condition:
self._loop = asyncio.get_event_loop()
self._waiters = collections.deque()
@asyncio.coroutine
def wait(self):
async def wait(self):
"""Wait until notified."""
fut = asyncio.Future(loop=self._loop)
self._waiters.append(fut)
try:
yield from fut
await fut
finally:
self._waiters.remove(fut)