forked from harry/creotech-sayma-testsuite
* rp_get_sayma_data.py: to be installed on Creotech remote. * plot_sayma_data.py: to be installed on local.master
parent
bf492e2f50
commit
7909267b88
@ -0,0 +1,37 @@ |
||||
import numpy as np |
||||
import matplotlib.pyplot as plot |
||||
from scipy import signal, constants |
||||
|
||||
def rp_raw_to_numpy(rp_raw): |
||||
# Convert raw buffer strings to numpy arrays |
||||
buff_string = rp_raw.split(',') |
||||
return np.array(list(map(float, buff_string))) |
||||
|
||||
def main(): |
||||
y1_raw = None |
||||
y2_raw = None |
||||
|
||||
with open('rp_y1_raw.bin', 'rb') as f: |
||||
y1_raw = f.read().decode('utf-8') |
||||
with open('rp_y2_raw.bin', 'rb') as f: |
||||
y2_raw = f.read().decode('utf-8') |
||||
if None in [y1_raw, y2_raw]: |
||||
raise IOError("Raw RP string files cannot be opened.") |
||||
|
||||
y1 = rp_raw_to_numpy(y1_raw) |
||||
y2 = rp_raw_to_numpy(y2_raw) |
||||
|
||||
t = np.arange(y1.shape[0])/125e6 |
||||
y = np.c_[y1, y2].T |
||||
z = signal.decimate(y*np.exp(1j*2*np.pi*9e6*t), q=10, ftype="fir", zero_phase=True)[:, 10:] |
||||
z = signal.decimate(z, q=10, ftype="fir", zero_phase=True)[:, 10:] |
||||
angle = np.angle(np.mean(z[0]*z[1].conj())) |
||||
|
||||
print(angle) |
||||
|
||||
plot.plot(y1) |
||||
plot.plot(y2) |
||||
plot.show() |
||||
|
||||
if __name__ == "__main__": |
||||
main() |
@ -0,0 +1,63 @@ |
||||
import socket |
||||
|
||||
|
||||
class RPSCPI: |
||||
def connect(self, host): |
||||
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
||||
self.sock.connect((host, 5000)) |
||||
self.sock_f = self.sock.makefile() |
||||
|
||||
def close(self): |
||||
self.sock_f.close() |
||||
self.sock.close() |
||||
|
||||
def sendmsg(self, msg): |
||||
self.sock.send(msg.encode() + b"\r\n") |
||||
|
||||
def recvmsg(self): |
||||
return self.sock_f.readline().strip() |
||||
|
||||
def trigger(self): |
||||
self.sendmsg("ACQ:START") |
||||
self.sendmsg("ACQ:TRIG NOW") |
||||
while True: |
||||
self.sendmsg("ACQ:TRIG:STAT?") |
||||
if self.recvmsg() == "TD": |
||||
break |
||||
|
||||
def get_data(self, channel): |
||||
self.sendmsg("ACQ:SOUR{}:DATA?".format(channel)) |
||||
return self.recvmsg()[1:-1] |
||||
|
||||
def main(): |
||||
rp = RPSCPI() |
||||
rp.connect("192.168.1.104") |
||||
try: |
||||
rp.trigger() |
||||
y1_raw = rp.get_data(1) |
||||
y2_raw = rp.get_data(2) |
||||
|
||||
with open('rp_y1_raw.bin', 'wb') as f: |
||||
f.write(y1_raw.encode('utf-8')) |
||||
print("Succesfully written y1 raw string from RP.") |
||||
with open('rp_y2_raw.bin', 'wb') as f: |
||||
f.write(y2_raw.encode('utf-8')) |
||||
print("Succesfully written y2 raw string from RP.") |
||||
|
||||
# DEBUGGING |
||||
y1 = [float(i) for i in y1_raw.split(',')] |
||||
y2 = [float(i) for i in y2_raw.split(',')] |
||||
with open('rp_y1_raw.txt', 'w') as f: |
||||
for i in y1: |
||||
f.write(str(i) + '\n') |
||||
print("[DEBUG] Succesfully written y1 human-readable data.") |
||||
with open('rp_y2_raw.txt', 'w') as f: |
||||
for i in y2: |
||||
f.write(str(i) + '\n') |
||||
print("[DEBUG] Succesfully written y2 human-readable data.") |
||||
|
||||
finally: |
||||
rp.close() |
||||
|
||||
if __name__ == "__main__": |
||||
main() |
Loading…
Reference in new issue