From 76e034c91336878bc7dc239540ee48a637aec677 Mon Sep 17 00:00:00 2001 From: Sebastien Bourdeauducq Date: Sun, 14 Jun 2015 22:03:34 -0600 Subject: [PATCH] protocols: add fire-and-forget RPC --- artiq/protocols/fire_and_forget.py | 45 ++++++++++++++++++++++++++++++ artiq/test/pc_rpc.py | 14 +++++++++- 2 files changed, 58 insertions(+), 1 deletion(-) create mode 100644 artiq/protocols/fire_and_forget.py diff --git a/artiq/protocols/fire_and_forget.py b/artiq/protocols/fire_and_forget.py new file mode 100644 index 000000000..556774531 --- /dev/null +++ b/artiq/protocols/fire_and_forget.py @@ -0,0 +1,45 @@ +import threading +import logging + + +logger = logging.getLogger(__name__) + + +class FFProxy: + """Proxies a target object and runs its methods in the background. + + All method calls to this object are forwarded to the target and executed + in a background thread. Method calls return immediately. Exceptions from + the target method are turned into warnings. At most one method from the + target object may be executed in the background; if a new call is + submitted while the previous one is still executing, a warning is printed + and the new call is dropped. + + This feature is typically used to wrap slow and non-critical RPCs in + experiments. + """ + def __init__(self, target): + self.target = target + self._thread = None + + def ff_join(self): + """Waits until any background method finishes its execution.""" + if self._thread is not None: + self._thread.join() + + def __getattr__(self, k): + def run_in_thread(*args, **kwargs): + if self._thread is not None and self._thread.is_alive(): + logger.warning("skipping fire-and-forget call to %r.%s as " + "previous call did not complete", + self.target, k) + return + def thread_body(): + try: + getattr(self.target, k)(*args, **kwargs) + except: + logger.warning("fire-and-forget call to %r.%s raised an " + "exception:", self.target, k, exc_info=True) + self._thread = threading.Thread(target=thread_body) + self._thread.start() + return run_in_thread diff --git a/artiq/test/pc_rpc.py b/artiq/test/pc_rpc.py index b9e76e19a..1b60d245f 100644 --- a/artiq/test/pc_rpc.py +++ b/artiq/test/pc_rpc.py @@ -6,7 +6,7 @@ import time import numpy as np -from artiq.protocols import pc_rpc +from artiq.protocols import pc_rpc, fire_and_forget test_address = "::1" @@ -84,6 +84,18 @@ class RPCCase(unittest.TestCase): self._run_server_and_test(self._loop_asyncio_echo) +class FireAndForgetCase(unittest.TestCase): + def _set_ok(self): + self.ok = True + + def test_fire_and_forget(self): + self.ok = False + p = fire_and_forget.FFProxy(self) + p._set_ok() + p.ff_join() + self.assertTrue(self.ok) + + class Echo: def __init__(self): self.terminate_notify = asyncio.Semaphore(0)