Source code for crappy.inout.ft232h.mprls

# coding: utf-8

from time import time
from typing import Optional, List
import logging

from ..meta_inout import InOut
from ...tool.ft232h import FT232HServer as FT232H, I2CMessage, USBArgsType

mprls_status_bits = {'busy': 0x20,
                     'memory error': 0x04,
                     'math saturation': 0x01}


[docs] class MPRLSFT232H(InOut): """This class can read values from an MPRLS pressure sensor through an FT232H. It is similar to the :class:`~crappy.inout.MPRLS` class, except this class is specific for use with an :class:`~crappy.tool.ft232h.FT232H` USB to I2C converter. It communicates over I2C with the sensor. .. versionadded:: 2.0.0 """ ft232h = True
[docs] def __init__(self, eoc_pin: Optional[str] = None, device_address: int = 0x18, _ft232h_args: USBArgsType = tuple()) -> None: """Initializes the parent class and opens the I2C bus. Args: eoc_pin: Optionally, reads the end of conversion signal from the polarity of a GPIO rather than from an I2C register. Speeds up the reading and decreases the traffic on the bus, but requires one extra wire. Give the name of the GPIO in the format `Dx` or `Cx`. device_address: The I2C address of the MPRLS. The address of the devices sold by Adafruit is `0x18`, but other suppliers may sell it with another address. _ft232h_args: This argument is meant for internal use only and should not be provided by the user. It contains the information necessary for setting up the FT232H. """ self._bus = None super().__init__() (block_index, block_lock, command_file, answer_file, shared_lock, current_block) = _ft232h_args self._bus = FT232H(mode='I2C', block_index=block_index, current_block=current_block, command_file=command_file, answer_file=answer_file, block_lock=block_lock, shared_lock=shared_lock) if not isinstance(device_address, int): raise TypeError("device_address should be an integer.") self._address = device_address if eoc_pin is not None and not isinstance(eoc_pin, str): raise TypeError('eoc_pin should be a string when using the ft232h ' 'backend !') self._eoc_pin = eoc_pin self._i2c_msg = None
[docs] def open(self) -> None: """Opens the I2C bus.""" self._i2c_msg = I2CMessage
[docs] def get_data(self) -> List[float]: """Reads the pressure value. Returns: The timestamp and the pressure value in hPa. """ # Starting conversion self.log(logging.DEBUG, f"Writing {0xAA, 0x00, 0x00} to the address " f"{self._address}") self._bus.i2c_rdwr(self._i2c_msg.write(self._address, [0xAA, 0x00, 0x00])) # Waiting for conversion to complete t0 = time() while not self._data_available(): if time() - t0 > 0.1: raise TimeoutError('Waited too long for data to be ready') # Reading conversion result read = self._i2c_msg.read(self._address, 4) self._bus.i2c_rdwr(read) out = list(read) self.log(logging.DEBUG, f"Read {out} from address {self._address}") # Checking if anu error occurred if out[0] & mprls_status_bits['memory error']: raise RuntimeError("A memory error occurred on the MPRLS") elif out[0] & mprls_status_bits['math saturation']: raise RuntimeError("A math saturation error occurred on the MPRLS") # Extracting conversion result as an integer ret = (out[1] << 16) | (out[2] << 8) | out[3] # Converting to hPa pres = 68.947572932 * (ret - 0x19999A) * 25 / (0xE66666 - 0x19999A) return [time(), pres]
[docs] def close(self) -> None: """Closes the I2C bus.""" if self._bus is not None: self.log(logging.INFO, "Closing the I2C connection to the MPRLS") self._bus.close()
def _data_available(self) -> bool: """Returns :obj:`True` if data is available, :obj:`False` otherwise.""" # EOC signal from the I2C communication if self._eoc_pin is None: wait = self._i2c_msg.read(self._address, 1) self._bus.i2c_rdwr(wait) return not list(wait)[0] & mprls_status_bits['busy'] # EOC signal from a GPIO else: return bool(self._bus.get_gpio(self._eoc_pin))