devices/thorlabs_tcube: cleanup, convert to asyncserial

This commit is contained in:
Sebastien Bourdeauducq 2016-03-22 21:55:37 +08:00
parent 2cbe47e26f
commit 7657b67ea6
1 changed files with 225 additions and 227 deletions

View File

@ -1,8 +1,9 @@
from enum import Enum
import logging
import struct as st
import asyncio
import serial
import asyncserial
logger = logging.getLogger(__name__)
@ -166,22 +167,6 @@ class Message:
return st.pack("<HBBBB", self.id.value,
self.param1, self.param2, self.dest, self.src)
def send(self, port):
logger.debug("sending: %s", self)
port.write(self.pack())
@classmethod
def recv(cls, port):
header = port.read(6)
logger.debug("received header: %s", header)
data = b""
if header[4] & 0x80:
(length, ) = st.unpack("<H", header[2:4])
data = port.read(length)
r = cls.unpack(header + data)
logger.debug("receiving: %s", r)
return r
@property
def has_data(self):
return self.dest & 0x80
@ -196,32 +181,41 @@ class Message:
class _Tcube:
def __init__(self, serial_dev):
self.port = serial.serial_for_url(serial_dev, baudrate=115200,
self.port = asyncserial.AsyncSerial(serial_dev, baudrate=115200,
rtscts=True)
self.status_report_counter = 0
self.port.flush()
logger.debug("baud rate set to 115200")
def close(self):
"""Close the device."""
self.port.close()
def send(self, msg):
msg.send(self.port)
async def send(self, message):
logger.debug("sending: %s", message)
await self.port.write(message.pack())
def handle_message(self, msg):
async def recv(self):
header = await self.port.read(6)
logger.debug("received header: %s", header)
data = b""
if header[4] & 0x80:
(length, ) = st.unpack("<H", header[2:4])
data = await self.port.read(length)
r = Message.unpack(header + data)
logger.debug("receiving: %s", r)
return r
async def handle_message(self, msg):
# derived classes must implement this
raise NotImplementedError
def send_request(self, msgreq_id, wait_for_msgs, param1=0, param2=0, data=None):
Message(msgreq_id, param1, param2, data=data).send(self.port)
async def send_request(self, msgreq_id, wait_for_msgs, param1=0, param2=0, data=None):
await self.send(Message(msgreq_id, param1, param2, data=data))
msg = None
while msg is None or msg.id not in wait_for_msgs:
msg = Message.recv(self.port)
self.handle_message(msg)
msg = await self.recv()
await self.handle_message(msg)
return msg
def set_channel_enable_state(self, activated):
async def set_channel_enable_state(self, activated):
"""Enable or Disable channel 1.
:param activated: 1 to enable channel, 2 to disable it.
@ -232,11 +226,11 @@ class _Tcube:
else:
activated = 2
self.send(Message(MGMSG.MOD_SET_CHANENABLESTATE, param1=1,
await self.send(Message(MGMSG.MOD_SET_CHANENABLESTATE, param1=1,
param2=activated))
def get_channel_enable_state(self):
get_msg = self.send_request(MGMSG.MOD_REQ_CHANENABLESTATE,
async def get_channel_enable_state(self):
get_msg = await self.send_request(MGMSG.MOD_REQ_CHANENABLESTATE,
[MGMSG.MOD_GET_CHANENABLESTATE], 1)
self.chan_enabled = get_msg.param2
if self.chan_enabled == 1:
@ -248,15 +242,15 @@ class _Tcube:
"1 nor 2: {}".format(self.chan_enabled))
return self.chan_enabled
def module_identify(self):
async def module_identify(self):
"""Ask device to flash its front panel led.
Instruct hardware unit to identify itself by flashing its front panel
led.
"""
self.send(Message(MGMSG.MOD_IDENTIFY))
await self.send(Message(MGMSG.MOD_IDENTIFY))
def hardware_start_update_messages(self, update_rate):
async def hardware_start_update_messages(self, update_rate):
"""Start status updates from the embedded controller.
Status update messages contain information about the position and
@ -264,23 +258,26 @@ class _Tcube:
:param update_rate: Rate at which you will receive status updates
"""
self.send(Message(MGMSG.HW_START_UPDATEMSGS, param1=update_rate))
await self.send(Message(MGMSG.HW_START_UPDATEMSGS, param1=update_rate))
def hardware_stop_update_messages(self):
async def hardware_stop_update_messages(self):
"""Stop status updates from the controller."""
self.send(Message(MGMSG.HW_STOP_UPDATEMSGS))
await self.send(Message(MGMSG.HW_STOP_UPDATEMSGS))
def hardware_request_information(self):
return self.send_request(MGMSG.HW_REQ_INFO,
async def hardware_request_information(self):
return await self.send_request(MGMSG.HW_REQ_INFO,
[MGMSG.HW_GET_INFO])
def is_channel_enabled(self):
return self.chan_enabled
def ping(self):
async def ping(self):
try:
self.hardware_request_information()
await self.hardware_request_information()
except asyncio.CancelledError:
raise
except:
logger.warning("ping failed", exc_info=True)
return False
return True
@ -290,7 +287,7 @@ class Tpz(_Tcube):
_Tcube.__init__(self, serial_dev)
self.voltage_limit = self.get_tpz_io_settings()[0]
def handle_message(self, msg):
async def handle_message(self, msg):
msg_id = msg.id
data = msg.data
@ -305,7 +302,7 @@ class Tpz(_Tcube):
.format(code,
data[4:].decode(encoding="ascii")))
def set_position_control_mode(self, control_mode):
async def set_position_control_mode(self, control_mode):
"""Set the control loop mode.
When in closed-loop mode, position is maintained by a feedback signal
@ -320,10 +317,10 @@ class Tpz(_Tcube):
0x04 for Closed Loop Smooth.
"""
self.send(Message(MGMSG.PZ_SET_POSCONTROLMODE, param1=1,
await self.send(Message(MGMSG.PZ_SET_POSCONTROLMODE, param1=1,
param2=control_mode))
def get_position_control_mode(self):
async def get_position_control_mode(self):
"""Get the control loop mode.
:return: Returns the control mode.
@ -338,11 +335,11 @@ class Tpz(_Tcube):
:rtype: int
"""
get_msg = self.send_request(MGMSG.PZ_REQ_POSCONTROLMODE,
get_msg = await self.send_request(MGMSG.PZ_REQ_POSCONTROLMODE,
[MGMSG.PZ_GET_POSCONTROLMODE], 1)
return get_msg.param2
def set_output_volts(self, voltage):
async def set_output_volts(self, voltage):
"""Set output voltage applied to the piezo actuator.
This command is only applicable in Open Loop mode. If called when in
@ -352,7 +349,7 @@ class Tpz(_Tcube):
in open loop mode. The voltage value must be in range
[0; voltage_limit]. Voltage_limit being set by the
:py:meth:`set_tpz_io_settings()
<artiq.devices.thorlabs.driver.Tpz.set_tpz_io_settings>` method
<artiq.devices.thorlabs_tcube.driver.Tpz.set_tpz_io_settings>` method
between the three values 75 V, 100 V and 150 V.
"""
if voltage < 0 or voltage > self.voltage_limit:
@ -360,19 +357,19 @@ class Tpz(_Tcube):
.format(self.voltage_limit))
volt = int(voltage*32767/self.voltage_limit)
payload = st.pack("<HH", 1, volt)
self.send(Message(MGMSG.PZ_SET_OUTPUTVOLTS, data=payload))
await self.send(Message(MGMSG.PZ_SET_OUTPUTVOLTS, data=payload))
def get_output_volts(self):
async def get_output_volts(self):
"""Get the output voltage applied to the piezo actuator.
:return: The output voltage.
:rtype: float
"""
get_msg = self.send_request(MGMSG.PZ_REQ_OUTPUTVOLTS,
get_msg = await self.send_request(MGMSG.PZ_REQ_OUTPUTVOLTS,
[MGMSG.PZ_GET_OUTPUTVOLTS], 1)
return st.unpack("<H", get_msg.data[2:])[0]*self.voltage_limit/32767
def set_output_position(self, position_sw):
async def set_output_position(self, position_sw):
"""Set output position of the piezo actuator.
This command is only applicable in Closed Loop mode. If called when in
@ -385,20 +382,20 @@ class Tpz(_Tcube):
the maximum piezo extension.
"""
payload = st.pack("<HH", 1, position_sw)
self.send(Message(MGMSG.PZ_SET_OUTPUTPOS, data=payload))
await self.send(Message(MGMSG.PZ_SET_OUTPUTPOS, data=payload))
def get_output_position(self):
async def get_output_position(self):
"""Get output position of piezo actuator.
:return: The output position of the piezo relative to the zero
position.
:rtype: int
"""
get_msg = self.send_request(MGMSG.PZ_REQ_OUTPUTPOS,
get_msg = await self.send_request(MGMSG.PZ_REQ_OUTPUTPOS,
[MGMSG.PZ_GET_OUTPUTPOS], 1)
return st.unpack("<H", get_msg.data[2:])[0]
def set_input_volts_source(self, volt_src):
async def set_input_volts_source(self, volt_src):
"""Set the input source(s) which controls the output from the HV
amplifier circuit (i.e. the drive to the piezo actuators).
@ -407,7 +404,7 @@ class Tpz(_Tcube):
0x00 Software Only: Unit responds only to software inputs and the
HV amp output is that set using the :py:meth:`set_output_volts()
<artiq.devices.thorlabs.driver.Tpz.set_output_volts>` method.
<artiq.devices.thorlabs_tcube.driver.Tpz.set_output_volts>` method.
0x01 External Signal: Unit sums the differential signal on the rear
panel EXT IN(+) and EXT IN(-) connectors with the voltage set
@ -422,23 +419,23 @@ class Tpz(_Tcube):
either or both of the other source options.
"""
payload = st.pack("<HH", 1, volt_src)
self.send(Message(MGMSG.PZ_SET_INPUTVOLTSSRC, data=payload))
await self.send(Message(MGMSG.PZ_SET_INPUTVOLTSSRC, data=payload))
def get_input_volts_source(self):
async def get_input_volts_source(self):
"""Get the input source(s) which controls the output from the HV
amplifier circuit.
:return: Value which selects the various analog sources, cf.
:py:meth:`set_input_volts_source()
<artiq.devices.thorlabs.driver.Tpz.set_input_volts_source>` method
<artiq.devices.thorlabs_tcube.driver.Tpz.set_input_volts_source>` method
docstring for meaning of bits.
:rtype: int
"""
get_msg = self.send_request(MGMSG.PZ_REQ_INPUTVOLTSSRC,
get_msg = await self.send_request(MGMSG.PZ_REQ_INPUTVOLTSSRC,
[MGMSG.PZ_GET_INPUTVOLTSSRC], 1)
return st.unpack("<H", get_msg.data[2:])[0]
def set_pi_constants(self, prop_const, int_const):
async def set_pi_constants(self, prop_const, int_const):
"""Set the proportional and integration feedback loop constants.
These parameters determine the response characteristics when operating
@ -452,20 +449,20 @@ class Tpz(_Tcube):
:param int_const: Value of the integral term in range [0; 255].
"""
payload = st.pack("<HHH", 1, prop_const, int_const)
self.send(Message(MGMSG.PZ_SET_PICONSTS, data=payload))
await self.send(Message(MGMSG.PZ_SET_PICONSTS, data=payload))
def get_pi_constants(self):
async def get_pi_constants(self):
"""Get the proportional and integration feedback loop constants.
:return: Returns a tuple whose first element is the proportional
term and the second element is the integral term.
:rtype: a 2 int elements tuple : (int, int)
"""
get_msg = self.send_request(MGMSG.PZ_REQ_PICONSTS,
get_msg = await self.send_request(MGMSG.PZ_REQ_PICONSTS,
[MGMSG.PZ_GET_PICONSTS], 1)
return st.unpack("<HH", get_msg.data[2:])
def set_output_lut(self, lut_index, output):
async def set_output_lut(self, lut_index, output):
"""Set the ouput LUT values for WGM (Waveform Generator Mode).
It is possible to use the controller in an arbitrary Waveform
@ -491,7 +488,7 @@ class Tpz(_Tcube):
applicable channel is specified by the Chan Ident parameter If only a
sub set of the array is being used (as specified by the cyclelength
parameter of the :py:meth:`set_output_lut_parameters()
<artiq.devices.thorlabs.driver.Tpz.set_output_lut_parameters>`
<artiq.devices.thorlabs_tcube.driver.Tpz.set_output_lut_parameters>`
function), then only the first cyclelength values need to be set. In
this manner, any arbitrary voltage waveform can be programmed into the
LUT. Note. The LUT values are output by the system at a maximum
@ -503,25 +500,25 @@ class Tpz(_Tcube):
:param output: The voltage value to be set. Values are in the range
[0; voltage_limit]. Voltage_limit being set with the
:py:meth:`set_tpz_io_settings
<artiq.devices.thorlabs.driver.Tpz.set_tpz_io_settings>` method.
<artiq.devices.thorlabs_tcube.driver.Tpz.set_tpz_io_settings>` method.
"""
volt = round(output*32767/self.voltage_limit)
payload = st.pack("<HHH", 1, lut_index, volt)
self.send(Message(MGMSG.PZ_SET_OUTPUTLUT, data=payload))
await self.send(Message(MGMSG.PZ_SET_OUTPUTLUT, data=payload))
def get_output_lut(self):
async def get_output_lut(self):
"""Get the ouput LUT values for WGM (Waveform Generator Mode).
:return: a tuple whose first element is the lut index and the second is
the voltage output value.
:rtype: a 2 elements tuple (int, float)
"""
get_msg = self.send_request(MGMSG.PZ_REQ_OUTPUTLUT,
get_msg = await self.send_request(MGMSG.PZ_REQ_OUTPUTLUT,
[MGMSG.PZ_GET_OUTPUTLUT], 1)
index, output = st.unpack("<Hh", get_msg.data[2:])
return index, output*self.voltage_limit/32767
def set_output_lut_parameters(self, mode, cycle_length, num_cycles,
async def set_output_lut_parameters(self, mode, cycle_length, num_cycles,
delay_time, precycle_rest, postcycle_rest):
"""Set Waveform Generator Mode parameters.
@ -585,56 +582,56 @@ class Tpz(_Tcube):
payload = st.pack("<HHHLLLLHLH", 1, mode, cycle_length, num_cycles,
delay_time, precycle_rest, postcycle_rest,
0, 0, 0)
self.send(Message(MGMSG.PZ_SET_OUTPUTLUTPARAMS, data=payload))
await self.send(Message(MGMSG.PZ_SET_OUTPUTLUTPARAMS, data=payload))
def get_output_lut_parameters(self):
async def get_output_lut_parameters(self):
"""Get Waveform Generator Mode parameters.
:return: a 6 int elements tuple whose members are (mode, cycle_length,
num_cycles, delay_time, precycle_rest, postcycle_rest).
:rtype: 6 int elements tuple
"""
get_msg = self.send_request(MGMSG.PZ_REQ_OUTPUTLUTPARAMS,
get_msg = await self.send_request(MGMSG.PZ_REQ_OUTPUTLUTPARAMS,
[MGMSG.PZ_GET_OUTPUTLUTPARAMS], 1)
return st.unpack("<HHLLLL", get_msg.data[2:22])
def start_lut_output(self):
async def start_lut_output(self):
"""Start the voltage waveform (LUT) outputs."""
self.send(Message(MGMSG.PZ_START_LUTOUTPUT, param1=1))
await self.send(Message(MGMSG.PZ_START_LUTOUTPUT, param1=1))
def stop_lut_output(self):
async def stop_lut_output(self):
"""Stop the voltage waveform (LUT) outputs."""
self.send(Message(MGMSG.PZ_STOP_LUTOUTPUT, param1=1))
await self.send(Message(MGMSG.PZ_STOP_LUTOUTPUT, param1=1))
def set_eeprom_parameters(self, msg_id):
async def set_eeprom_parameters(self, msg_id):
"""Save the parameter settings for the specified message.
:param msg_id: The message ID of the message containing the parameters
to be saved.
"""
payload = st.pack("<HH", 1, msg_id)
self.send(Message(MGMSG.PZ_SET_EEPROMPARAMS, data=payload))
await self.send(Message(MGMSG.PZ_SET_EEPROMPARAMS, data=payload))
def set_tpz_display_settings(self, intensity):
async def set_tpz_display_settings(self, intensity):
"""Set the intensity of the LED display on the front of the TPZ unit.
:param intensity: The intensity is set as a value from 0 (Off) to 255
(brightest).
"""
payload = st.pack("<H", intensity)
self.send(Message(MGMSG.PZ_SET_TPZ_DISPSETTINGS, data=payload))
await self.send(Message(MGMSG.PZ_SET_TPZ_DISPSETTINGS, data=payload))
def get_tpz_display_settings(self):
async def get_tpz_display_settings(self):
"""Get the intensity of the LED display on the front of the TPZ unit.
:return: The intensity as a value from 0 (Off) to 255 (brightest).
:rtype: int
"""
get_msg = self.send_request(MGMSG.PZ_REQ_TPZ_DISPSETTINGS,
get_msg = await self.send_request(MGMSG.PZ_REQ_TPZ_DISPSETTINGS,
[MGMSG.PZ_GET_TPZ_DISPSETTINGS], 1)
return st.unpack("<H", get_msg.data)[0]
def set_tpz_io_settings(self, voltage_limit, hub_analog_input):
async def set_tpz_io_settings(self, voltage_limit, hub_analog_input):
"""Set various I/O settings."
:param voltage_limit: The piezo actuator connected to the T-Cube has a
@ -676,18 +673,18 @@ class Tpz(_Tcube):
raise ValueError("voltage_limit must be 75 V, 100 V or 150 V")
payload = st.pack("<HHHHH", 1, voltage_limit, hub_analog_input, 0, 0)
self.send(Message(MGMSG.PZ_SET_TPZ_IOSETTINGS, data=payload))
await self.send(Message(MGMSG.PZ_SET_TPZ_IOSETTINGS, data=payload))
def get_tpz_io_settings(self):
async def get_tpz_io_settings(self):
"""Get various I/O settings.
:return: Returns a tuple whose elements are the voltage limit and the
Hub analog input. Refer to :py:meth:`set_tpz_io_settings()
<artiq.devices.thorlabs.driver.Tpz.set_tpz_io_settings>` for the
<artiq.devices.thorlabs_tcube.driver.Tpz.set_tpz_io_settings>` for the
meaning of those parameters.
:rtype: a 2 elements tuple (int, int)
"""
get_msg = self.send_request(MGMSG.PZ_REQ_TPZ_IOSETTINGS,
get_msg = await self.send_request(MGMSG.PZ_REQ_TPZ_IOSETTINGS,
[MGMSG.PZ_GET_TPZ_IOSETTINGS], 1)
voltage_limit, hub_analog_input = st.unpack("<HH", get_msg.data[2:6])
if voltage_limit == 1:
@ -703,10 +700,11 @@ class Tpz(_Tcube):
class Tdc(_Tcube):
def send_status_ack(self):
self.send(Message(MGMSG.MOT_ACK_DCSTATUSUPDATE))
def __init__(self, *args, **kwargs):
_Tcube.__init__(self, *args, **kwargs)
self.status_report_counter = 0
def handle_message(self, msg):
async def handle_message(self, msg):
msg_id = msg.id
data = msg.data
@ -725,18 +723,18 @@ class Tdc(_Tcube):
msg_id == MGMSG.MOT_GET_DCSTATUSUPDATE):
if self.status_report_counter == 25:
self.status_report_counter = 0
self.send_status_ack()
await self.send(Message(MGMSG.MOT_ACK_DCSTATUSUPDATE))
else:
self.status_report_counter += 1
# 'r' is a currently unused and reserved field
self.position, self.velocity, r, self.status = st.unpack(
"<LHHL", data[2:])
def is_moving(self):
status_bits = self.get_status_bits()
async def is_moving(self):
status_bits = await self.get_status_bits()
return (status_bits & 0x2F0) != 0
def set_pot_parameters(self, zero_wnd, vel1, wnd1, vel2, wnd2, vel3,
async def set_pot_parameters(self, zero_wnd, vel1, wnd1, vel2, wnd2, vel3,
wnd3, vel4):
"""Set pot parameters.
@ -755,28 +753,28 @@ class Tdc(_Tcube):
"""
payload = st.pack("<HHLHLHLHL", 1, zero_wnd, vel1, wnd1, vel2, wnd2,
vel3, wnd3, vel4)
self.send(Message(MGMSG.MOT_SET_POTPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_POTPARAMS, data=payload))
def get_pot_parameters(self):
async def get_pot_parameters(self):
"""Get pot parameters.
:return: An 8 int tuple containing the following values: zero_wnd,
vel1, wnd1, vel2, wnd2, vel3, wnd3, vel4. See
:py:meth:`set_pot_parameters()
<artiq.devices.thorlabs.driver.Tdc.set_pot_parameters>` for a
<artiq.devices.thorlabs_tcube.driver.Tdc.set_pot_parameters>` for a
description of each tuple element meaning.
:rtype: An 8 int tuple
"""
get_msg = self.send_request(MGMSG.MOT_REQ_POTPARAMS,
get_msg = await self.send_request(MGMSG.MOT_REQ_POTPARAMS,
[MGMSG.MOT_GET_POTPARAMS], 1)
return st.unpack("<HLHLHLHL", get_msg.data[2:])
def hub_get_bay_used(self):
get_msg = self.send_request(MGMSG.HUB_REQ_BAYUSED,
async def hub_get_bay_used(self):
get_msg = await self.send_request(MGMSG.HUB_REQ_BAYUSED,
[MGMSG.HUB_GET_BAYUSED])
return get_msg.param1
def set_position_counter(self, position):
async def set_position_counter(self, position):
"""Set the "live" position count in the controller.
In general, this command is not normally used. Instead, the stage is
@ -787,19 +785,19 @@ class Tdc(_Tcube):
:param position: The new value of the position counter.
"""
payload = st.pack("<Hl", 1, position)
self.send(Message(MGMSG.MOT_SET_POSCOUNTER, data=payload))
await self.send(Message(MGMSG.MOT_SET_POSCOUNTER, data=payload))
def get_position_counter(self):
async def get_position_counter(self):
"""Get the "live" position count from the controller.
:return: The value of the position counter.
:rtype: int
"""
get_msg = self.send_request(MGMSG.MOT_REQ_POSCOUNTER,
get_msg = await self.send_request(MGMSG.MOT_REQ_POSCOUNTER,
[MGMSG.MOT_GET_POSCOUNTER], 1)
return st.unpack("<l", get_msg.data[2:])[0]
def set_encoder_counter(self, encoder_count):
async def set_encoder_counter(self, encoder_count):
"""Set encoder count in the controller.
This is only applicable to stages and actuators fitted
@ -809,38 +807,38 @@ class Tdc(_Tcube):
:param encoder_count: The new value of the encoder counter.
"""
payload = st.pack("<Hl", 1, encoder_count)
self.send(Message(MGMSG.MOT_SET_ENCCOUNTER, data=payload))
await self.send(Message(MGMSG.MOT_SET_ENCCOUNTER, data=payload))
def get_encoder_counter(self):
async def get_encoder_counter(self):
"""Get encoder count from the controller.
:return: The value of the encoder counter.
:rtype: int
"""
get_msg = self.send_request(MGMSG.MOT_REQ_ENCCOUNTER,
get_msg = await self.send_request(MGMSG.MOT_REQ_ENCCOUNTER,
[MGMSG.MOT_GET_ENCCOUNTER], 1)
return st.unpack("<l", get_msg.data[2:])[0]
def set_velocity_parameters(self, acceleration, max_velocity):
async def set_velocity_parameters(self, acceleration, max_velocity):
"""Set the trapezoidal velocity parameter.
:param acceleration: The acceleration in encoder counts/sec/sec.
:param max_velocity: The maximum (final) velocity in counts/sec.
"""
payload = st.pack("<HLLL", 1, 0, acceleration, max_velocity)
self.send(Message(MGMSG.MOT_SET_VELPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_VELPARAMS, data=payload))
def get_velocity_parameters(self):
async def get_velocity_parameters(self):
"""Get the trapezoidal velocity parameters.
:return: A 2 int tuple: (acceleration, max_velocity).
:rtype: A 2 int tuple (int, int)
"""
get_msg = self.send_request(MGMSG.MOT_REQ_VELPARAMS,
get_msg = await self.send_request(MGMSG.MOT_REQ_VELPARAMS,
[MGMSG.MOT_GET_VELPARAMS], 1)
return st.unpack("<LL", get_msg.data[6:])
def set_jog_parameters(self, mode, step_size, acceleration,
async def set_jog_parameters(self, mode, step_size, acceleration,
max_velocity, stop_mode):
"""Set the velocity jog parameters.
@ -854,41 +852,41 @@ class Tdc(_Tcube):
"""
payload = st.pack("<HHLLLLH", 1, mode, step_size, 0, acceleration,
max_velocity, stop_mode)
self.send(Message(MGMSG.MOT_SET_JOGPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_JOGPARAMS, data=payload))
def get_jog_parameters(self):
async def get_jog_parameters(self):
"""Get the velocity jog parameters.
:return: A 5 int tuple containing in this order: jog_mode,
step_size, acceleration, max_velocity, stop_mode
:rtype: A 5 int tuple.
"""
get_msg = self.send_request(MGMSG.MOT_REQ_JOGPARAMS,
get_msg = await self.send_request(MGMSG.MOT_REQ_JOGPARAMS,
[MGMSG.MOT_GET_JOGPARAMS], 1)
(jog_mode, step_size, _, acceleration, max_velocity,
stop_mode) = st.unpack("<HLLLLH", get_msg.data[2:])
return jog_mode, step_size, acceleration, max_velocity, stop_mode
def set_gen_move_parameters(self, backlash_distance):
async def set_gen_move_parameters(self, backlash_distance):
"""Set the backlash distance.
:param backlash_distance: The value of the backlash distance,
which specifies the relative distance in position counts.
"""
payload = st.pack("<Hl", 1, backlash_distance)
self.send(Message(MGMSG.MOT_SET_GENMOVEPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_GENMOVEPARAMS, data=payload))
def get_gen_move_parameters(self):
async def get_gen_move_parameters(self):
"""Get the backlash distance.
:return: The value of the backlash distance.
:rtype: int
"""
get_msg = self.send_request(MGMSG.MOT_REQ_GENMOVEPARAMS,
get_msg = await self.send_request(MGMSG.MOT_REQ_GENMOVEPARAMS,
[MGMSG.MOT_GET_GENMOVEPARAMS], 1)
return st.unpack("<l", get_msg.data[2:])[0]
def set_move_relative_parameters(self, relative_distance):
async def set_move_relative_parameters(self, relative_distance):
"""Set the following relative move parameter: relative_distance.
:param relative_distance: The distance to move. This is a signed
@ -896,19 +894,19 @@ class Tdc(_Tcube):
counts.
"""
payload = st.pack("<Hl", 1, relative_distance)
self.send(Message(MGMSG.MOT_SET_MOVERELPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_MOVERELPARAMS, data=payload))
def get_move_relative_parameters(self):
async def get_move_relative_parameters(self):
"""Get the relative distance move parameter.
:return: The relative distance move parameter.
:rtype: int
"""
get_msg = self.send_request(MGMSG.MOT_REQ_MOVERELPARAMS,
get_msg = await self.send_request(MGMSG.MOT_REQ_MOVERELPARAMS,
[MGMSG.MOT_GET_MOVERELPARAMS], 1)
return st.unpack("<l", get_msg.data[2:])[0]
def set_move_absolute_parameters(self, absolute_position):
async def set_move_absolute_parameters(self, absolute_position):
"""Set the following absolute move parameter: absolute_position.
:param absolute_position: The absolute position to move. This is a
@ -916,45 +914,45 @@ class Tdc(_Tcube):
counts.
"""
payload = st.pack("<Hl", 1, absolute_position)
self.send(Message(MGMSG.MOT_SET_MOVEABSPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_MOVEABSPARAMS, data=payload))
def get_move_absolute_parameters(self):
async def get_move_absolute_parameters(self):
"""Get the absolute position move parameter.
:return: The absolute position to move.
:rtype: int
"""
get_msg = self.send_request(MGMSG.MOT_REQ_MOVEABSPARAMS,
get_msg = await self.send_request(MGMSG.MOT_REQ_MOVEABSPARAMS,
[MGMSG.MOT_GET_MOVEABSPARAMS], 1)
return st.unpack("<l", get_msg.data[2:])[0]
def set_home_parameters(self, home_velocity):
async def set_home_parameters(self, home_velocity):
"""Set the homing velocity parameter.
:param home_velocity: Homing velocity.
"""
payload = st.pack("<HHHLL", 1, 0, 0, home_velocity, 0)
self.send(Message(MGMSG.MOT_SET_HOMEPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_HOMEPARAMS, data=payload))
def get_home_parameters(self):
async def get_home_parameters(self):
"""Get the homing velocity parameter.
:return: The homing velocity.
:rtype: int
"""
get_msg = self.send_request(MGMSG.MOT_REQ_HOMEPARAMS,
get_msg = await self.send_request(MGMSG.MOT_REQ_HOMEPARAMS,
[MGMSG.MOT_GET_HOMEPARAMS], 1)
return st.unpack("<L", get_msg.data[6:10])[0]
def move_home(self):
async def move_home(self):
"""Start a home move sequence.
This call is blocking until device is homed or move is stopped.
"""
self.send_request(MGMSG.MOT_MOVE_HOME,
await self.send_request(MGMSG.MOT_MOVE_HOME,
[MGMSG.MOT_MOVE_HOMED, MGMSG.MOT_MOVE_STOPPED], 1)
def set_limit_switch_parameters(self, cw_hw_limit, ccw_hw_limit):
async def set_limit_switch_parameters(self, cw_hw_limit, ccw_hw_limit):
"""Set the limit switch parameters.
:param cw_hw_limit: The operation of clockwise hardware limit switch
@ -982,57 +980,57 @@ class Tdc(_Tcube):
switch when contact is made.
"""
payload = st.pack("<HHHLLH", 1, cw_hw_limit, ccw_hw_limit, 0, 0, 0)
self.send(Message(MGMSG.MOT_SET_LIMSWITCHPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_LIMSWITCHPARAMS, data=payload))
def get_limit_switch_parameters(self):
async def get_limit_switch_parameters(self):
"""Get the limit switch parameters.
:return: A 2 int tuple containing the following in order: cw_hw_limit,
ccw_hw_limit. Cf. description in
:py:meth:`set_limit_switch_parameters()
<artiq.devices.thorlabs.driver.Tdc.set_limit_switch_parameters>`
<artiq.devices.thorlabs_tcube.driver.Tdc.set_limit_switch_parameters>`
method.
:rtype: A 2 int tuple.
"""
get_msg = self.send_request(MGMSG.MOT_REQ_LIMSWITCHPARAMS,
get_msg = await self.send_request(MGMSG.MOT_REQ_LIMSWITCHPARAMS,
[MGMSG.MOT_GET_LIMSWITCHPARAMS], 1)
return st.unpack("<HH", get_msg.data[2:6])
def move_relative_memory(self):
async def move_relative_memory(self):
"""Start a relative move of distance in the controller's memory
The relative distance parameter used for the move will be the parameter
sent previously by a :py:meth:`set_move_relative_parameters()
<artiq.devices.thorlabs.driver.Tdc.set_move_relative_parameters>`
<artiq.devices.thorlabs_tcube.driver.Tdc.set_move_relative_parameters>`
command.
"""
self.send_request(MGMSG.MOT_MOVE_RELATIVE,
await self.send_request(MGMSG.MOT_MOVE_RELATIVE,
[MGMSG.MOT_MOVE_COMPLETED, MGMSG.MOT_MOVE_STOPPED], 1)
def move_relative(self, relative_distance):
async def move_relative(self, relative_distance):
"""Start a relative move
:param relative_distance: The distance to move in position encoder
counts.
"""
payload = st.pack("<Hl", 1, relative_distance)
self.send_request(MGMSG.MOT_MOVE_RELATIVE,
await self.send_request(MGMSG.MOT_MOVE_RELATIVE,
[MGMSG.MOT_MOVE_COMPLETED, MGMSG.MOT_MOVE_STOPPED],
data=payload)
def move_absolute_memory(self):
async def move_absolute_memory(self):
"""Start an absolute move of distance in the controller's memory.
The absolute move position parameter used for the move will be the
parameter sent previously by a :py:meth:`set_move_absolute_parameters()
<artiq.devices.thorlabs.driver.Tdc.set_move_absolute_parameters>`
<artiq.devices.thorlabs_tcube.driver.Tdc.set_move_absolute_parameters>`
command.
"""
self.send_request(MGMSG.MOT_MOVE_ABSOLUTE,
await self.send_request(MGMSG.MOT_MOVE_ABSOLUTE,
[MGMSG.MOT_MOVE_COMPLETED, MGMSG.MOT_MOVE_STOPPED],
param1=1)
def move_absolute(self, absolute_distance):
async def move_absolute(self, absolute_distance):
"""Start an absolute move.
:param absolute_distance: The distance to move. This is a signed
@ -1040,36 +1038,36 @@ class Tdc(_Tcube):
counts.
"""
payload = st.pack("<Hl", 1, absolute_distance)
self.send_request(MGMSG.MOT_MOVE_ABSOLUTE,
await self.send_request(MGMSG.MOT_MOVE_ABSOLUTE,
[MGMSG.MOT_MOVE_COMPLETED, MGMSG.MOT_MOVE_STOPPED],
data=payload)
def move_jog(self, direction):
async def move_jog(self, direction):
"""Start a jog move.
:param direction: The direction to jog. 1 is forward, 2 is backward.
"""
self.send_request(MGMSG.MOT_MOVE_JOG,
await self.send_request(MGMSG.MOT_MOVE_JOG,
[MGMSG.MOT_MOVE_COMPLETED, MGMSG.MOT_MOVE_STOPPED],
param1=1, param2=direction)
def move_velocity(self, direction):
async def move_velocity(self, direction):
"""Start a move.
When this method is called, the motor will move continuously in the
specified direction using the velocity parameter set by the
:py:meth:`set_move_relative_parameters()
<artiq.devices.thorlabs.driver.Tdc.set_move_relative_parameters>`
<artiq.devices.thorlabs_tcube.driver.Tdc.set_move_relative_parameters>`
command until a :py:meth:`move_stop()
<artiq.devices.thorlabs.driver.Tdc.move_stop>` command (either
<artiq.devices.thorlabs_tcube.driver.Tdc.move_stop>` command (either
StopImmediate or StopProfiled) is called, or a limit switch is reached.
:param direction: The direction to jog: 1 to move forward, 2 to move
backward.
"""
self.send(Message(MGMSG.MOT_MOVE_VELOCITY, param1=1, param2=direction))
await self.send(Message(MGMSG.MOT_MOVE_VELOCITY, param1=1, param2=direction))
def move_stop(self, stop_mode):
async def move_stop(self, stop_mode):
"""Stop any type of motor move.
Stops any of those motor move: relative, absolute, homing or move at
@ -1079,13 +1077,13 @@ class Tdc(_Tcube):
or profiled stop. Set this byte to 1 to stop immediately, or to 2
to stop in a controlled (profiled) manner.
"""
if self.is_moving():
self.send_request(MGMSG.MOT_MOVE_STOP,
if await self.is_moving():
await self.send_request(MGMSG.MOT_MOVE_STOP,
[MGMSG.MOT_MOVE_STOPPED,
MGMSG.MOT_MOVE_COMPLETED],
1, stop_mode)
def set_dc_pid_parameters(self, proportional, integral, differential,
async def set_dc_pid_parameters(self, proportional, integral, differential,
integral_limit, filter_control=0x0F):
"""Set the position control loop parameters.
@ -1102,23 +1100,23 @@ class Tdc(_Tcube):
"""
payload = st.pack("<HLLLLH", 1, proportional, integral,
differential, integral_limit, filter_control)
self.send(Message(MGMSG.MOT_SET_DCPIDPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_DCPIDPARAMS, data=payload))
def get_dc_pid_parameters(self):
async def get_dc_pid_parameters(self):
"""Get the position control loop parameters.
:return: A 5 int tuple containing in this order:
proportional gain, integral gain, differential gain, integral limit
and filter control. Cf. :py:meth:`set_dc_pid_parameters()
<artiq.devices.thorlabs.driver.Tdc.set_dc_pid_parameters>` for
<artiq.devices.thorlabs_tcube.driver.Tdc.set_dc_pid_parameters>` for
precise description.
:rtype: A 5 int tuple.
"""
get_msg = self.send_request(MGMSG.MOT_REQ_DCPIDPARAMS,
get_msg = await self.send_request(MGMSG.MOT_REQ_DCPIDPARAMS,
[MGMSG.MOT_GET_DCPIDPARAMS], 1)
return st.unpack("<LLLLH", get_msg.data[2:])
def set_av_modes(self, mode_bits):
async def set_av_modes(self, mode_bits):
"""Set the LED indicator modes.
The LED on the control keyboard can be configured to indicate certain
@ -1131,9 +1129,9 @@ class Tdc(_Tcube):
Set the bit 3 (value 8) will make the LED lit when motor is moving.
"""
payload = st.pack("<HH", 1, mode_bits)
self.send(Message(MGMSG.MOT_SET_AVMODES, data=payload))
await self.send(Message(MGMSG.MOT_SET_AVMODES, data=payload))
def get_av_modes(self):
async def get_av_modes(self):
"""Get the LED indicator mode bits.
:return: The LED indicator mode bits.
@ -1143,7 +1141,7 @@ class Tdc(_Tcube):
[MGMSG.MOT_GET_AVMODES], 1)
return st.unpack("<H", get_msg.data[2:])[0]
def set_button_parameters(self, mode, position1, position2):
async def set_button_parameters(self, mode, position1, position2):
"""Set button parameters.
The control keypad can be used either to jog the motor, or to perform
@ -1153,7 +1151,7 @@ class Tdc(_Tcube):
:param mode: If set to 1, the buttons are used to jog the motor. Once
set to this mode, the move parameters for the buttons are taken
from the arguments of the :py:meth:`set_jog_parameters()
<artiq.devices.thorlabs.driver.Tdc.set_jog_parameters>` method.
<artiq.devices.thorlabs_tcube.driver.Tdc.set_jog_parameters>` method.
If set to 2, each button can be programmed with a differente
position value such that the controller will move the motor to that
position when the specific button is pressed.
@ -1164,31 +1162,31 @@ class Tdc(_Tcube):
"""
payload = st.pack("<HHllHH", 1, mode, position1, position2,
0, 0)
self.send(Message(MGMSG.MOT_SET_BUTTONPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_BUTTONPARAMS, data=payload))
def get_button_parameters(self):
async def get_button_parameters(self):
"""Get button parameters.
:return: A 3 int tuple containing in this order: button mode,
position1 and position2. Cf. :py:meth:`set_button_parameters()
<artiq.devices.thorlabs.driver.Tdc.set_button_parameters>` for
<artiq.devices.thorlabs_tcube.driver.Tdc.set_button_parameters>` for
description.
:rtype: A 3 int tuple
"""
get_msg = self.send_request(MGMSG.MOT_REQ_BUTTONPARAMS,
get_msg = await self.send_request(MGMSG.MOT_REQ_BUTTONPARAMS,
[MGMSG.MOT_GET_BUTTONPARAMS], 1)
return st.unpack("<Hll", get_msg.data[2:12])
def set_eeprom_parameters(self, msg_id):
async def set_eeprom_parameters(self, msg_id):
"""Save the parameter settings for the specified message.
:param msg_id: The message ID of the message containing the parameters
to be saved.
"""
payload = st.pack("<HH", 1, msg_id)
self.send(Message(MGMSG.MOT_SET_EEPROMPARAMS, data=payload))
await self.send(Message(MGMSG.MOT_SET_EEPROMPARAMS, data=payload))
def get_dc_status_update(self):
async def get_dc_status_update(self):
"""Request a status update from the motor.
This can be used instead of enabling regular updates.
@ -1197,31 +1195,31 @@ class Tdc(_Tcube):
velocity, status bits.
:rtype: A 3 int tuple
"""
get_msg = self.send_request(MGMSG.MOT_REQ_DCSTATUSUPDATE,
get_msg = await self.send_request(MGMSG.MOT_REQ_DCSTATUSUPDATE,
[MGMSG.MOT_GET_DCSTATUSUPDATE], 1)
pos, vel, _, stat = st.unpack("<LHHL", get_msg.data[2:])
return pos, vel, stat
def get_status_bits(self):
async def get_status_bits(self):
"""Request a cut down version of the status update with status bits.
:return: The motor status.
:rtype:
"""
get_msg = self.send_request(MGMSG.MOT_REQ_STATUSBITS,
get_msg = await self.send_request(MGMSG.MOT_REQ_STATUSBITS,
[MGMSG.MOT_GET_STATUSBITS], 1)
return st.unpack("<L", get_msg.data[2:])[0]
def suspend_end_of_move_messages(self):
async def suspend_end_of_move_messages(self):
"""Disable all unsolicited "end of move" messages and error messages
returned by the controller.
i.e., MGMSG.MOT_MOVE_STOPPED, MGMSG.MOT_MOVE_COMPLETED,
MGMSGS_MOT_MOVE_HOMED
"""
self.send(Message(MGMSG.MOT_SUSPEND_ENDOFMOVEMSGS))
await self.send(Message(MGMSG.MOT_SUSPEND_ENDOFMOVEMSGS))
def resume_end_of_move_messages(self):
async def resume_end_of_move_messages(self):
"""Resume all unsolicited "end of move" messages and error messages
returned by the controller.
@ -1233,7 +1231,7 @@ class Tdc(_Tcube):
MGMSG.HW_RESPONSE,
MGMSG.HW_RICHRESPONSE
"""
self.send(Message(MGMSG.MOT_RESUME_ENDOFMOVEMSGS))
await self.send(Message(MGMSG.MOT_RESUME_ENDOFMOVEMSGS))
class TpzSim: