# coding: utf-8
from subprocess import check_output
from threading import Thread, RLock
from time import sleep
from typing import Union, Dict, Optional
import logging
from .meta_actuator import Actuator
from .._global import OptionalModule
try:
import yaml
is_installed = True
except (ModuleNotFoundError, ImportError):
is_installed = False
try:
getattr(yaml, 'FullLoader')
full_loader = True
except (AttributeError, NameError):
full_loader = False
if not is_installed:
# If it was in the first try/except, a message would be displayed at getattr
yaml = OptionalModule("pyyaml")
try:
from usb import core
from usb import util
Tic_usb_request = {'Cmd': util.CTRL_OUT |
util.CTRL_TYPE_VENDOR |
util.CTRL_RECIPIENT_DEVICE,
'Var': util.CTRL_IN |
util.CTRL_TYPE_VENDOR |
util.CTRL_RECIPIENT_DEVICE}
except (ModuleNotFoundError, ImportError):
util = core = OptionalModule("pyusb")
Tic_usb_request = {'Cmd': 0x40,
'Var': 0xC0}
Tic_vendor_id = 0x1FFB
Tic_product_id = {'T825': 0x00B3,
'T834': 0x00B5,
'T500': 0x00BD,
'N825': 0x00C3,
'T249': 0x00C9,
'36v4': 0x00CB}
Tic_max_allowed_current = {'T825': 3968,
'T834': 3456,
'T500': 3093,
'T249': 4480,
'36v4': 3939}
Tic_36v4_max_current = 9095
Tic_current_steps = {'T834': 32,
'T825': 32,
'T249': 40,
'36v4': 71.615}
Tic_step_modes = {'T825': [2 ** i for i in range(6)],
'T834': [2 ** i for i in range(6)],
'T500': [2 ** i for i in range(4)],
'T249': [2 ** i for i in range(6)] + ['2_100p'],
'36v4': [2 ** i for i in range(9)]}
Tic_step_mode = {1: 0,
2: 1,
4: 2,
8: 3,
16: 4,
32: 5,
'2_100p': 6,
64: 7,
128: 8,
256: 9}
Tic_cmd = {'Set_target_position': 0xE0,
'Set_target_velocity': 0xE3,
'Halt_and_set_position': 0xEC,
'Halt_and_hold': 0x89,
'Go_home': 0x97,
'Reset_command_timeout': 0x8C,
'Deenergize': 0x86,
'Energize': 0x85,
'Exit_safe_start': 0x83,
'Enter_safe_start': 0x8F,
'Reset': 0xB0,
'Clear_driver_error': 0x8A,
'Set_max_speed': 0xE6,
'Set_starting_speed': 0xE5,
'Set_max_accel': 0xEA,
'Set_max_decel': 0xE9,
'Set_step_mode': 0x94,
'Set_current_limit': 0x91,
'Set_decay_mode': 0x92,
'Set_AGC_option': 0x98,
'Get_variable': 0xA1,
'Get_variable_and_clear_errors_occurred': 0xA2,
'Get_setting': 0xA8,
'Set_setting': 0x13,
'Reinitialize': 0x10,
'Start_bootloader': 0xFF,
'Get_debug_data': 0x20}
# offsets/indexes
Tic_var = {'Operation_state': 0x00,
'Misc_flags1': 0x01,
'Error_status': 0x02,
'Errors_occurred': 0x04,
'Planning_mode': 0x09,
'Target_position': 0x0A,
'Target_velocity': 0x0E,
'Starting_speed': 0x12,
'Max_speed': 0x16,
'Max_decel': 0x1A,
'Max_accel': 0x1E,
'Current_position': 0x22,
'Current_velocity': 0x26,
'Acting_target_position': 0x2A,
'Time_since_last_step': 0x2E,
'Device_reset': 0x32,
'Vin_voltage': 0x33,
'Up_time': 0x35,
'Encoder_position': 0x39,
'RC_pulse_width': 0x3D,
'Analog_reading_SCL': 0x3F,
'Analog_reading_SDA': 0x41,
'Analog_reading_TX': 0x43,
'Analog_reading_RX': 0x45,
'Digital_readings': 0x47,
'Pin_states': 0x48,
'Step_mode': 0x49,
'Current_limit': 0x4A,
'Decay_mode': 0x4B,
'Input_state': 0x4C,
'Input_after_averaging': 0x4D,
'Input_after_hysteresis': 0x4F,
'Input_after_scaling': 0x51}
# indexes
Tic_settings = {'Setting_not_initialized': 0x00,
'Control_mode': 0x01,
'Never_sleep': 0x02,
'Disable_safe_start': 0x03,
'Ignore_err_line_high': 0x04,
'Serial_baud_rate_generator': 0x05,
'Serial_device_number': 0x07,
'Auto_clear_driver_error': 0x08,
'Command_timeout_low': 0x09,
'Command_timeout_high': 0x0A,
'Serial_CRC_enabled': 0x0B,
'Low_vin_timeout': 0x0C,
'Low_vin_shutoff_voltage': 0x0E,
'Low_vin_startup_voltage': 0x10,
'High_vin_shutoff_voltage': 0x12,
'Vin_calibration': 0x14,
'RC_max_pulse_period': 0x16,
'RC_bad_signal_timeout': 0x18,
'RC_consecutive_good_pulses': 0x1A,
'Invert_motor_direction': 0x1B,
'Input_error_min': 0x1C,
'Input_error_max': 0x1E,
'Input_scaling_degree': 0x20,
'Input_invert': 0x21,
'Input_min': 0x22,
'Input_neutral_min': 0x24,
'Input_neutral_max': 0x26,
'Input_max': 0x28,
'Output_min': 0x2A,
'Input_averaging_enabled': 0x2E,
'Input_hysteresis': 0x2F,
'Current_limit_during_error': 0x31,
'Output_max': 0x32,
'Switch_polarity_map': 0x36,
'Encoder_postscaler': 0x37,
'SCL_config': 0x3B,
'SDA_config': 0x3C,
'TX_config': 0x3D,
'RX_config': 0x3E,
'RC_config': 0x3F,
'Current_limit': 0x40,
'Step_mode': 0x41,
'Decay_mode': 0x42,
'Starting_speed': 0x43,
'Max_speed': 0x47,
'Max_decel': 0x4B,
'Max_accel': 0x4F,
'Soft_error_response': 0x53,
'Soft_error_position': 0x54,
'Encoder_prescaler': 0x58,
'Encoder_unlimited': 0x5C,
'Kill_switch_map': 0x5D,
'Serial_response_delay': 0x5E,
'Limit_switch_forward_map': 0x5F,
'Limit_switch_reverse_map': 0x60,
'Homing_speed_towards': 0x61,
'Homing_speed_away': 0x65,
'Serial_device_number_high': 0x69,
'Serial_alt_device_number': 0x6A,
'Size': 0x5F,
'Unrestricted_current_limit': 0x6C}
Tic_current_tables = {'T500': [0, 1, 174, 343, 495, 634, 762, 880, 990, 1092,
1189, 1281, 1368, 1452, 1532, 1611, 1687, 1762,
1835, 1909, 1982, 2056, 2131, 2207, 2285, 2366,
2451, 2540, 2634, 2734, 2843, 2962, 3093],
'T834': [index * Tic_current_steps['T834'] for index in
(list(range(33)) + list(range(34, 65, 2)) +
list(range(68, 109, 4)))],
'T825': [index * Tic_current_steps['T825'] for index in
(list(range(33)) + list(range(34, 65, 2)) +
list(range(68, 125, 4)))],
'T249': [index * Tic_current_steps['T249'] for index in
(list(range(33)) + list(range(34, 65, 2)) +
list(range(68, 113, 4)))],
'36v4': [index * Tic_current_steps['36v4'] for index in
range(128)]}
Tic_max_accel = 2147483647 # steps/s/100s
Tic_min_accel = 100 # steps/s/100s
Tic_max_speed = 500000000 # steps/10000s, i.e. a 50 kHz frequency
Tic_min_speed = 7 # steps/10000s, i.e. 1 step every 23 minutes
Tic_backends = ['ticcmd', 'USB']
Tic_pins_bit = {'SCL': 0,
'SDA': 1,
'TX': 2,
'RX': 3,
'RC': 4}
Tic_pin_modes = {'Default': 0,
'Kill switch': 7,
'Limit switch forward': 8,
'Limit switch reverse': 9}
Tic_pin_polarity = {'Active low': 0,
'Active high': 1}
class FindSerialNumber:
"""A class used for finding USB devices matching a given serial number, using
the :meth:`usb.core.find` method."""
def __init__(self, serial_number: str) -> None:
self.serial_number = serial_number
def __call__(self, device) -> bool:
return device.serial_number == self.serial_number
[docs]
class PololuTic(Actuator):
"""Class for controlling Pololu's Tic stepper motor divers.
The PololuTic Actuator block is meant for controlling a Pololu Tic stepper
motor driver. It can be driven in both speed and position. Several Tic models
are supported. The length unit is the millimeter (`mm`), and time unit is the
second (`s`).
Important:
**Only for Linux users:** In order to drive the Tic, the appropriate udev
rule should be set. This is done automatically when installing `ticcmd`,
or can be done using the `udev_rule_setter` utility in ``crappy``'s `util`
folder. It is also possible to add it manually by running:
::
$ echo "SUBSYSTEM==\\"usb\\", ATTR{idVendor}==\\"1ffb\\", \
MODE=\\"0666\\\"" | sudo tee pololu.rules > /dev/null 2>&1
in a shell opened in ``/etc/udev/rules.d``.
.. versionadded:: 1.4.0
.. versionchanged:: 2.0.0 renamed from *Pololu_tic* to *PololuTic*
"""
[docs]
def __init__(self,
steps_per_mm: float,
current_limit: float,
step_mode: Union[int, str] = 8,
max_accel: float = 20,
t_shutoff: float = 0,
config_file: Optional[str] = None,
serial_number: Optional[str] = None,
model: Optional[str] = None,
reset_command_timeout: bool = True,
backend: str = 'USB',
unrestricted_current_limit: bool = False,
pin_function: Optional[Dict[str, str]] = None,
pin_polarity: Optional[Dict[str, str]] = None) -> None:
"""Checks args validity, finds the right device, reads the current limit
tables.
Args:
steps_per_mm: The number of full steps needed for the motor to travel
`1 mm`. This varies according to the motor model, and can be deduced
from the datasheet or directly measured. This value is usually between
`50` and `500`.
current_limit: The maximum current the motor is able to withstand, in mA.
It is usually around `1A` for small stepper motors, and can go up to a
few Amps. The maximum allowed ``current_limit`` value depends on the
Tic model. The Tic 36v4 default maximum current limit can be increased
using the ``unrestricted_current_limit`` parameter.
step_mode: Sets the number of microsteps used for driving the motor. This
number is always a power of `2`. The minimum number of microsteps is
`1` (full steps), and the maximum depends on the Tic model. All models
however support modes `1` to `8`. The speed and length conversions are
managed automatically so that changing the step mode doesn't affect the
motor behaviour.
max_accel: The maximum allowed acceleration for the motor, in `mm/s²`.
When asked to reach a given speed or position, the motor accelerates at
this rate. It also corresponds to the maximum allowed deceleration.
Usually doesn't need to be changed.
t_shutoff: This class features an auto-shutoff thread that deenergizes
the motor after a period of `t_shutoff` seconds of inactivity. The
timer counts in steps of `0.1s`, which is thus the maximum precision
for this setting. When set to `0`, this feature is disabled and the
motor remains energized until the :meth:`close` method is called.
config_file: The path to the config file to be loaded to the Tic. It only
works if ``backend`` is 'ticcmd'. The config file contains some
specific settings that can only be accessed this way using the 'ticcmd'
backend. Not necessary for most applications.
serial_number: The serial number of the Tic to be controlled. It must be
given as a :obj:`str`, and it is an 8-digits number. Allows to control
the right device if several Tic of the same model are connected.
Otherwise, an error is raised.
model: The model of the Tic to be controlled. Available models are:
::
'T825', 'T824', 'T500', 'N825', 'T249', '36v4'
Allows to control the right device if several Tic of different models
are connected. Otherwise, an error is raised.
reset_command_timeout: Enables or disables the `reset_command_timeout`
thread. It can only be disabled if ``backend`` is 'USB'. This thread
pings the Tic every `0.5s`, so that it doesn't block due to a Command
Timeout error. This feature is a safety to prevent the motor from
running indefinitely if the USB connection is down, so it is better not
to disable it. When disabled, the Tic never raises Command Timeout
errors.
backend: The backend for communicating with the Tic. Available backends
are:
::
'USB', 'ticcmd'
They both communicate over USB, but 'ticcmd' requires Pololu's firmware
to be installed. Some features are specific to each backend.
unrestricted_current_limit: Enables or disables the unrestricted current
limit feature. Only works if ``backend`` is 'USB', and for the 36v4 Tic
model. When disabled, the maximum current allowed is `3939mA`. If
enabled, it goes up to `9095mA`. The Tic should however be cooled in
order to withstand currents higher than `3939mA`.
pin_function: Allows setting the Tic GPIO functions. It is a :obj:`dict`
whose keys are the pin names, and values are the functions. Only works
if ``backend`` is `'USB'`. Only the pins indicated in ``pin_function``
are set, the others are left in their previous state. The available
pins are:
::
'SCL', 'SDA', 'TX', 'RX', 'RC'
and can be set to:
::
'Default', 'Kill switch', 'Limit switch forward', \
'Limit switch reverse'
The GPIO functions remain set as long as they are not changed by the
user, so for a given setup it is only necessary to set them once.
pin_polarity: Allows setting the polarity of the GPIOs used as switches.
It is a :obj:`dict`, whose keys are the pin names, and values are the
pin polarities. Only works if ``backend`` is `'USB'`. Only the pins
indicated in ``pin_function`` are set, the others are left in their
previous state. The available pins are:
::
'SCL', 'SDA', 'TX', 'RX', 'RC'
and can be set to:
::
'Active high', 'Active low'
The GPIO polarities remain set as long as they are not changed by the
user, so for a given setup it is only necessary to set them once.
Warning:
- ``current_limit``:
If the ``current_limit`` setting is higher than the motor max current,
there's a risk of overheating and damaging the motor !
Note:
- ``steps_per_mm``:
If you have to measure this value, it can be done easily following this
procedure. Set ``steps_per_mm`` to `spm` (`100` should be fine), and
``step_mode`` to `sm` (`8` should be fine). Run a Crappy script
for moving the motor from position `0` to position `p` (a few tenth of
millimeters should be fine). The motor will reach an actual position
`ap` that can be measured. The actual ``steps_per_mm`` value `aspm` for
this motor can be calculated as follows:
::
aspm = spm * p / ap
- ``step_mode``:
Increasing the number of microsteps allows to reduce the noise, the
vibrations, and improve the precision. However, the more microsteps,
the lower the maximum achievable speed for the motor. Chances that the
motor misses microsteps are also higher when the number of microsteps
is high.
- ``t_shutoff``:
This functionality was originally added for long tests in temperature
controlled environments, so that the motor doesn't unnecessarily heat
the setup when inactive. In other assays, it may still be useful for
reducing the noise, the electromagnetic interference, or the energy
consumption.
- ``serial_number``:
Serial numbers can be accessed using the `lsusb` command in Linux
shell, or running ``ticcmd --list`` if `ticcmd` is installed. This
number is also displayed during :meth:`__init__` if only one device is
connected and ``serial_number`` is :obj:`None`.
- ``model``:
The model is written on the Tic board, and can be accessed by running
``ticcmd --list`` in a shell if `ticcmd` is installed. It is also
displayed during :meth:`__init__` if only one device is connected and
``model`` is :obj:`None`.
- **Pins settings**:
The pin functions and polarity can also be set independently of
``crappy`` before starting the test, in the `ticgui`.
"""
super().__init__()
if backend not in Tic_backends:
raise ValueError("backend should be in {}".format(Tic_backends))
else:
self._backend = backend
if model is not None and model not in Tic_product_id:
raise ValueError("model should be in {} if given".format(list(
Tic_product_id.keys())))
if serial_number is not None and type(serial_number) is not str:
raise ValueError("serial_number should be given as a string")
# Finding the right device among all the connected ones
if backend == 'USB':
# Finding all devices matching the given inputs
if model is None:
if serial_number is None:
devices = core.find(find_all=True,
idVendor=Tic_vendor_id)
else:
devices = core.find(find_all=True,
idVendor=Tic_vendor_id,
custom_match=FindSerialNumber(serial_number))
else:
if serial_number is None:
devices = core.find(find_all=True,
idVendor=Tic_vendor_id,
idProduct=Tic_product_id[model])
else:
devices = core.find(find_all=True,
idVendor=Tic_vendor_id,
idProduct=Tic_product_id[model],
custom_match=FindSerialNumber(serial_number))
# Making sure there's only one matching device
devices = list(devices)
if len(devices) == 0:
raise IOError("No matching device connected")
elif len(devices) > 1:
raise IOError("Several matching devices found, try specifying a "
"device or a serial_number")
else:
self._dev = devices[0]
# Setting self.serial_number and self.device
if serial_number is None:
self._serial_number = util.get_string(self._dev,
self._dev.iSerialNumber)
else:
self._serial_number = serial_number
if model is None:
try:
self._model = next(key for key, value in Tic_product_id.items() if
value == self._dev.idProduct)
except StopIteration:
raise ValueError("The Tic model automatically found is not "
"implemented in crappy")
else:
self._model = model
elif backend == 'ticcmd':
# Finding all devices matching the given inputs
devices = check_output(['ticcmd', '--list']).decode("utf-8").split("\n")
devices.pop() # Removing the '' element at the end of devices
devices = [string.split(',') for string in devices]
if model is not None:
if serial_number is not None:
devices = [dev for dev in devices if dev[0] == serial_number and
model in dev[1]]
else:
devices = [dev for dev in devices if model in dev[1]]
elif serial_number is not None:
devices = [dev for dev in devices if dev[0] == serial_number]
# Making sure there's only one matching device
if len(devices) == 0:
raise IOError("No matching device found")
elif len(devices) > 1:
raise IOError("Several matching devices found, try specifying a "
"device or a serial_number")
# Setting self.serial_number and self.device
if serial_number is None:
self._serial_number = devices[0][0]
else:
self._serial_number = serial_number
if model is None:
try:
self._model = next(key for key in Tic_product_id if key in
devices[0][1])
except StopIteration:
raise ValueError("The Tic model automatically found is not "
"implemented in crappy")
else:
self._model = model
# Displaying model and serial_number if they were not specified by the user
if serial_number is None:
self.log(logging.INFO, f"Tic serial number: {self._serial_number}")
if model is None:
self.log(logging.INFO, f"Tic model: {self._model}")
# Making sure the current limit is valid, especially for the 36v4 model
if not 0 < current_limit < Tic_max_allowed_current[self._model]:
if self._model == '36v4':
if not 0 < current_limit < Tic_36v4_max_current:
raise ValueError("current_limit should be between 0 and {} mA for "
"this Tic model".format(Tic_36v4_max_current))
elif not unrestricted_current_limit:
raise ValueError("current_limit exceeds the safety limit, which may "
"cause overheating. Set unrestricted_current_limit "
"to True if you want to keep this current_limit "
"(only works if backend='USB')")
elif backend != 'USB':
raise ValueError("Setting unrestricted_current_limit to True only "
"works if backend='USB'")
else:
raise ValueError("current limit should be between 0 and {} mA "
"for this Tic model".
format(Tic_max_allowed_current[self._model]))
self._current_limit = current_limit
self._unrestricted_current_limit = unrestricted_current_limit
# Converting the current limit value to a current index, used by the
# USB backend only
if backend == 'USB':
if self._model == 'T500':
self._current_index = min(enumerate(Tic_current_tables[self._model]),
key=lambda x: abs(x[1] - current_limit))[0]
else:
self._current_index = round(min(Tic_current_tables[self._model],
key=lambda x: abs(x - current_limit)) /
Tic_current_steps[self._model])
if step_mode not in Tic_step_modes[self._model]:
raise ValueError("step_mode should be in {}".format(
Tic_step_modes[self._model]))
else:
self._step_mode = step_mode
if steps_per_mm < 0:
raise ValueError("steps_per_mm should be positive")
else:
self._steps_per_mm = steps_per_mm
# Keeping the max_accel and max_decel values within the Tic ratings
if max_accel < self._to_mm(Tic_min_accel / 100):
self.log(logging.WARNING, "Requested acceleration below min allowed "
"acceleration, setting to min allowed "
"acceleration")
max_accel = self._to_mm(Tic_min_accel / 100)
elif max_accel > self._to_mm(Tic_max_accel / 100):
self.log(logging.WARNING, "Requested acceleration exceeding max allowed "
"acceleration, setting to max allowed "
"acceleration")
max_accel = self._to_mm(Tic_max_accel / 100)
self._max_accel = max_accel
if t_shutoff < 0:
raise ValueError("t_shutoff should be zero or positive")
else:
self._t_shutoff = t_shutoff
if config_file is not None and backend != 'ticcmd':
self.log(logging.WARNING, "Config files can only be loaded if "
"backend='ticcmd', ignoring the given "
"config_file")
self._config_file = None
else:
self._config_file = config_file
if backend != 'USB' and not reset_command_timeout:
self.log(logging.WARNING, "reset_command_timeout can only be disabled "
"if backend='USB', reset_command_timeout set "
"to True")
self._rct_on = True
else:
self._rct_on = reset_command_timeout
if backend != 'USB' and pin_function is not None:
raise ValueError("It is not possible to set the pin functions if "
"the backend is not 'USB'")
if pin_function is not None:
if not all(key in Tic_pins_bit for key in pin_function):
raise ValueError("Unexpected pin name, pin names should be in "
"{}".format(list(Tic_pins_bit.keys())))
if not all(value in Tic_pin_modes for value in pin_function.values()):
raise ValueError("Unexpected pin function, pin functions should be in "
"{}".format(list(Tic_pin_modes.keys())))
self._pin_function = pin_function
if backend != 'USB' and pin_polarity is not None:
raise ValueError("It is not possible to set the pin polarities if "
"the backend is not 'USB'")
if pin_polarity is not None:
if not all(key in Tic_pins_bit for key in pin_polarity):
raise ValueError("Unexpected pin name, pin names should be in "
"{}".format(list(Tic_pins_bit.keys())))
if not all(value in Tic_pin_polarity for value in pin_polarity.values()):
raise ValueError("Unexpected pin function, pin functions should be in "
"{}".format(list(Tic_pin_polarity.keys())))
self._pin_polarity = pin_polarity
# The lock is meant for preventing interferences between the threads
self._lock = RLock()
# Definition of the flags
self._timer_shutoff = False
self._RCT = False
self._reset_timer_shutoff = False
self._close = False
# Definition of the auxiliary threads
self._thrd_rct = Thread(target=self._thread_rct)
self._thrd_shutoff = Thread(target=self._thread_shutoff)
[docs]
def open(self) -> None:
"""Sets the communication, the motor parameters and starts the threads."""
if self._backend == 'USB':
try:
self.log(logging.INFO, f"Setting configuration on USB device "
f"{self._dev}")
self._dev.set_configuration()
except core.USBError:
self.log(logging.ERROR,
"An error occurred while setting the configuration of the USB"
" device !\nYou may have to install the udev-rules for this "
"USB device, this can be done using the udev_rule_setter "
"utility in the util folder")
raise
# Setting the Tic according to the user parameters
pin_changed = False
if self._pin_polarity is not None:
self._set_pin_polarity(self._pin_polarity)
pin_changed = True
if self._pin_function is not None:
self._set_pin_function(self._pin_function)
pin_changed = True
if pin_changed:
self._reset()
self._enter_safe_start()
self._deenergize()
self._set_step_mode()
self._set_current_limit()
self._set_max_accel()
self._set_max_decel()
# Loading the config file
if self._config_file is not None:
self.log(logging.INFO, "Loading the config file")
self._ticcmd('--settings', str(self._config_file))
# Starting the auxiliary threads
# The RCT thread is not needed in case reset_command_timeout is False
# The shutoff thread is not needed in case t_shutoff is zero
if self._rct_on:
self.log(logging.INFO, "Starting the thread for managing the command "
"timeouts")
self._thrd_rct.start()
else:
self.log(logging.INFO, "Disabling the command timeout security")
self._usb_command(request=Tic_cmd['Set_setting'],
value=0x00,
index=Tic_settings['Command_timeout_low'])
self._usb_command(request=Tic_cmd['Set_setting'],
value=0x00,
index=Tic_settings['Command_timeout_high'])
if self._t_shutoff > 0:
self.log(logging.INFO, "Starting the thread managing the auto shutoff")
self._thrd_shutoff.start()
[docs]
def get_speed(self) -> float:
"""Reads the current motor speed.
Returns:
The speed in `mm/s`
"""
if self._backend == 'ticcmd':
return self._to_mm(yaml.load(self._ticcmd('-s'), Loader=yaml.FullLoader)
['Current velocity'] / 10000) if full_loader else \
self._to_mm(yaml.load(self._ticcmd('-s'))['Current velocity'] / 10000)
elif self._backend == 'USB':
return self._to_mm(int.from_bytes(
self._usb_command(request_type=Tic_usb_request['Var'],
request=Tic_cmd['Get_variable'],
index=Tic_var['Current_velocity'],
data_or_length=4),
byteorder='little',
signed=True) / 10000)
[docs]
def get_position(self) -> float:
"""Reads the current motor position.
Returns:
The position in `mm`
.. versionchanged:: 1.5.2 renamed from *get_pos* to *get_position*
"""
if self._backend == 'ticcmd':
return self._to_mm(yaml.load(self._ticcmd('-s'), Loader=yaml.FullLoader)
['Current position']) if full_loader else \
self._to_mm(yaml.load(self._ticcmd('-s'))['Current position'])
elif self._backend == 'USB':
return self._to_mm(int.from_bytes(
self._usb_command(request_type=Tic_usb_request['Var'],
request=Tic_cmd['Get_variable'],
index=Tic_var['Current_position'],
data_or_length=4),
byteorder='little',
signed=True))
[docs]
def set_position(self, position: float, speed: Optional[float]) -> None:
"""Sends a position command to the motor.
Args:
position: The position to reach in `mm`
speed: The speed at which the motor should move to the given position,
in `mm/s` Giving a speed other than :obj:`None` will set the maximum
speed of the motor to that speed.
.. versionchanged:: 2.0.0 *speed* is now a mandatory argument
Note:
- ``speed``:
The only way to reach a position at a given speed is to change the
maximum speed. The Tic will try to accelerate to the maximum speed but
may remain slower if it doesn't have time to do so before reaching the
given position.
"""
if speed is not None:
self._set_max_speed(speed)
# Energizing the motor
self._energize()
self._exit_safe_start()
# Raising the flags
self._timer_shutoff = True
self._reset_timer_shutoff = True
self._RCT = True
# Sending the position command
self._set_position(position)
[docs]
def set_speed(self, speed: float) -> None:
"""Sends a speed command to the motor.
Args:
speed: The speed the motor should reach
"""
# Changing the maximum speed if needed
max_speed = self._get_max_speed()
if abs(speed) > max_speed:
self._set_max_speed(speed)
# The command speed may first need to be reduced or increased in order to
# comply with the Tic ratings
final_speed = min(abs(self._to_steps(speed * 10000)), Tic_max_speed)
if final_speed: # If speed is 0, then it should remain 0
if final_speed < Tic_min_speed:
self.log(logging.WARNING, "Requested speed below min possible speed, "
"setting to min possible speed")
final_speed = Tic_min_speed
final_speed *= speed / abs(speed)
# Energizing the motor
self._energize()
self._exit_safe_start()
# Raising the flags
self._timer_shutoff = True
self._reset_timer_shutoff = True
self._RCT = True
# Sending the speed command
self._set_velocity(final_speed)
[docs]
def stop(self) -> None:
"""Sets the speed to `0`."""
self._set_velocity(0)
[docs]
def close(self) -> None:
"""Stops the motor, joins the threads and deenergizes the motor."""
self._close = True
if self._rct_on:
self.log(logging.INFO, "Stopping the thread managing the command "
"timeouts")
self._thrd_rct.join()
if self._t_shutoff > 0:
self.log(logging.INFO, "Starting the thread managing the auto shutoff")
self._thrd_shutoff.join()
self._enter_safe_start()
self._deenergize()
if self._backend == 'USB':
self.log(logging.INFO, "Releasing the USB resources")
util.dispose_resources(self._dev)
def _to_steps(self, mm: float) -> float:
"""Wrapper for converting `mm` to `steps`."""
return mm * self._steps_per_mm * self._step_mode
def _to_mm(self, steps: float) -> float:
"""Wrapper for converting `steps` to `mm`."""
return steps / self._steps_per_mm / self._step_mode
def _reset_command_timeout(self) -> None:
"""Sends a reset command timeout command."""
self.log(logging.DEBUG, "Sending reset command timeout command")
if self._backend == 'ticcmd':
self._ticcmd('--reset-command-timeout')
elif self._backend == 'USB':
self._usb_command(request=Tic_cmd['Reset_command_timeout'])
def _enter_safe_start(self) -> None:
"""Sends an enter safe start command."""
self.log(logging.INFO, "Entering safe start mode")
if self._backend == 'ticcmd':
self._ticcmd('--enter-safe-start')
elif self._backend == 'USB':
self._usb_command(request=Tic_cmd['Enter_safe_start'])
def _exit_safe_start(self) -> None:
"""Sends an exit safe start command."""
self.log(logging.INFO, "Exiting safe start mode")
if self._backend == 'ticcmd':
self._ticcmd('--exit-safe-start')
elif self._backend == 'USB':
self._usb_command(request=Tic_cmd['Exit_safe_start'])
def _deenergize(self) -> None:
"""Sends a deenergize command."""
self.log(logging.INFO, "Deenergizing the motor")
if self._backend == 'ticcmd':
self._ticcmd('--deenergize')
elif self._backend == 'USB':
self._usb_command(request=Tic_cmd['Deenergize'])
def _energize(self) -> None:
"""Sends an energize command."""
self.log(logging.INFO, "Energizing the motor")
if self._backend == 'ticcmd':
self._ticcmd('--energize')
elif self._backend == 'USB':
self._usb_command(request=Tic_cmd['Energize'])
def _set_step_mode(self) -> None:
"""Sends a set step mode command."""
self.log(logging.INFO, "Setting the micro-step mode")
if self._backend == 'ticcmd':
self._ticcmd('--step-mode', str(self._step_mode))
elif self._backend == 'USB':
self._usb_command(request=Tic_cmd['Set_step_mode'],
value=Tic_step_mode[self._step_mode])
def _set_current_limit(self) -> None:
"""Sends a set current limit command."""
self.log(logging.INFO, "Setting the current limit")
if self._backend == 'ticcmd':
self._ticcmd('--current', str(self._current_limit))
elif self._backend == 'USB':
if self._model == '36v4':
self._usb_command(request=Tic_cmd['Set_setting'],
value=self._unrestricted_current_limit,
index=Tic_settings['Unrestricted_current_limit'])
self._usb_command(request=Tic_cmd['Set_current_limit'],
value=self._current_index)
def _set_max_speed(self, speed: float) -> None:
"""Clamps the speed within the limits and sets it."""
self.log(logging.DEBUG, "Setting the maximum speed")
# The given speed may first need to be reduced or increased in order to
# comply with the Tic ratings
if abs(speed) > self._to_mm(Tic_max_speed / 10000):
self.log(logging.WARNING, "Requested speed exceeding max allowed speed, "
"setting to max allowed speed")
max_speed = Tic_max_speed
elif abs(speed) < self._to_mm(Tic_min_speed / 10000):
self.log(logging.WARNING, "Requested speed below min possible speed, "
"setting to min possible speed")
max_speed = Tic_min_speed
else:
max_speed = abs(self._to_steps(speed * 10000))
if self._backend == 'ticcmd':
self._ticcmd('--max-speed', str(int(max_speed)))
elif self._backend == 'USB':
self._usb_32_bit(request=Tic_cmd['Set_max_speed'],
data=int(max_speed))
def _set_max_accel(self) -> None:
"""Clamps the acceleration within the limits and sets it."""
self.log(logging.INFO, "Setting the maximum acceleration")
if self._backend == 'ticcmd':
self._ticcmd('--max-accel', str(int(
self._to_steps(self._max_accel * 100))))
elif self._backend == 'USB':
self._usb_32_bit(request=Tic_cmd['Set_max_accel'],
data=int(self._to_steps(self._max_accel * 100)))
def _set_max_decel(self) -> None:
"""Clamps the deceleration within the limits and sets it."""
self.log(logging.INFO, "Setting the maximum deceleration")
if self._backend == 'ticcmd':
self._ticcmd('--max-decel', str(int(
self._to_steps(self._max_accel * 100))))
elif self._backend == 'USB':
self._usb_32_bit(request=Tic_cmd['Set_max_decel'],
data=int(self._to_steps(self._max_accel * 100)))
def _set_position(self, position: float) -> None:
"""Sends a set position command."""
self.log(logging.DEBUG, "Setting the position")
if self._backend == 'ticcmd':
self._ticcmd('--position', str(int(self._to_steps(position))))
elif self._backend == 'USB':
self._usb_32_bit(request=Tic_cmd['Set_target_position'],
data=int(self._to_steps(position)))
def _set_velocity(self, velocity: float) -> None:
"""Sends a set velocity command."""
self.log(logging.DEBUG, "Setting the velocity")
if self._backend == 'ticcmd':
self._ticcmd('--velocity', str(int(velocity)))
elif self._backend == 'USB':
self._usb_32_bit(request=Tic_cmd['Set_target_velocity'],
data=int(velocity))
def _get_max_speed(self) -> float:
"""Reads the maximum speed from the motor."""
self.log(logging.DEBUG, "Reading the maximum speed")
if self._backend == 'ticcmd':
return self._to_mm(yaml.load(self._ticcmd('-s', '--full'),
Loader=yaml.FullLoader)
['Max speed'] / 10000) if full_loader else \
self._to_mm(yaml.load(self._ticcmd('-s', '--full'))
['Max speed'] / 10000)
elif self._backend == 'USB':
return self._to_mm(int.from_bytes(
self._usb_command(request_type=Tic_usb_request['Var'],
request=Tic_cmd['Get_variable'],
index=Tic_var['Max_speed'],
data_or_length=4),
byteorder='little',
signed=False) / 10000)
def _set_pin_function(self, pin_func: Dict[str, str]) -> None:
"""Sets the pin function bitfields.
Sends a command for setting each pin separately, and three commands for
setting the Kill switch, Limit switch forward and Limit switch reverse
bitfields.
"""
self.log(logging.INFO, "Setting the pin functions")
if self._backend == 'ticcmd':
pass
elif self._backend == 'USB':
# Reads the current bitfields
kill_switch_map = int.from_bytes(
self._usb_command(request_type=Tic_usb_request['Var'],
request=Tic_cmd['Get_setting'],
index=Tic_settings['Kill_switch_map'],
data_or_length=1),
byteorder='little',
signed=False)
limit_forward_map = int.from_bytes(
self._usb_command(request_type=Tic_usb_request['Var'],
request=Tic_cmd['Get_setting'],
index=Tic_settings['Limit_switch_forward_map'],
data_or_length=1),
byteorder='little',
signed=False)
limit_reverse_map = int.from_bytes(
self._usb_command(request_type=Tic_usb_request['Var'],
request=Tic_cmd['Get_setting'],
index=Tic_settings['Limit_switch_reverse_map'],
data_or_length=1),
byteorder='little',
signed=False)
for pin in pin_func:
# Modifies the three bitfields
if pin_func[pin] == 'Default':
kill_switch_map &= 0xFF - (1 << Tic_pins_bit[pin])
limit_forward_map &= 0xFF - (1 << Tic_pins_bit[pin])
limit_reverse_map &= 0xFF - (1 << Tic_pins_bit[pin])
elif pin_func[pin] == 'Kill switch':
kill_switch_map |= 1 << Tic_pins_bit[pin]
limit_forward_map &= 0xFF - (1 << Tic_pins_bit[pin])
limit_reverse_map &= 0xFF - (1 << Tic_pins_bit[pin])
elif pin_func[pin] == 'Limit switch forward':
kill_switch_map &= 0xFF - (1 << Tic_pins_bit[pin])
limit_forward_map |= 1 << Tic_pins_bit[pin]
limit_reverse_map &= 0xFF - (1 << Tic_pins_bit[pin])
elif pin_func[pin] == 'Limit switch reverse':
kill_switch_map &= 0xFF - (1 << Tic_pins_bit[pin])
limit_forward_map &= 0xFF - (1 << Tic_pins_bit[pin])
limit_reverse_map |= 1 << Tic_pins_bit[pin]
# Sends an individual command for each pin
self._usb_command(request=Tic_cmd['Set_setting'],
value=Tic_pin_modes[pin_func[pin]],
index=Tic_settings[pin + '_config'])
# Sets the three bitfields
self._usb_command(request=Tic_cmd['Set_setting'],
value=kill_switch_map,
index=Tic_settings['Kill_switch_map'])
self._usb_command(request=Tic_cmd['Set_setting'],
value=limit_forward_map,
index=Tic_settings['Limit_switch_forward_map'])
self._usb_command(request=Tic_cmd['Set_setting'],
value=limit_reverse_map,
index=Tic_settings['Limit_switch_reverse_map'])
def _set_pin_polarity(self, pin_pol: Dict[str, str]) -> None:
"""Sets the switch polarity bitfield."""
self.log(logging.INFO, "Setting the pin polarities")
if self._backend == 'ticcmd':
pass
elif self._backend == 'USB':
current = int.from_bytes(
self._usb_command(request_type=Tic_usb_request['Var'],
request=Tic_cmd['Get_setting'],
index=Tic_settings['Switch_polarity_map'],
data_or_length=1),
byteorder='little',
signed=False)
for pin in pin_pol:
if Tic_pin_polarity[pin_pol[pin]]:
current |= 1 << Tic_pins_bit[pin]
else:
current &= 0xFF - (1 << Tic_pins_bit[pin])
self._usb_command(request=Tic_cmd['Set_setting'],
value=current,
index=Tic_settings['Switch_polarity_map'])
def _reset(self) -> None:
"""Resets the Tic and reloads the settings."""
self.log(logging.INFO, "Resetting the device")
if self._backend == 'ticcmd':
pass
elif self._backend == 'USB':
self._usb_command(request=Tic_cmd['Reset'])
def _usb_command(self,
request_type: int = Tic_usb_request['Cmd'],
request: int = 0,
value: int = 0,
index: int = 0,
data_or_length: int = 0) -> Union[bytearray, int]:
"""Wrapper for sending a USB control transfer."""
with self._lock:
try:
self.log(logging.DEBUG, f"Sending USB command with "
f"request type {request_type}, request "
f"{request}, value {value}, index {index},"
f"length or data {data_or_length}")
result = self._dev.ctrl_transfer(bmRequestType=request_type,
bRequest=request,
wValue=value,
wIndex=index,
data_or_wLength=data_or_length)
except core.USBError:
raise IOError("An error occurred during USB communication")
return result
def _usb_32_bit(self, request: int, data: int) -> None:
"""Wrapper for sending USB requests containing 32-bits values."""
value = data & 0xFFFF
index = data >> 16 & 0xFFFF
self._usb_command(request=request, value=value, index=index)
def _ticcmd(self, *args: str) -> bytes:
"""Wrapper for calling ticcmd in a subprocess."""
with self._lock:
self.log(logging.DEBUG, f"Calling ticcmd -d {self._serial_number} "
f"{' '.join(args)}")
return check_output(['ticcmd'] + ['-d'] +
[self._serial_number]
+ list(args))
def _thread_shutoff(self) -> None:
"""Thread for deenergizing the motor after a given period of inactivity.
This thread reads the speed every 0.1s, and increments a timer if the speed
is `0`. Once the timer reaches `_t_shutoff`, deenergizes the motor. The
timer is reset if a speed or position command is issued, or if the speed is
not `0`.
"""
timer = 0
while not self._close:
sleep(0.01)
while self._timer_shutoff:
# Exit if close flag raised
if self._close:
break
# Resetting timer if reset flag raised
if self._reset_timer_shutoff:
timer = 0
self._reset_timer_shutoff = False
# Checking if the motor is moving
if self.get_speed() == 0:
timer += 0.1
else:
timer = 0
sleep(0.1)
# Finally deenergizing the motor if all the conditions are met
if timer > self._t_shutoff and \
not self._reset_timer_shutoff and \
not self._close:
self._enter_safe_start()
self._deenergize()
self._timer_shutoff = False
self._RCT = False # Stopping the RCT thread as well
timer = 0
def _thread_rct(self) -> None:
"""Thread for sending the reset command timeout command every `0.5s`.
This prevents the motor from stopping because of a reset command timeout
error. Only sends the command when the motor is energized.
"""
# Setting command timeout to 1000ms
if self._backend == 'USB':
self._usb_command(request=Tic_cmd['Set_setting'],
value=0xE8,
index=Tic_settings['Command_timeout_low'])
self._usb_command(request=Tic_cmd['Set_setting'],
value=0x03,
index=Tic_settings['Command_timeout_high'])
while not self._close:
sleep(0.01)
while self._RCT:
if self._close:
break
self._reset_command_timeout()
sleep(0.5)