forked from M-Labs/artiq
pxi6733: add driver and controller
This commit is contained in:
parent
e19f8896f0
commit
69388ccc1a
|
@ -0,0 +1,69 @@
|
||||||
|
# Yann Sionneau <ys@m-labs.hk>, 2015
|
||||||
|
|
||||||
|
from ctypes import byref
|
||||||
|
|
||||||
|
|
||||||
|
class DAQmxSim:
|
||||||
|
def load_sample_values(self, values):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def done_callback_py(taskhandle, status, callback_data):
|
||||||
|
callback_data.daq.DAQmxClearTask(taskhandle)
|
||||||
|
callback_data.tasks.remove(taskhandle)
|
||||||
|
|
||||||
|
|
||||||
|
class DAQmx:
|
||||||
|
"""NI PXI6733 DAQ interface."""
|
||||||
|
|
||||||
|
def __init__(self, device, analog_output, clock):
|
||||||
|
import PyDAQmx as daq
|
||||||
|
|
||||||
|
self.device = device
|
||||||
|
self.analog_output = analog_output
|
||||||
|
self.clock = clock
|
||||||
|
self.tasks = []
|
||||||
|
self.daq = daq
|
||||||
|
|
||||||
|
def load_sample_values(self, values):
|
||||||
|
"""Load sample values into PXI 6733 device.
|
||||||
|
|
||||||
|
This loads sample values into the PXI 6733 device and then
|
||||||
|
configures a task to output those samples at each clock rising
|
||||||
|
edge.
|
||||||
|
|
||||||
|
A callback is registered to clear the task (unallocate resources)
|
||||||
|
when the task has completed.
|
||||||
|
|
||||||
|
:param values: A numpy array of sample values to load in the device.
|
||||||
|
"""
|
||||||
|
|
||||||
|
t = self.daq.Task()
|
||||||
|
t.CreateAOVoltageChan(self.device+b"/"+self.analog_output, b"",
|
||||||
|
min(values), max(values),
|
||||||
|
self.daq.DAQmx_Val_Volts, None)
|
||||||
|
t.CfgSampClkTiming(self.clock, 1000.0, self.daq.DAQmx_Val_Rising,
|
||||||
|
self.daq.DAQmx_Val_FiniteSamps, len(values))
|
||||||
|
num_samps_written = self.daq.int32()
|
||||||
|
ret = t.WriteAnalogF64(len(values), False, 0,
|
||||||
|
self.daq.DAQmx_Val_GroupByChannel, values,
|
||||||
|
byref(num_samps_written), None)
|
||||||
|
if num_samps_written.value != len(values):
|
||||||
|
raise Exception("Error: only {} sample values were written"
|
||||||
|
.format(num_samps_written.value))
|
||||||
|
if ret:
|
||||||
|
raise Exception("Error while writing samples to channel's buffer")
|
||||||
|
|
||||||
|
done_callback = self.daq.DAQmxDoneEventCallbackPtr(done_callback_py)
|
||||||
|
self.tasks.append(t.taskHandle)
|
||||||
|
self.daq.DAQmxRegisterDoneEvent(t.taskHandle, 0, done_callback, self)
|
||||||
|
t.StartTask()
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Clear all pending tasks."""
|
||||||
|
|
||||||
|
for t in self.tasks:
|
||||||
|
self.daq.DAQmxClearTask(t)
|
|
@ -0,0 +1,43 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
# Yann Sionneau <ys@m-labs.hk>, 2015
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
from artiq.protocols.pc_rpc import simple_server_loop
|
||||||
|
from artiq.devices.pxi6733.driver import DAQmx, DAQmxSim
|
||||||
|
from artiq.tools import verbosity_args, init_logger, simple_network_args
|
||||||
|
|
||||||
|
|
||||||
|
def get_argparser():
|
||||||
|
parser = argparse.ArgumentParser(description="NI PXI 6733 controller")
|
||||||
|
simple_network_args(parser, 3256)
|
||||||
|
parser.add_argument("-d", "--device", default=None,
|
||||||
|
help="Device name (e.g. Dev1)."
|
||||||
|
" Omit for simulation mode.")
|
||||||
|
parser.add_argument("-c", "--clock", default="PFI5",
|
||||||
|
help="Input clock pin name (default: PFI5)")
|
||||||
|
parser.add_argument("-a", "--analog-output", default="ao0",
|
||||||
|
help="Analog output pin name (default: ao0)")
|
||||||
|
verbosity_args(parser)
|
||||||
|
return parser
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
args = get_argparser().parse_args()
|
||||||
|
init_logger(args)
|
||||||
|
|
||||||
|
if args.device is None:
|
||||||
|
daq = DAQmxSim()
|
||||||
|
else:
|
||||||
|
daq = DAQmx(bytes(args.device, "ascii"),
|
||||||
|
bytes(args.analog_output, "ascii"),
|
||||||
|
bytes(args.clock, "ascii"))
|
||||||
|
|
||||||
|
try:
|
||||||
|
simple_server_loop({"pxi6733": daq},
|
||||||
|
args.bind, args.port)
|
||||||
|
finally:
|
||||||
|
daq.close()
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
|
@ -0,0 +1,44 @@
|
||||||
|
import unittest
|
||||||
|
import os
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
from artiq.devices.pxi6733.driver import DAQmxSim, DAQmx
|
||||||
|
|
||||||
|
|
||||||
|
pxi6733_device = os.getenv("PXI6733_DEVICE")
|
||||||
|
pxi6733_analog_output = os.getenv("PXI6733_ANALOG_OUTPUT")
|
||||||
|
pxi6733_clock = os.getenv("PXI6733_CLOCK")
|
||||||
|
|
||||||
|
|
||||||
|
class GenericPXI6733Test:
|
||||||
|
def test_load_sample_values(self):
|
||||||
|
test_vector = np.array([1.0, 2.0, 3.0, 4.0], dtype=float)
|
||||||
|
self.cont.load_sample_values(test_vector)
|
||||||
|
self.assertTrue(True)
|
||||||
|
|
||||||
|
def test_close_1(self):
|
||||||
|
self.cont.close()
|
||||||
|
self.assertTrue(True)
|
||||||
|
|
||||||
|
def test_close_2(self):
|
||||||
|
test_vector = np.array([1.0, 2.0, 3.0, 4.0], dtype=float)
|
||||||
|
self.cont.load_sample_values(test_vector)
|
||||||
|
self.cont.close()
|
||||||
|
self.assertTrue(True)
|
||||||
|
|
||||||
|
|
||||||
|
@unittest.skipUnless(pxi6733_device, "no hardware")
|
||||||
|
class TestPXI6733(GenericPXI6733Test, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
args = dict()
|
||||||
|
args["device"] = bytes(pxi6733_device, "ascii")
|
||||||
|
args["analog_output"] = bytes(pxi6733_analog_output, "ascii") \
|
||||||
|
if pxi6733_analog_output else b"ao0"
|
||||||
|
args["clock"] = bytes(pxi6733_clock, "ascii") \
|
||||||
|
if pxi6733_clock else b"PFI5"
|
||||||
|
self.cont = DAQmx(**args)
|
||||||
|
|
||||||
|
|
||||||
|
class TestPXI6733Sim(GenericPXI6733Test, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.cont = DAQmxSim()
|
Loading…
Reference in New Issue