forked from M-Labs/artiq
74 lines
2.5 KiB
Python
74 lines
2.5 KiB
Python
import time
|
|
import inspect
|
|
|
|
from artiq.experiment import *
|
|
from artiq.protocols.remote_exec import connect_global_rpc
|
|
|
|
import remote_exec_processing
|
|
|
|
|
|
class RemoteExecDemo(EnvExperiment):
|
|
def build(self):
|
|
self.setattr_device("camera_sim")
|
|
self.setattr_device("scheduler")
|
|
self.setattr_argument("remote_exec", BooleanValue(False))
|
|
self.setattr_argument("show_picture", BooleanValue(True), "Local options")
|
|
self.setattr_argument("enable_fit", BooleanValue(True), "Local options")
|
|
if self.remote_exec:
|
|
self.setattr_device("camera_sim_rexec")
|
|
|
|
def prepare(self):
|
|
if self.remote_exec:
|
|
connect_global_rpc(self.camera_sim_rexec)
|
|
self.camera_sim_rexec.add_code(
|
|
inspect.getsource(remote_exec_processing))
|
|
|
|
def transfer_parameters(self, parameters):
|
|
w, h, cx, cy = parameters
|
|
self.set_dataset("rexec_demo.gaussian_w", w, save=False, broadcast=True)
|
|
self.set_dataset("rexec_demo.gaussian_h", h, save=False, broadcast=True)
|
|
self.set_dataset("rexec_demo.gaussian_cx", cx, save=False, broadcast=True)
|
|
self.set_dataset("rexec_demo.gaussian_cy", cy, save=False, broadcast=True)
|
|
|
|
def fps_meter(self):
|
|
t = time.monotonic()
|
|
if hasattr(self, "last_pt_update"):
|
|
self.iter_count += 1
|
|
dt = t - self.last_pt_update
|
|
if dt >= 5:
|
|
pt = dt/self.iter_count
|
|
self.set_dataset("rexec_demo.picture_pt", pt, save=False, broadcast=True)
|
|
self.last_pt_update = t
|
|
self.iter_count = 0
|
|
else:
|
|
self.last_pt_update = t
|
|
self.iter_count = 0
|
|
|
|
def run_local(self):
|
|
while True:
|
|
self.fps_meter()
|
|
data = self.camera_sim.get_picture()
|
|
if self.show_picture:
|
|
self.set_dataset("rexec_demo.picture", data,
|
|
save=False, broadcast=True)
|
|
if self.enable_fit:
|
|
p = remote_exec_processing.fit(data, self.get_dataset)
|
|
self.transfer_parameters(p)
|
|
self.scheduler.pause()
|
|
|
|
def run_remote(self):
|
|
while True:
|
|
self.fps_meter()
|
|
p = self.camera_sim_rexec.call("get_and_fit")
|
|
self.transfer_parameters(p)
|
|
self.scheduler.pause()
|
|
|
|
def run(self):
|
|
try:
|
|
if self.remote_exec:
|
|
self.run_remote()
|
|
else:
|
|
self.run_local()
|
|
except TerminationRequested:
|
|
pass
|