Source code for crappy.inout.kollmorgen_akd_pdmm

# coding: utf-8

from time import time
from struct import pack, unpack
from typing import List, Iterable
import logging
from  warnings import warn

from .meta_inout import InOut
from .._global import OptionalModule
try:
  from pymodbus.client.tcp import ModbusTcpClient
except (ModuleNotFoundError, ImportError):
  ModbusTcpClient = OptionalModule("pymodbus", "Cannot use KollMorgenVariator")


coil_addr = {'power': 0,
             'move_abs': 1,
             'move_rel': 2,
             'move_vel': 3,
             'stop': 4,
             'ack_error': 5}

reg_addr = {'position': 0,
            'distance': 2,
            'velocity': 4,
            'acc': 5,
            'dec': 6,
            'fstdec': 7,
            'direction': 8}

input_reg_addr = {'act_speed': 0,
                  'act_position': 2,
                  'axis_state': 4}


[docs] class KollmorgenAKDPDMM(InOut): """This class can communicate with a KollMorgen AKD PDMM programmable multi-axis controller. It can either drive it in speed or in position. Multiple axes can be driven. The values of the current speeds or positions can also be retrieved. .. versionadded:: 1.4.0 .. versionchanged:: 2.0.0 renamed from *Koll* to *KollmorgenAKDPDMM* """
[docs] def __init__(self, axes: Iterable[int], mode: str = 'position', host: str = '192.168.0.109', port: int = 502) -> None: """Sets the arguments and initializes the parent class. Args: axes: An iterable (like a :obj:`list` or a :obj:`tuple`) containing the motors/axes to drive, given as :obj:`int`. .. versionchanged:: 1.5.10 renamed from *axis* to *axes* mode: The driving mode, should be either `'speed'` or `'position'`. .. versionchanged:: 1.5.10 renamed from *data* to *mode* host: The IP address of the variator, given as a :obj:`str`. port: The network port over which to communicate with the variator, as an :obj:`int`. .. versionremoved:: 1.5.10 *speed*, *acc*, *decc* and *labels* arguments """ warn(f"Starting from version 2.1.0, {type(self).__name__} will be moved " f"to crappy.collection. Your code that uses it will still work as " f"is, except you will now need to import crappy.collection at the " f"top of your script.", FutureWarning) self._variator = None super().__init__() # Making sure the given mode is correct if mode not in ('speed', 'position'): raise ValueError("The mode argument should be either 'speed' or " "'position' !") self._axes = axes self._mode = mode self._host = host self._port = port
[docs] def open(self) -> None: """Connects to the variator over modbus.""" self.log(logging.INFO, f"Initializing the TCP connection to address " f"{self._host} on port {self._port}") self._variator = ModbusTcpClient(host=self._host, port=self._port) self.log(logging.INFO, f"Opening the TCP connecting to the address " f"{self._host} on port {self._port}") self._variator.connect()
[docs] def get_data(self) -> List[float]: """For each motor, reads its current speed or position depending on the selected mode. The positions or speeds are returned in the same order as the motors were given in the ``axes`` argument. """ ret = [time()] for axis in self._axes: # Selecting the register to read depending on the selected mode if self._mode == 'speed': reg = 10 * axis + input_reg_addr["act_speed"] else: reg = 10 * axis + input_reg_addr["act_position"] # Actually reading the data and converting it read = self._variator.read_input_registers(address=reg, count=2) converted, *_ = unpack("=f", pack("=HH", *read.registers[::-1])) ret.append(converted) return ret
[docs] def set_cmd(self, *cmd: float) -> None: """Sets either the speed or the position of all motors depending on the selected mode. If more commands than motors are given, the extra commands are ignored. If there are more motors than commands, only part of the motors will be set. .. versionadded:: 1.5.10 """ for axis, val in zip(self._axes, cmd): # Setting the speed of each motor if self._mode == 'speed': reg = 10 * axis + reg_addr['velocity'] self._variator.write_register(reg, abs(val)) direction = 10 * axis + reg_addr['direction'] self._variator.write_register(direction, 0 if val > 0 else 1) # Setting the target position of each motor else: coil = 10 * axis + coil_addr['move_abs'] reg = 10 * axis + reg_addr['position'] data = unpack("=HH", pack("=f", val))[::-1] self._variator.write_registers(reg, list(data)) self._variator.write_coil(coil, True)
[docs] def close(self) -> None: """Closes the modbus connection to the variator.""" if self._variator is not None: self.log(logging.INFO, f"Closing the TCP connecting to the address " f"{self._host} on port {self._port}") self._variator.close()