master: Include RID in worker exception messages

This helps when debugging unexpected shutdown problems
after the fact.
This commit is contained in:
David Nadlinger 2019-01-19 21:01:48 +00:00
parent e24e893303
commit 0dab7ecd73
1 changed files with 15 additions and 6 deletions

View File

@ -165,12 +165,15 @@ class Worker:
ifs, timeout=self.send_timeout, ifs, timeout=self.send_timeout,
return_when=asyncio.FIRST_COMPLETED) return_when=asyncio.FIRST_COMPLETED)
if all(f.cancelled() for f in fs): if all(f.cancelled() for f in fs):
raise WorkerTimeout("Timeout sending data to worker") raise WorkerTimeout(
"Timeout sending data to worker (RID {})".format(self.rid))
for f in fs: for f in fs:
if not f.cancelled() and f.done(): if not f.cancelled() and f.done():
f.result() # raise any exceptions f.result() # raise any exceptions
if cancellable and self.closed.is_set(): if cancellable and self.closed.is_set():
raise WorkerError("Data transmission to worker cancelled") raise WorkerError(
"Data transmission to worker cancelled (RID {})".format(
self.rid))
async def _recv(self, timeout): async def _recv(self, timeout):
assert self.io_lock.locked() assert self.io_lock.locked()
@ -178,16 +181,22 @@ class Worker:
[self.ipc.readline(), self.closed.wait()], [self.ipc.readline(), self.closed.wait()],
timeout=timeout, return_when=asyncio.FIRST_COMPLETED) timeout=timeout, return_when=asyncio.FIRST_COMPLETED)
if all(f.cancelled() for f in fs): if all(f.cancelled() for f in fs):
raise WorkerTimeout("Timeout receiving data from worker") raise WorkerTimeout(
"Timeout receiving data from worker (RID {})".format(self.rid))
if self.closed.is_set(): if self.closed.is_set():
raise WorkerError("Data transmission to worker cancelled") raise WorkerError(
"Receiving data from worker cancelled (RID {})".format(
self.rid))
line = fs[0].result() line = fs[0].result()
if not line: if not line:
raise WorkerError("Worker ended while attempting to receive data") raise WorkerError(
"Worker ended while attempting to receive data (RID {})".
format(self.rid))
try: try:
obj = pyon.decode(line.decode()) obj = pyon.decode(line.decode())
except: except:
raise WorkerError("Worker sent invalid PYON data") raise WorkerError("Worker sent invalid PYON data (RID {})".format(
self.rid))
return obj return obj
async def _handle_worker_requests(self): async def _handle_worker_requests(self):