Source code for crappy.blocks.hdf_recorder

# coding: utf-8

import numpy as np
from typing import Union, Optional
from pathlib import Path
import logging

from .._global import OptionalModule
from .meta_block import Block

try:
  import tables
except ModuleNotFoundError:
  tables = OptionalModule("tables", "HDFRecorder needs the tables module to "
                          "write hdf files.")


[docs] class HDFRecorder(Block): """This Block records data efficiently into a HDF5 file. It expects data as :obj:`numpy.array` from exactly one upstream Block, that should be an :class:`~crappy.blocks.IOBlock` in `streamer` mode. It then saves this data in a HDF5 file using the :mod:`tables` module. This Block is intended for high-speed data recording from :class:`~crappy.inout.InOut` in `streamer` mode. For regular data recording, the :class:`~crappy.blocks.Recorder` Block should be used instead. Warning: Corrupted HDF5 files are not readable at all ! If anything goes wrong during a test, especially during the finish phase, it is not guaranteed that the recorded data will be readable. .. versionadded:: 1.4.0 .. versionchanged:: 2.0.0 renamed from *Hdf_recorder* to *HDFRecorder* """
[docs] def __init__(self, filename: Union[str, Path], node: str = 'table', expected_rows: int = 10**8, atom=None, label: str = 'stream', metadata: Optional[dict] = None, freq: Optional[float] = None, display_freq: bool = False, debug: Optional[bool] = False) -> None: """Sets the arguments and initializes the parent class. Args: filename: Path to the output file, either relative or absolute. If the parent folders of the file do not exist, they will be created. If the file already exists, the actual file where data will be written will be renamed with a trailing index to avoid overriding it. node: The name of the array to create in the HDF5 file, as a :obj:`str`. expected_rows: The number of expected rows in the file. It is used to optimize the dumping. atom: This represents the type of data to be stored in the table. It can be given as a :obj:`tables.Atom` instance, as a :obj:`numpy.array` or as a :obj:`str`. label: The label carrying the data to be saved metadata: A :obj:`dict` containing additional information to save in the `HDF5` file. freq: The target looping frequency for the Block. If :obj:`None`, loops as fast as possible. .. versionadded:: 1.5.10 display_freq: if :obj:`True`, displays the looping frequency of the Block. .. versionadded:: 1.5.10 .. versionchanged:: 2.0.0 renamed from *verbose* to *display_freq* debug: If :obj:`True`, displays all the log messages including the :obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log messages with :obj:`~logging.INFO` level or higher. If :obj:`None`, disables logging for this Block. .. versionadded:: 2.0.0 """ self._hfile = None super().__init__() self.freq = freq self.display_freq = display_freq self.debug = debug self._path = Path(filename) self._label = label self._metadata = {} if metadata is None else metadata self._expected_rows = expected_rows self._node = node atom = tables.Int16Atom() if atom is None else atom if not isinstance(atom, tables.Atom): self._atom = tables.Atom.from_dtype(np.dtype(atom)) else: self._atom = atom self._array_initialized = False
[docs] def prepare(self) -> None: """Checks that the Block has the right number of inputs, creates the folder containing the file if it doesn't already exist, changes the name of the file if it already exists, and initializes the HDF file.""" # Making sure there's the right number of incoming links if not self.inputs: raise ValueError('The HDF Recorder block does not have inputs !') elif len(self.inputs) > 1: raise ValueError('Cannot link more than one block to an HDF Recorder ' 'block !') parent_folder = self._path.parent # Creating the folder for storing the data if it does not already exist if not Path.is_dir(parent_folder): self.log(logging.INFO, f"Creating the folder containing the file to save" f" data to ({parent_folder})") Path.mkdir(parent_folder, exist_ok=True, parents=True) # Changing the name of the file if it already exists if Path.exists(self._path): self.log(logging.WARNING, f"The file {self._path} already exists !") stem, suffix = self._path.stem, self._path.suffix i = 1 # Adding an integer at the end of the name to identify the file while Path.exists(parent_folder / f'{stem}_{i:05d}{suffix}'): i += 1 self._path = parent_folder / f'{stem}_{i:05d}{suffix}' self.log(logging.WARNING, f"Writing data to the file {self._path} " f"instead !") # Initializing the file to save data to self.log(logging.INFO, "Initializing the HDF5 file") self._hfile = tables.open_file(str(self._path), "w") for name, value in self._metadata.items(): self._hfile.create_array(self._hfile.root, name, value)
[docs] def loop(self) -> None: """Receives data from the upstream Block and saves it. Also creates the array for recording data when the first values are received. """ # Do nothing until the first value to save are received if not self._array_initialized: if self.data_available(): self._first_loop() self._array_initialized = True else: return data = self.recv_all_data() if self._label in data: for elt in data[self._label]: self._array.append(elt)
[docs] def finish(self) -> None: """Closes the HDF file.""" if self._hfile is not None: self.log(logging.INFO, "Closing the HDF5 file") self._hfile.close()
def _first_loop(self) -> None: """Initializes the array for saving data.""" data = self.recv_all_data() if self._label not in data: raise KeyError(f'The data received by the HDF Recorder block does not ' f'contain the label {self._label} !') self.log(logging.INFO, "Initializing the arrays in the HDF5 file") _, width, *_ = data[self._label][0].shape self._array = self._hfile.create_earray(self._hfile.root, self._node, self._atom, (0, width), expectedrows=self._expected_rows) for elt in data[self._label]: self._array.append(elt)