# coding: utf-8
from typing import Iterable, Optional, Union
from pathlib import Path
import logging
from .meta_block import Block
[docs]
class Recorder(Block):
"""This Block saves data from an upstream Block to a text file, with values
separated by a coma and lines by a newline character.
The first row of the file contains the names of the saved labels. This Block
can only save data coming from exactly one upstream Block. To save data
from multiple Blocks, use several instances of Recorder (recommended) or a
:class:`~crappy.blocks.Multiplexer` Block.
This Block cannot directly record data from "streams", i.e. coming from an
:class:`~crappy.blocks.IOBlock` Block with the ``'streamer'`` argument set to
:obj:`True`. To do so, the :class:`~crappy.blocks.HDFRecorder` Block should
be used instead. Alternatively, a :class:`~crappy.modifier.Demux` Modifier
can be placed between the IOBlock and the Recorder, but most of the acquired
data won't be saved.
.. versionadded:: 1.4.0
"""
[docs]
def __init__(self,
file_name: Union[str, Path],
delay: float = 2,
labels: Optional[Union[str, Iterable[str]]] = None,
freq: Optional[float] = 200,
display_freq: bool = False,
debug: Optional[bool] = False) -> None:
"""Sets the arguments and initializes the parent class.
Args:
file_name: Path to the output file, either relative or absolute. If the
parent folders of the file do not exist, they will be created. If the
file already exists, the actual file where data will be written will be
renamed with a trailing index to avoid overriding it.
.. versionchanged:: 2.0.0 renamed from *filename* to *file_name*
delay: Delay between each write in seconds.
labels: If provided, only the data carried by these labels will be saved.
Otherwise, all the received data is saved.
freq: The target looping frequency for the Block. If :obj:`None`, loops
as fast as possible.
.. versionadded:: 1.5.10
display_freq: If :obj:`True`, displays the looping frequency of the
Block.
.. versionadded:: 1.5.10
.. versionchanged:: 2.0.0 renamed from *verbose* to *display_freq*
debug: If :obj:`True`, displays all the log messages including the
:obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log
messages with :obj:`~logging.INFO` level or higher. If :obj:`None`,
disables logging for this Block.
.. versionadded:: 2.0.0
"""
super().__init__()
self.niceness = -5
self.freq = freq
self.display_freq = display_freq
self.debug = debug
self._delay = delay
self._path = Path(file_name)
# Forcing the labels into a list
if labels is not None and isinstance(labels, str):
self._labels = [labels]
elif labels is not None:
self._labels = list(labels)
else:
self._labels = None
self._file_initialized = False
[docs]
def prepare(self) -> None:
"""Checks that the Block has the right number of inputs, creates the
folder containing the file if it doesn't already exist, and changes the
name of the file if it already exists."""
# Making sure there's the right number of incoming links
if not self.inputs:
raise ValueError('The Recorder block does not have inputs !')
elif len(self.inputs) > 1:
raise ValueError('Cannot link more than one block to a Recorder block !')
parent_folder = self._path.parent
# Creating the folder for storing the data if it does not already exist
if not Path.is_dir(parent_folder):
self.log(logging.INFO, f"Creating the folder containing the file to save"
f" data to ({parent_folder})")
Path.mkdir(parent_folder, exist_ok=True, parents=True)
# Changing the name of the file if it already exists
if Path.exists(self._path):
self.log(logging.WARNING, f"The file {self._path} already exists !")
stem, suffix = self._path.stem, self._path.suffix
i = 1
# Adding an integer at the end of the name to identify the file
while Path.exists(parent_folder / f'{stem}_{i:05d}{suffix}'):
i += 1
self._path = parent_folder / f'{stem}_{i:05d}{suffix}'
self.log(logging.WARNING, f"Writing data to the file {self._path} "
f"instead !")
[docs]
def loop(self) -> None:
"""Receives data from the upstream Block and saves it."""
if not self._file_initialized:
if self.data_available():
data = self.recv_all_data(delay=self._delay)
# If no labels are given, save everything that's received
if self._labels is None:
self._labels = list(data.keys())
# The first row of the file contains the names of the labels
with open(self._path, 'w') as file:
self.log(logging.INFO, f"Writing the header on file {self._path}")
file.write(f"{','.join(self._labels)}\n")
self._file_initialized = True
else:
return
else:
data = self.recv_all_data(delay=self._delay)
# Keeping only the data that needs to be saved
data = {key: val for key, val in data.items() if key in self._labels}
if data:
with open(self._path, 'a') as file:
# Sorting the lists of values in the same order as the labels
sorted_data = [data[label] for label in self._labels]
# Actually writing the values
self.log(logging.DEBUG, f"Writing {sorted_data} to the file "
f"{self._path}")
for values in zip(*sorted_data):
file.write(f"{','.join(map(str, values))}\n")