forked from M-Labs/artiq
master: finer control of worker exception reporting. Closes #233
This commit is contained in:
parent
dae63bd10c
commit
cc6b808bf8
@ -6,7 +6,8 @@ import logging
|
|||||||
from functools import partial
|
from functools import partial
|
||||||
|
|
||||||
from artiq.protocols.sync_struct import Notifier
|
from artiq.protocols.sync_struct import Notifier
|
||||||
from artiq.master.worker import Worker
|
from artiq.master.worker import (Worker, WorkerInternalException,
|
||||||
|
log_worker_exception)
|
||||||
from artiq.tools import get_windows_drives, exc_to_warning
|
from artiq.tools import get_windows_drives, exc_to_warning
|
||||||
|
|
||||||
|
|
||||||
@ -21,6 +22,9 @@ async def _get_repository_entries(entry_dict,
|
|||||||
})
|
})
|
||||||
try:
|
try:
|
||||||
description = await worker.examine(os.path.join(root, filename))
|
description = await worker.examine(os.path.join(root, filename))
|
||||||
|
except:
|
||||||
|
log_worker_exception()
|
||||||
|
raise
|
||||||
finally:
|
finally:
|
||||||
await worker.close()
|
await worker.close()
|
||||||
for class_name, class_desc in description.items():
|
for class_name, class_desc in description.items():
|
||||||
@ -55,8 +59,9 @@ async def _scan_experiments(root, get_device_db, log, subdir=""):
|
|||||||
try:
|
try:
|
||||||
await _get_repository_entries(
|
await _get_repository_entries(
|
||||||
entry_dict, root, filename, get_device_db, log)
|
entry_dict, root, filename, get_device_db, log)
|
||||||
except:
|
except Exception as exc:
|
||||||
logger.warning("Skipping file '%s'", filename, exc_info=True)
|
logger.warning("Skipping file '%s'", filename,
|
||||||
|
exc_info=not isinstance(exc, WorkerInternalException))
|
||||||
if de.is_dir():
|
if de.is_dir():
|
||||||
subentries = await _scan_experiments(
|
subentries = await _scan_experiments(
|
||||||
root, get_device_db, log,
|
root, get_device_db, log,
|
||||||
|
@ -3,7 +3,7 @@ import logging
|
|||||||
from enum import Enum
|
from enum import Enum
|
||||||
from time import time
|
from time import time
|
||||||
|
|
||||||
from artiq.master.worker import Worker
|
from artiq.master.worker import Worker, log_worker_exception
|
||||||
from artiq.tools import asyncio_wait_or_cancel, TaskObject, Condition
|
from artiq.tools import asyncio_wait_or_cancel, TaskObject, Condition
|
||||||
from artiq.protocols.sync_struct import Notifier
|
from artiq.protocols.sync_struct import Notifier
|
||||||
|
|
||||||
@ -231,7 +231,7 @@ class PrepareStage(TaskObject):
|
|||||||
except:
|
except:
|
||||||
logger.error("got worker exception in prepare stage, "
|
logger.error("got worker exception in prepare stage, "
|
||||||
"deleting RID %d", run.rid)
|
"deleting RID %d", run.rid)
|
||||||
logger.error("worker exception details", exc_info=True)
|
log_worker_exception()
|
||||||
self.delete_cb(run.rid)
|
self.delete_cb(run.rid)
|
||||||
else:
|
else:
|
||||||
run.status = RunStatus.prepare_done
|
run.status = RunStatus.prepare_done
|
||||||
@ -281,7 +281,7 @@ class RunStage(TaskObject):
|
|||||||
except:
|
except:
|
||||||
logger.error("got worker exception in run stage, "
|
logger.error("got worker exception in run stage, "
|
||||||
"deleting RID %d", run.rid)
|
"deleting RID %d", run.rid)
|
||||||
logger.error("worker exception details", exc_info=True)
|
log_worker_exception()
|
||||||
self.delete_cb(run.rid)
|
self.delete_cb(run.rid)
|
||||||
else:
|
else:
|
||||||
if completed:
|
if completed:
|
||||||
@ -319,7 +319,7 @@ class AnalyzeStage(TaskObject):
|
|||||||
except:
|
except:
|
||||||
logger.error("got worker exception in analyze stage, "
|
logger.error("got worker exception in analyze stage, "
|
||||||
"deleting RID %d", run.rid)
|
"deleting RID %d", run.rid)
|
||||||
logger.error("worker exception details", exc_info=True)
|
log_worker_exception()
|
||||||
self.delete_cb(run.rid)
|
self.delete_cb(run.rid)
|
||||||
else:
|
else:
|
||||||
self.delete_cb(run.rid)
|
self.delete_cb(run.rid)
|
||||||
|
@ -25,6 +25,20 @@ class WorkerError(Exception):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class WorkerInternalException(Exception):
|
||||||
|
"""Exception raised inside the worker, information has been printed
|
||||||
|
through logging."""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def log_worker_exception():
|
||||||
|
exc, _, _ = sys.exc_info()
|
||||||
|
if exc is WorkerInternalException:
|
||||||
|
logger.debug("worker exception details", exc_info=True)
|
||||||
|
else:
|
||||||
|
logger.error("worker exception details", exc_info=True)
|
||||||
|
|
||||||
|
|
||||||
class Worker:
|
class Worker:
|
||||||
def __init__(self, handlers=dict(), send_timeout=0.5):
|
def __init__(self, handlers=dict(), send_timeout=0.5):
|
||||||
self.handlers = handlers
|
self.handlers = handlers
|
||||||
@ -167,6 +181,8 @@ class Worker:
|
|||||||
return True
|
return True
|
||||||
elif action == "pause":
|
elif action == "pause":
|
||||||
return False
|
return False
|
||||||
|
elif action == "exception":
|
||||||
|
raise WorkerInternalException
|
||||||
elif action == "create_watchdog":
|
elif action == "create_watchdog":
|
||||||
func = self.create_watchdog
|
func = self.create_watchdog
|
||||||
elif action == "delete_watchdog":
|
elif action == "delete_watchdog":
|
||||||
|
@ -264,15 +264,17 @@ def main():
|
|||||||
put_object({"action": "completed"})
|
put_object({"action": "completed"})
|
||||||
elif action == "terminate":
|
elif action == "terminate":
|
||||||
break
|
break
|
||||||
except CompileError:
|
|
||||||
pass
|
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
lines = ["Terminating with exception\n"]
|
# When we get CompileError, a more suitable diagnostic has already
|
||||||
lines += traceback.format_exception_only(type(exc), exc)
|
# been printed.
|
||||||
if hasattr(exc, "parent_traceback"):
|
if not isinstance(exc, CompileError):
|
||||||
lines += exc.parent_traceback
|
lines = ["Terminating with exception\n"]
|
||||||
logging.error("".join(lines).rstrip(),
|
lines += traceback.format_exception_only(type(exc), exc)
|
||||||
exc_info=not hasattr(exc, "parent_traceback"))
|
if hasattr(exc, "parent_traceback"):
|
||||||
|
lines += exc.parent_traceback
|
||||||
|
logging.error("".join(lines).rstrip(),
|
||||||
|
exc_info=not hasattr(exc, "parent_traceback"))
|
||||||
|
put_object({"action": "exception"})
|
||||||
finally:
|
finally:
|
||||||
device_mgr.close_devices()
|
device_mgr.close_devices()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user