transfer Python builtin exceptions over pc_rpc and master/worker

This commit is contained in:
Sebastien Bourdeauducq 2016-04-04 22:02:42 +08:00
parent f860548396
commit aa61c29efb
7 changed files with 70 additions and 44 deletions

View File

@ -8,7 +8,7 @@ import numpy as np # Needed to use numpy in RPC call arguments on cmd line
import readline # This makes input() nicer import readline # This makes input() nicer
import pprint import pprint
from artiq.protocols.pc_rpc import AutoTarget, Client, RemoteError from artiq.protocols.pc_rpc import AutoTarget, Client
def get_argparser(): def get_argparser():
@ -101,9 +101,11 @@ def interactive(remote):
try: try:
ret = eval(cmd, {}, RemoteDict()) ret = eval(cmd, {}, RemoteDict())
except Exception as e: except Exception as e:
if isinstance(e, RemoteError): if hasattr(e, "parent_traceback"):
print("Remote exception:") print("Remote exception:")
print(str(e)) print(traceback.format_exception_only(type(e), e)[0].rstrip())
for l in e.parent_traceback:
print(l.rstrip())
else: else:
traceback.print_exc() traceback.print_exc()
else: else:

View File

@ -3,11 +3,11 @@ import os
import asyncio import asyncio
import logging import logging
import subprocess import subprocess
import traceback
import time import time
from artiq.protocols import pipe_ipc, pyon from artiq.protocols import pipe_ipc, pyon
from artiq.protocols.logging import LogParser from artiq.protocols.logging import LogParser
from artiq.protocols.packed_exceptions import current_exc_packed
from artiq.tools import asyncio_wait_or_cancel from artiq.tools import asyncio_wait_or_cancel
@ -216,12 +216,11 @@ class Worker:
try: try:
data = func(*obj["args"], **obj["kwargs"]) data = func(*obj["args"], **obj["kwargs"])
reply = {"status": "ok", "data": data} reply = {"status": "ok", "data": data}
except Exception as e: except:
reply = {"status": "failed", reply = {
"exception": traceback.format_exception_only( "status": "failed",
type(e), e)[0][:-1], "exception": current_exc_packed()
"message": str(e), }
"traceback": traceback.format_tb(e.__traceback__)}
await self.io_lock.acquire() await self.io_lock.acquire()
try: try:
await self._send(reply) await self._send(reply)

View File

@ -7,6 +7,7 @@ from collections import OrderedDict
import artiq import artiq
from artiq.protocols import pipe_ipc, pyon from artiq.protocols import pipe_ipc, pyon
from artiq.protocols.packed_exceptions import raise_packed_exc
from artiq.tools import multiline_log_config, file_import from artiq.tools import multiline_log_config, file_import
from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output from artiq.master.worker_db import DeviceManager, DatasetManager, get_hdf5_output
from artiq.language.environment import is_experiment from artiq.language.environment import is_experiment
@ -27,11 +28,7 @@ def put_object(obj):
ipc.write((ds + "\n").encode()) ipc.write((ds + "\n").encode())
class ParentActionError(Exception): def make_parent_action(action):
pass
def make_parent_action(action, exception=None):
def parent_action(*args, **kwargs): def parent_action(*args, **kwargs):
request = {"action": action, "args": args, "kwargs": kwargs} request = {"action": action, "args": args, "kwargs": kwargs}
put_object(request) put_object(request)
@ -44,22 +41,17 @@ def make_parent_action(action, exception=None):
if reply["status"] == "ok": if reply["status"] == "ok":
return reply["data"] return reply["data"]
else: else:
if exception is None: raise_packed_exc(reply["exception"])
exn = ParentActionError(reply["exception"])
else:
exn = exception(reply["message"])
exn.parent_traceback = reply["traceback"]
raise exn
return parent_action return parent_action
class ParentDeviceDB: class ParentDeviceDB:
get_device_db = make_parent_action("get_device_db") get_device_db = make_parent_action("get_device_db")
get = make_parent_action("get_device", KeyError) get = make_parent_action("get_device")
class ParentDatasetDB: class ParentDatasetDB:
get = make_parent_action("get_dataset", KeyError) get = make_parent_action("get_dataset")
update = make_parent_action("update_dataset") update = make_parent_action("update_dataset")

View File

@ -0,0 +1,42 @@
import inspect
import builtins
import traceback
import sys
__all__ = ["GenericRemoteException", "current_exc_packed", "raise_packed_exc"]
class GenericRemoteException(Exception):
pass
builtin_exceptions = {v: k for k, v in builtins.__dict__.items()
if inspect.isclass(v) and issubclass(v, BaseException)}
def current_exc_packed():
exc_class, exc, exc_tb = sys.exc_info()
if exc_class in builtin_exceptions:
return {
"class": builtin_exceptions[exc_class],
"message": str(exc),
"traceback": traceback.format_tb(exc_tb)
}
else:
message = traceback.format_exception_only(exc_class, exc)[0].rstrip()
return {
"class": "GenericRemoteException",
"message": message,
"traceback": traceback.format_tb(exc_tb)
}
def raise_packed_exc(pack):
if pack["class"] == "GenericRemoteException":
cls = GenericRemoteException
else:
cls = getattr(builtins, pack["class"])
exc = cls(pack["message"])
exc.parent_traceback = pack["traceback"]
raise exc

