2
0
mirror of https://github.com/m-labs/artiq.git synced 2025-01-27 02:48:12 +08:00

remove pxi6733 support (now lives at https://github.com/m-labs/aq_ni6733

This commit is contained in:
Sebastien Bourdeauducq 2016-02-02 18:41:57 +01:00
parent c639860331
commit b7de92e96c
9 changed files with 1 additions and 462 deletions

View File

@ -1,141 +0,0 @@
# Yann Sionneau <ys@m-labs.hk>, 2015
from ctypes import byref, c_ulong, create_string_buffer
import logging
import numpy as np
logger = logging.getLogger(__name__)
class DAQmxSim:
def load_sample_values(self, values):
pass
def close(self):
pass
def ping(self):
return True
class DAQmx:
"""NI PXI6733 DAQ interface."""
def __init__(self, channels, clock):
"""
:param channels: List of channels as a string, following
the physical channels lists and ranges NI-DAQmx syntax.
Example: Dev1/ao0, Dev1/ao1:ao3
:param clock: Clock source terminal as a string, following
NI-DAQmx terminal names syntax.
Example: PFI5
"""
import PyDAQmx as daq
self.channels = channels.encode()
self.clock = clock.encode()
self.task = None
self.daq = daq
def _done_callback(self, taskhandle, status, callback_data):
if taskhandle != self.task:
logger.warning("done callback called with unexpected task")
else:
self.clear_pending_task()
def ping(self):
try:
data_len = 128
data = create_string_buffer(data_len)
self.daq.DAQmxGetSysDevNames(data, data_len)
logger.debug("Device names: %s", data.value)
except:
return False
return data.value != ""
def load_sample_values(self, sampling_freq, values):
"""Load sample values into PXI 6733 device.
This loads sample values into the PXI 6733 device.
The device will output samples at each clock rising edge.
The device waits for a clock rising edge to output the first sample.
When using several channels simultaneously, you can either concatenate
the values for the different channels in a 1-dimensional ``values``
numpy ndarray.
Example:
>>> values = np.array([ch0_samp0, ch0_samp1, ch1_samp0, ch1_samp1],
dtype=float)
In this example the first two samples will be output via the first
channel and the two following samples will be output via the second
channel.
Or you can use a 2-dimensional numpy ndarray like this:
>>> values = np.array([[ch0_samp0, ch0_samp1],[ch1_samp0, ch1_samp1]],
dtype=float)
Any call to this method will cancel any previous task even if it has
not yet completed.
:param sampling_freq: The sampling frequency in samples per second.
:param values: A numpy ndarray of sample values (in volts) to load in
the device.
"""
self.clear_pending_task()
values = values.flatten()
t = self.daq.Task()
t.CreateAOVoltageChan(self.channels, b"",
min(values), max(values)+1,
self.daq.DAQmx_Val_Volts, None)
channel_number = (c_ulong*1)()
t.GetTaskNumChans(channel_number)
nb_values = len(values)
if nb_values % channel_number[0]:
self.daq.DAQmxClearTask(t.taskHandle)
raise ValueError("The size of the values array must be a multiple "
"of the number of channels ({})"
.format(channel_number[0]))
samps_per_channel = nb_values // channel_number[0]
t.CfgSampClkTiming(self.clock, sampling_freq,
self.daq.DAQmx_Val_Rising,
self.daq.DAQmx_Val_FiniteSamps, samps_per_channel)
num_samps_written = self.daq.int32()
values = np.require(values, dtype=float,
requirements=["C_CONTIGUOUS", "WRITEABLE"])
ret = t.WriteAnalogF64(samps_per_channel, False, 0,
self.daq.DAQmx_Val_GroupByChannel, values,
byref(num_samps_written), None)
if num_samps_written.value != samps_per_channel:
raise IOError("Error: only {} sample values per channel were"
"written".format(num_samps_written.value))
if ret:
raise IOError("Error while writing samples to the channel buffer")
done_cb = self.daq.DAQmxDoneEventCallbackPtr(self._done_callback)
self.task = t.taskHandle
self.daq.DAQmxRegisterDoneEvent(t.taskHandle, 0, done_cb, None)
t.StartTask()
def clear_pending_task(self):
"""Clear any pending task."""
if self.task is not None:
self.daq.DAQmxClearTask(self.task)
self.task = None
def close(self):
"""Free any allocated resources."""
self.clear_pending_task()

View File

@ -1,180 +0,0 @@
import numpy as np
from artiq.language.core import *
from artiq.language.units import *
from artiq.wavesynth.compute_samples import Synthesizer
class SegmentSequenceError(Exception):
"""Raised when attempting to play back a named segment which is not the
next in the sequence."""
pass
class InvalidatedError(Exception):
"""Raised when attemting to use a frame or segment that has been
invalidated (due to disarming the DAQmx)."""
pass
class ArmError(Exception):
"""Raised when attempting to arm an already armed DAQmx, to modify the
program of an armed DAQmx, or to play a segment on a disarmed DAQmx."""
pass
def _ceil_div(a, b):
return (a + b - 1)//b
def _compute_duration_mu(nsamples, ftw, acc_width):
# This returns the precise duration so that the clock can be stopped
# exactly at the next rising edge (RTLink commands take precedence over
# toggling from the accumulator).
# If segments are played continuously, replacement of the stop command
# will keep the clock running. If the FTW is not a power of two, note that
# the accumulator is reset at that time, which causes jitter and frequency
# inaccuracy.
# Formally:
# duration *ftw >= nsamples*2**acc_width
# (duration - 1)*ftw < nsamples*2**acc_width
return _ceil_div(nsamples*2**acc_width, ftw)
class _Segment:
def __init__(self, frame, segment_number):
self.frame = frame
self.segment_number = segment_number
self.lines = []
# for @kernel
self.core = frame.daqmx.core
def add_line(self, duration, channel_data):
if self.frame.invalidated:
raise InvalidatedError
if self.frame.daqmx.armed:
raise ArmError
self.lines.append((duration, channel_data))
def get_sample_count(self):
return sum(duration for duration, _ in self.lines)
@kernel
def advance(self):
if self.frame.invalidated:
raise InvalidatedError
if not self.frame.daqmx.armed:
raise ArmError
# If the frame is currently being played, check that we are next.
if (self.frame.daqmx.next_segment >= 0
and self.frame.daqmx.next_segment != self.segment_number):
raise SegmentSequenceError
self.frame.advance()
class _Frame:
def __init__(self, daqmx):
self.daqmx = daqmx
self.segments = []
self.segment_count = 0 # == len(self.segments), used in kernel
self.invalidated = False
# for @kernel
self.core = self.daqmx.core
def create_segment(self, name=None):
if self.invalidated:
raise InvalidatedError
if self.daqmx.armed:
raise ArmError
segment = _Segment(self, self.segment_count)
if name is not None:
if hasattr(self, name):
raise NameError("Segment name already exists")
setattr(self, name, segment)
self.segments.append(segment)
self.segment_count += 1
return segment
def _arm(self):
self.segment_delays = [
_compute_duration_mu(s.get_sample_count(),
self.daqmx.sample_rate,
self.daqmx.clock.acc_width)
for s in self.segments]
def _invalidate(self):
self.invalidated = True
def _get_samples(self):
program = [[
{
"dac_divider": 1,
"duration": duration,
"channel_data": channel_data,
} for segment in self.segments
for duration, channel_data in segment.lines]]
synth = Synthesizer(self.daqmx.channel_count, program)
synth.select(0)
# not setting any trigger flag in the program causes the whole
# waveform to be computed here for all segments.
# slicing the segments is done by stopping the clock.
return synth.trigger()
@kernel
def advance(self):
if self.invalidated:
raise InvalidatedError
if not self.daqmx.armed:
raise ArmError
self.daqmx.clock.set(self.daqmx.sample_rate)
delay_mu(self.segment_delays[self.daqmx.next_segment])
self.daqmx.next_segment += 1
self.daqmx.clock.stop()
# test for end of frame
if self.daqmx.next_segment == self.segment_count:
self.daqmx.next_segment = -1
class CompoundDAQmx:
def __init__(self, dmgr, daqmx_device, clock_device, channel_count,
sample_rate, sample_rate_in_mu=False):
self.core = dmgr.get("core")
self.daqmx = dmgr.get(daqmx_device)
self.clock = dmgr.get(clock_device)
self.channel_count = channel_count
if sample_rate_in_mu:
self.sample_rate = sample_rate
else:
self.sample_rate = self.clock.frequency_to_ftw(sample_rate)
self.frame = None
self.next_segment = -1
self.armed = False
def disarm(self):
if self.frame is not None:
self.frame._invalidate()
self.frame = None
self.armed = False
def arm(self):
if self.armed:
raise ArmError
if self.frame is not None:
self.frame._arm()
self.daqmx.load_sample_values(
self.clock.ftw_to_frequency(self.sample_rate),
np.array(self.frame._get_samples()))
self.armed = True
def create_frame(self):
if self.armed:
raise ArmError
self.frame = _Frame(self)
return self.frame

View File

@ -1,48 +0,0 @@
#!/usr/bin/env python3.5
# Yann Sionneau <ys@m-labs.hk>, 2015
import argparse
import sys
from artiq.protocols.pc_rpc import simple_server_loop
from artiq.devices.pxi6733.driver import DAQmx, DAQmxSim
from artiq.tools import *
def get_argparser():
parser = argparse.ArgumentParser(description="NI PXI 6733 controller")
simple_network_args(parser, 3256)
parser.add_argument("-C", "--channels", default=None,
help="List of channels (e.g. Dev1/ao0, Dev1/ao1:3).")
parser.add_argument("-c", "--clock", default="PFI5",
help="Input clock pin name (default: PFI5)")
parser.add_argument("--simulation", action='store_true',
help="Put the driver in simulation mode, even if "
"--channels is used.")
verbosity_args(parser)
return parser
def main():
args = get_argparser().parse_args()
init_logger(args)
if not args.simulation and args.channels is None:
print("You need to specify either --simulation or -C/--channels "
"argument. Use --help for more information.")
sys.exit(1)
if args.simulation:
daq = DAQmxSim()
else:
daq = DAQmx(args.channels,
args.clock)
try:
simple_server_loop({"pxi6733": daq},
bind_address_from_args(args), args.port)
finally:
daq.close()
if __name__ == "__main__":
main()

View File

@ -1,41 +0,0 @@
import unittest
import os
import numpy as np
from artiq.devices.pxi6733.driver import DAQmxSim, DAQmx
pxi6733_device = os.getenv("PXI6733_DEVICE")
pxi6733_analog_output = os.getenv("PXI6733_ANALOG_OUTPUT")
pxi6733_clock = os.getenv("PXI6733_CLOCK")
class GenericPXI6733Test:
def test_load_sample_values(self):
test_vector = np.array([1.0, 2.0, 3.0, 4.0], dtype=float)
self.cont.load_sample_values(test_vector)
def test_close_1(self):
self.cont.close()
def test_close_2(self):
test_vector = np.array([1.0, 2.0, 3.0, 4.0], dtype=float)
self.cont.load_sample_values(test_vector)
self.cont.close()
@unittest.skipUnless(pxi6733_device, "no hardware")
class TestPXI6733(GenericPXI6733Test, unittest.TestCase):
def setUp(self):
args = dict()
args["device"] = bytes(pxi6733_device, "ascii")
args["analog_output"] = bytes(pxi6733_analog_output, "ascii") \
if pxi6733_analog_output else b"ao0"
args["clock"] = bytes(pxi6733_clock, "ascii") \
if pxi6733_clock else b"PFI5"
self.cont = DAQmx(**args)
class TestPXI6733Sim(GenericPXI6733Test, unittest.TestCase):
def setUp(self):
self.cont = DAQmxSim()

View File

@ -27,7 +27,6 @@ build:
- novatech409b_controller = artiq.frontend.novatech409b_controller:main
- pdq2_client = artiq.frontend.pdq2_client:main
- pdq2_controller = artiq.frontend.pdq2_controller:main
- pxi6733_controller = artiq.frontend.pxi6733_controller:main
- thorlabs_tcube_controller = artiq.frontend.thorlabs_tcube_controller:main
requirements:
@ -47,7 +46,6 @@ requirements:
- sphinx-argparse
- h5py
- dateutil
- pydaqmx
- quamash
- pyqtgraph
- pygit2

View File

@ -28,5 +28,3 @@ Default network ports
+--------------------------+--------------+
| Thorlabs T-Cube | 3255 |
+--------------------------+--------------+
| NI PXI6733 | 3256 |
+--------------------------+--------------+

View File

@ -170,49 +170,3 @@ Then, send commands to it via the ``artiq_rpctool`` utility::
$ artiq_rpctool ::1 3255 call set_tpz_io_settings 150 1 # set maximum output voltage to 150 V
$ artiq_rpctool ::1 3255 call set_output_volts 150 # set output voltage to 150 V
$ artiq_rpctool ::1 3255 call close # close the device
NI PXI6733
----------
Driver
++++++
.. automodule:: artiq.devices.pxi6733.driver
:members:
Controller
++++++++++
.. argparse::
:ref: artiq.frontend.pxi6733_controller.get_argparser
:prog: pxi6733_controller
PXI6733 controller usage example
++++++++++++++++++++++++++++++++
This controller has only been tested on Windows so far.
To use this controller you need first to install the NI-DAQmx driver
from http://www.ni.com/downloads/ni-drivers/f/.
Then you also need to install PyDAQmx python module::
$ git clone https://github.com/clade/PyDAQmx
$ cd PyDAQmx
$ C:\Python34\Tools\Scripts\2to3.py -w .
$ python setup.py build
$ python setup.py install
Then, you can run the PXI6733 controller::
$ pxi6733_controller -d Dev1
Then, send a load_sample_values command to it via the ``artiq_rpctool`` utility::
$ artiq_rpctool ::1 3256 list-targets
Target(s): pxi6733
$ artiq_rpctool ::1 3256 call load_sample_values 'np.array([1.0, 2.0, 3.0, 4.0], dtype=float)'
This loads 4 voltage values as a numpy float array: 1.0 V, 2.0 V, 3.0 V, 4.0 V
Then the device is set up to output those samples at each rising edge of the clock.

View File

@ -12,7 +12,7 @@ if sys.version_info[:3] < (3, 5, 1):
requirements = [
"sphinx", "sphinx-argparse", "pyserial", "numpy", "scipy",
"python-dateutil", "prettytable", "h5py", "pydaqmx",
"python-dateutil", "prettytable", "h5py",
"quamash", "pyqtgraph", "pygit2", "aiohttp",
"llvmlite_artiq", "pythonparser", "python-Levenshtein",
"lit", "OutputCheck",
@ -36,7 +36,6 @@ scripts = [
"novatech409b_controller=artiq.frontend.novatech409b_controller:main",
"pdq2_client=artiq.frontend.pdq2_client:main",
"pdq2_controller=artiq.frontend.pdq2_controller:main",
"pxi6733_controller=artiq.frontend.pxi6733_controller:main",
"thorlabs_tcube_controller=artiq.frontend.thorlabs_tcube_controller:main",
]