Source code for crappy.inout.daqmx

# coding: utf-8

import numpy as np
import time
from typing import Union

from .inout import InOut
from .._global import OptionalModule
try:
  import PyDAQmx
except (ModuleNotFoundError, ImportError):
  PyDAQmx = OptionalModule("PyDAQmx")


[docs]def get_daqmx_devices_names() -> list: """Get all connected daqmx devices. Returns: A :obj:`list` of all connected daqmx devices. """ buffer_size = 4096 buffer = PyDAQmx.create_string_buffer(buffer_size) PyDAQmx.DAQmxGetSysDevNames(buffer, buffer_size) print(len(buffer.value.split(",")), " devices detected: ", buffer.value) return buffer.value.split(",")
def listify(stuff: Union[list, float], length: int) -> list: r = stuff if isinstance(stuff, list) else [stuff] * length assert len(r) == length, "Invalid list length for " + str(r) return r
[docs]class Daqmx(InOut): """Class to use DAQmx devices."""
[docs] def __init__(self, device: str = 'Dev1', channels: Union[list, str] = None, gain: Union[list, float] = 1, offset: Union[list, float] = 0, range: Union[float, list] = 5, make_zero: Union[list, bool] = True, nperscan: int = 1000, sample_rate: float = 10000, out_channels: Union[list, str] = None, out_gain: Union[list, float] = 1, out_offset: Union[list, float] = 0, out_range: Union[list, float] = 5) -> None: """Sets the args and initializes the parent class. Args: device (:obj:`str`, optional): Name of the device to open. channels (:obj:`list`, optional): Names or ids of the channels to read. gain (:obj:`list`, optional): Gains to apply to each reading. offset (:obj:`list`, optional): Offset to apply to each reading. range (:obj:`list`, optional): Max value for the reading. Should be a :obj:`list` of :obj:`float` in: :: 0.5, 1., 2.5, 5. Refer to the niDAQ api for more details. make_zero (:obj:`list`, optional): If :obj:`True`, the average value on the channel at opening will be evaluated and subtracted to the actual reading. nperscan (:obj:`int`, optional): If using streamer mode, number of readings to acquire on each :meth:`get_stream` call. sample_rate (:obj:`float`, optional): If using streamer mode, frequency of acquisition when calling :meth:`get_stream`. out_channels (:obj:`list`, optional): Names or ids of the output channels. out_gain (:obj:`list`, optional): Gains to apply to the commands. out_offset (:obj:`list`, optional): Offset to apply to the commands. out_range (:obj:`list`, optional): Max value of the output. Should be a :obj:`list` of :obj:`float` in: :: 0.5, 1., 2.5, 5. Refer to the niDAQ api for more details. Note: If an argument supposed to be a :obj:`list` is given as a single value, this value will be applied to all channels. """ InOut.__init__(self) # For now, kwargs like in_gain are equivalent to gain # (it is for consistency with out_gain, out_channels, etc...) self.device = device self.channels = ['ai0'] if channels is None else channels self.gain = gain self.offset = offset self.range = range self.make_zero = make_zero self.nperscan = nperscan self.sample_rate = sample_rate self.out_channels = [] if out_channels is None else out_channels self.out_gain = out_gain self.out_offset = out_offset self.out_range = out_range self.check_vars()
[docs] def check_vars(self) -> None: """Turns the settings into :obj:`list` of the same length, each index standing for one channel. Note: If a :obj:`list` is given, simply checks the length. Else make a :obj:`list` of the correct length containing only the given value. """ # IN channels self.channels = self.channels if isinstance(self.channels, list) \ else [self.channels] nin = len(self.channels) for i in range(nin): if isinstance(self.channels[i], int): self.channels[i] = 'ai' + str(self.channels[i]) self.gain = listify(self.gain, nin) self.offset = listify(self.offset, nin) self.range = listify(self.range, nin) self.make_zero = listify(self.make_zero, nin) # OUT channels self.out_channels = self.out_channels if \ isinstance(self.out_channels, list) else\ [self.out_channels] nout = len(self.out_channels) for i in range(nout): if isinstance(self.out_channels[i], int): self.out_channels[i] = 'ao' + str(self.out_channels[i]) self.out_gain = np.array(listify(self.out_gain, nout)) self.out_offset = np.array(listify(self.out_offset, nout)) self.out_range = listify(self.out_range, nout) assert nin + nout, "DAQmx has no in nor out channels!"
def open(self) -> None: PyDAQmx.DAQmxResetDevice(self.device) self.handle, self.out_handle = None, None # IN channels if self.channels: self.handle = PyDAQmx.TaskHandle() self.nread = PyDAQmx.int32() PyDAQmx.DAQmxCreateTask("", PyDAQmx.byref(self.handle)) for i, chan in enumerate(self.channels): PyDAQmx.DAQmxCreateAIVoltageChan( self.handle, self.device + "/" + chan, "", PyDAQmx.DAQmx_Val_Cfg_Default, 0, self.range[i], PyDAQmx.DAQmx_Val_Volts, None) if any(self.make_zero): off = self.eval_offset() for i, make_zero in enumerate(self.make_zero): if make_zero: self.offset[i] += off[i] # OUT channels if self.out_channels: self.out_handle = PyDAQmx.TaskHandle() PyDAQmx.DAQmxCreateTask("", PyDAQmx.byref(self.out_handle)) for i, chan in enumerate(self.out_channels): PyDAQmx.DAQmxCreateAOVoltageChan( self.out_handle, self.device + "/" + chan, "", 0, self.out_range[i], PyDAQmx.DAQmx_Val_Volts, None) PyDAQmx.DAQmxStartTask(self.out_handle)
[docs] def get_data(self) -> list: """Returns a :obj:`tuple` of length ``len(self.channels) + 1``. First element is the time, others are readings of each channel. """ return [i[0] for i in self.get_single(1)]
[docs] def get_single(self, npoints: int = None) -> list: """Reads the analog voltage on specified channels. Args: npoints: Number of values to read. If :obj:`None`, will use the value of `self.nperscan`. Returns: A :obj:`tuple` of `len(self.channels) + 1` :obj:`list` of length `npoints`. First list is the time, the others are the read voltages. """ if npoints is None: npoints = self.nperscan PyDAQmx.DAQmxCfgSampClkTiming(self.handle, "", self.sample_rate, PyDAQmx.DAQmx_Val_Rising, PyDAQmx.DAQmx_Val_FiniteSamps, npoints + 1) PyDAQmx.DAQmxStartTask(self.handle) data = np.empty((len(self.channels), npoints), dtype=np.float64) t0 = time.time() # DAQmx Read Code PyDAQmx.DAQmxReadAnalogF64(self.handle, npoints, 10.0, PyDAQmx.DAQmx_Val_GroupByChannel, data, npoints * len(self.channels), PyDAQmx.byref(self.nread), None) t = time.time() # DAQmx Stop Code PyDAQmx.DAQmxStopTask(self.handle) # Estimated starting of the acq t1 = (( t + t0) - npoints / self.sample_rate) / 2 return [[t1 + i / self.sample_rate for i in range(npoints)]] \ + [data[i, :] * self.gain[i] + self.offset[i] for i in range(len(self.channels))]
[docs] def set_cmd(self, *args: float) -> None: """Set the output(s) to the specified value. Note: Takes `n` arguments, `n` being the number of channels opened at :meth:`__init__`. The ith argument is the value to set to the ith channel. """ assert len(args) == len(self.out_channels) data = np.array(args, dtype=np.float64) * self.out_gain + self.out_offset PyDAQmx.DAQmxWriteAnalogF64(self.out_handle, 1, 1, 10.0, PyDAQmx.DAQmx_Val_GroupByChannel, data, None, None)
[docs] def close(self) -> None: """Closes the connection.""" if self.handle: PyDAQmx.DAQmxStopTask(self.handle) PyDAQmx.DAQmxClearTask(self.handle) if self.out_handle: PyDAQmx.DAQmxStopTask(self.out_handle) PyDAQmx.DAQmxClearTask(self.out_handle)