Source code for crappy.inout.daqmx

# coding: utf-8

import numpy as np
from time import time
from typing import List, Optional, Iterable
from dataclasses import dataclass
import logging

from .meta_inout import InOut
from .._global import OptionalModule
try:
  import PyDAQmx
except (ModuleNotFoundError, ImportError):
  PyDAQmx = OptionalModule("PyDAQmx")


@dataclass
class _Channel:
  """This class is a simple structure holding all the attributes a DAQmx
  channel can have."""

  name: str
  range_num: float = 5
  gain: float = 1
  offset: float = 0
  make_zero: bool = False


[docs] class DAQmx(InOut): """This class can drive data acquisition hardware from National Instruments. It is similar to :class:`~crappy.inout.NIDAQmx` InOut, except it relies on the :mod:`PyDAQmx` module. It was written and tested on a USB 6008 DAQ board, but should work with other instruments as well. Note: This class requires the NIDAQmx C driver to be installed, as well as the :mod:`PyDAQmx` module. .. versionadded:: 1.4.0 .. versionchanged:: 2.0.0 renamed from *Daqmx* to *DAQmx* """
[docs] def __init__(self, device: str = 'Dev1', channels: Optional[Iterable[str]] = None, gain: Optional[Iterable[float]] = None, offset: Optional[Iterable[float]] = None, ranges: Optional[Iterable[float]] = None, make_zero: Optional[Iterable[bool]] = None, sample_rate: float = 10000, out_channels: Optional[Iterable[str]] = None, out_gain: Optional[Iterable[float]] = None, out_offset: Optional[Iterable[float]] = None, out_ranges: Optional[Iterable[float]] = None) -> None: """Sets the arguments and initializes the parent class. Args: device: The name of the device to open, as a :obj:`str`. channels: An iterable (like a :obj:`list` or a :obj:`tuple`) containing the names of the channels to use as inputs, given as :obj:`str`. Typical names for inputs are ``'aiX'```, with `X` an integer. gain: An iterable (like a :obj:`list` or a :obj:`tuple`) containing for each input channel the gain to apply to the measured voltage, as a :obj:`float`. The returned voltage is calculated as follows :math:`returned\_voltage = gain * measured\_voltage + offset`. If not given, no gain is applied to the measured values. offset: An iterable (like a :obj:`list` or a :obj:`tuple`) containing for each input channel the offset to apply to the measured voltage, as a :obj:`float`. The returned voltage is calculated as follows :math:`returned\_voltage = gain * measured\_voltage + offset`. If not given, no offset is applied to the measured values. ranges: An iterable (like a :obj:`list` or a :obj:`tuple`) containing for each input channel the range to set for that channel, as a :obj:`float`. The possible range values are : :: 0.5, 1., 2.5, 5. If not given, all input channels will be set to the range `5`. .. versionchanged:: 1.5.10 renamed from *range* to *ranges* make_zero: An iterable (like a :obj:`list` or a :obj:`tuple`) containing for each input channel a :obj:`bool` indicating whether the channel should be zeroed or not. If so, data will be acquired on this channel before the test starts, and a compensation value will be deduced so that the offset of this channel is `0`. **It will only take effect if the** ``make_zero_delay`` **argument of the** :class:`~crappy.blocks.IOBlock` **controlling the DAQ is set** ! If not given, the channels are by default not zeroed. sample_rate: The frequency of the acquisition, as a :obj:`float`. The higher this number, the more noise there is on the signal but the higher the acquisition frequency. out_channels: An iterable (like a :obj:`list` or a :obj:`tuple`) containing the names of the channels to use as outputs, given as :obj:`str`. Typical names for outputs are ``'aoX'``, with `X` an integer. out_gain: An iterable (like a :obj:`list` or a :obj:`tuple`) containing for each output channel the gain to apply to the command voltage, as a :obj:`float`. The set voltage is calculated as follows : :math:`set\_voltage = out\_gain * command\_voltage + out\_offset`. If not given, no gain is applied to the command values. out_offset: An iterable (like a :obj:`list` or a :obj:`tuple`) containing for each output channel the offset to apply to the command voltage, as a :obj:`float`. The set voltage is calculated as follows : :math:`set\_voltage = out\_gain * command\_voltage + out\_offset`. If not given, no offset is applied to the command values. out_ranges: An iterable (like a :obj:`list` or a :obj:`tuple`) containing for each output channel the range to set for that channel, as a :obj:`float`. The possible range values are : :: 0.5, 1., 2.5, 5. If not given, all output channels will be set to the range `5`. .. versionchanged:: 1.5.10 renamed from *out_range* to *out_ranges* Note: All the iterables given as arguments for the input channels should have the same length, and same for the output channels. If that's not the case, all the given iterables are treated as if they had the same length as the shortest given one. .. versionremoved:: 1.5.10 *nperscan* argument """ self._handle = None self._out_handle = None super().__init__() self._device = device self._sample_rate = sample_rate # Setting the defaults for arguments that are not given if channels is None: channels = list() if out_channels is None: out_channels = list() if ranges is None: ranges = [5 for _ in channels] if gain is None: gain = [1 for _ in channels] if offset is None: offset = [0 for _ in channels] if make_zero is None: make_zero = [False for _ in channels] if out_ranges is None: out_ranges = [5 for _ in out_channels] if out_gain is None: out_gain = [1 for _ in out_channels] if out_offset is None: out_offset = [0 for _ in out_channels] # Creating the channel objects self._channels = [_Channel(name=chan, range_num=r_num, gain=g, offset=off, make_zero=make_z) for chan, r_num, g, off, make_z in zip(channels, ranges, gain, offset, make_zero)] self.log(logging.DEBUG, f"Input channels: {self._channels}") self._out_channels = [_Channel(name=chan, range_num=r_num, gain=g, offset=off) for chan, r_num, g, off in zip(out_channels, out_ranges, out_gain, out_offset)] self.log(logging.DEBUG, f"Output channels: {self._out_channels}") self._handle, self._out_handle = None, None self._n_reads = PyDAQmx.int32()
[docs] def open(self) -> None: """Opens the device and initializes the input and output channels.""" self.log(logging.INFO, "Opening the connection to the DAQmx device") PyDAQmx.DAQmxResetDevice(self._device) if self._channels: # Opening the device for reading self._handle = PyDAQmx.TaskHandle() PyDAQmx.DAQmxCreateTask('', PyDAQmx.byref(self._handle)) # Setting up the input channels self.log(logging.INFO, "Setting up the input channels") for chan in self._channels: self.log(logging.DEBUG, f"Setting up the input channel " f"{self._device}/{chan.name}") PyDAQmx.DAQmxCreateAIVoltageChan(self._handle, f"{self._device}/{chan.name}", '', PyDAQmx.DAQmx_Val_Cfg_Default, 0, chan.range_num, PyDAQmx.DAQmx_Val_Volts, None) if self._out_channels: # Opening the device for writing self._out_handle = PyDAQmx.TaskHandle() PyDAQmx.DAQmxCreateTask('', PyDAQmx.byref(self._out_handle)) # Setting up the output channels self.log(logging.INFO, "Setting up the output channels") for chan in self._out_channels: self.log(logging.DEBUG, f"Setting up the output channel " f"{self._device}/{chan.name}") PyDAQmx.DAQmxCreateAOVoltageChan(self._out_handle, f"{self._device}/{chan.name}", '', 0, chan.range_num, PyDAQmx.DAQmx_Val_Volts, None) PyDAQmx.DAQmxStartTask(self._out_handle)
[docs] def make_zero(self, delay: float) -> None: """Overriding the method of the parent class, because the user can choose which channels should be zeroed or not. It simply performs the regular zeroing, and resets the compensation to zero for the channels that shouldn't be zeroed. """ # No need to acquire data if no channel should be zeroed if any(chan.make_zero for chan in self._channels): # Acquiring the data super().make_zero(delay) # Proceed only if the acquisition went fine if self._compensations: # Resetting the compensation for channels that shouldn't be zeroed self._compensations = [comp if chan.make_zero else 0 for comp, chan in zip(self._compensations, self._channels)]
[docs] def get_data(self) -> List[float]: """Creates and starts an acquisition task, and returns the acquired values.""" # Defining the acquisition task and the buffer, and starting it PyDAQmx.DAQmxCfgSampClkTiming(self._handle, '', self._sample_rate, PyDAQmx.DAQmx_Val_Rising, PyDAQmx.DAQmx_Val_FiniteSamps, 2) PyDAQmx.DAQmxStartTask(self._handle) data = np.empty(len(self._channels), dtype=np.float64) # Reading the acquired values and stopping the task t0 = time() PyDAQmx.DAQmxReadAnalogF64(self._handle, 1, 10.0, PyDAQmx.DAQmx_Val_GroupByChannel, data, len(self._channels), PyDAQmx.byref(self._n_reads), None) PyDAQmx.DAQmxStopTask(self._handle) return [t0] + [data[i] * chan.gain + chan.offset for i, chan in enumerate(self._channels)]
[docs] def set_cmd(self, *cmd: float) -> None: """Sets the command value on the output channels. There should be as many commands as there are output channels. In case the numbers of channels and commands don't match, an exception is raised. """ # Applying the gains and offsets to the commands out_gains = [chan.gain for chan in self._out_channels] out_offsets = [chan.offset for chan in self._out_channels] data = np.array(cmd, dtype=np.float64) * out_gains + out_offsets # Setting the commands PyDAQmx.DAQmxWriteAnalogF64(self._out_handle, 1, 1, 10.0, PyDAQmx.DAQmx_Val_GroupByChannel, data, None, None)
[docs] def close(self) -> None: """Closes the processes of the input and output channels.""" # Stopping and closing the processes for data acquisition if self._handle is not None: self.log(logging.INFO, "Stopping the input channels") PyDAQmx.DAQmxStopTask(self._handle) PyDAQmx.DAQmxClearTask(self._handle) # Stopping and closing the processes for writing data if self._out_handle is not None: self.log(logging.INFO, "Stopping the output channels") PyDAQmx.DAQmxStopTask(self._out_handle) PyDAQmx.DAQmxClearTask(self._out_handle)