2
0
mirror of https://github.com/m-labs/artiq.git synced 2024-12-25 03:08:27 +08:00

novatech409b: cleanup

This commit is contained in:
Joe Britton 2015-06-19 15:58:06 -06:00 committed by Sebastien Bourdeauducq
parent 5a9bdb2e33
commit f850336537

View File

@ -15,10 +15,26 @@ class UnexpectedResponse(Exception):
class Novatech409B:
"""Driver for Novatech 409B 4-channel DDS"""
"""Driver for Novatech 409B 4-channel DDS.
# maximum frequency of Novatech 409B when using PLL and external reference
max_freq_with_pll = 171.1276031
All output channels are in range [0, 1, 2, 3].
All frequencies are in Hz.
All phases are in turns.
All amplitudes are in volts.
"""
error_codes = {
"?0": "Unrecognized Command",
"?1": "Bad Frequency",
"?2": "Bad AM Command",
"?3": "Input line too long",
"?4": "Bad Phase",
"?5": "Bad Time",
"?6": "Bad Mode",
"?7": "Bad Amp",
"?8": "Bad Constant",
"?f": "Bad Byte"
}
def __init__(self, serial_dev):
if serial_dev is None:
@ -32,68 +48,67 @@ class Novatech409B:
parity="N",
stopbits=1,
xonxoff=0,
timeout=0.2)
timeout=1.0)
self.setup()
def close(self):
"""Close the serial port"""
"""Close the serial port."""
if not self.simulation:
self.port.close()
def _ser_send(self, cmd, get_response=True):
"""send a string to the serial port
"""Send a string to the serial port."""
Routine for sending serial commands to device. It sends strings
and listens for a response terminated by a carriage return.
# Low-level routine for sending serial commands to device. It sends
# strings and listens for a response terminated by a carriage return.
# example:
# ser_send("F0 1.0") # sets the freq of channel 0 to 1.0 MHz
example:
ser_send("F0 1.0") # sets the freq of channel 0 to 1.0 MHz
:param cmd: a character string to send to device
:returns: None
"""
if self.simulation:
print(cmd)
else:
self.port.flush()
self.port.flushInput()
self.port.write((cmd + "\r\n").encode())
result = self.port.readline().rstrip().decode()
if get_response:
result = self.port.readline().rstrip().decode()
if result != "OK":
raise UnexpectedResponse(result)
logger.debug("got response from device: %s", result)
if result == "OK":
pass
elif result == "":
raise UnexpectedResponse("Response from device timed out")
else:
try:
errstr = self.error_codes[result]
except KeyError:
errstr = "Unrecognized reply: '{}'".format(result)
s = "Error Code = {ec}, {ecs}".format(ec=result, ecs=errstr)
raise UnexpectedResponse(s)
else:
pass
def reset(self):
"""command hardware reset of 409B
returns: None
"""
"""Hardware reset of 409B."""
self._ser_send("R", get_response=False)
time.sleep(1)
self.setup()
def setup(self):
"""initial setup of 409B
"""Initial setup of 409B."""
Setup the Novatech 409B with the following defaults.
* command echo off ("E d")
* external clock ("") 10 MHz sinusoid -1 to +7 dBm
# Setup the Novatech 409B with the following defaults:
# * command echo off ("E d")
# * external clock ("") 10 MHz sinusoid -1 to +7 dBm
:returns: None
"""
# disable command echo
self._ser_send("E d", get_response=False)
self.set_phase_continuous(True)
self.set_simultaneous_update(False)
def save_state_to_eeprom(self):
"""save current state to EEPROM
Saves current state into EEPROM and sets valid flag.
State used as default upon next power up or reset. """
"""Save current state to EEPROM."""
self._ser_send("S")
def set_phase_continuous(self, is_continuous):
"""toggle phase continuous mode
"""Toggle phase continuous mode.
Sends the "M n" command. This turns off the automatic
clearing of the phase register. In this mode, the phase
@ -109,7 +124,9 @@ class Novatech409B:
self._ser_send("M a")
def set_simultaneous_update(self, simultaneous):
"""Sends the "I m" command. In this mode an update
"""Set simultaneous update mode.
Sends the "I m" command. In this mode an update
pulse will not be sent to the DDS chip until
an "I p" command is sent. This is useful when it is
important to change all the outputs to new values
@ -121,140 +138,75 @@ class Novatech409B:
self._ser_send("I a")
def set_freq(self, ch_no, freq):
"""set_freq(ch_no,freq):
Set ch_no to frequency freq MHz"""
if ch_no < 0 or ch_no > 3:
raise ValueError("Incorrect channel number {}".format(ch_no))
if freq < 0.0 or freq > self.max_freq_with_pll:
raise ValueError("Incorrect frequency {}".format(freq))
# do this immediately, disable SimultaneousUpdate mode
"""Set frequency of one channel."""
self.set_simultaneous_update(False)
self._ser_send("F{:d} {:f}".format(ch_no, freq))
# Novatech expects MHz
self._ser_send("F{:d} {:f}".format(ch_no, freq/1e6))
def set_phase(self, ch_no, phase):
"""set DDS phase
:param ch_no: 0 to 3
:param phase: phase angle in cycles [0, 1]
:returns: None
"""
if ch_no < 0 or ch_no > 3:
raise ValueError("Incorrect channel number {}".format(ch_no))
if phase < 0 or phase > 1:
raise ValueError("Incorrect phase {}".format(phase))
"""Set phase of one channel."""
# do this immediately, disable SimultaneousUpdate mode
self.set_simultaneous_update(False)
# phase word is required by device
# N is an integer from 0 to 16383. Phase is set to
# N*360/16384 deg; in ARTIQ represent phase in cycles [0, 1]
phase_word = round(phase*16384)
if phase_word >= 16384:
phase_word -= 16384
phase_word = round(phase*16383)
cmd = "P{:d} {:d}".format(ch_no, phase_word)
self._ser_send(cmd)
def set_freq_all_phase_continuous(self, freq):
"""set frequency of all channels simultaneously
"""Set frequency of all channels simultaneously.
Set frequency of all channels simultaneously.
1) all DDSs are set to phase continuous mode
2) all DDSs are simultaneously set to new frequency
Together 1 and 2 ensure phase continuous frequency switching.
:param freq: frequency in MHz
:returns: None
"""
self.set_simultaneous_update(True)
self.set_phase_continuous(True)
for channel_num in range(4):
self.set_freq(channel_num, freq)
for i in range(4):
self.set_freq(i, freq)
# send command necessary to update all channels at the same time
self._ser_send("I p")
def set_phase_all(self, phase):
"""set phase of all DDS channels simultaneously
"""Set phase of all channels simultaneously."""
Set phase of all DDS channels at the same time. For example,::
set_phase_all([0, .25, 0.5, 0.75])
:param phase: vector of four phases (in cycles [0, 1])
:returns: None
"""
self.set_simultaneous_update(True)
# Note that this only works if the continuous
# phase switching is turned off.
self.set_phase_continuous(False)
for ch_no in range(4):
self.set_phase(ch_no, phase[ch_no])
for i in range(4):
self.set_phase(i, phase)
# send command necessary to update all channels at the same time
self._ser_send("I p")
def freq_sweep_all_phase_continuous(self, f0, f1, t):
""" sweep phase of all DDSs, phase continuous
def set_gain(self, ch_no, volts):
"""Set amplitude of one channel."""
Sweep frequency in a phase continuous fashion.
# due to error in Novatech it doesn't generate an error for
# dac_value>1024, so need to trap.
dac_value = int(math.floor(volts/0.51*1024))
if dac_value < 0 or dac_value > 1023:
s = "Amplitude out of range {v}".format(v=volts)
raise ValueError(s)
:param f0: starting frequency (MHz)
:param f1: ending frequency (MHz)
:param t: sweep duration (seconds)
:returns: None
"""
# TODO: consider using artiq.language.units
if f0 == f1:
return
# get sign of sweep
if f1 > f0:
df_sign = 1
else:
df_sign = -1
self.set_phase_continuous(True)
self.set_simultaneous_update(True)
# calculate delay
# note that a single call to self.set_freq_all_phase_continuous()
# takes time t_for_one_freq_set; fix duration empirically
t_for_one_freq_set = 0.264
dt = t_for_one_freq_set
n_steps = int(math.ceil(t/dt))
df = abs(f0-f1)/n_steps
for n in range(n_steps):
fnow = f0+n*df_sign*df
self.set_freq_all_phase_continuous(fnow)
self.set_freq_all_phase_continuous(f1)
def output_scale(self, ch_no, frac):
"""changes amplitude of a DDS
:param ch_no: DDS channel 0, 1, 2 or 3
:param frac: 0 to 1 (full attenuation to no attenuation)
:returns: None
"""
self.set_simultaneous_update(False)
dac_ch_no = int(math.floor(frac*1024))
s = "V{:d} {:d}".format(ch_no, dac_ch_no)
s = "V{:d} {:d}".format(ch_no, dac_value)
self._ser_send(s)
def output_scale_all(self, frac):
"""changes amplitude of all DDSs
:param frac: 0 to 1 (full attenuation to no attenuation)
"""
for ch_no in range(4):
self.output_scale(ch_no, frac)
def output_on_off(self, ch_no, on):
"""turns on or off the DDS
:param ch_no: DDS channel 0, 1, 2 or 3
"""
if on:
self.output_scale(ch_no, 1.0)
def get_status(self):
if self.simulation:
return ["00989680 2000 01F5 0000 00000000 00000000 000301",
"00989680 2000 01F5 0000 00000000 00000000 000301",
"00989680 2000 01F5 0000 00000000 00000000 000301",
"00989680 2000 01F5 0000 00000000 00000000 000301",
"80 BC0000 0000 0102 21"]
else:
self.output_scale(ch_no, 0.0)
def output_on_off_all(self, on):
"""turns on or off the all the DDSs"""
if on:
self.output_scale_all(1.0)
else:
self.output_scale_all(0.0)
# status message is multi-line
self.port.flushInput()
self.port.write(("QUE" + "\r\n").encode())
result = self.port.readlines()
result = [r.rstrip().decode() for r in result]
logger.debug("got device status: %s", result)
return result