Source code for crappy.blocks.machine

# coding: utf-8

from time import time
from typing import Dict, List, Any, Optional, Iterable
from dataclasses import dataclass, fields
import logging

from .meta_block import Block
from ..actuator import actuator_dict, Actuator, deprecated_actuators
from ..tool.ft232h import USBServer


@dataclass
class ActuatorInstance:
  """This class holds all the information that can be associated to an
  Actuator."""

  actuator: Actuator
  speed: Optional[float] = None
  position_label: Optional[str] = None
  speed_label: Optional[str] = None
  mode: str = 'speed'
  cmd_label: str = 'cmd'
  speed_cmd_label: Optional[str] = None


[docs] class Machine(Block): """This Block is meant to drive one or several :class:`~crappy.actuator.Actuator`. It can set speed or position commands on hardware actuators. The possibility to drive several Actuators from a unique Block is given so that they can be driven in a synchronized way. If synchronization is not needed, it is preferable to drive the Actuators from separate Machine Blocks. This Block takes the speed or position commands for the Actuators as inputs, and can optionally read and output the current speed and/or positions of the Actuators. The speed and position commands are set respectively by calling the :meth:`~crappy.actuator.Actuator.set_position` and :meth:`~crappy.actuator.Actuator.set_speed` methods of the Actuators, and the current speed and position values are acquired by calling the :meth:`~crappy.actuator.Actuator.get_position` and :meth:`~crappy.actuator.Actuator.get_speed` methods of the Actuators. It is possible to tune for each Actuator the label over which it receives its commands, and optionally the labels over which it sends its current speed and/or position. The driving mode (`'speed'` or `'position'`) can also be set independently for each Actuator. .. versionadded:: 1.4.0 """
[docs] def __init__(self, actuators: Iterable[Dict[str, Any]], common: Optional[Dict[str, Any]] = None, time_label: str = 't(s)', ft232h_ser_num: Optional[str] = None, spam: bool = False, freq: Optional[float] = 200, display_freq: bool = False, debug: Optional[bool] = False) -> None: """Sets the arguments and initializes the parent class. Args: actuators: An iterable (like a :obj:`list` or a :obj:`tuple`) of all the :class:`~crappy.actuator.Actuator` this Block needs to drive. It contains one :obj:`dict` for every Actuator, with mandatory and optional keys. The keys providing information on how to drive the Actuator are listed below. Any other unrecognized key will be passed to the Actuator as argument when instantiating it. common: The keys of this :obj:`dict` will be common to all the Actuators. If one key conflicts with an existing key for an Actuator, the common one will prevail. time_label: If reading speed or position from one or more Actuators, the time information will be carried by this label. ft232h_ser_num: Serial number of the FT232H device to use for driving the controlled Actuator. .. versionadded:: 2.0.0 spam: If :obj:`True`, a command is sent to the Actuators at each loop of the Block, else it is sent every time a new command is received. freq: The target looping frequency for the Block. If :obj:`None`, loops as fast as possible. display_freq: If :obj:`True`, displays the looping frequency of the Block. .. versionadded:: 1.5.10 .. versionchanged:: 2.0.0 renamed from *verbose* to *display_freq* debug: If :obj:`True`, displays all the log messages including the :obj:`~logging.DEBUG` ones. If :obj:`False`, only displays the log messages with :obj:`~logging.INFO` level or higher. If :obj:`None`, disables logging for this Block. .. versionadded:: 2.0.0 Note: - ``actuators`` keys: - ``type``: The name of the :class:`~crappy.actuator.Actuator` class to instantiate. This key is mandatory. - ``cmd_label``: The label carrying the command for driving the Actuator. It defaults to `'cmd'`. - ``mode``: Can be either `'speed'` or `'position'`. Either :meth:`~crappy.actuator.Actuator.set_speed` or :meth:`~crappy.actuator.Actuator.set_position` is called to drive the Actuator, depending on the selected mode. When driven in `'position'` mode, the speed of the actuator can also be adjusted, see the ``speed`` and ``speed_cmd_label`` keys. The default mode is `'speed'`. - ``speed``: If mode is `'position'`, the speed at which the Actuator should move. This speed is passed as second argument to the :meth:`~crappy.actuator.Actuator.set_position` method of the Actuator. If the ``speed_cmd_label`` key is not specified, this speed will remain the same for the entire test. This key is not mandatory. - ``position_label``: If given, the Block will return the value of :meth:`~crappy.actuator.Actuator.get_position` under this label. This key is not mandatory. - ``speed_label``: If given, the Block will return the value of :meth:`~crappy.actuator.Actuator.get_speed` under this label. This key is not mandatory. - ``speed_cmd_label``: The label carrying the speed to set when driving in `'position'` mode. Each time a value is received, the stored speed value is updated. It will also overwrite the ``speed`` key if given. """ self._actuators: List[ActuatorInstance] = list() self._ft232h_args = None super().__init__() self.freq = freq self.display_freq = display_freq self.debug = debug self._time_label = time_label self._spam = spam # No extra information to add to the main dicts if common is None: common = dict() # Updating the settings with the common information for actuator in actuators: actuator.update(common) # Making sure all the dicts contain the 'type' key if not all('type' in dic for dic in actuators): raise ValueError("The 'type' key must be provided for all the " "actuators !") # The names of the possible settings, to avoid typos and reduce verbosity actuator_settings = [field.name for field in fields(ActuatorInstance) if field.type is not Actuator] # The list of all the Actuator types to instantiate self._types = [actuator['type'] for actuator in actuators] # Checking for deprecated names deprecated = [type_ for type_ in self._types if type_ in deprecated_actuators] for type_ in deprecated: raise NotImplementedError( f"The {type_} Actuator was deprecated in version 2.0.0, and renamed " f"to {deprecated_actuators[type_]} ! Please update your code " f"accordingly and check the documentation for more information") # Checking that all the given actuators are valid if not all(type_ in actuator_dict for type_ in self._types): unknown = ', '.join(tuple(type_ for type_ in self._types if type_ not in actuator_dict)) possible = ', '.join(sorted(actuator_dict.keys())) raise ValueError(f"Unknown actuator type(s) : {unknown} ! " f"The possible types are : {possible}") # The settings that won't be passed to the Actuator objects self._settings = [{key: value for key, value in actuator.items() if key in actuator_settings} for actuator in actuators] # The settings that will be passed as kwargs to the Actuator objects self._actuators_kw = [{key: value for key, value in actuator.items() if key not in ('type', *actuator_settings)} for actuator in actuators] # Checking whether the Actuators communicate through an FT232H if any(actuator_dict[type_].ft232h for type_ in self._types): self._ft232h_args = USBServer.register(ft232h_ser_num)
[docs] def prepare(self) -> None: """Checks the validity of the linking and initializes all the Actuator objects to drive. This method calls the :meth:`~crappy.actuator.Actuator.open` method of each Actuator. """ # Instantiating the actuators and storing them self._actuators = [ActuatorInstance( actuator=actuator_dict[type_](**actuator_kw) if not actuator_dict[type_].ft232h else actuator_dict[type_](**actuator_kw, _ft232h_args=self._ft232h_args), **setting) for type_, setting, actuator_kw in zip(self._types, self._settings, self._actuators_kw)] # Checking the consistency of the linking if not self.inputs and not self.outputs: raise IOError("The Machine block isn't linked to any other block !") # Opening each actuator for actuator in self._actuators: self.log(logging.INFO, f"Opening the {type(actuator.actuator).__name__}" f"Actuator") actuator.actuator.open() self.log(logging.INFO, f"Opened the {type(actuator.actuator).__name__}" f"Actuator")
[docs] def loop(self) -> None: """Sets the received position and speed commands, and reads the current speed and position from the :class:`~crappy.actuator.Actuator`. For each Actuator, a command is set **only** if a new one was received or if the ``spam`` argument is :obj:`True`. It is set using either :meth:`~crappy.actuator.Actuator.set_position` or :meth:`~crappy.actuator.Actuator.set_speed` depending on the selected driving mode. For each Actuator, a speed and/or position value is read **only** if the ``speed_label`` and/or the ``position_label`` was set. If so, these values are read at each loop and sent to downstream Blocks over the given labels. This is independent of the chosen driving mode. The :meth:`~crappy.actuator.Actuator.get_position` and :meth:`~crappy.actuator.Actuator.get_speed` are called for acquiring the position and speed values respectively. """ # Receiving the latest command recv = self.recv_last_data(fill_missing=self._spam) # Iterating over the actuators for setting the commands if recv: for actuator in self._actuators: # Setting the speed attribute if it was received if actuator.speed_cmd_label is not None and \ actuator.speed_cmd_label in recv: self.log(logging.DEBUG, f"Updating the speed of the " f"{type(actuator.actuator).__name__} Actuator from " f"{actuator.speed} to {recv[actuator.speed_cmd_label]}") actuator.speed = recv[actuator.speed_cmd_label] # Setting only the commands that were received if actuator.cmd_label in recv: # Setting the speed command if actuator.mode == 'speed': self.log(logging.DEBUG, f"Setting speed of the {type(actuator.actuator).__name__}" f" Actuator to {recv[actuator.cmd_label]}") actuator.actuator.set_speed(recv[actuator.cmd_label]) # Setting the position command else: actuator.actuator.set_position(recv[actuator.cmd_label], actuator.speed) self.log( logging.DEBUG, f"Setting position of the {type(actuator.actuator).__name__} " f"Actuator to {recv[actuator.cmd_label]} with speed " f"{actuator.speed}") to_send = {} # Iterating over the actuators to get the speeds and the positions for actuator in self._actuators: if actuator.position_label is not None: position = actuator.actuator.get_position() if position is not None: to_send[actuator.position_label] = position if actuator.speed_label is not None: speed = actuator.actuator.get_speed() if speed is not None: to_send[actuator.speed_label] = speed # Sending the speed and position values if any if to_send: to_send[self._time_label] = time() - self.t0 self.send(to_send)
[docs] def finish(self) -> None: """Stops and closes all the Actuators to drive. This method calls the :meth:`~crappy.actuator.Actuator.stop` and :meth:`~crappy.actuator.Actuator.close` method of each Actuator. """ for actuator in self._actuators: self.log(logging.INFO, f"Stopping the {type(actuator.actuator).__name__}" f"Actuator") actuator.actuator.stop() for actuator in self._actuators: self.log(logging.INFO, f"Closing the {type(actuator.actuator).__name__}" f"Actuator") actuator.actuator.close() self.log(logging.INFO, f"Closed the {type(actuator.actuator).__name__}" f"Actuator")