Source code for crappy.inout.ni_daqmx

# coding: utf-8

from time import time
import numpy as np
from typing import List, Optional, Dict, Any, Iterable
from dataclasses import dataclass, field
from re import fullmatch
from collections import defaultdict
from itertools import chain
import logging

from .meta_inout import InOut
from .._global import OptionalModule

try:
  import nidaqmx
  from nidaqmx import stream_readers, stream_writers
except (ModuleNotFoundError, ImportError):
  nidaqmx = OptionalModule("nidaqmx")
  stream_readers = stream_writers = nidaqmx

thcp_map = {"B": 10047,
            "E": 10055,
            "J": 10072,
            "K": 10073,
            "N": 10077,
            "R": 10082,
            "S": 10085,
            "T": 10086}

unit_map = {"C": 10143,
            "F": 10144,
            "R": 10145,
            "K": 10325}


@dataclass
class _Channel:
  """This class is a simple structure holding all the attributes a NI DAQmx
  channel can have."""

  meas_type: str = 'voltage'
  name: Optional[str] = None
  kwargs: Dict[str, Any] = field(default_factory=dict)

  def update(self, dic_in: Dict[str, Any]) -> None:
    """Updates the channel keys based on the user input."""

    for key, val in dic_in.items():
      # The 'name' and 'type' keys are handled separately
      if key == 'name':
        self.name = val
      elif key == 'type':
        self.meas_type = val

      # All the other keys are put together in the kwargs attribute
      else:
        self.kwargs.update({key: val})


