Source code for crappy.blocks.generator

# coding: utf-8

from time import time, sleep
from typing import Dict, Any, Optional, Iterator, Iterable
from itertools import cycle
from copy import deepcopy
import logging

from .meta_block import Block
from .generator_path.meta_path import paths_dict, Path
from .._global import GeneratorStop


class GeneratorNoStop(Exception):
  """A custom exception for handling the case when the Generator should not
  raise a :exc:`CrappyStop` exception when it terminates."""


[docs] class Generator(Block): """This Block generates a signal following a user-defined assembly of :class:`~crappy.blocks.generator_path.meta_path.Path`. The generated signal is just a waveform that can serve any purpose. It can for example be used for driving a :class:`~crappy.blocks.Machine` Block, or for triggering a :class:`~crappy.blocks.Camera` Block. One Generator Block can only generate one signal. Use multiple Blocks if several signals are needed. Note that the default behavior of a Generator is to stop the entire script when it reaches the end of all the Paths. This Block can also accept inputs from other Blocks, as these inputs may be used by a :class:`~crappy.blocks.generator_path.meta_path.Path`. The most common use of this feature is to have the stop condition of a Path depend on the received values of a label. .. versionadded:: 1.4.0 """
[docs] def __init__(self, path: Iterable[Dict[str, Any]], freq: Optional[float] = 200, cmd_label: str = 'cmd', path_index_label: str = 'index', repeat: bool = False, spam: bool = False, display_freq: bool = False, end_delay: Optional[float] = 2, safe_start: bool = False, debug: Optional[bool] = False) -> None: """Sets the arguments and initializes the parent class. Args: path: An iterable (like a :obj:`list` or a :obj:`tuple`) of :obj:`dict`, each dict providing the parameters to generate a :class:`~crappy.blocks.generator_path.meta_path.Path`. The Paths are generated in the order in which they are given, and the stop condition of each Path is used for determining when to switch to the next one. The ``'type'`` key of each :obj:`dict` gives the name of the Path to use, and all the other keys correspond to the arguments to give to this Path. Refer to the documentation of the chosen Paths to know which keys to provide. freq: The target looping frequency for the Block. If :obj:`None`, loops as fast as possible. cmd_label: The label of the signal sent to the downstream Blocks. path_index_label: In addition to the ``cmd_label``, this label holds the index of the current :class:`~crappy.blocks.generator_path.meta_path.Path`. Useful to trigger a Block when the current Path changes, as the output value might not necessarily change. repeat: If :obj:`True`, the ``path`` will loop forever instead of stopping when it reaches the last Path. spam: If :obj:`True`, the signal value will be sent on each loop. Else, it will only be sent if it is different from the previous or if the Block switched to the next Path. display_freq: if :obj:`True`, displays the looping frequency of the Block. .. versionchanged:: 2.0.0 renamed from *verbose* to *display_freq* end_delay: When all the Paths are exhausted, waits this many seconds before stopping the entire script. Can be set to :obj:`None`, in which case the Generator won't stop the program when finishing. safe_start: Ensures the first Path waits for at least one data point from upstream Blocks before sending the first value of the signal. Otherwise, the first value might be sent without checking the associated condition if its depends on labels from other Blocks. .. versionadded:: 1.5.10 debug: If :obj:`True`, displays all the log messages including the :obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log messages with :obj:`~logging.INFO` level or higher. If :obj:`None`, disables logging for this Block. .. versionadded:: 2.0.0 """ super().__init__() # Instantiating a few attributes based on the arguments self.niceness = -5 self.freq = freq self.display_freq = display_freq self.labels = ['t(s)', cmd_label, path_index_label] self.debug = debug self._end_delay = end_delay self._spam = spam self._safe_start = safe_start self._safe_started = False # The path is an iterable object path = list(path) self._path = cycle(path) if repeat else iter(path) # More attributes self._ended_no_raise = False self._last_cmd = None self._last_id = None self._last_t = None self._current_path = None self._path_id = None # Checking the validity of the path self._check_path_validity(iter(deepcopy(iter(path))))
[docs] def begin(self) -> None: """Initializes the first :class:`~crappy.blocks.generator_path.meta_path.Path`.""" self._update_path()
[docs] def loop(self) -> None: """First reads data from upstream Blocks, then gets the next command to send, and finally sends it to downstream Blocks. It also manages the transitions between the :class:`~crappy.blocks.generator_path.meta_path.Path`. """ # Case when the Generator shouldn't raise CrappyStop after it ended if self._ended_no_raise: return # If self start requested, do nothing until the first values are received if self._safe_start and not self._safe_started: if self.data_available(): self._safe_started = True self.log(logging.INFO, "First data received, starting safely") else: self.log(logging.DEBUG, "Waiting for first data to arrive for " "starting safely") return # Getting the data from upstream blocks data = self.recv_all_data() try: # Getting the next command to send self._last_t = time() cmd = self._current_path.get_cmd(data) self.log(logging.DEBUG, f"Returned command: {cmd}") except StopIteration: try: # Switching to the next path if we reached the end of one self._update_path() self.loop() return except GeneratorNoStop: self.log(logging.WARNING, f"Generator path exhausted, staying idle " f"until the script ends") # Case when the Generator shouldn't raise CrappyStop after it ended self._ended_no_raise = True return # Not sending if no command was output by the get_cmd method if cmd is None: return # The command is sent if it's different from the previous, or if we # switched to the next path, or if spam is True if cmd != self._last_cmd or self._last_id != self._path_id or self._spam: self._last_cmd = cmd self._last_id = self._path_id # Actually sending the command self.send([self._last_t - self.t0, cmd, self._path_id])
def _update_path(self) -> None: """Gets the next Path from the list of Paths and instantiates it. Also manages the case when the last Path of the list was reached. """ try: # Getting the next path from the list of paths next_path_dict = deepcopy(next(self._path)) # Updating the path index if self._path_id is not None: self._path_id += 1 else: self._path_id = 0 # Raised when the list of paths is exhausted except StopIteration: # First option, stopping the program after a delay if self._end_delay is not None: sleep(self._end_delay) raise GeneratorStop # Second option, not stopping the program and looping forever else: raise GeneratorNoStop self.log(logging.INFO, f"Next generator path (id: {self._path_id}): " f"{next_path_dict['type']}") # Instantiating the next generator path object path_name = next_path_dict.pop('type') self._check_path_exists(path_name) path_type = paths_dict[path_name] Path.t0 = self._last_t if self._last_t is not None else self.t0 Path.last_cmd = self._last_cmd self._current_path = path_type(**next_path_dict) def _check_path_validity(self, path: Iterator[Dict[str, Any]]) -> None: """Simply instantiates all the Paths in a row to check no error is raised.""" for i, next_dict in enumerate(path): next_dict = deepcopy(next_dict) path_name = next_dict.pop('type') self._check_path_exists(path_name) Path.t0 = 0 Path.last_cmd = None if i == 0 else 0 path_type = paths_dict[path_name] path_type(**next_dict) @staticmethod def _check_path_exists(name: str) -> None: """Checks that the provided Generator Path is a valid one, and raises an error if not.""" if name not in paths_dict: possible = ', '.join(sorted(paths_dict.keys())) raise ValueError(f"Unknown Generator path type : {name} ! " f"The possible types are : {possible}")