Source code for crappy.actuator.phidgets_stepper4a

# coding: utf-8

import logging
import numpy as np
from pathlib import Path
from platform import system
from typing import Optional, Tuple, Union

from .meta_actuator import Actuator
from .._global import OptionalModule

try:
  from Phidget22.Net import Net, PhidgetServerType
  from Phidget22.Devices.Stepper import Stepper, StepperControlMode
  from Phidget22.Devices.DigitalInput import DigitalInput
  from Phidget22.PhidgetException import PhidgetException
except (ImportError, ModuleNotFoundError):
  Net = OptionalModule('Phidget22')
  PhidgetServerType = OptionalModule('Phidget22')
  Stepper = OptionalModule('Phidget22')
  StepperControlMode = OptionalModule('Phidget22')
  DigitalInput = OptionalModule('Phidget22')
  PhidgetException = OptionalModule('Phidget22')


[docs] class Phidget4AStepper(Actuator): """This class can drive Phidget's 4A Stepper module in speed or in position. It relies on the :mod:`Phidget22` module to communicate with the motor driver. The driver can deliver up to 4A to the motor, and uses 16 microsteps by default. Its acquisition rate is set to 10 values per second in this class. The distance unit is the `mm` and the time unit is the `s`, so speeds are in `mm/s` and accelerations in `mm/s²`. .. versionadded:: 2.0.3 """
[docs] def __init__(self, steps_per_mm: float, current_limit: float, max_acceleration: Optional[float] = None, remote: bool = False, absolute_mode: bool = False, reference_pos: float = 0, switch_ports: Tuple[int, ...] = tuple(), save_last_pos: bool = False, save_pos_folder: Optional[Union[str, Path]] = None) -> None: """Sets the args and initializes the parent class. Args: steps_per_mm: The number of steps necessary to move by `1 mm`. This value is used to set a conversion factor on the driver, so that it can be driven in `mm` and `s` units. current_limit: The maximum current the driver is allowed to deliver to the motor, in `A` . max_acceleration: If given, sets the maximum acceleration the motor is allowed to reach in `mm/s²`. remote: Set to :obj:`True` to drive the stepper via a network VINT Hub, or to :obj:`False` to drive it via a USB VINT Hub. absolute_mode: If :obj:`True`, the target position of the motor will be calculated from a reference position. If :obj:`False`, the target position of the motor will be calculated from its current position. .. versionadded:: 2.0.4 reference_pos: The position considered as the reference position at the beginning of the test. Only takes effect if ``absolute_mode`` is :obj:`True`. .. versionadded:: 2.0.4 switch_ports: The indexes of the VINT Hub ports where the switches are connected. .. versionadded:: 2.0.4 save_last_pos: If :obj:`True`, the last position of the actuator will be saved in a .npy file. .. versionadded:: 2.0.4 save_pos_folder: The path to the folder where to save the last position of the motor. Only takes effect if ``save_last_pos`` is :obj:`True`. .. versionadded:: 2.0.4 """ self._motor: Optional[Stepper] = None super().__init__() self._steps_per_mm = steps_per_mm self._current_limit = current_limit self._max_acceleration = max_acceleration self._remote = remote self._switch_ports = switch_ports self._switches = list() # The following attribute is set to True to automatically check the state # of the switches in the open method to keep the motor to move if a switch # has been disconnected or hit. # Nevertheless, this check can be bypassed if the motor is used outside # from a Crappy loop by setting the attribute to False. self._check_switch = True self._absolute_mode = absolute_mode if self._absolute_mode: self._ref_pos = reference_pos # Determining the path where to save the last position # It depends on the current operating system self._path: Optional[Path] = None if save_last_pos: if save_pos_folder is not None: self._path = Path(save_pos_folder) elif system() in ('Linux', 'Darwin'): self._path = Path.home() / '.machine1000N' elif system() == 'Windows': self._path = Path.home() / 'AppData' / 'Local' / 'machine1000N' else: self._save_last_pos = False # These buffers store the last known position and speed self._last_velocity: Optional[float] = None self._last_position: Optional[float] = None
[docs] def open(self) -> None: """Sets up the connection to the motor driver as well as the various callbacks, and waits for the motor driver to attach.""" # Setting up the motor driver self.log(logging.DEBUG, "Enabling server discovery") Net.enableServerDiscovery(PhidgetServerType.PHIDGETSERVER_DEVICEREMOTE) self._motor = Stepper() # Setting up the switches for port in self._switch_ports: switch = DigitalInput() switch.setIsHubPortDevice(True) switch.setHubPort(port) self._switches.append(switch) # Setting the remote or local status self._motor.setIsLocal(not self._remote) self._motor.setIsRemote(self._remote) for switch in self._switches: switch.setIsLocal(not self._remote) switch.setIsRemote(self._remote) # Setting up the callbacks self.log(logging.DEBUG, "Setting the callbacks") self._motor.setOnAttachHandler(self._on_attach) self._motor.setOnErrorHandler(self._on_error) self._motor.setOnVelocityChangeHandler(self._on_velocity_change) self._motor.setOnPositionChangeHandler(self._on_position_change) for switch in self._switches: switch.setOnStateChangeHandler(self._on_end) # Opening the connection to the motor driver try: self.log(logging.DEBUG, "Trying to attach the motor") self._motor.openWaitForAttachment(10000) except PhidgetException: raise TimeoutError("Waited too long for the motor to attach !") # Opening the connection to the switches for switch in self._switches: try: self.log(logging.DEBUG, "Trying to attach the switch") switch.openWaitForAttachment(10000) except PhidgetException: raise TimeoutError("Waited too long for the switch to attach !") # Energizing the motor self._motor.setEngaged(True) # Check the state of the switches if self._check_switch and not all( switch.getState() for switch in self._switches): raise ValueError(f"A switch is already hit or disconnected !")
[docs] def set_speed(self, speed: float) -> None: """Sets the requested speed for the motor. Switches to the correct driving mode if needed. Args: speed: The speed to reach, in `mm/s`. """ # Switching the control mode if needed if not self._motor.getControlMode() == StepperControlMode.CONTROL_MODE_RUN: self.log(logging.DEBUG, "Setting the control mode to run") self._motor.setControlMode(StepperControlMode.CONTROL_MODE_RUN) # Setting the desired velocity if abs(speed) > self._motor.getMaxVelocityLimit(): raise ValueError(f"Cannot set a velocity greater than " f"{self._motor.getMaxVelocityLimit()} mm/s !") else: self._motor.setVelocityLimit(speed)
[docs] def set_position(self, position: float, speed: Optional[float] = None) -> None: """Sets the requested position for the motor. Switches to the correct driving mode if needed. Args: position: The position to reach, in `mm`. speed: If not :obj:`None`, the speed to use for moving to the desired position. """ # Switching the control mode if needed if not (self._motor.getControlMode() == StepperControlMode.CONTROL_MODE_STEP): self.log(logging.DEBUG, "Setting the control mode to step") self._motor.setControlMode(StepperControlMode.CONTROL_MODE_STEP) # Setting the desired velocity if required if speed is not None: if abs(speed) > self._motor.getMaxVelocityLimit(): raise ValueError(f"Cannot set a velocity greater than " f"{self._motor.getMaxVelocityLimit()} mm/s !") else: self._motor.setVelocityLimit(abs(speed)) # Ensuring that the requested position is valid min_pos = self._motor.getMinPosition() max_pos = self._motor.getMaxPosition() if not min_pos <= position <= max_pos: raise ValueError(f"The position value must be between {min_pos} and " f"{max_pos}, got {position} !") # Setting the position depending on the driving mode if not self._absolute_mode: self._motor.setTargetPosition(position) else: self._motor.setTargetPosition(position - self._ref_pos)
[docs] def get_speed(self) -> Optional[float]: """Returns the last known speed of the motor.""" return self._last_velocity
[docs] def get_position(self) -> Optional[float]: """Returns the last known position of the motor.""" if not self._absolute_mode: return self._last_position # Cannot perform addition if no known last position elif self._last_position is not None: return self._last_position + self._ref_pos
[docs] def stop(self) -> None: """Deenergizes the motor.""" if self._motor is not None: self._motor.setEngaged(False)
[docs] def close(self) -> None: """Closes the connection to the motor.""" if self._motor is not None: if self._path is not None: self._path.mkdir(parents=False, exist_ok=True) np.save(self._path / 'last_pos.npy', self.get_position()) self._motor.close() for switch in self._switches: switch.close()
def _on_attach(self, _: Stepper) -> None: """Callback called when the motor driver attaches to the program. It sets the current limit, scale factor, data rate and maximum acceleration. """ self.log(logging.INFO, "Motor successfully attached") # Setting the current limit for the motor min_current = self._motor.getMinCurrentLimit() max_current = self._motor.getMaxCurrentLimit() if not min_current <= self._current_limit <= max_current: raise ValueError(f"The current limit should be between {min_current} $" f"and {max_current} A !") else: self._motor.setCurrentLimit(self._current_limit) # Setting the scale factor and the data rate self._motor.setRescaleFactor(1 / 16 / self._steps_per_mm) self._motor.setDataInterval(100) # Setting the maximum acceleration if self._max_acceleration is not None: min_accel = self._motor.getMinAcceleration() max_accel = self._motor.getMaxAcceleration() if not min_accel <= self._max_acceleration <= max_accel: raise ValueError(f"The maximum acceleration should be between " f"{min_accel} and {max_accel} m/s² !") else: self._motor.setAcceleration(self._max_acceleration) def _on_error(self, _: Stepper, error_code: int, error: str) -> None: """Callback called when the motor driver returns an error.""" raise RuntimeError(f"Got error with error code {error_code}: {error}") def _on_velocity_change(self, _: Stepper, velocity: float) -> None: """Callback called when the motor velocity changes.""" self.log(logging.DEBUG, f"Velocity changed to {velocity}") self._last_velocity = velocity def _on_position_change(self, _: Stepper, position: float) -> None: """Callback called when the motor position changes.""" self.log(logging.DEBUG, f"Position changed to {position}") self._last_position = position def _on_end(self, _: DigitalInput, state) -> None: """Callback when a switch is hit.""" if not bool(state): self.stop() raise ValueError(f"A switch has been hit or disconnected !")