Source code for crappy.blocks.synchronizer

# coding: utf-8

import numpy as np
from typing import Optional, Union, Iterable, Dict
from collections import defaultdict
import logging

from .meta_block import Block


[docs] class Synchronizer(Block): """This Block takes data from upstream Blocks as input and interpolates it to output all the labels on the same timestamps as a reference label. This Block is very similar to the :class:`~crappy.blocks.Multiplexer` Block, but the `Multiplexer` interpolates data in a time base independent of the labels whereas this one takes one label as a reference. It can take any number of inputs, provided that they all share a common time label. It is also possible to choose which labels are considered for interpolation and which are dropped. The interpolation is performed using the :obj:`numpy.interp` method. This Block is useful for synchronizing data acquired from different sensors, in the context when one label should be treated as a reference. This is for example the case when synchronizing signals with the output of an image processing, to be able to compare all the signals in the time base of the image acquisition. .. versionadded:: 2.0.5 """
[docs] def __init__(self, reference_label: str, time_label: str = 't(s)', labels_to_sync: Optional[Union[str, Iterable[str]]] = None, freq: Optional[float] = 50, display_freq: bool = False, debug: Optional[bool] = False) -> None: """Sets the arguments and initializes the parent class. Args: reference_label: The label whose timestamps will be taken as a time base for performing the interpolation. time_label: The label carrying the time information. Should be common to all the input Blocks. labels_to_sync: An iterable (like a :obj:`list` or a :obj:`tuple`) containing the labels to interpolate on the reference label's time base, except for the time label that is given separately in the ``time_label`` argument. The Block also doesn't output anything until data has been received on all these labels. If left to :obj:`None`, all the received labels are considered. **It is recommended to always set this argument !** It is also possible to give this argument as a single :obj:`str` (i.e. not in an iterable). freq: The target looping frequency for the Block. If :obj:`None`, loops as fast as possible. display_freq: If :obj:`True`, displays the looping frequency of the Block. debug: If :obj:`True`, displays all the log messages including the :obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log messages with :obj:`~logging.INFO` level or higher. If :obj:`None`, disables logging for this Block. """ super().__init__() self.freq = freq self.display_freq = display_freq self.debug = debug # Initializing the attributes self._ref_label = reference_label self._time_label = time_label self._data: Dict[str, np.ndarray] = defaultdict(self._default_array) # Forcing the labels_to_sync into a list if labels_to_sync is not None and isinstance(labels_to_sync, str): self._to_sync = [labels_to_sync] elif labels_to_sync is not None: self._to_sync = list(labels_to_sync) else: self._to_sync = None
[docs] def loop(self) -> None: """Receives data, interpolates it, and sends it to the downstream Blocks.""" # Receiving all the upcoming data data = self.recv_all_data_raw() # Iterating over all the links for link_data in data: # Only data associated with a time label can be synchronized if self._time_label not in link_data: continue # Extracting the time information from the data timestamps = link_data.pop(self._time_label) # Adding data from each label in the buffer for label, values in link_data.items(): # Only the labels specified in out_labels is considered if (self._to_sync is not None and label not in self._to_sync and label != self._ref_label): continue # Adding the received values to the buffered ones self._data[label] = np.concatenate((self._data[label], np.array((timestamps, values))), axis=1) # Sorting the buffered data, if a same label comes from multiple Links self._data[label] = self._data[label][ :, self._data[label][0].argsort()] # Aborting if there's no data to process if not self._data: self.log(logging.DEBUG, "No data in the buffer to process") return # Aborting if there's no data for the reference label if self._ref_label not in self._data: self.log(logging.DEBUG, "No value for the reference label found in " "the buffer") return # Making sure there's data for all the requested labels if (self._to_sync is not None and any(label not in self._data for label in self._to_sync)): self.log(logging.DEBUG, "Not all the requested labels received yet") return # There should also be at least two values for each label if any(len(self._data[label][0]) < 2 for label in self._data): self.log(logging.DEBUG, "Not at least 2 values for each label in buffer") return # Getting the minimum time for the interpolation (maximin over all labels) min_t = max(data[0, 0] for data in self._data.values()) # Getting the maximum time for the interpolation (minimax over all labels) max_t = min(data[0, -1] for data in self._data.values()) # Checking if there's a valid time range for interpolation if max_t < min_t: self.log(logging.DEBUG, "Ranges not matching for interpolation") return # The array containing the timestamps for interpolating interp_times = self._data[self._ref_label][0, (self._data[self._ref_label][0] >= min_t) & (self._data[self._ref_label][0] <= max_t)] # Checking if there are values for the target label in the valid time range if not np.any(interp_times): self.log(logging.DEBUG, "No value of the target label found between the minimum and " "maximum possible interpolation times") return to_send = dict() # Building the dict of values to send for label, values in self._data.items(): # Keeping the values of the reference label as they are if label == self._ref_label: to_send[label] = values[1, :] # For all the other labels, performing interpolation else: to_send[label] = list(np.interp(interp_times, values[0], values[1])) # Keeping the last data point before max_t to pass this information on last = values[:, values[0] <= max_t][:, -1] # Removing the used values from the buffer, except the last data point self._data[label] = np.column_stack((last, values[:, values[0] > max_t])) if to_send: # Adding the time values to the dict of values to send to_send[self._time_label] = list(interp_times) # Sending the values for i, _ in enumerate(interp_times): self.send({label: values[i] for label, values in to_send.items()})
@staticmethod def _default_array() -> np.ndarray: """Helper function for the default dict.""" return np.array(([], []))