Source code for crappy.blocks.generator_path.cyclic

# coding: utf-8

from time import time
from typing import Union, Dict
from itertools import cycle, islice
import logging

from .meta_path import Path, ConditionType


[docs] class Cyclic(Path): """This Path cyclically alternates between two constant values, based on two different conditions. It can for example be used as a trigger, or used to drive an actuator cyclically. It is equivalent to a succession of :class:`~crappy.blocks.generator_path.Constant` Paths. .. versionadded:: 1.4.0 """
[docs] def __init__(self, condition1: Union[str, ConditionType], condition2: Union[str, ConditionType], value1: float, value2: float, cycles: float = 1) -> None: """Sets the arguments and initializes the parent class. The Path always starts with ``value1``, and then switches to ``value2``. Args: condition1: The condition for switching to ``value2``. Refer to :class:`~crappy.blocks.generator_path.meta_path.Path` for more information. condition2: The condition for switching to ``value1``. Refer to :class:`~crappy.blocks.generator_path.meta_path.Path` for more information. value1: First value to send. value2: Second value to send. cycles: Number of cycles. Half cycles are accepted. If `0`, loops forever. Note: :: [{'type': 'Cyclic', 'value1': 1, 'condition1': 'AIN0>2', 'value2': 0, 'condition2': 'AIN1<1', 'cycles': 5}] is equivalent to :: [{'type': 'Constant', 'value': 1,'condition': 'AIN0>2'}, {'type': 'Constant', 'value': 0, 'condition': 'AIN1<1'}] * 5 .. versionchanged:: 1.5.10 renamed *time* argument to *_last_time* .. versionchanged:: 1.5.10 renamed *cmd* argument to *_last_cmd* .. versionremoved:: 1.5.10 *verbose* argument .. versionremoved:: 2.0.0 *_last_time* and *_last_cmd* arguments """ super().__init__() # Creates an interator object with a given length if cycles > 0: cycles = int(2 * cycles) self._values = islice(cycle((value1, value2)), cycles) self._conditions = islice(cycle((self.parse_condition(condition1), self.parse_condition(condition2))), cycles) # Creates an endless iterator object else: self._values = cycle((value1, value2)) self._conditions = cycle((self.parse_condition(condition1), self.parse_condition(condition2))) # The current condition object and value self._condition = None self._value = None
[docs] def get_cmd(self, data: Dict[str, list]) -> float: """Returns either the first or second value depending on the current state of the cycle. Raises :exc:`StopIteration` when the cycles are exhausted. Also manages the switch between the values and conditions 1 and 2. """ # During the first loop, getting the first condition and value if self._value is None and self._condition is None: try: self._value = next(self._values) self._condition = next(self._conditions) self.log(logging.DEBUG, f"Got value {self._value} and condition " f"{self._condition}") except StopIteration: self.log(logging.DEBUG, "No value or condition, switching to next " "path") raise # During other loops, getting the next condition and value if the current # condition is met if self._condition(data): self.log(logging.DEBUG, "Ended phase of the cycle, switching to the " "next phase") self.t0 = time() try: self._value = next(self._values) self._condition = next(self._conditions) self.log(logging.DEBUG, f"Got value {self._value} and condition " f"{self._condition}") except StopIteration: self.log(logging.DEBUG, "Stop condition met, switching to next path") raise # Finally, returning the current value return self._value