Source code for crappy.blocks.pause

# coding: utf-8

from collections.abc import Sequence, Callable
import logging
from re import split
from time import time

from .meta_block import Block


[docs] class Pause(Block): """This Block parses the data it receives and checks if this data meets the given pause criteria. If so, the other Blocks are paused until the criteria are no longer met. When paused, the other Blocks are still looping but no longer executing any code. This feature is mostly useful when human intervention on a test setup is required, to ensure that nothing happens during that time. It is possible to prevent a Block from being affected by a pause by setting its ``pausable`` attribute to :obj:`False`. In particular, the Block(s) responsible for outputting the labels checked by the criteria should keep running, otherwise the test will be put on hold forever. Important: This Block prevents other Blocks from running normally, but no specific mechanism for putting hardware in an idle state is implemented. For example, a motor driven by an :class:`~crappy.blocks.Machine` Block might keep moving according to the last command it received before the Blocks were paused. It is up to the user to put hardware in the desired state before starting a pause. Warning: Using this Block is potentially dangerous, as it leaves hardware unsupervised with no software control on it. It is advised to always include hardware securities on your setup. .. versionadded:: 2.0.7 """
[docs] def __init__(self, criteria: str | Callable | Sequence[str | Callable], freq: float | None = 50, display_freq: bool = False, debug: bool | None = False) -> None: """Sets the arguments and initializes the parent class. Args: criteria: A :obj:`str`, a :obj:`~collections.abc.Callable`, or an :obj:`~collections.abc.Iterable` (like a :obj:`tuple` or a :obj:`list`) containing such objects. Each :obj:`str` or :obj:`~collections.abc.Callable` represents one pause criterion. There is no limit to the given number of stop criteria. If a criterion is given as an :obj:`~collections.abc.Callable`, it should accept as its sole argument the output of the :meth:`crappy.blocks.Block.recv_all_data` method and return :obj:`True` if the criterion is met, and :obj:`False` otherwise. If the criterion is given as a :obj:`str`, it should follow one the following syntaxes : :: '<lab> > <threshold>' '<lab> < <threshold>' With ``<lab>`` and ``<threshold>`` to be replaced respectively with the name of a received label and a threshold value. The spaces in the string are ignored. freq: The target looping frequency for the Block. If :obj:`None`, loops as fast as possible. display_freq: If :obj:`True`, displays the looping frequency of the Block. debug: If :obj:`True`, displays all the log messages including the :obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log messages with :obj:`~logging.INFO` level or higher. If :obj:`None`, disables logging for this Block. """ super().__init__() self.pausable = False self.freq = freq self.display_freq = display_freq self.debug = debug # Handling the case when only one stop condition is given if isinstance(criteria, str) or isinstance(criteria, Callable): criteria = (criteria,) criteria = tuple(criteria) self._raw_crit: tuple[str | Callable[[dict[str, list]], bool], ...] = criteria self._criteria: tuple[Callable[[dict[str, list]], bool]] | None = None
[docs] def prepare(self) -> None: """Converts all the given criteria to :obj:`~collections.abc.Callable`.""" # This operation cannot be performed during __init__ due to limitations of # the spawn start method of multiprocessing self._criteria = tuple(map(self._parse_criterion, self._raw_crit))
[docs] def loop(self) -> None: """Receives data from upstream Blocks, checks if this data meets at least one criterion, and puts the other Blocks in pause if that's the case.""" if not (data := self.recv_all_data()): self.log(logging.DEBUG, "No data received during this loop") return # Pausing only if not paused, and stop criterion is met if (self._criteria and any(crit(data) for crit in self._criteria) and not self._pause_event.is_set()): self.log(logging.WARNING, "Stop criterion reached, pausing the Blocks !") self._pause_event.set() return if (self._criteria and not any(crit(data) for crit in self._criteria) and self._pause_event.is_set()): self.log(logging.WARNING, "Stop criterion no longer satisfied, " "un-pausing the Blocks !") self._pause_event.clear() return self.log(logging.DEBUG, "No pausing or un-pausing during this loop")
def _parse_criterion(self, criterion: str | Callable[[dict[str, list]], bool] ) -> Callable[[dict[str, list]], bool]: """Parses a Callable or string criterion given as an input by the user, and returns the associated Callable.""" # If the criterion is already a callable, returning it if isinstance(criterion, Callable): self.log(logging.DEBUG, "Criterion is a callable") return criterion # Second case, the criterion is a string containing '<' if '<' in criterion: self.log(logging.DEBUG, "Criterion is of type var < thresh") var, thresh = split(r'\s*<\s*', criterion) # Return a function that checks if received data is inferior to threshold def cond(data: dict[str, list]) -> bool: """Criterion checking that the label values are below a given threshold.""" if var in data: return any((val < float(thresh) for val in data[var])) return False return cond # Third case, the criterion is a string containing '>' elif '>' in criterion: self.log(logging.DEBUG, "Criterion is of type var > thresh") var, thresh = split(r'\s*>\s*', criterion) # Special case for a time criterion if var == 't(s)': self.log(logging.DEBUG, "Criterion is about the elapsed time") # Return a function that checks if the given time was reached def cond(_: dict[str, list]) -> bool: """Criterion checking if a given delay is expired.""" return time() - self.t0 > float(thresh) return cond # Regular case else: # Return a function that checks if received data is superior to # threshold def cond(data: dict[str, list]) -> bool: """Criterion checking that the label values are above a given threshold.""" if var in data: return any((val > float(thresh) for val in data[var])) return False return cond # Otherwise, it's an invalid syntax else: raise ValueError("Wrong syntax for the criterion, please refer to the " "documentation")