Adding python test script
This commit is contained in:
parent
06e82ddd4a
commit
2d20063848
111
test.py
Normal file
111
test.py
Normal file
@ -0,0 +1,111 @@
|
||||
#!/usr/bin/python3
|
||||
"""
|
||||
Description: Test Stabilizer communication and DDS configuration.
|
||||
|
||||
Author: Ryan Summers
|
||||
"""
|
||||
|
||||
import socket
|
||||
import json
|
||||
|
||||
HOST = '10.0.16.99'
|
||||
PORT = 1235
|
||||
|
||||
def do_request(s, request):
|
||||
""" Perform a request with the Stabilizer.
|
||||
|
||||
Args:
|
||||
s: The socket to the stabilizer.
|
||||
request: The request to transmit.
|
||||
|
||||
Returns:
|
||||
The received response object.
|
||||
"""
|
||||
# Transform the value field.
|
||||
request['value'] = json.dumps(request['value'], separators=[',', ':']).replace('"', "'")
|
||||
data = (json.dumps(request, separators=[',', ':']) + '\n').encode('ascii')
|
||||
s.send(data)
|
||||
|
||||
response = b''
|
||||
while not response.endswith(b'\n'):
|
||||
response += s.recv(1024)
|
||||
|
||||
# Decode the value array
|
||||
response = json.loads(response.decode('ascii'))
|
||||
response['value'] = response['value'].replace("'", '"')
|
||||
response['value'] = json.loads(response['value'])
|
||||
|
||||
return response
|
||||
|
||||
|
||||
def read_attribute(s, attribute_name):
|
||||
""" Read an attribute on the Stabilizer device.
|
||||
|
||||
Args:
|
||||
s: The socket to the stabilizer.
|
||||
attribute_name: The name of the endpoint to write to (the attribute name).
|
||||
|
||||
Returns:
|
||||
The value of the attribute. May be a string or a dictionary.
|
||||
"""
|
||||
request = {
|
||||
"req": "Read",
|
||||
"attribute": attribute_name,
|
||||
"value": "",
|
||||
}
|
||||
|
||||
response = do_request(s, request)
|
||||
|
||||
if 'code' not in response or response['code'] != 200:
|
||||
raise Exception(f'Failed to read {attribute_name}: {response}')
|
||||
|
||||
return response['value']
|
||||
|
||||
|
||||
def write_attribute(s, attribute_name, attribute_value):
|
||||
""" Write an attribute on the Stabilizer device.
|
||||
|
||||
Args:
|
||||
s: The socket to the stabilizer.
|
||||
attribute_name: The name of the endpoint to write to (the attribute name).
|
||||
attribute_value: The value to write to the attribute. May be a string or a dictionary.
|
||||
"""
|
||||
request = {
|
||||
"req": "Write",
|
||||
"attribute": attribute_name,
|
||||
"value": attribute_value,
|
||||
}
|
||||
|
||||
response = do_request(s, request)
|
||||
|
||||
if 'code' not in response or response['code'] != 200:
|
||||
raise Exception(f'Failed to write {attribute_name}: {response}')
|
||||
|
||||
|
||||
def main():
|
||||
""" Main program entry point. """
|
||||
with socket.socket() as s:
|
||||
|
||||
# Connect to the stabilizer.
|
||||
s.connect((HOST, PORT))
|
||||
|
||||
# A sample configuration for an output channel.
|
||||
channel_config = {
|
||||
'attenuation': 31.5,
|
||||
'parameters': {
|
||||
'phase_offset': 0.5,
|
||||
'frequency': 100.0e6,
|
||||
'amplitude': 0.2,
|
||||
'enabled': True,
|
||||
}
|
||||
}
|
||||
|
||||
# Configure OUT0 and read it back.
|
||||
write_attribute(s, "pounder/out0", channel_config)
|
||||
print('Pounder OUT0: ', read_attribute(s, "pounder/out0"))
|
||||
|
||||
print('Pounder IN1: ', read_attribute(s, "pounder/in1"))
|
||||
print('Pounder OUT1: ', read_attribute(s, "pounder/out1"))
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in New Issue
Block a user