forked from M-Labs/artiq
protocols: add fire-and-forget RPC
This commit is contained in:
parent
a5b34beffa
commit
76e034c913
|
@ -0,0 +1,45 @@
|
||||||
|
import threading
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class FFProxy:
|
||||||
|
"""Proxies a target object and runs its methods in the background.
|
||||||
|
|
||||||
|
All method calls to this object are forwarded to the target and executed
|
||||||
|
in a background thread. Method calls return immediately. Exceptions from
|
||||||
|
the target method are turned into warnings. At most one method from the
|
||||||
|
target object may be executed in the background; if a new call is
|
||||||
|
submitted while the previous one is still executing, a warning is printed
|
||||||
|
and the new call is dropped.
|
||||||
|
|
||||||
|
This feature is typically used to wrap slow and non-critical RPCs in
|
||||||
|
experiments.
|
||||||
|
"""
|
||||||
|
def __init__(self, target):
|
||||||
|
self.target = target
|
||||||
|
self._thread = None
|
||||||
|
|
||||||
|
def ff_join(self):
|
||||||
|
"""Waits until any background method finishes its execution."""
|
||||||
|
if self._thread is not None:
|
||||||
|
self._thread.join()
|
||||||
|
|
||||||
|
def __getattr__(self, k):
|
||||||
|
def run_in_thread(*args, **kwargs):
|
||||||
|
if self._thread is not None and self._thread.is_alive():
|
||||||
|
logger.warning("skipping fire-and-forget call to %r.%s as "
|
||||||
|
"previous call did not complete",
|
||||||
|
self.target, k)
|
||||||
|
return
|
||||||
|
def thread_body():
|
||||||
|
try:
|
||||||
|
getattr(self.target, k)(*args, **kwargs)
|
||||||
|
except:
|
||||||
|
logger.warning("fire-and-forget call to %r.%s raised an "
|
||||||
|
"exception:", self.target, k, exc_info=True)
|
||||||
|
self._thread = threading.Thread(target=thread_body)
|
||||||
|
self._thread.start()
|
||||||
|
return run_in_thread
|
|
@ -6,7 +6,7 @@ import time
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
from artiq.protocols import pc_rpc
|
from artiq.protocols import pc_rpc, fire_and_forget
|
||||||
|
|
||||||
|
|
||||||
test_address = "::1"
|
test_address = "::1"
|
||||||
|
@ -84,6 +84,18 @@ class RPCCase(unittest.TestCase):
|
||||||
self._run_server_and_test(self._loop_asyncio_echo)
|
self._run_server_and_test(self._loop_asyncio_echo)
|
||||||
|
|
||||||
|
|
||||||
|
class FireAndForgetCase(unittest.TestCase):
|
||||||
|
def _set_ok(self):
|
||||||
|
self.ok = True
|
||||||
|
|
||||||
|
def test_fire_and_forget(self):
|
||||||
|
self.ok = False
|
||||||
|
p = fire_and_forget.FFProxy(self)
|
||||||
|
p._set_ok()
|
||||||
|
p.ff_join()
|
||||||
|
self.assertTrue(self.ok)
|
||||||
|
|
||||||
|
|
||||||
class Echo:
|
class Echo:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.terminate_notify = asyncio.Semaphore(0)
|
self.terminate_notify = asyncio.Semaphore(0)
|
||||||
|
|
Loading…
Reference in New Issue