# coding: utf-8
import logging
import numpy as np
from typing import Dict, Optional, Iterable, Union
from collections import defaultdict
from .meta_block import Block
[docs]
class Multiplexer(Block):
"""This Block takes data from upstream Blocks as input and interpolates it to
output all the labels in a common time basis.
This Block is very similar to the :class:`~crappy.blocks.Synchronizer` Block,
but the `Synchronizer` takes the timestamps of a reference label as a time
base whereas this one performs the interpolation on a time base independent
of the received labels.
It can take any number of inputs, provided that they all share a common time
label. It is also possible to choose which labels are considered for
multiplexing and which are dropped. The interpolation is performed using the
:obj:`numpy.interp` method.
This Block is useful for synchronizing data acquired from different sensors,
e.g. to plot a real-time stress-strain curve with position data coming from a
:class:`~crappy.blocks.Machine` Block and force data coming from a
:class:`~crappy.blocks.IOBlock` Block. Multiplexing is however quite
resource-consuming, so it is preferable to perform interpolation when
post-processing if real-time is not needed.
Note:
This Block doesn't truly output data in real-time as it needs to wait for
data from all the upstream Blocks before performing the interpolation. It
should only be used with care as an input to a decision-making Block. This
is especially true when the upstream Blocks have very different sampling
rates.
.. versionadded:: 1.4.0
.. versionchanged:: 2.0.0 renamed from *Multiplex* to *Multiplexer*
"""
[docs]
def __init__(self,
time_label: str = 't(s)',
out_labels: Optional[Union[str, Iterable[str]]] = None,
interp_freq: float = 200,
freq: Optional[float] = 50,
display_freq: bool = False,
debug: Optional[bool] = False) -> None:
"""Sets the arguments and initializes the parent class.
Args:
time_label: The label carrying the time information.
.. versionchanged:: 1.5.10 renamed from *key* to *time_label*
out_labels: An iterable (like a :obj:`list` or a :obj:`tuple`) containing
the labels to multiplex, except for the time label that is given
separately in the ``time_label`` argument. The Block also doesn't
output anything until data has been received on all these labels. If
left to :obj:`None`, all the received labels are considered. **It is
recommended to always set this argument !** It is also possible to
give this argument as a single :obj:`str` (i.e. not in an iterable),
although multiplexing a single label is of limited interest.
.. versionadded:: 2.0.0
interp_freq: The target frequency for performing the interpolation. In
the output data, there will be one interpolated data point each
:math:`1 / interp_freq` seconds. Independent of the ``freq`` argument,
but it is no use setting ``freq`` higher than ``interp_freq`` otherwise
there will be void loops.
.. versionadded:: 2.0.0
freq: The target looping frequency for the Block. If :obj:`None`, loops
as fast as possible.
display_freq: If :obj:`True`, displays the looping frequency of the
Block.
.. versionadded:: 1.5.9
.. versionchanged:: 2.0.0 renamed from *verbose* to *display_freq*
debug: If :obj:`True`, displays all the log messages including the
:obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log
messages with :obj:`~logging.INFO` level or higher. If :obj:`None`,
disables logging for this Block.
.. versionadded:: 2.0.0
"""
super().__init__()
self.freq = freq
self.display_freq = display_freq
self.debug = debug
# Initializing the attributes
self._time_label = time_label
self._interp_freq = interp_freq
self._data: Dict[str, np.ndarray] = defaultdict(self._default_array)
self._delta: float = 1 / self._interp_freq / 20
self._last_max_t: float = -float('inf')
# Forcing the out_labels into a list
if out_labels is not None and isinstance(out_labels, str):
self._out_labels = [out_labels]
elif out_labels is not None:
self._out_labels = list(out_labels)
else:
self._out_labels = None
[docs]
def loop(self) -> None:
"""Receives data, interpolates it, and sends it to the downstream
Blocks."""
# Receiving all the upcoming data
data = self.recv_all_data_raw()
# Iterating over all the links
for link_data in data:
# Only data associated with a time label can be multiplexed
if self._time_label not in link_data:
continue
# Extracting the time information from the data
timestamps = link_data.pop(self._time_label)
# Adding data from each label in the buffer
for label, values in link_data.items():
# Only the labels specified in out_labels is considered
if self._out_labels is not None and label not in self._out_labels:
continue
# Adding the received values to the buffered ones
self._data[label] = np.concatenate((self._data[label],
np.array((timestamps, values))),
axis=1)
# Sorting the buffered data, if a same label comes from multiple Links
self._data[label] = self._data[label][
:, self._data[label][0].argsort()]
# Aborting if there's no data to process
if not self._data:
self.log(logging.DEBUG, "No data in the buffer to process")
return
# Making sure there's data for all the requested labels
if (self._out_labels is not None and
any(label not in self._data for label in self._out_labels)):
self.log(logging.DEBUG, "Not all the requested labels received yet")
return
# There should also be at least two values for each label
if any(len(self._data[label][0]) < 2 for label in self._data):
self.log(logging.DEBUG, "Not at least 2 values for each label in buffer")
return
# The two values should also be separated by at least one time period
if any(np.ptp(self._data[label][0]) < 1 / self._interp_freq
for label in self._data):
self.log(logging.DEBUG, "At least one label has values too close "
"together compared to interpolation frequency")
return
# Getting the minimum time for the interpolation (maximin over all labels)
min_t = max(data[0, 0] for data in self._data.values())
# The minimum must be higher than the previous maximum
min_t = max(min_t, self._last_max_t + self._delta)
# Correcting to the closest upper multiple of the time interval
min_t = min_t + (1 / self._interp_freq) - min_t % (1 / self._interp_freq)
# Getting the maximum time for the interpolation (minimax over all labels)
max_t = min(data[0, -1] for data in self._data.values())
# Correcting to the closest lower multiple of the time interval
max_t = max_t - (1 / self._interp_freq) + max_t % (1 / self._interp_freq)
if max_t < min_t:
self.log(logging.DEBUG, "Ranges not matching for interpolation")
return
# The array containing the timestamps for interpolating
interp_times = np.arange(min_t, max_t + self._delta, 1 / self._interp_freq)
# Making sure there are points to interpolate
if not np.any(interp_times):
self.log(logging.DEBUG, "No time points for interpolation")
return
to_send = dict()
self._last_max_t = max_t
# Building the dict of values to send
for label, values in self._data.items():
to_send[label] = list(np.interp(interp_times, values[0], values[1]))
# Keeping the last data point before max_t to pass this information on
last = values[:, values[0] <= max_t][:, -1]
# Removing the used values from the buffer, except the last data point
self._data[label] = np.column_stack((last, values[:, values[0] > max_t]))
if to_send:
# Adding the time values to the dict of values to send
to_send[self._time_label] = list(interp_times)
# Sending the values
for i, _ in enumerate(interp_times):
self.send({label: values[i] for label, values in to_send.items()})
@staticmethod
def _default_array() -> np.ndarray:
"""Helper function for the default dict."""
return np.array(([], []))