# coding: utf-8
import numpy as np
from typing import Dict, Any, Union, Iterable
import logging
from .meta_modifier import Modifier
[docs]
class Demux(Modifier):
"""Modifier converting a stream into a regular data flow interpretable by
most Blocks.
It is meant to be used on a :class:`~crappy.links.Link` taking an
:class:`~crappy.blocks.IOBlock` in streamer mode as an input. It converts
the stream to make it readable by most Blocks, and also splits the stream in
several labels if necessary.
It takes a stream as an input, i.e. a :obj:`dict` whose values are
:obj:`numpy.array`, and outputs another :obj:`dict` whose values are
:obj:`float`. If the numpy arrays contains several columns (corresponding to
several acquired channels), it splits them into several labels.
Important:
In the process of converting the stream data to regular labeled data, much
information is lost ! This Modifier is intended to format the stream data
for low-frequency plotting, or low-frequency decision-making. To save all
the stream data, use the :class:`~crappy.blocks.HDFRecorder` Block.
.. versionadded:: 1.4.0
"""
[docs]
def __init__(self,
labels: Union[str, Iterable[str]],
stream_label: str = "stream",
mean: bool = False,
time_label: str = "t(s)",
transpose: bool = False) -> None:
"""Sets the args and initializes the parent class.
Args:
labels: The labels corresponding to the rows or columns of the stream.
It can be either a single label, or an iterable of labels (like a
:obj:`list` or a :obj:`tuple`). They must be given in the same order as
they appear in the stream. If fewer labels are given than there are
rows or columns in the stream, only the data from the first rows or
columns will be retrieved.
stream_label: The label carrying the stream.
.. versionchanged:: 1.5.10 renamed from *stream* to *stream_label*
mean: If :obj:`True`, the returned value will be the average of the
row or column. Otherwise, it will be the first value.
time_label: The label carrying the time information.
transpose: If :obj:`True`, each label corresponds to a row in the stream.
Otherwise, a label corresponds to a column in the stream.
"""
super().__init__()
if isinstance(labels, str):
labels = (labels,)
self._labels = labels
self._stream_label = stream_label
self._mean = mean
self._time_label = time_label
self._transpose = transpose
[docs]
def __call__(self, data: Dict[str, np.ndarray]) -> Dict[str, Any]:
"""Retrieves for each label its value in the stream, also gets the
corresponding timestamp, and returns them.
.. versionchanged:: 1.5.10
merge *evaluate_mean* and *evaluate_nomean* methods into *evaluate*
.. versionchanged:: 2.0.0 renamed from *evaluate* to *__call__*
"""
self.log(logging.DEBUG, f"Received {data}")
# If there are no rows or no column, cannot perform the demux
if 0 in data[self._stream_label].shape:
return data
# Getting either the average or the first value for each label
for i, label in enumerate(self._labels):
# The data of a given label is on a same row
if self._transpose:
if self._mean:
data[label] = np.mean(data[self._stream_label][i, :])
else:
data[label] = data[self._stream_label][i, 0]
# The data of a given label is on a same column
else:
if self._mean:
data[label] = np.mean(data[self._stream_label][:, i])
else:
data[label] = data[self._stream_label][0, i]
# Discarding the raw data
del data[self._stream_label]
# Keeping either the average or the first time value
if self._mean:
data[self._time_label] = np.mean(data[self._time_label])
else:
data[self._time_label] = np.squeeze(data[self._time_label])[0]
self.log(logging.DEBUG, f"Sending {data}")
return data