Source code for crappy.inout.labjack_t7

# coding: utf-8

from time import time
from typing import List, Optional, Dict, Any, Union, Tuple, Iterable
from itertools import chain
from dataclasses import dataclass, field
from multiprocessing import current_process
import logging

from .meta_inout import InOut
from .._global import OptionalModule

try:
  from labjack import ljm
except (ModuleNotFoundError, ImportError):
  ljm = OptionalModule("ljm", "Please install Labjack LJM and the ljm "
                              "Python module")

# Map of thermocouple types vs indexes for the Labjack T7
thcp_map = {'E': 20, 'J': 21, 'K': 22, 'R': 23, 'T': 24, 'S': 25, 'C': 30}


@dataclass
class _Channel:
  """This class is a simple structure holding all the attributes a Labjack
  channel can have.

  Not all the attributes are used by every channel, but they all have a use for
  at least one type of channel.
  """

  name: Union[str, int]

  direction: bool = True
  dtype: Optional[int] = None
  address: int = 1
  gain: float = 1
  offset: float = 0
  make_zero: bool = False
  range: float = 10
  limits: Optional[Tuple[float, float]] = None
  resolution: int = 1
  thermocouple: Optional[str] = None
  write_at_open: List[Tuple[str, float]] = field(default_factory=list)

  def update(self, dic_in: Dict[str, Any]) -> None:
    """Updates the channel keys based on the user input."""

    for key, val in dic_in.items():
      if hasattr(self, key):
        setattr(self, key, val)

      # Handling the case when the user enters a wrong key
      else:
        logger = logging.getLogger(
          f"{current_process().name}.LabjackT7.Channel_{self.name}")
        logger.log(logging.WARNING, f"Unknown channel key : {key}, ignoring")


