Source code for crappy.blocks.pid

# coding: utf-8

from time import time
from typing import Optional, Tuple
import logging

from .meta_block import Block


[docs] class PID(Block): """This Block implements a basic PID corrector. It takes a target value and a measured value as an input, and outputs a command value for reaching the target on the controlled system. The command is updated at each loop. The given `P`, `I` and `D` gains can be adjusted at runtime, by sending the new desired values. It is also possible to set boundaries to the output command, or to the integral corrector only. This Block is generally used in combination with a :class:`~crappy.blocks.Generator` Block for generating the target value, and a :class:`~crappy.blocks.Machine` or :class:`~crappy.blocks.IOBlock` Block driving a physical system. This latter Block takes the command as input, and returns to the PID the measured value to be compared to the target. .. versionadded:: 1.4.0 """
[docs] def __init__(self, kp: float, ki: float = 0, kd: float = 0, out_max: float = float('inf'), out_min: float = -float('inf'), setpoint_label: str = 'cmd', input_label: str = 'V', time_label: str = 't(s)', kp_label: str = 'kp', ki_label: str = 'ki', kd_label: str = 'kd', labels: Optional[Tuple[str, str]] = None, reverse: bool = False, i_limit: Tuple[Optional[float], Optional[float]] = (None, None), send_terms: bool = False, freq: Optional[float] = 500, display_freq: bool = False, debug: Optional[bool] = False) -> None: """Sets the arguments and initializes the parent class. Args: kp: The initial `P` gain. It can be tuned while running by sending the new value over the given ``kp_label``. No matter if a positive or a negative value is given, the definitive sign will be set by the ``reverse`` argument. ki: The initial `I` gain. It can be tuned while running by sending the new value over the given ``ki_label``. No matter if a positive or a negative value is given, the definitive sign will be set by the ``reverse`` argument. kd: The initial `D` gain. It can be tuned while running by sending the new value over the given ``kd_label``. No matter if a positive or a negative value is given, the definitive sign will be set by the ``reverse`` argument. out_max: Ensures the output is always inferior to this value. out_min: Ensures the output is always superior to this value. setpoint_label: The label carrying the setpoint value. .. versionchanged:: 2.0.0 renamed from *target_label* to *setpoint_label* input_label: The label carrying the reading of the actual value, to be compared with the setpoint. time_label: The label carrying the time information in the incoming Links. kp_label: The label to use for changing the `P` gain on the fly. If a value is received over this label, it will overwrite the one given in the ``kp`` argument. .. versionadded:: 2.0.0 ki_label: The label to use for changing the `I` gain on the fly. If a value is received over this label, it will overwrite the one given in the ``ki`` argument. .. versionadded:: 2.0.0 kd_label: The label to use for changing the `D` gain on the fly. If a value is received over this label, it will overwrite the one given in the ``kd`` argument. .. versionadded:: 2.0.0 labels: The two labels that will be sent to downstream Blocks. The first one is the time label, the second one is the output of the PID. If this argument is not given, they default to ``'t(s)'`` and ``'pid'``. reverse: If :obj:`True`, reverses the action of the PID. i_limit: A :obj:`tuple` containing respectively the lower and upper boundaries for the `I` term. send_terms: If :obj:`True`, returns the weight of each term in the output value. It adds ``'p_term', 'i_term', 'd_term'`` to the output labels. This is particularly useful to tweak the gains. freq: The target looping frequency for the Block. If :obj:`None`, loops as fast as possible. display_freq: If :obj:`True`, displays the looping frequency of the Block. .. versionadded:: 1.5.10 .. versionchanged:: 2.0.0 renamed from *verbose* to *display_freq* debug: If :obj:`True`, displays all the log messages including the :obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log messages with :obj:`~logging.INFO` level or higher. If :obj:`None`, disables logging for this Block. .. versionadded:: 2.0.0 """ # Attributes of the parent class super().__init__() self.niceness = -10 self.freq = freq self.display_freq = display_freq self.debug = debug self.labels = ['t(s)', 'pid'] if labels is None else list(labels) if send_terms: self.labels.extend(['p_term', 'i_term', 'd_term']) # Setting the gains sign = -1 if reverse else 1 self._kp = sign * abs(kp) self._ki = sign * abs(ki) self._kd = sign * abs(kd) # Setting the limits self._out_max = out_max self._out_min = out_min self._i_min, self._i_max = i_limit # Setting the labels self._target_label = setpoint_label self._input_label = input_label self._time_label = time_label self._kp_label = kp_label self._ki_label = ki_label self._kd_label = kd_label self._send_terms = send_terms self._reverse = reverse # Setting the variables self._setpoint: Optional[float] = None self._last_input: Optional[float] = None self._prev_t: float = 0. self._i_term: float = 0.
[docs] def loop(self) -> None: """Receives the latest target and input values, calculates the P, I and D terms and sends the output to the downstream Blocks.""" # Looping in a non-blocking way data = self.recv_last_data(fill_missing=False) # Updating the gains if provided if self._kp_label in data: kp = data[self._kp_label] self._kp = -abs(kp) if self._reverse else kp if self._ki_label in data: ki = data[self._ki_label] self._ki = -abs(ki) if self._reverse else ki if self._kd_label in data: kd = data[self._kd_label] self._kd = -abs(kd) if self._reverse else kd # Updating the target value if provided if self._target_label in data: self._setpoint = data[self._target_label] self.log(logging.DEBUG, f"Updated target value to {self._setpoint}") # Checking if a new input was received if self._time_label in data and self._input_label in data: input_ = data[self._input_label] t = data[self._time_label] self.log(logging.DEBUG, f"Updated input value to {input_} at time {t}") # For the first loops, setting the target to the first input by default if self._setpoint is None: self._setpoint = input_ # For the first loops, initializing the input history if self._last_input is None: self._last_input = input_ # No new input was received else: return delta_t = t - self._prev_t error = self._setpoint - input_ d_input = input_ - self._last_input # Calculating the three PID terms p_term = self._kp * error self._i_term += self._ki * error * delta_t d_term = - self._kd * d_input / delta_t if delta_t > 0 else 0 self._prev_t = t self._last_input = input_ # Clamping the i term if required if self._i_min is not None: self._i_term = max(self._i_min, self._i_term) if self._i_max is not None: self._i_term = min(self._i_max, self._i_term) # Clamping the output if required out = p_term + self._i_term + d_term out = min(self._out_max, max(self._out_min, out)) # Sending the values to the downstream blocks if self._send_terms: self.send([time() - self.t0, out, p_term, self._i_term, d_term]) else: self.send([time() - self.t0, out])