# coding: utf-8
from typing import Dict, Any, Union, Iterable
import logging
from .meta_modifier import Modifier
[docs]
class Offset(Modifier):
"""This Modifier offsets every value of the given labels by a constant. This
constant is calculated so that for each label the first returned value is
equal to a user-defined target.
For example if for a given label the target is `6` and the first received
value is `3`, the Modifier will add `3` to each value received over this
label.
This Modifier can be used for example when measuring a variable that should
start at `0` but doesn't because of a sensor offset. It can also just be used
to plot nicer figures. It is not very accurate as it is only based on a
single data point for the offset calculation. The ``make_zero`` argument of
the :class:`~crappy.blocks.IOBlock` is a better alternative if precision is
required when offsetting a sensor.
.. versionadded:: 1.5.10
"""
[docs]
def __init__(self,
labels: Union[str, Iterable[str]],
offsets: Union[float, Iterable[float]]) -> None:
"""Sets the args and initializes the parent class.
Args:
labels: The labels to offset. Can be given as a single label, a
:obj:`list` of labels or a :obj:`tuple` of labels.
offsets: For each label, the target for the first received value. Can be
given as a single value, a :obj:`list` of values or a :obj:`tuple` of
values.
"""
super().__init__()
# Handling the case when only one label needs to be offset
if isinstance(labels, str):
labels = (labels,)
labels = tuple(labels)
try:
iter(offsets)
except TypeError:
offsets = (offsets,)
offsets = tuple(offsets)
# Checking that the number of offsets match the number of labels
if len(offsets) != len(labels):
raise ValueError("As many offsets as there are labels should be given.")
# Associating each offset to its label
self._offsets = {label: offset for label, offset in zip(labels, offsets)}
self._compensations = None
self._compensated = False
[docs]
def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""If the compensations are not set, sets them, and then offsets the
required labels.
.. versionchanged:: 2.0.0 renamed from *evaluate* to *__call__*
"""
self.log(logging.DEBUG, f"Received {data}")
# During the first loop, calculating the compensation values
if not self._compensated:
self._compensations = {label: -data[label] + offset for label, offset in
self._offsets.items()}
self._compensated = True
# Compensating the data to match the target offset value
for label in self._offsets:
data[label] += self._compensations[label]
self.log(logging.DEBUG, f"Sending {data}")
return data