mirror of
https://github.com/m-labs/artiq.git
synced 2025-01-08 18:13:34 +08:00
41 lines
975 B
Python
41 lines
975 B
Python
from artiq.coredevice.runtime_exceptions import exception_map, _RPCException
|
|
|
|
|
|
def _lookup_exception(d, e):
|
|
for eid, exception in d.items():
|
|
if isinstance(e, exception):
|
|
return eid
|
|
return 0
|
|
|
|
|
|
class RPCWrapper:
|
|
def __init__(self):
|
|
self.last_exception = None
|
|
|
|
def run_rpc(self, user_exception_map, fn, args):
|
|
eid = 0
|
|
r = None
|
|
|
|
try:
|
|
r = fn(*args)
|
|
except Exception as e:
|
|
eid = _lookup_exception(user_exception_map, e)
|
|
if not eid:
|
|
eid = _lookup_exception(exception_map, e)
|
|
if eid:
|
|
self.last_exception = None
|
|
else:
|
|
self.last_exception = e
|
|
eid = _RPCException.eid
|
|
|
|
if r is None:
|
|
r = 0
|
|
else:
|
|
r = int(r)
|
|
|
|
return eid, r
|
|
|
|
def filter_rpc_exception(self, eid):
|
|
if eid == _RPCException.eid:
|
|
raise self.last_exception
|