Source code for crappy.blocks.generator_path.cyclic_ramp

# coding: utf-8

from time import time
from typing import Union, Dict, Optional
from itertools import cycle, islice
import logging

from .meta_path import Path, ConditionType


[docs] class CyclicRamp(Path): """This Pth cyclically alternates between two ramps with different slopes, based on two different conditions. It is equivalent to a succession of :class:`~crappy.blocks.generator_path.Ramp` Paths. .. versionadded:: 1.4.0 .. versionchanged:: 2.0.0 renamed from *Cyclic_ramp* to *CyclicRamp* """
[docs] def __init__(self, condition1: Union[str, ConditionType], condition2: Union[str, ConditionType], speed1: float, speed2: float, cycles: float = 1, init_value: Optional[float] = None) -> None: """Sets the arguments and initializes the parent class. The path always starts with ``speed1``, and then switches to ``speed2``. Args: condition1: The condition for switching to ``speed2``. Refer to :class:`~crappy.blocks.generator_path.meta_path.Path` for more information. condition2: The condition for switching to ``speed1``. Refer to :class:`~crappy.blocks.generator_path.meta_path.Path` for more information. speed1: Slope of the first generated ramp, in `units/s`. speed2: Slope of the second generated ramp, in `units/s`. cycles: Number of cycles. Half cycles are accepted. If `0`, loops forever. init_value: If given, overwrites the last value of the signal as the starting point for the first ramp. In the specific case when this Path is the first one in the Generator Paths, this argument must be given ! .. versionadded:: 1.5.10 Note: :: [{'type': 'CyclicRamp', 'speed1': 5, 'condition1': 'AIN0>2', 'speed2': -2, 'condition2': 'AIN1<1', 'cycles': 5}] is equivalent to :: [{'type': 'Ramp', 'speed': 5,'condition': 'AIN0>2'}, {'type': 'Ramp', 'value': -2, 'condition': 'AIN1<1'}] * 5 .. versionchanged:: 1.5.10 renamed *time* argument to *_last_time* .. versionchanged:: 1.5.10 renamed *cmd* argument to *_last_cmd* .. versionremoved:: 1.5.10 *verbose* argument .. versionremoved:: 2.0.0 *_last_time* and *_last_cmd* arguments """ super().__init__() if init_value is None and self.last_cmd is None: raise ValueError('For the first path, an init_value must be given !') # Creates an interator object with a given length if cycles > 0: cycles = int(2 * cycles) self._speeds = islice(cycle((speed1, speed2)), cycles) self._conditions = islice(cycle((self.parse_condition(condition1), self.parse_condition(condition2))), cycles) # Creates an endless iterator object else: self._speeds = cycle((speed1, speed2)) self._conditions = cycle((self.parse_condition(condition1), self.parse_condition(condition2))) # The current condition object and value self._condition = None self._speed = None # The last extreme command sent self._last_peak_cmd = self.last_cmd if init_value is None else init_value
[docs] def get_cmd(self, data: Dict[str, list]) -> float: """Returns the current value of the signal and raises :exc:`StopIteration` when the cycles are exhausted. Also manages the switch between the speeds and conditions 1 and 2. """ # During the first loop, getting the first condition and speed if self._speed is None and self._condition is None: try: self._speed = next(self._speeds) self._condition = next(self._conditions) self.log(logging.DEBUG, f"Got value {self._speed} and condition " f"{self._condition}") except StopIteration: self.log(logging.DEBUG, "Stop condition met, switching to next path") raise # During other loops, getting the next condition and speed if the current # condition is met if self._condition(data): self.log(logging.DEBUG, "Ended phase of the cycle, switching to the " "next phase") t = time() self._last_peak_cmd += self._speed * (t - self.t0) self.t0 = t try: self._speed = next(self._speeds) self._condition = next(self._conditions) self.log(logging.DEBUG, f"Got value {self._speed} and condition " f"{self._condition}") except StopIteration: self.log(logging.DEBUG, "Stop condition met, switching to next path") raise # Finally, returning the current value return self._last_peak_cmd + self._speed * (time() - self.t0)