[docs] class NIDAQmx(InOut): """This class can drive data acquisition hardware from National Instruments. It is similar to :class:`~crappy.inout.DAQmx` InOut, except it relies on the :mod:`nidaqmx` module. It was written and tested on a USB 6008 DAQ board, but should work with other instruments as well. It can read single data points from digital and analog channels, read streams of data from analog channels, and set the voltage of analog and digital output channels. For analog input channels, several types of acquisition can be performed, like voltage, resistance, current, etc. .. versionadded:: 1.4.0 .. versionchanged:: 2.0.0 renamed from *Nidaqmx* to *NIDAQmx* """
[docs] def __init__(self, channels: Iterable[Dict[str, Any]], sample_rate: float = 100, n_samples: Optional[int] = None) -> None: """Sets the arguments and initializes the parent class. Args: channels: An iterable (like a :obj:`list` or a :obj:`tuple`) containing :obj:`dict` holding information on the channels to read data from or write data to. See below for the mandatory and optional keys for the dicts. Note that in streamer mode, the digital input channels are not available for reading. Also, only one type of analog input channel at a time can be read in streamer mode, with no restriction on the number of channels of this type. sample_rate: The target sample rate for data acquisition in streamer mode, given as a :obj:`float`. Default is `100` SPS. .. versionchanged:: 1.5.10 renamed from *samplerate* to *sample_rate* n_samples: The number of samples to acquire per chunk of data in streamer mode. Default is 20% of ``sample_rate``. .. versionchanged:: 1.5.10 renamed from *nsamples* to *n_samples* Note: - ``channels`` keys: - name: The name of the channel to drive, given with the following syntax : :: 'DevX/[a/d][i/o]Y' With `X` the index of the device, and `Y` the line on which the channel is. `d` stands for digital, `a` for analog, `i` for input and `o` for output. For digital channels, `DevX/d[i/o]Y` is internally converted to `DevX/port<Y // 8>/line<Y % 8>`. Example of a valid name : `Dev1/ao3`. - type: The type of data to read, for analog input channels. This field can take many different values, refer to the documentation of the :mod:`nidaqmx` for more details. This field is internally used for calling the method : :meth:`nidaqmx.task.add_ai_[type]_chan`. The default for this field is `'voltage'`, possible values include `'thrmcpl'`, `'bridge'`, `'current'` and `'resistance'`. - All the other keys will be given as kwargs to the :meth:`nidaqmx.task.add_ai_[type]_chan` method for analog input channels, to the :meth:`nidaqmx.task.add_ao_voltage_chan` for analog output channels, to :meth:`nidaqmx.task.add_do_chan` for digital output channels, and to :meth:`nidaqmx.task.add_di_chan` for digital input channels. Refer to :mod:`nidaqmx` documentation for the possible arguments and values. Note that for the `'thrmcpl'` analog input channel type, the `'thermocouple_type'` argument must be given as a letter, same for the `'units'` argument. They will be parsed internally. Also note that for the analog output channels and the analog input channels of type `'voltage'`, the `'min_val'` and `'max_val'` arguments are internally set by default to `0` and `5`. """ super().__init__() # Setting the number of samples per acquisition for streamer mode if n_samples is None: self._n_samples = max(1, int(sample_rate / 5)) else: self._n_samples = n_samples self._sample_rate = sample_rate # These attributes will be set later self._task_ao = None self._stream_ao = None self._task_di = None self._stream_di = None self._task_do = None self._stream_do = None self._stream_started = False # For analog inputs a dict is needed as each type is handled separately self._tasks_ai = dict() self._stream_ai = dict() self._digital_in = list() self._digital_out = list() self._analog_out = list() # Here as well a dict is needed for handling each analog input type self._analog_in = defaultdict(list) for channel in channels: # Making sure each channel has a 'name' attribute if 'name' not in channel: raise AttributeError("The given channels must contain the 'name' " "key !") # Parsing the channel name to retrieve info from it match = fullmatch(r'(.+)/(.+)(\d+)', channel['name']) if match is not None: dev, type_, num = match.groups() num = int(num) else: raise AttributeError(f"Invalid format for the channel name " f": {channel['name']} !\nIt should be " f"'Dev<dev num>/[a/d][i/o]<chan num>'") # Creating a _Channel object holding the information on the channel chan = _Channel() chan.update(channel) # Saving the channel to the right place and performing specific actions if type_ == 'ai': self._analog_in[chan.meas_type].append(chan) elif type_ == 'ao': self._analog_out.append(chan) elif type_ == 'di': chan.name = f"{dev}/port{num // 8}/line{num % 8}" self._digital_in.append(chan) elif type_ == 'do': chan.name = f"{dev}/port{num // 8}/line{num % 8}" self._digital_out.append(chan) else: raise ValueError(f"Wrong channel type : {type_} !\nIt should be " f"either 'ai', 'ao', 'di', or 'do'.")
[docs] def open(self) -> None: """Creates tasks and streams for analog output, digital input, digital output, and each type of analog input channels.""" # Creating one task for each type of analog input channel self._tasks_ai = {type_: nidaqmx.Task() for type_ in self._analog_in} # Iterating over all the analog input channels for type_, channels in self._analog_in.items(): for chan in channels: # Setting the min and max voltage for the voltage analog input channels if type_ == 'voltage': chan.kwargs['max_val'] = chan.kwargs.get('max_val', 5) chan.kwargs['min_val'] = chan.kwargs.get('min_val', 0) # Parsing the thermocouple related arguments if 'thermocouple_type' in chan.kwargs: chan.kwargs['thermocouple_type'] = thcp_map[ chan.kwargs['thermocouple_type']] # Included in the if as 'units' is an arg for other types of channels if 'units' in chan.kwargs: chan.kwargs['units'] = thcp_map[chan.kwargs['units']] # Adding the channel to the task with the given kwargs try: func = getattr(self._tasks_ai[type_].ai_channels, f'add_ai_{type_}_chan') func(chan.name, **chan.kwargs) except AttributeError: raise ValueError(f"Invalid channel type : {type_}") # Opening a stream for each analog input task self.log(logging.INFO, "Opening the streams for the analog input channels") self._stream_ai = { type_: stream_readers.AnalogMultiChannelReader(task.in_stream) for type_, task in self._tasks_ai.items()} # Creating a task and a stream for all the analog output channels if self._analog_out: self.log(logging.INFO, "Opening the streams for the analog output channels") self._task_ao = nidaqmx.Task() self._stream_ao = stream_writers.AnalogMultiChannelWriter( self._task_ao.out_stream, auto_start=True) # Setting the min and max voltage for all the analog output channels for chan in self._analog_out: chan.kwargs['max_val'] = chan.kwargs.get('max_val', 5) chan.kwargs['min_val'] = chan.kwargs.get('min_val', 0) self._task_ao.ao_channels.add_ao_voltage_chan(chan.name, **chan.kwargs) # Creating a task and a stream for all the digital input channels if self._digital_in: self.log(logging.INFO, "Opening the streams for the digital input channels") self._task_di = nidaqmx.Task() for chan in self._digital_in: self._task_di.di_channels.add_di_chan(chan.name, **chan.kwargs) self._stream_di = stream_readers.DigitalMultiChannelReader( self._task_di.in_stream) # Creating a task and a stream for all the digital output channels if self._digital_out: self.log(logging.INFO, "Opening the streams for the digital output channels") self._task_do = nidaqmx.Task() for chan in self._digital_out: self._task_do.do_channels.add_do_chan(chan.name, **chan.kwargs) self._stream_do = stream_writers.DigitalMultiChannelWriter( self._task_do.out_stream)
[docs] def start_stream(self) -> None: """Starts the streaming task for analog input channels. Data can be acquired via streaming for multiple channels, but only for one type of channel. """ # Making sure there's only one type of channel to read data from if len(self._tasks_ai) > 1: raise IOError("Stream mode can only open one type of channel !") elif len(self._tasks_ai) < 1: raise IOError("There's no analog in channel to read data from !") # Starting the streaming task, there should be only one for task in self._tasks_ai.values(): task.timing.cfg_samp_clk_timing( self._sample_rate, sample_mode=nidaqmx.constants.AcquisitionType.CONTINUOUS) self._stream_started = True
[docs] def get_data(self) -> List[float]: """Reads data from the analog and digital input channels, and returns it along with a timestamp. Data from the analog channels is read first, and then data from the digital channels. Data is returned in the same order as it was acquired. """ ret = [time()] # Reading the analog channels if self._analog_in: data = np.empty(len(list(chain(*self._analog_in.values())))) i = 0 for type_, stream in self._stream_ai.items(): stream.read_one_sample(data[i:i + len(self._analog_in[type_])]) i += len(self._analog_in[type_]) ret.extend(list(data)) # Reading the digital channels if self._digital_in: data = np.empty((len(self._digital_in), 1), dtype=bool) self._stream_di.read_one_sample_multi_line(data) ret.extend(list(data[:, 0])) return ret
[docs] def get_stream(self) -> Optional[List[np.ndarray]]: """Reads data from the device, and returns it in an array along with an array holding the timestamps. Only data from analog input channels can be read, this method cannot read stream data from digital input channels. """ if not self._stream_started: return # Creating the container for the data data = np.empty((len(list(chain(*self._analog_in.values()))), self._n_samples)) # # Creating the array holding the timestamps t = time() + np.arange(0, self._n_samples) / self._sample_rate # Actually reading the data from the device for type_, stream in self._stream_ai.items(): stream.read_many_sample(data, self._n_samples) return [t, data]
[docs] def set_cmd(self, *cmd: float) -> None: """Sets the analog and digital output channels according to the given command values. The first command values correspond to the analog channels, the remaining ones correspond to the digital channels. It might be that not all channels are set if the number of commands doesn't match the number of channels. """ # Setting the analog channels if self._analog_out: self._stream_ao.write_one_sample(np.array(cmd[:len(self._analog_out)], dtype=np.float64)) # Setting the digital channels if self._digital_out: self._stream_do.write_one_sample_multi_line( np.array(cmd[len(self._analog_out):], dtype=bool).reshape(len(self._digital_out), 1))
[docs] def stop_stream(self) -> None: """Stops all the acquisition tasks.""" if self._stream_started: for task in self._tasks_ai.values(): task.stop() task.timing.cfg_samp_clk_timing( self._sample_rate, sample_mode=nidaqmx.constants.AcquisitionType.FINITE) self._stream_started = False
[docs] def close(self) -> None: """Stops all the acquisition tasks, and closes the connections to the device.""" if self._analog_in: self.log(logging.INFO, "Closing the streams for the analog input " "channels") for task in self._tasks_ai.values(): task.stop() if self._analog_out: self.log(logging.INFO, "Closing the streams for the analog output " "channels") self._task_ao.stop() if self._stream_started: self.stop_stream() if self._digital_in: self.log(logging.INFO, "Closing the streams for the digital input " "channels") self._task_di.close() if self._digital_out: self.log(logging.INFO, "Closing the streams for the digital output " "channels") self._task_do.close()