[docs] class LabjackT7(InOut): """This InOut allows controlling a Labjack T7 device. It can use any channel as input/output. The Labjack T7 is a very complete DAQ board. It features several ADC, several DAC, as well as multiple GPIOs. It can also read thermocouples, and run LUA code on an integrated microcontroller. These features can all be controlled from Crappy. This class is not capable of streaming. For higher frequency, refer to the :class:`~crappy.inout.T7Streamer` class. .. versionadded:: 1.4.0 .. versionchanged:: 2.0.0 renamed from *Labjack_t7* to *LabjackT7* """
[docs] def __init__(self, channels: Iterable[Dict[str, Any]], device: str = 'ANY', connection: str = 'ANY', identifier: str = 'ANY', write_at_open: Optional[Iterable[tuple]] = None, no_led: bool = False) -> None: """Sets the arguments and initializes the parent class. Args: channels: An iterable (like a :obj:`list` or a :obj:`tuple`) of the channels to interface with on the Labjack. Each object in this iterable should be a :obj:`dict` representing a single channel, and whose keys provide information on the channel to use. Refer to the note below for more information on the possible keys. device: The type of Labjack to open. Possible values include : :: 'ANY', 'T7', 'T4', 'DIGIT' Only tested with `'T7'` in Crappy. connection: The type of connection used for interfacing with the Labjack. Possible values include : :: 'ANY', 'TCP', 'USB', 'ETHERNET', 'WIFI' identifier: Any extra information allowing to further identify the Labjack to open, like a serial number, an IP address, or a device name. write_at_open: If specific names or registers have to be written when opening the channel, they can be given here as an iterable (like a :obj:`list` or a :obj:`tuple`) of :obj:`tuple`. They will be written in the same order as in the given iterable. Refer to the note below for the accepted formats. no_led: If :obj:`True`, turns off the LED on the Labjack. This led can generate noise on the channels `AIN0` and `AIN1`. Note: - ``channels`` keys: - name: The name of the channel to interface with, as written on the Labjack's case. Ex: `'AIN0'`. The available settings as well as the direction of the channel depend on the given name. The name can be: - `AINx`: Analog input, linked to an ADC. A gain and offset can be provided, and the range and resolution can be adjusted. It is always an input. These channels can also be used with thermocouples (see below). - `(T)DACx`: Analog output, linked to a DAC. A gain and an offset can be specified. It is always an output. - `(E/F/C/M IOx)`: Digital inputs/outputs. A gain and an offset can be specified. It can be either an input or an output, the default is output. - gain: If the channel is an input, the measured value will be modified directly by the Labjack as follows : :math:`returned\_value = gain * measured\_value + offset`. If the channel is an output, the command value will be modified in Crappy as follows before being sent to the Labjack : :math:`sent\_value = gain * command + offset`. - offset: If the channel is an input, the measured value will be modified directly by the Labjack as follows : :math:`returned\_value = gain * measured\_value + offset`. If the channel is an output, the command value will be modified in Crappy as follows before being sent to the Labjack : :math:`sent\_value = gain * command + offset`. - make_zero: If :obj:`True`, data will be acquired on this channel before the test starts, and a compensation value will be deduced so that the offset of this channel is `0`. The compensation is performed directly by the Labjack. This setting only has effect for `AIN` channels defined as inputs. **It will only take effect if the** ``make_zero_delay`` **argument of the** :class:`~crappy.blocks.IOBlock` **controlling the Labjack is set** ! - direction: If :obj:`True`, the channel is considered as an output, else as an input. Only has effect for `IO` channels, the default is output. - resolution: The resolution of the acquisition as an integer, refer to Labjack documentation for more details. The higher this value the better the resolution, but the lower the speed. The possible range is either `1` to `8` or to `12` depending on the model. The default is `1`. Only has effect for `AIN` channels. - range: The range of the acquisition in Volts. A range of `x` means that values can be read between `-x` and `x` Volts. The possible values are : :: 0.01, 0.1, 1, 10 Only has effect for `AIN` channels. - limits: A :obj:`tuple` containing the minimum and maximum allowed command values to set for this channel. After applying the gain and offset to the command, it is then clamped between these two values before being sent to the Labjack. Only has effect for output channels. - thermocouple: The type of thermocouple to read data from. Possible values are: :: 'E', 'J', 'K', 'R', 'T', 'S', 'C' If specified, it will use the EF to read a temperature directly from the thermocouples. Only has effect for `AIN` channels. - write_at_open: A :obj:`list` containing commands for writing specific names or registers when opening the channel. The commands should be given as :obj:`tuple` either in the format `(name (str), value (int/float))` or `(register (int), type (int), value (float/int))`. Warning: Do not consider the ``limits`` key as a safety feature. It *should* not go beyond/below the given values, but this is not meant to replace hardware safety ! """ self._handle = None super().__init__() # Identifiers for the device to open self._device = device self._connection = connection self._identifier = identifier # List of commands to send when opening the Labjack self._write_at_open = [] if write_at_open is None else list(write_at_open) if no_led: self._write_at_open.append(('POWER_LED', 0)) self._channels_in = list() self._channels_out = list() # Parsing the setting dict given for each channel for channel in channels: # Checking that the name was given as it's the most important attribute if 'name' not in channel: raise AttributeError("The given channels must contain the 'name' " "key !") name = channel['name'] # Modbus registers if isinstance(name, int): chan = _Channel(name=name, dtype=ljm.constants.FLOAT32) chan.update(channel) # Can be either input or output, the user has to specify if chan.direction: self._channels_out.append(chan) else: self._channels_in.append(chan) # Analog inputs elif isinstance(name, str) and name.startswith('AIN'): chan = _Channel(name=name) chan.update(channel) # Setting the range and the resolution chan.write_at_open.extend([ (*self._parse(f'{name}_RANGE'), chan.range), (*self._parse(f'{name}_RESOLUTION_INDEX'), chan.resolution)]) # Specific commands for thermocouples if chan.thermocouple is not None: chan.write_at_open.extend([ (*self._parse(f'{name}_EF_INDEX'), thcp_map[chan.thermocouple]), (*self._parse(f'{name}_EF_CONFIG_A'), 1), (*self._parse(f'{name}_EF_CONFIG_B'), 60052), (*self._parse(f'{name}_EF_CONFIG_D'), 1), (*self._parse(f'{name}_EF_CONFIG_E'), 0)]) chan.address, chan.dtype = self._parse(f'{name}_EF_READ_A') # Simplest case with no gain and offset elif chan.gain == 1 and chan.offset == 0 and not chan.make_zero: chan.address, chan.dtype = self._parse(name) # Setting the gain and offset if provided else: chan.write_at_open.extend([ (*self._parse(f'{name}_EF_INDEX'), 1), (*self._parse(f'{name}_EF_CONFIG_D'), chan.gain), (*self._parse(f'{name}_EF_CONFIG_E'), chan.offset if not chan.make_zero else 0)]) chan.address, chan.dtype = self._parse(f'{name}_EF_READ_A') self._channels_in.append(chan) # Digital to analog converters elif isinstance(name, str) and 'DAC' in name: chan = _Channel(name=name) chan.update(channel) chan.address, chan.dtype = self._parse(name) self._channels_out.append(chan) # Digital inputs and outputs elif isinstance(name, str) and 'IO' in name: chan = _Channel(name=name) chan.update(channel) chan.address, chan.dtype = self._parse(name) # Can be either input or output, the user has to specify if chan.direction: self._channels_out.append(chan) else: self._channels_in.append(chan) else: raise AttributeError(f"Invalid chan name: {name}") self.log(logging.DEBUG, f"Input channels: {self._channels_in}") self.log(logging.DEBUG, f"Output channels: {self._channels_out}") # Extracting the addresses and data types from all channels self._read_addresses = [chan.address for chan in self._channels_in] self._read_types = [chan.dtype for chan in self._channels_in] self._write_addresses = [chan.address for chan in self._channels_out] self._write_types = [chan.dtype for chan in self._channels_out] # These attributes will come in use later self._last_sent_val = [None for _ in self._write_addresses]
[docs] def open(self) -> None: """Opens the Labjack, parses the commands to write at open, and sends them.""" # Opening the Labjack self.log(logging.INFO, "Opening the connection to the Labjack") self._handle = ljm.openS(self._device, self._connection, self._identifier) # Gathering all the data to write at open self._write_at_open.extend(chain(*(chan.write_at_open for chan in chain(self._channels_in, self._channels_out)))) # Parsing all the commands given as a tuple of two values self._write_at_open = [t if len(t) == 3 else (*self._parse(t[0]), t[1]) for t in self._write_at_open] # Getting the registers, data types and values in separate tuples if self._write_at_open: reg, types, values = tuple(zip(*self._write_at_open)) else: reg = types = values = None # Finally, writing the commands to write at open if reg is not None: self.log(logging.DEBUG, f"Writing values {values} to addresses {reg}") ljm.eWriteAddresses(handle=self._handle, numFrames=len(reg), aAddresses=reg, aDataTypes=types, aValues=values)
[docs] def make_zero(self, delay: float) -> None: """Overriding the method of the parent class, because the Labjack T7 allows setting offsets directly on the board. Setting the offsets on the Labjack is slightly quicker than correcting the received values afterward. Args: delay: The delay during which the data should be acquired for determining the offset. .. versionadded:: 1.5.10 """ # No need to acquire data if no channel should be zeroed # Also, the IO channels shouldn't be zeroed if any(chan.make_zero and 'AIN' in chan.name for chan in self._channels_in): # Acquiring the data super().make_zero(delay) # Proceed only if the acquisition went fine if self._compensations: names, values = tuple(zip(*( (f"{chan.name}_EF_CONFIG_E", chan.offset + off) for off, chan in zip(self._compensations, self._channels_in) if chan.make_zero and 'AIN' in chan.name))) # Setting the offsets on the Labjack self.log(logging.DEBUG, f"Writing values {values} to registers " f"{names}") ljm.eWriteNames(handle=self._handle, numFrames=len(names), aNames=names, aValues=values) # Resetting the software offsets to avoid double compensation self._compensations = list()
[docs] def get_data(self) -> List[float]: """Reads the signal on all pre-defined input channels, and returns the values along with a timestamp..""" return [time()] + ljm.eReadAddresses(handle=self._handle, numFrames=len(self._read_addresses), aAddresses=self._read_addresses, aDataTypes=self._read_types)
[docs] def set_cmd(self, *cmd: float) -> None: """Sets the tension commands on the output channels. The given gain and offset are first applied, then the commands are clamped to the given limits. The commands are then written to the Labjack, only if they differ from the last written ones. """ # First, applying the given gain and offsets to the commands cmd = [chan.gain * val + chan.offset for val, chan in zip(cmd, self._channels_out)] # Then, clamping the commands if limits were given cmd = [val if chan.limits is None else max(min(val, chan.limits[1]), chan.limits[0]) for val, chan in zip(cmd, self._channels_out)] # Checking which values have to be updated and which are unchanged to_upd = [not val == prev for val, prev in zip(cmd, self._last_sent_val)] # Updating the list of last sent values self._last_sent_val = list(cmd) # No use continuing if there's nothing to update if any(to_upd): # Getting the addresses, data types and values to send addresses, types, values = tuple(zip(*( (addr, typ, val) for addr, typ, val, upd in zip(self._write_addresses, self._write_types, cmd, to_upd) if upd))) # Sending the commands if addresses: self.log(logging.DEBUG, f"Writing values {values} to addresses " f"{addresses}") ljm.eWriteAddresses(handle=self._handle, numFrames=len(addresses), aAddresses=addresses, aDataTypes=types, aValues=values)
[docs] def close(self) -> None: """Closes the connection to the Labjack.""" if self._handle is not None: self.log(logging.INFO, "Closing the connection to the Labjack") ljm.close(self._handle)
@staticmethod def _parse(name: str) -> Tuple[int, int]: """Wrapper around :meth:`ljm.nameToAddress` to make the code clearer.""" return ljm.nameToAddress(name)