# coding: utf-8
from time import time
from typing import Union, Optional, List
import logging
from .meta_inout import InOut
from .._global import OptionalModule
try:
import adafruit_mprls
except (ImportError, ModuleNotFoundError):
adafruit_mprls = OptionalModule('adafruit_mprls')
try:
import board
except (ImportError, ModuleNotFoundError):
board = OptionalModule('board', 'Blinka is necessary to use the I2C bus')
try:
from digitalio import DigitalInOut
except (ImportError, ModuleNotFoundError):
DigitalInOut = OptionalModule('digitalio',
'Blinka is necessary to access the GPIOs')
try:
from smbus2 import SMBus, i2c_msg
except (ImportError, ModuleNotFoundError):
SMBus = i2c_msg = OptionalModule('smbus2')
try:
import RPi.GPIO as GPIO
except (ModuleNotFoundError, ImportError):
GPIO = OptionalModule("RPi.GPIO")
mprls_status_bits = {'busy': 0x20,
'memory error': 0x04,
'math saturation': 0x01}
mprls_backends = ['Pi4', 'blinka']
[docs]
class MPRLS(InOut):
"""This class can read values from an MPRLS pressure sensor.
It communicates over I2C with the sensor.
.. versionadded:: 1.4.0
.. versionchanged:: 2.0.0 renamed from Mprls to MPRLS
"""
[docs]
def __init__(self,
backend: str,
eoc_pin: Optional[Union[str, int]] = None,
device_address: int = 0x18,
i2c_port: int = 1) -> None:
"""Initializes the parent class and opens the I2C bus.
Args:
backend: Should be one of :
::
'Pi4', 'blinka'
The `'Pi4'` backend is optimized but only works on boards supporting
the :mod:`smbus2` module, like the Raspberry Pis. The `'blinka'`
backend may be less performant and requires installing
:mod:`Adafruit-Blinka` and :mod:`adafruit-circuitpython-mprls`, but
these modules are compatible with and maintained on a wide
variety of boards.
eoc_pin: Optionally, reads the end of conversion signal from the polarity
of a GPIO rather than from an I2C register. Speeds up the reading and
decreases the traffic on the bus, but requires one extra wire. With the
backend `'Pi4'`, give the index of the GPIO in BCM convention. With the
backend `'blinka'`, it should be a string but the syntax varies
according to the board. Refer to blinka's documentation for more
information.
device_address: The I2C address of the MPRLS. The address of the devices
sold by Adafruit is `0x18`, but other suppliers may sell it with
another address.
i2c_port: The I2C port over which the MPRLS should communicate. On most
Raspberry Pi models the default I2C port is `1`.
.. versionadded:: 1.5.8
*backend*, *eoc_pin*, *device_address*, *i2c_port* and *ft232h_ser_num*
arguments
.. versionremoved:: 2.0.0 *ft232h_ser_num* argument
"""
self._bus = None
self._eoc_pin = None
self._mpr = None
self._i2c_msg = None
if not isinstance(backend, str) or backend not in mprls_backends:
raise ValueError("backend should be in {}".format(mprls_backends))
self._backend = backend
super().__init__()
if not isinstance(device_address, int):
raise TypeError("device_address should be an integer.")
self._address = device_address
if not isinstance(i2c_port, int):
raise TypeError("i2c_port should be an integer.")
self._i2c_port = i2c_port
if eoc_pin is not None:
if backend == 'blinka' and not isinstance(eoc_pin, str):
raise TypeError('eoc_pin should be a string when using the blinka '
'backend !')
elif backend == 'ft232h' and not isinstance(eoc_pin, str):
raise TypeError('eoc_pin should be a string when using the ft232h '
'backend !')
elif backend == 'Pi4' and not isinstance(eoc_pin, int):
raise TypeError('eoc_pin should be an int when using the Pi4 '
'backend !')
self._eoc_pin = eoc_pin
[docs]
def open(self) -> None:
"""Opens the I2C bus."""
if self._backend == 'Pi4':
self.log(logging.INFO, f"Opening the I2C connection to the MPRLS on "
f"port {self._i2c_port}")
self._bus = SMBus(self._i2c_port)
if self._eoc_pin is not None:
self.log(logging.INFO, "Setting up the GPIOs")
GPIO.setmode(GPIO.BCM)
GPIO.setup(self._eoc_pin, GPIO.IN)
elif self._backend == 'blinka':
if self._eoc_pin is not None:
eoc = DigitalInOut(getattr(board, self._eoc_pin))
else:
eoc = None
self.log(logging.INFO, "Connecting the the MPRLS with backend blinka")
self._mpr = adafruit_mprls.MPRLS(board.I2C(),
psi_min=0,
psi_max=25,
eoc_pin=eoc)
self._i2c_msg = i2c_msg
[docs]
def get_data(self) -> List[float]:
"""Reads the pressure value.
Returns:
The timestamp and the pressure value in hPa.
"""
if self._backend == 'blinka':
pres = self._mpr.pressure
else:
# Starting conversion
self.log(logging.DEBUG, f"Writing {0xAA, 0x00, 0x00} to the address "
f"{self._address}")
self._bus.i2c_rdwr(self._i2c_msg.write(self._address,
[0xAA, 0x00, 0x00]))
# Waiting for conversion to complete
t0 = time()
while not self._data_available():
if time() - t0 > 0.1:
raise TimeoutError('Waited too long for data to be ready')
# Reading conversion result
read = self._i2c_msg.read(self._address, 4)
self._bus.i2c_rdwr(read)
out = list(read)
self.log(logging.DEBUG, f"Read {out} from address {self._address}")
# Checking if anu error occurred
if out[0] & mprls_status_bits['memory error']:
raise RuntimeError("A memory error occurred on the MPRLS")
elif out[0] & mprls_status_bits['math saturation']:
raise RuntimeError("A math saturation error occurred on the MPRLS")
# Extracting conversion result as an integer
ret = (out[1] << 16) | (out[2] << 8) | out[3]
# Converting to hPa
pres = 68.947572932 * (ret - 0x19999A) * 25 / (0xE66666 - 0x19999A)
return [time(), pres]
[docs]
def close(self) -> None:
"""Closes the I2C bus."""
if self._backend == 'Pi4' and self._bus is not None:
self.log(logging.INFO, "Closing the I2C connection to the MPRLS")
self._bus.close()
if self._backend == 'Pi4' and self._eoc_pin is not None:
self.log(logging.INFO, "Cleaning up the GPIOs")
GPIO.cleanup()
def _data_available(self) -> bool:
"""Returns :obj:`True` if data is available, :obj:`False` otherwise."""
# EOC signal from the I2C communication
if self._eoc_pin is None:
wait = self._i2c_msg.read(self._address, 1)
self._bus.i2c_rdwr(wait)
return not list(wait)[0] & mprls_status_bits['busy']
else:
return bool(GPIO.input(self._eoc_pin))