# coding: utf-8
import numpy as np
from typing import Optional, Union, Iterable
from time import time
import logging
from .meta_block import Block
[docs]
class MeanBlock(Block):
"""This Block can compute the average values of given labels over a given
delay.
It can take any number of inputs, provided that they share a common time
label. If the same label (except time) is received from several Blocks, it
may lead to unexpected results.
The output of this Block is very similar to that of the
:class:`~crappy.modifier.Mean` and :class:`~crappy.modifier.MovingAvg`
Modifiers, but not exactly similar. While these Modifiers calculate the
average of a label over a fixed number of data points, the MeanBlock
calculates the average of the values received over a given delay. This
behavior could, however, also be achieved using a
:class:`~crappy.modifier.Modifier`.
Warning:
If the delay for averaging is too short compared with the looping frequency
of the upstream Blocks, this Block may not always return the same number of
labels ! This can cause errors in downstream Blocks expecting a fixed
number of labels.
.. versionadded:: 1.4.0
.. versionchanged:: 2.0.0 renamed from *Mean_block* to *MeanBlock*
"""
[docs]
def __init__(self,
delay: float,
time_label: str = 't(s)',
out_labels: Optional[Union[str, Iterable[str]]] = None,
display_freq: bool = False,
freq: Optional[float] = 50,
debug: Optional[bool] = False) -> None:
"""Sets the arguments and initializes the parent class.
Args:
delay: The averaged data will be sent each ``delay`` seconds.
time_label: The label containing the time information. It must be common
to all the incoming Links.
.. versionchanged:: 1.5.10 renamed from *t_label* to *time_label*
out_labels: An iterable (like a :obj:`list` or a :obj:`tuple`) containing
all the labels to average, as :obj:`str`. If not given, all the
received labels are averaged and returned. The time label should not
be included, as it is already given in ``time_label``. If there is only
one label to average, it can be directly given as a :obj:`str`, i.e.
not in an iterable.
display_freq: If :obj:`True`, displays the looping frequency of the
Block.
.. versionadded:: 1.5.10
.. versionchanged:: 2.0.0 renamed from *verbose* to *display_freq*
freq: The target looping frequency for the Block. If :obj:`None`, loops
as fast as possible.
debug: If :obj:`True`, displays all the log messages including the
:obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log
messages with :obj:`~logging.INFO` level or higher. If :obj:`None`,
disables logging for this Block.
.. versionadded:: 2.0.0
"""
super().__init__()
self.display_freq = display_freq
self.freq = freq
self.debug = debug
self._delay = delay
self._time_label = time_label
# Forcing the out_labels into a list
if out_labels is not None and isinstance(out_labels, str):
self._out_labels = [out_labels]
elif out_labels is not None:
self._out_labels = list(out_labels)
else:
self._out_labels = None
self._last_sent_t = time()
[docs]
def begin(self) -> None:
"""Initializes the time counter.
.. versionadded:: 2.0.0
"""
self._last_sent_t = self.t0
[docs]
def loop(self) -> None:
"""Receives all available data from the upstream Blocks, averages it and
sends it if the time delay is reached."""
# Receiving data from each incoming link
data = self.recv_all_data(delay=self._delay, poll_delay=self._delay / 10)
to_send = dict()
# Removing the time label from the received data
if self._time_label in data:
data.pop(self._time_label)
# Building the output dict with the averaged values
for label, values in data.items():
if self._out_labels is None or label in self._out_labels:
try:
to_send[label] = np.mean(values)
except (ValueError, TypeError):
self.log(logging.WARNING, f"Cannot perform averaging on label "
f"{label} with values: {values}")
to_send[label] = values[-1]
# Sending the output dict
if to_send:
to_send[self._time_label] = (time() + self._last_sent_t) / 2 - self.t0
self._last_sent_t = time()
self.send(to_send)