From 69388ccc1a51d4906329735d6a9f5b523d2e9a15 Mon Sep 17 00:00:00 2001 From: Yann Sionneau Date: Tue, 21 Apr 2015 16:07:18 +0800 Subject: [PATCH] pxi6733: add driver and controller --- artiq/devices/pxi6733/__init__.py | 0 artiq/devices/pxi6733/driver.py | 69 ++++++++++++++++++++++++++++ artiq/frontend/pxi6733_controller.py | 43 +++++++++++++++++ artiq/test/pxi6733.py | 44 ++++++++++++++++++ 4 files changed, 156 insertions(+) create mode 100644 artiq/devices/pxi6733/__init__.py create mode 100644 artiq/devices/pxi6733/driver.py create mode 100755 artiq/frontend/pxi6733_controller.py create mode 100644 artiq/test/pxi6733.py diff --git a/artiq/devices/pxi6733/__init__.py b/artiq/devices/pxi6733/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/artiq/devices/pxi6733/driver.py b/artiq/devices/pxi6733/driver.py new file mode 100644 index 000000000..77cfb3078 --- /dev/null +++ b/artiq/devices/pxi6733/driver.py @@ -0,0 +1,69 @@ +# Yann Sionneau , 2015 + +from ctypes import byref + + +class DAQmxSim: + def load_sample_values(self, values): + pass + + def close(self): + pass + + +def done_callback_py(taskhandle, status, callback_data): + callback_data.daq.DAQmxClearTask(taskhandle) + callback_data.tasks.remove(taskhandle) + + +class DAQmx: + """NI PXI6733 DAQ interface.""" + + def __init__(self, device, analog_output, clock): + import PyDAQmx as daq + + self.device = device + self.analog_output = analog_output + self.clock = clock + self.tasks = [] + self.daq = daq + + def load_sample_values(self, values): + """Load sample values into PXI 6733 device. + + This loads sample values into the PXI 6733 device and then + configures a task to output those samples at each clock rising + edge. + + A callback is registered to clear the task (unallocate resources) + when the task has completed. + + :param values: A numpy array of sample values to load in the device. + """ + + t = self.daq.Task() + t.CreateAOVoltageChan(self.device+b"/"+self.analog_output, b"", + min(values), max(values), + self.daq.DAQmx_Val_Volts, None) + t.CfgSampClkTiming(self.clock, 1000.0, self.daq.DAQmx_Val_Rising, + self.daq.DAQmx_Val_FiniteSamps, len(values)) + num_samps_written = self.daq.int32() + ret = t.WriteAnalogF64(len(values), False, 0, + self.daq.DAQmx_Val_GroupByChannel, values, + byref(num_samps_written), None) + if num_samps_written.value != len(values): + raise Exception("Error: only {} sample values were written" + .format(num_samps_written.value)) + if ret: + raise Exception("Error while writing samples to channel's buffer") + + done_callback = self.daq.DAQmxDoneEventCallbackPtr(done_callback_py) + self.tasks.append(t.taskHandle) + self.daq.DAQmxRegisterDoneEvent(t.taskHandle, 0, done_callback, self) + t.StartTask() + + def close(self): + """Clear all pending tasks.""" + + for t in self.tasks: + self.daq.DAQmxClearTask(t) diff --git a/artiq/frontend/pxi6733_controller.py b/artiq/frontend/pxi6733_controller.py new file mode 100755 index 000000000..755dfa2bd --- /dev/null +++ b/artiq/frontend/pxi6733_controller.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 +# Yann Sionneau , 2015 + +import argparse + +from artiq.protocols.pc_rpc import simple_server_loop +from artiq.devices.pxi6733.driver import DAQmx, DAQmxSim +from artiq.tools import verbosity_args, init_logger, simple_network_args + + +def get_argparser(): + parser = argparse.ArgumentParser(description="NI PXI 6733 controller") + simple_network_args(parser, 3256) + parser.add_argument("-d", "--device", default=None, + help="Device name (e.g. Dev1)." + " Omit for simulation mode.") + parser.add_argument("-c", "--clock", default="PFI5", + help="Input clock pin name (default: PFI5)") + parser.add_argument("-a", "--analog-output", default="ao0", + help="Analog output pin name (default: ao0)") + verbosity_args(parser) + return parser + + +def main(): + args = get_argparser().parse_args() + init_logger(args) + + if args.device is None: + daq = DAQmxSim() + else: + daq = DAQmx(bytes(args.device, "ascii"), + bytes(args.analog_output, "ascii"), + bytes(args.clock, "ascii")) + + try: + simple_server_loop({"pxi6733": daq}, + args.bind, args.port) + finally: + daq.close() + +if __name__ == "__main__": + main() diff --git a/artiq/test/pxi6733.py b/artiq/test/pxi6733.py new file mode 100644 index 000000000..f1e52bd2c --- /dev/null +++ b/artiq/test/pxi6733.py @@ -0,0 +1,44 @@ +import unittest +import os +import numpy as np + +from artiq.devices.pxi6733.driver import DAQmxSim, DAQmx + + +pxi6733_device = os.getenv("PXI6733_DEVICE") +pxi6733_analog_output = os.getenv("PXI6733_ANALOG_OUTPUT") +pxi6733_clock = os.getenv("PXI6733_CLOCK") + + +class GenericPXI6733Test: + def test_load_sample_values(self): + test_vector = np.array([1.0, 2.0, 3.0, 4.0], dtype=float) + self.cont.load_sample_values(test_vector) + self.assertTrue(True) + + def test_close_1(self): + self.cont.close() + self.assertTrue(True) + + def test_close_2(self): + test_vector = np.array([1.0, 2.0, 3.0, 4.0], dtype=float) + self.cont.load_sample_values(test_vector) + self.cont.close() + self.assertTrue(True) + + +@unittest.skipUnless(pxi6733_device, "no hardware") +class TestPXI6733(GenericPXI6733Test, unittest.TestCase): + def setUp(self): + args = dict() + args["device"] = bytes(pxi6733_device, "ascii") + args["analog_output"] = bytes(pxi6733_analog_output, "ascii") \ + if pxi6733_analog_output else b"ao0" + args["clock"] = bytes(pxi6733_clock, "ascii") \ + if pxi6733_clock else b"PFI5" + self.cont = DAQmx(**args) + + +class TestPXI6733Sim(GenericPXI6733Test, unittest.TestCase): + def setUp(self): + self.cont = DAQmxSim()