Source code for crappy.blocks.generator_path.integrator

# coding: utf-8

from numpy import trapz
from typing import Union, Dict
import logging

from .meta_path import Path, ConditionType


[docs] class Integrator(Path): """This Path integrates an incoming label over time and returns the integration as an output signal. Let `f(t)` be the input signal, `v(t)` the value of the output, `m` the inertia and `t0` the timestamp of the beginning of this Path. Then the output value for this Path will be :math:`v(t) = v(t0) - [I(t0 -> t)f(t)dt] / m`. .. versionadded:: 1.4.0 .. versionchanged:: 2.0.0 renamed from *Inertia* to *Integrator* """
[docs] def __init__(self, condition: Union[str, ConditionType], inertia: float, func_label: str, time_label: str = 't(s)', init_value: float = None) -> None: """Sets the arguments and initializes the parent class. Args: condition: The condition for switching to the next Path. Refer to :class:`~crappy.blocks.generator_path.meta_path.Path` for more information. inertia: Value of the equivalent inertia to use for driving the signal. In the above formula, it is the value of `m`. The larger this value, the slower the changes in the signal value. func_label: The name of the label of the input value to integrate. .. versionchanged:: 1.5.10 renamed from *flabel* to *func_label* time_label: The name of the time label for the integration. .. versionchanged:: 1.5.10 renamed from *tlabel* to *time_label* init_value: If given, overwrites the last value of the signal as the starting point for the inertia path. In the specific case when this path is the first one in the Generator Paths, this argument must be given ! .. versionchanged:: 1.5.10 renamed from *value* to *init_value* .. versionchanged:: 1.5.10 renamed *time* argument to *_last_time* .. versionchanged:: 1.5.10 renamed *cmd* argument to *_last_cmd* .. versionremoved:: 1.5.10 *const* argument .. versionremoved:: 2.0.0 *_last_time* and *_last_cmd* arguments """ super().__init__() if init_value is None and self.last_cmd is None: raise ValueError('For the first path, an init_value must be given !') # Setting the attributes self._condition = self.parse_condition(condition) self._time_label = time_label self._func_label = func_label self._inertia = inertia self._value = self.last_cmd if init_value is None else init_value self._last_t = None self._last_val = None
[docs] def get_cmd(self, data: Dict[str, list]) -> float: """Gets the latest values of the incoming label, integrates them and changes the output accordingly. Also raises :exc:`StopIteration` in case the stop condition is met. """ # Checking if the stop condition is met if self._condition(data): self.log(logging.DEBUG, "Stop condition met") raise StopIteration if self._time_label in data and self._func_label in data: # Getting the last values from the received data times = data[self._time_label] values = data[self._func_label] # Including the last values from the last loop if self._last_val is not None and self._last_t is not None: times = [self._last_t] + times values = [self._last_val] + values # Keeping in memory the last values from this loop if times and values: self._last_t = times[-1] self._last_val = values[-1] # Performing the integration and subtracting from the previous value self._value -= trapz(values, times) / self._inertia # Returning the current value return self._value