View File

@ -13,7 +13,6 @@ client's list.
import socket import socket
import asyncio import asyncio
import traceback
import threading import threading
import time import time
import logging import logging
@ -22,6 +21,7 @@ from operator import itemgetter
from artiq.protocols import pyon from artiq.protocols import pyon
from artiq.protocols.asyncio_server import AsyncioServer as _AsyncioServer from artiq.protocols.asyncio_server import AsyncioServer as _AsyncioServer
from artiq.protocols.packed_exceptions import *
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -33,12 +33,6 @@ class AutoTarget:
pass pass
class RemoteError(Exception):
"""Raised when a RPC failed or raised an exception on the remote (server)
side."""
pass
class IncompatibleServer(Exception): class IncompatibleServer(Exception):
"""Raised by the client when attempting to connect to a server that does """Raised by the client when attempting to connect to a server that does
not have the expected target.""" not have the expected target."""
@ -163,7 +157,7 @@ class Client:
if obj["status"] == "ok": if obj["status"] == "ok":
return obj["ret"] return obj["ret"]
elif obj["status"] == "failed": elif obj["status"] == "failed":
raise RemoteError(obj["message"]) raise_packed_exc(obj["exception"])
else: else:
raise ValueError raise ValueError
@ -267,7 +261,7 @@ class AsyncioClient:
if obj["status"] == "ok": if obj["status"] == "ok":
return obj["ret"] return obj["ret"]
elif obj["status"] == "failed": elif obj["status"] == "failed":
raise RemoteError(obj["message"]) raise_packed_exc(obj["exception"])
else: else:
raise ValueError raise ValueError
finally: finally:
@ -395,7 +389,7 @@ class BestEffortClient:
if obj["status"] == "ok": if obj["status"] == "ok":
return obj["ret"] return obj["ret"]
elif obj["status"] == "failed": elif obj["status"] == "failed":
raise RemoteError(obj["message"]) raise_packed_exc(obj["exception"])
else: else:
raise ValueError raise ValueError
@ -524,13 +518,11 @@ class Server(_AsyncioServer):
.format(obj["action"])) .format(obj["action"]))
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
except Exception as exc: except:
short_exc_info = type(exc).__name__ return {
exc_str = str(exc) "status": "failed",
if exc_str: "exception": current_exc_packed()
short_exc_info += ": " + exc_str.splitlines()[0] }
return {"status": "failed",
"message": short_exc_info + "\n" + traceback.format_exc()}
finally: finally:
if self._noparallel is not None: if self._noparallel is not None:
self._noparallel.release() self._noparallel.release()

View File

@ -45,7 +45,7 @@ class RPCCase(unittest.TestCase):
self.assertEqual(test_object, test_object_back) self.assertEqual(test_object, test_object_back)
test_object_back = remote.async_echo(test_object) test_object_back = remote.async_echo(test_object)
self.assertEqual(test_object, test_object_back) self.assertEqual(test_object, test_object_back)
with self.assertRaises(pc_rpc.RemoteError): with self.assertRaises(AttributeError):
remote.non_existing_method() remote.non_existing_method()
remote.terminate() remote.terminate()
finally: finally:
@ -72,7 +72,7 @@ class RPCCase(unittest.TestCase):
self.assertEqual(test_object, test_object_back) self.assertEqual(test_object, test_object_back)
test_object_back = await remote.async_echo(test_object) test_object_back = await remote.async_echo(test_object)
self.assertEqual(test_object, test_object_back) self.assertEqual(test_object, test_object_back)
with self.assertRaises(pc_rpc.RemoteError): with self.assertRaises(AttributeError):
await remote.non_existing_method() await remote.non_existing_method()
await remote.terminate() await remote.terminate()
finally: finally:

View File

@ -39,10 +39,9 @@ def get_and_fit():
if "dataset_db" in globals(): if "dataset_db" in globals():
logger.info("using dataset DB for Gaussian fit guess") logger.info("using dataset DB for Gaussian fit guess")
def get_dataset(name, default): def get_dataset(name, default):
from artiq.protocols import pc_rpc
try: try:
return dataset_db.get(name) return dataset_db.get(name)
except (KeyError, pc_rpc.RemoteError): # TODO: serializable exceptions except KeyError:
return default return default
else: else:
logger.info("using defaults for Gaussian fit guess") logger.info("using defaults for Gaussian fit guess")