pxi6733: refactor, allow multiple channels in one task, cancel any previous task

This commit is contained in:
Yann Sionneau 2015-06-05 20:12:41 +02:00
parent 398940f5ac
commit c251601204
2 changed files with 74 additions and 29 deletions

View File

@ -14,22 +14,38 @@ class DAQmxSim:
def ping(self):
return True
def string_to_bytes(string, name):
if isinstance(string, str):
string = bytes(string, encoding="ascii")
elif not isinstance(string, bytes):
raise ValueError("{} must be of either str or bytes type".format(name))
return string
class DAQmx:
"""NI PXI6733 DAQ interface."""
def __init__(self, device, analog_output, clock):
def __init__(self, channels, clock):
"""
:param channels: List of channels as a string or bytes(), following
the physical channels lists and ranges NI-DAQmx syntax.
Example: Dev1/ao0, Dev1/ao1:ao3
:param clock: Clock source terminal as a string or bytes(), following
NI-DAQmx terminal names syntax.
Example: PFI5
"""
import PyDAQmx as daq
self.device = device
self.analog_output = analog_output
self.clock = clock
self.tasks = []
self.channels = string_to_bytes(channels, "channels")
self.clock = string_to_bytes(clock, "clock")
self.task = None
self.daq = daq
def done_callback_py(self, taskhandle, status, callback_data):
self.daq.DAQmxClearTask(taskhandle)
self.tasks.remove(taskhandle)
if taskhandle == self.task:
self.clear_pending_task()
def ping(self):
try:
@ -42,43 +58,75 @@ class DAQmx:
def load_sample_values(self, sampling_freq, values):
"""Load sample values into PXI 6733 device.
This loads sample values into the PXI 6733 device and then
configures a task to output those samples at each clock rising
edge.
This loads sample values into the PXI 6733 device.
The device will output samples at each clock rising edge.
The first sample is output at the first clock rising edge.
A callback is registered to clear the task (deallocate resources)
when the task has completed.
When using several channels simultaneously, you must concatenate the
values for the different channels in the ``values`` array.
The sample values for the same channel must be grouped together.
Example:
>>> values = np.array([ch0_samp0, ch0_samp1, ch1_samp0, ch1_samp1],
dtype=float)
In this example the first two samples will be output via the first
channel and the two following samples will be output via the second
channel.
Any call to this method will cancel any previous task even if it has
not yet completed.
:param sampling_freq: The sampling frequency in samples per second.
:param values: A numpy array of sample values to load in the device.
"""
self.clear_pending_task()
t = self.daq.Task()
t.CreateAOVoltageChan(self.device+b"/"+self.analog_output, b"",
t.CreateAOVoltageChan(self.channels, b"",
min(values), max(values),
self.daq.DAQmx_Val_Volts, None)
channel_number = self.daq.int32()
t.GetTaskNumChans(byref(channel_number))
nb_values = len(values)
if nb_values % channel_number.value > 0:
self.daq.DAQmxClearTask(t.taskHandle)
raise ValueError("The size of the values array must be a multiple "
"of the number of channels ({})"
.format(channel_number.value))
samps_per_channel = nb_values // channel_number
t.CfgSampClkTiming(self.clock, sampling_freq,
self.daq.DAQmx_Val_Rising,
self.daq.DAQmx_Val_FiniteSamps, len(values))
self.daq.DAQmx_Val_FiniteSamps, samps_per_channel)
num_samps_written = self.daq.int32()
values = np.require(values, dtype=float,
requirements=["C_CONTIGUOUS", "WRITEABLE"])
ret = t.WriteAnalogF64(len(values), False, 0,
ret = t.WriteAnalogF64(samps_per_channel, False, 0,
self.daq.DAQmx_Val_GroupByChannel, values,
byref(num_samps_written), None)
if num_samps_written.value != len(values):
if num_samps_written.value != nb_values:
raise IOError("Error: only {} sample values were written"
.format(num_samps_written.value))
if ret:
raise IOError("Error while writing samples to the channel buffer")
done_cb = self.daq.DAQmxDoneEventCallbackPtr(self.done_callback_py)
self.tasks.append(t.taskHandle)
self.task = t.taskHandle
self.daq.DAQmxRegisterDoneEvent(t.taskHandle, 0, done_cb, None)
t.StartTask()
def close(self):
"""Clear all pending tasks."""
def clear_pending_task(self):
"""Clear any pending task."""
for t in self.tasks:
self.daq.DAQmxClearTask(t)
if self.task is not None:
self.daq.DAQmxClearTask(self.task)
self.task = None
def close(self):
"""Free any allocated resources."""
self.clear_pending_task()

View File

@ -11,13 +11,11 @@ from artiq.tools import verbosity_args, init_logger, simple_network_args
def get_argparser():
parser = argparse.ArgumentParser(description="NI PXI 6733 controller")
simple_network_args(parser, 3256)
parser.add_argument("-d", "--device", default=None,
help="Device name (e.g. Dev1)."
parser.add_argument("-C", "--channels", default=None,
help="List of channels (e.g. Dev1/ao0, Dev1/ao1:3)."
" Omit for simulation mode.")
parser.add_argument("-c", "--clock", default="PFI5",
help="Input clock pin name (default: PFI5)")
parser.add_argument("-a", "--analog-output", default="ao0",
help="Analog output pin name (default: ao0)")
verbosity_args(parser)
return parser
@ -26,12 +24,11 @@ def main():
args = get_argparser().parse_args()
init_logger(args)
if args.device is None:
if args.channels is None:
daq = DAQmxSim()
else:
daq = DAQmx(bytes(args.device, "ascii"),
bytes(args.analog_output, "ascii"),
bytes(args.clock, "ascii"))
daq = DAQmx(args.channels,
args.clock)
try:
simple_server_loop({"pxi6733": daq},