Source code for crappy.tool.ft232h.ft232h

# coding: utf-8

from enum import IntEnum
from collections import namedtuple
from struct import calcsize, unpack, pack
from typing import Union, Callable, Optional, Tuple, List
from multiprocessing import current_process
import logging

from .i2c_message import I2CMessage
from ..._global import OptionalModule
try:
  from usb import util
  from usb.core import find, USBError

  Ftdi_req_out = util.build_request_type(util.CTRL_OUT,
                                         util.CTRL_TYPE_VENDOR,
                                         util.CTRL_RECIPIENT_DEVICE)

  Ftdi_req_in = util.build_request_type(util.CTRL_IN,
                                        util.CTRL_TYPE_VENDOR,
                                        util.CTRL_RECIPIENT_DEVICE)

except (ModuleNotFoundError, ImportError):
  util = OptionalModule("pyusb")
  find = OptionalModule("pyusb")
  USBError = OptionalModule("pyusb")
  Ftdi_req_out = 0x40
  Ftdi_req_in = 0xC0

ft232h_cmds = {'write_bytes_PVE_MSB': 0x10,
               'write_bytes_NVE_MSB': 0x11,
               'write_bits_NVE_MSB': 0x13,
               'write_bytes_PVE_LSB': 0x18,
               'write_bytes_NVE_LSB': 0x19,
               'read_bytes_PVE_MSB': 0x20,
               'read_bits_PVE_MSB': 0x22,
               'read_bytes_NVE_MSB': 0x24,
               'read_bytes_PVE_LSB': 0x28,
               'read_bytes_NVE_LSB': 0x2C,
               'rw_bytes_PVE_NVE_MSB': 0x31,
               'rw_bytes_NVE_PVE_MSB': 0x34,
               'rw_bytes_PVE_NVE_LSB': 0x39,
               'rw_bytes_NVE_PVE_LSB': 0x3C,
               'set_bits_low': 0x80,
               'get_bits_low': 0x81,
               'set_bits_high': 0x82,
               'get_bits_high': 0x83,
               'loopback_start': 0x84,
               'loopback_end': 0x85,
               'set_tck_divisor': 0x86,
               'send_immediate': 0x87,
               'disable_clk_div5': 0x8A,
               'enable_clk_div5': 0x8B,
               'enable_clk_3phase': 0x8C,
               'disable_clk_3phase': 0x8D,
               'enable_clk_adaptative': 0x96,
               'disable_clk_adaptative': 0x97,
               'drive_zero': 0x9E}

ft232h_sio_req = {'reset': 0x00,
                  'set_event_char': 0x06,
                  'set_error_char': 0x07,
                  'set_latency_timer': 0x09,
                  'set_bitmode': 0x0B,
                  'read_eeprom': 0x90,
                  'write_eeprom': 0x91}

ft232h_sio_args = {'reset': 0,
                   'purge_RX': 1,
                   'purge_TX': 2}

Ftdi_vendor_id = 0x0403
ft232h_product_id = 0x6014

ft232h_latency = {'min': 1,
                  'max': 255}

ft232h_clock = {'base': 6.0E6,
                'high': 30.0E6}

ft232h_tx_empty_bits = 0x60
ft232h_max_payload = 0xFF
ft232h_mpsse_bit_delay = 0.5E-6
ft232h_port_width = 16
ft232h_eeprom_size = 256

ft232h_eeprom = {'has_serial_pos': 0x0A,
                 'str_table': 0x0E,
                 'str_position': 0xA0}

ft232h_pins = {'SCL': 0x01,
               'SDAO': 0x02,
               'SDAI': 0x04,
               'SCL_FB': 0x80,
               'SCK': 0x01,
               'DO': 0x02,
               'DI': 0x04,
               'CS': 0x08}

ft232h_i2c_timings = namedtuple('I2CTimings',
                                't_hd_sta t_su_sta t_su_sto t_buf')

ft232h_modes = ['SPI', 'I2C', 'GPIO_only', 'Write_serial_nr']

ft232h_pin_nr = {pin: index for pin, index in zip(
  ['D{}'.format(i) for i in range(8)] +
  ['C{}'.format(i) for i in range(8)], [2 ** j for j in range(16)])}

ft232h_i2c_speed = {100E3: ft232h_i2c_timings(4.0E-6, 4.7E-6, 4.0E-6, 4.7E-6),
                    400E3: ft232h_i2c_timings(0.6E-6, 0.6E-6, 0.6E-6, 1.3E-6),
                    1E6: ft232h_i2c_timings(0.26E-6, 0.26E-6, 0.26E-6, 0.5E-6)}


class FindSerialNumber:
  """A class used for finding USB devices matching a given serial number, using
     the usb.core.find method.
  
  .. versionadded:: 1.5.10
  .. versionchanged:: 2.0.0
     renamed from *Find_serial_number* to *FindSerialNumber*
  """

  def __init__(self, serial_number: str) -> None:
    self.serial_number = serial_number

  def __call__(self, device) -> bool:
    return device.serial_number == self.serial_number


[docs] class FT232H: """A class for controlling FTDI's USB to Serial FT232H. Communication in SPI and I2C are implemented, along with GPIO control. The name of the methods for SPI and I2C communication are those of :mod:`smbus` and :mod:`spidev` libraries, in order to facilitate the use and the integration in a multi-backend environment. This class also allows to write a USB serial number in the EEPROM, as there's no default serial number on the chip. Note: The FT232H does not support clock stretching and this may cause bugs with some I2C devices. Lowering the ``i2c_speed`` may solve the problem. Important: If using Adafruit's board, its `I2C Mode` switch should of course be set to the correct value according to the chosen mode. Important: **Only for Linux users:** In order to drive the FT232H, the appropriate udev rule should be set. This can be done using the `udev_rule_setter` utility in ``crappy``'s `util` folder. It is also possible to add it manually by running: :: $ echo "SUBSYSTEM==\\"usb\\", ATTR{idVendor}==\\"0403\\", \ MODE=\\"0666\\\"" | sudo tee ftdi.rules > /dev/null 2>&1 in a shell opened in ``/etc/udev/rules.d``. Important: For controlling several FT232H from the same computer, it is first necessary to set their USB serial numbers. Otherwise, an error will be raised. This can be done using the crappy utility ``set_ft232h_serial_nr.py``. .. versionadded:: 1.5.10 .. versionchanged:: 2.0.0 renamed from *ft232h* to *FT232H* """ class BitMode(IntEnum): """Commands for changing the bit mode.""" RESET = 0x00 # switch off alternative mode (default to UART) BITBANG = 0x01 # classical asynchronous bitbang mode MPSSE = 0x02 # MPSSE mode, available on 2232x chips SYNCBB = 0x04 # synchronous bitbang mode MCU = 0x08 # MCU Host Bus Emulation mode, OPTO = 0x10 # Fast Opto-Isolated Serial Interface Mode CBUS = 0x20 # Bitbang on CBUS pins of R-type chips SYNCFF = 0x40 # Single Channel Synchronous FIFO mode
[docs] def __init__(self, mode: str, serial_nr: Optional[str] = None, i2c_speed: float = 100E3, spi_turbo: bool = False) -> None: """Checks the arguments validity, initializes the device and sets the locks. Args: mode: The communication mode as a :obj:`str`, can be : :: 'SPI', 'I2C', 'GPIO_only', 'Write_serial_nr' GPIOs can be driven in any mode, but faster speeds are achievable in `GPIO_only` mode. serial_nr: The serial number of the FT232H to drive, as a :obj:`str`. In `Write_serial_nr` mode, the serial number to be written. i2c_speed: In I2C mode, the I2C bus clock frequency in Hz, as an :obj:`int`. Available values are : :: 100E3, 400E3, 1E6 or any value between `10kHz` and `100kHz`. Lowering below the default value may solve I2C clock stretching issues on some devices. spi_turbo: If :obj:`True`, increases the achievable bus speed in SPI mode, but may not work with some devices. Note: - **CS pin**: The CS pin for selecting SPI devices is always `D3`. This pin is reserved and cannot be used as a GPIO. If you want to drive the CS line manually, it is possible not to drive the CS pin by setting the SPI parameter :attr:`no_cs` to :obj:`True` and to drive the CS line from a GPIO instead. - ``mode``: It is not possible to simultaneously control slaves over SPI and I2C, due to different hardware requirements for the two protocols. Trying to do so will most likely raise an error or lead to inconsistent behavior. """ if mode not in ft232h_modes: raise ValueError("mode should be in {}".format(ft232h_modes)) self._ft232h_mode = mode if mode == 'Write_serial_nr' and serial_nr is None: raise ValueError("Cannot set serial number if it is not specified !") if i2c_speed not in ft232h_i2c_speed: try: if not 10E3 <= i2c_speed < 100E3: raise ValueError("i2c_speed should be in {} or between 10E3 and " "100E3".format(list(ft232h_i2c_speed.values()))) except TypeError: raise TypeError("i2c_speed should be a float or an int !") self._gpio_low = 0 self._gpio_high = 0 self._gpio_dir = 0 self._retry_count = 16 self._usb_write_timeout = 5000 self._usb_read_timeout = 5000 self._serial_nr = serial_nr self._turbo = spi_turbo self._i2c_speed = i2c_speed self._nb_attempt_1 = 8 self._nb_attempt_2 = 8 self._bits_per_word = 8 self._cshigh = False self._no_cs = False self._loop = False self._lsbfirst = False self._max_speed_hz = 400E3 self._mode = 0 self._threewire = False self._spi_param_changed = True self._logger: Optional[logging.Logger] = None self._initialize() if mode == 'Write_serial_nr': self.log(logging.WARNING, f"Setting the FT232H seria lnumber to " f"{serial_nr}") self._set_serial_number(serial_nr) self.close()
def _initialize(self) -> None: """Initializing the FT232H according to the chosen mode. The main differences are for the choice of the clock frequency and parameters. """ # FT232H properties fifo_sizes = (1024, 1024) latency = 16 # I2C properties if self._ft232h_mode == 'I2C': timings = ft232h_i2c_speed[self._i2c_speed if self._i2c_speed in ft232h_i2c_speed else 100E3] frequency = self._i2c_speed self._ck_hd_sta = self._compute_delay_cycles(timings.t_hd_sta) self._ck_su_sto = self._compute_delay_cycles(timings.t_su_sto) ck_su_sta = self._compute_delay_cycles(timings.t_su_sta) ck_buf = self._compute_delay_cycles(timings.t_buf) self._ck_idle = max(ck_su_sta, ck_buf) self._ck_delay = ck_buf self._i2c_mask = ft232h_pins['SCL'] | ft232h_pins['SDAO'] | \ ft232h_pins['SDAI'] self._i2c_dir = ft232h_pins['SCL'] | ft232h_pins['SDAO'] # SPI properties elif self._ft232h_mode == 'SPI': frequency = 400E3 self._cs_bit = ft232h_pins['CS'] self._spi_dir = self._cs_bit | ft232h_pins['SCK'] | ft232h_pins['DO'] self._spi_mask = self._cs_bit | ft232h_pins['SCK'] | \ ft232h_pins['DO'] | ft232h_pins['DI'] else: frequency = 400E3 # Finding the matching USB devices if self._serial_nr is not None and self._ft232h_mode != 'Write_serial_nr': devices = find(find_all=True, idVendor=Ftdi_vendor_id, idProduct=ft232h_product_id, custom_match=FindSerialNumber(self._serial_nr)) else: devices = find(find_all=True, idVendor=Ftdi_vendor_id, idProduct=ft232h_product_id) # Checking if there's only 1 device matching devices = list(devices) if len(devices) == 0: raise IOError("No matching ft232h connected") elif len(devices) > 1: raise IOError("Several ft232h devices found, please specify a serial_nr") else: self._usb_dev = devices[0] self.log(logging.DEBUG, f"USB device found: {self._usb_dev}") try: self._serial_nr = self._usb_dev.serial_number except ValueError: self._serial_nr = "" # Configuring the USB device, interface and endpoints try: if self._usb_dev.is_kernel_driver_active(0): self._usb_dev.detach_kernel_driver(0) self.log(logging.INFO, "Setting USB configuration for the FT232H") self._usb_dev.set_configuration() except USBError: self.log(logging.ERROR, "Could not set USB device configuration !\nYou may have to " "install the udev-rules for this USB device, this can be done " "using the udev_rule_setter utility in the util folder") raise config = self._usb_dev.get_active_configuration() interface = config[(0, 0)] self._index = interface.bInterfaceNumber + 1 endpoints = sorted([ep.bEndpointAddress for ep in interface]) self._in_ep, self._out_ep = endpoints[:2] endpoint = interface[0] self._max_packet_size = endpoint.wMaxPacketSize # Invalidate data in the readbuffer self._readoffset = 0 self._readbuffer = bytearray() # Drain input buffer self._purge_buffers() # Shallow reset if self._ctrl_transfer_out(ft232h_sio_req['reset'], ft232h_sio_args['reset']): raise IOError('Unable to reset FTDI device') # Reset feature mode self._set_bitmode(0, FT232H.BitMode.RESET) # Set latency timer self._set_latency_timer(latency) # Set chunk size and invalidate all remaining data self._writebuffer_chunksize = fifo_sizes[0] self._readoffset = 0 self._readbuffer = bytearray() self._readbuffer_chunksize = min(fifo_sizes[0], fifo_sizes[1], self._max_packet_size) # Reset feature mode self._set_bitmode(0, FT232H.BitMode.RESET) # Drain buffers self._purge_buffers() # Disable event and error characters if self._ctrl_transfer_out(ft232h_sio_req['set_event_char'], 0): raise IOError('Unable to set event char') if self._ctrl_transfer_out(ft232h_sio_req['set_error_char'], 0): raise IOError('Unable to set error char') # Enable MPSSE mode if self._ft232h_mode == 'GPIO_only': self.log(logging.DEBUG, "Setting the mode to GPIO_only") self._set_bitmode(0xFF, FT232H.BitMode.MPSSE) else: self.log(logging.DEBUG, f"Setting the mode to {self._ft232h_mode}") self._set_bitmode(self._direction, FT232H.BitMode.MPSSE) # Configure clock if self._ft232h_mode == 'I2C': # Note that bus frequency may differ from clock frequency, when # 3-phase clock is enabled self._set_frequency(3 * frequency / 2) else: self._set_frequency(frequency) # Configure pins self.log(logging.DEBUG, "Configuring the FT232H pins") if self._ft232h_mode == 'I2C': cmd = bytearray(self._idle) cmd.extend((ft232h_cmds['set_bits_high'], 0, 0)) self._write_data(cmd) elif self._ft232h_mode == 'SPI': cmd = bytearray((ft232h_cmds['set_bits_low'], self._cs_bit & 0xFF, self._direction & 0xFF)) cmd.extend((ft232h_cmds['set_bits_high'], (self._cs_bit >> 8) & 0xFF, (self._direction >> 8) & 0xFF)) self._write_data(cmd) else: cmd = bytearray((ft232h_cmds['set_bits_low'], 0, 0)) cmd.extend((ft232h_cmds['set_bits_high'], 0, 0)) self._write_data(cmd) # Disable loopback self.log(logging.DEBUG, "Disabling loopback") self._write_data(bytearray((ft232h_cmds['loopback_end'],))) # Validate MPSSE bytes_ = bytes(self._read_data_bytes(2)) if (len(bytes_) >= 2) and (bytes_[0] == '\xfa'): raise IOError("Invalid command @ %d" % bytes_[1]) # I2C-specific settings if self._ft232h_mode == 'I2C': self.log(logging.DEBUG, "Configuring I2C-specific features") self._tx_size, self._rx_size = fifo_sizes # Enable 3-phase clock self._write_data(bytearray([True and ft232h_cmds['enable_clk_3phase'] or ft232h_cmds['disable_clk_3phase']])) # Enable drivezero mode self._write_data(bytearray([ft232h_cmds['drive_zero'], self._i2c_mask & 0xFF, (self._i2c_mask >> 8) & 0xFF])) # Disable adaptative clock self._write_data(bytearray([False and ft232h_cmds['enable_clk_adaptative'] or ft232h_cmds['disable_clk_adaptative']]))
[docs] def log(self, level: int, msg: str) -> None: """Wrapper for logging messages. Also initializes the Logger on the first message. Args: level: The logging level of the message, as an :obj:`int`. msg: The message to log, as a :obj:`str`. """ if self._logger is None: self._logger = logging.getLogger(f"{current_process().name}." f"{type(self).__name__}") self._logger.log(level, msg)
@staticmethod def _compute_delay_cycles(value: float) -> int: """Approximates the number of clock cycles over a given delay. Args: value: The delay in seconds, as a :obj:`float`. Returns: The number of clock cycles, as an :obj:`int`. """ bit_delay = ft232h_mpsse_bit_delay return max(1, int((value + bit_delay) / bit_delay)) def _set_latency_timer(self, latency: int) -> None: """Sets the latency timer. Sets the latency timer, i.e. the delay the chip waits before sending the data stored in the buffer to the host. Not applicable when a send immediate command is issued, in which case the buffered data is returned immediately. Args: latency: The latency in milliseconds, as an :obj:`int`. """ self.log(logging.DEBUG, f"Setting the latency timer to {latency}") if not ft232h_latency['min'] <= latency <= ft232h_latency['max']: raise ValueError("Latency out of range") if self._ctrl_transfer_out(ft232h_sio_req['set_latency_timer'], latency): raise IOError('Unable to set latency timer') def _set_frequency(self, frequency: float) -> float: """Sets the bus frequency. Sets the FT232H clock divisor value, according to the desired bus frequency. The actual bus frequency is then as close as possible to the desired value, but may still be slightly different. Args: frequency: The desired bus frequency in Hz, as a :obj:`float`. Returns: The actual bus frequency, as a :obj:`float`. """ self.log(logging.DEBUG, f"Setting the clock frequency to {frequency}") # Calculate base speed clock divider divcode = ft232h_cmds['enable_clk_div5'] divisor = int((ft232h_clock['base'] + frequency / 2) / frequency) - 1 divisor = max(0, min(0xFFFF, divisor)) actual_freq = ft232h_clock['base'] / (divisor + 1) error = (actual_freq / frequency) - 1 # Calculate high speed clock divider divisor_hs = int((ft232h_clock['high'] + frequency / 2) / frequency) - 1 divisor_hs = max(0, min(0xFFFF, divisor_hs)) actual_freq_hs = ft232h_clock['high'] / (divisor_hs + 1) error_hs = (actual_freq_hs / frequency) - 1 # Enable if closer to desired frequency # ===================================================================== if abs(error_hs) <= abs(error): divcode = ft232h_cmds['disable_clk_div5'] divisor = divisor_hs actual_freq = actual_freq_hs # FTDI expects little endian cmd = bytearray((divcode,)) cmd.extend((ft232h_cmds['set_tck_divisor'], divisor & 0xff, (divisor >> 8) & 0xff)) cmd.extend((ft232h_cmds['send_immediate'],)) self._write_data(cmd) # validate MPSSE bytes_ = bytes(self._read_data_bytes(2)) if (len(bytes_) >= 2) and (bytes_[0] == '\xfa'): raise IOError("Invalid command @ %d" % bytes_[1]) # Drain input buffer self._purge_rx_buffer() return actual_freq def _set_bitmode(self, bitmask: int, mode: BitMode) -> None: """Sets the bitbang mode. Args: bitmask: Mask for choosing the driven GPIOs. mode: The bitbang mode to be set. """ mask = sum(FT232H.BitMode) value = (bitmask & 0xff) | ((mode.value & mask) << 8) if self._ctrl_transfer_out(ft232h_sio_req['set_bitmode'], value): raise IOError('Unable to set bitmode') def _purge_buffers(self) -> None: """Clears the buffers on the chip and the internal read buffer.""" self._purge_rx_buffer() self._purge_tx_buffer() def _purge_rx_buffer(self) -> None: """Clears the USB receive buffer on the chip (host-to-ftdi) and the internal read buffer.""" if self._ctrl_transfer_out(ft232h_sio_req['reset'], ft232h_sio_args['purge_RX']): raise IOError('Unable to flush RX buffer') # Invalidate data in the readbuffer self._readoffset = 0 self._readbuffer = bytearray() def _purge_tx_buffer(self) -> None: """Clears the USB transmit buffer on the chip (ftdi-to-host).""" if self._ctrl_transfer_out(ft232h_sio_req['reset'], ft232h_sio_args['purge_TX']): raise IOError('Unable to flush TX buffer') def _ctrl_transfer_out(self, reqtype: int, value: int, data: bytes = b'') -> int: """Sends a control message to the device. Args: reqtype: bmRequest value: wValue data: payload Returns: Number of bytes actually written """ try: self.log(logging.DEBUG, f"Sending USB control transfer with request type {Ftdi_req_out}" f", request {reqtype}, value {value}, index {self._index}, " f"data {data}") return self._usb_dev.ctrl_transfer( Ftdi_req_out, reqtype, value, self._index, bytearray(data), self._usb_write_timeout) except USBError as ex: raise IOError('UsbError: %s' % str(ex)) def _set_serial_number(self, serial_number: str) -> None: """(Over)Writes the serial number. Writes the desired serial number to the EEPROM. It is then accessible to USB commands as a string descriptor. Also sets the manufacturer and product string descriptors. Args: serial_number: Serial number to be written in the EEPROM, as a :obj:`str`. """ if not isinstance(serial_number, str): serial_number = str(serial_number) if any(char in serial_number for char in ':/'): raise ValueError("Invalid character : or / in serial number") # Reading current eeprom word_count = round(ft232h_eeprom_size / 2) word_addr = 0 data = bytearray() while word_count: try: self.log(logging.DEBUG, f"Sending USB control transfer with request type " f"{Ftdi_req_in}, request {ft232h_sio_req['read_eeprom']}, " f"value 0, index {word_addr}, data 2") buf = self._usb_dev.ctrl_transfer( Ftdi_req_in, ft232h_sio_req['read_eeprom'], 0, word_addr, 2, self._usb_read_timeout) self.log(logging.DEBUG, f"Read {buf} from the USB device") except USBError as exc: raise IOError('UsbError: %s' % exc) from exc if not buf: raise IOError('EEPROM read error @ %d' % (word_addr << 1)) data.extend(buf) word_count -= 1 word_addr += 1 new_eeprom = data[0:ft232h_eeprom_size] # Setting the has_serial flag to True new_eeprom[ft232h_eeprom['has_serial_pos']] |= 1 << 3 # Changing the string descriptors and the descriptors index str_descriptors = {'manufacturer': 'FTDI', 'product': 'FT232H', 'serial': serial_number} stream = bytearray() str_pos = ft232h_eeprom['str_position'] tbl_pos = ft232h_eeprom['str_table'] data_pos = str_pos for name in str_descriptors: new_str = str_descriptors[name].encode('utf-16le') length = len(new_str) + 2 stream.append(length) stream.append(util.DESC_TYPE_STRING) # string descriptor stream.extend(new_str) new_eeprom[tbl_pos] = data_pos tbl_pos += 1 new_eeprom[tbl_pos] = length tbl_pos += 1 data_pos += length new_eeprom[str_pos:str_pos + len(stream)] = stream # Filling the remaining space with zeros crc_pos = len(new_eeprom) rem = crc_pos - (str_pos + len(stream)) new_eeprom[str_pos + len(stream):crc_pos] = bytes(rem) # Checking the eeprom length if len(new_eeprom) != ft232h_eeprom_size: raise ValueError("Eeprom_size not matching, serial number may be " "too long, eeprom not written") # Calculating the new checksum and modifying the corresponding bytes checksum = 0xAAAA for idx in range(0, len(new_eeprom[:-2]), 2): v = ((new_eeprom[:-2][idx + 1] << 8) + new_eeprom[:-2][idx]) & 0xFFFF checksum = v ^ checksum checksum = ((checksum << 1) & 0xFFFF) | ((checksum >> 15) & 0xFFFF) new_eeprom[-2] = checksum & 0xFF new_eeprom[-1] = checksum >> 8 # Updating the eeprom addr = 0 for word in unpack('<%dH' % (len(new_eeprom) // 2), new_eeprom): self.log(logging.DEBUG, f"Sending USB control transfer with request type {Ftdi_req_out}" f", request {ft232h_sio_req['write_eeprom']}, value {word}, " f"index {addr >> 1}, data b''") out = self._usb_dev.ctrl_transfer( Ftdi_req_out, ft232h_sio_req['write_eeprom'], word, addr >> 1, b'', self._usb_write_timeout) if out: raise IOError('EEPROM Write Error @ %d' % addr) addr += 2 def _write_data(self, data: Union[bytearray, bytes]) -> int: """Writes data to the FT232H. Writes the sequence of MPSSE commands and data to the FTDI port. Data buffer is split into chunk-sized blocks before being sent over the USB bus. Args: data: The byte stream to send to the FTDI interface Returns: Count of written bytes """ offset = 0 size = len(data) try: while offset < size: write_size = self._writebuffer_chunksize if offset + write_size > size: write_size = size - offset try: self.log(logging.DEBUG, f"Sending USB write command to endpoint {self._in_ep}" f"and with data {data[offset:offset + write_size]}") length = self._usb_dev.write(self._in_ep, data[offset:offset + write_size], self._usb_write_timeout) except USBError: raise if length <= 0: raise USBError("Usb bulk write error") offset += length return offset except USBError: self.log(logging.ERROR, "An error occurred while writing to USB device") raise def _read_data_bytes(self, size: int, attempt: int = 2, request_gen: Optional[ Callable[[int], Union[bytearray, bytes]]] = None) -> bytes: """Reads data from the FT232H. Reads data from the FTDI interface. The data buffer is rebuilt from chunk-sized blocks received over the USB bus. The FTDI device always sends internal status bytes, which are stripped out as not part of the data payload. Args: size: The number of bytes to receive from the device, as an :obj:`int`. attempt: Attempt cycle count request_gen: A callable that takes the number of bytes read and expects a bytes buffer to send back to the remote device. This is only useful to perform optimized/continuous transfer from a slave device. Returns: Payload bytes """ # Packet size sanity check if not self._max_packet_size: raise ValueError("max_packet_size is bogus") packet_size = self._max_packet_size length = 1 # initial condition to enter the usb_read loop data = bytearray() # everything we want is still in the cache? if size <= len(self._readbuffer) - self._readoffset: data = self._readbuffer[self._readoffset:self._readoffset + size] self._readoffset += size return data # something still in the cache, but not enough to satisfy 'size'? if len(self._readbuffer) - self._readoffset != 0: data = self._readbuffer[self._readoffset:] # end of readbuffer reached self._readoffset = len(self._readbuffer) # read from USB, filling in the local cache as it is empty retry = attempt req_size = size try: while (len(data) < size) and (length > 0): while True: try: self.log(logging.DEBUG, f"Sending USB read command to endpoint {self._out_ep}" f"to read {self._readbuffer_chunksize} bytes") tempbuf = self._usb_dev.read(self._out_ep, self._readbuffer_chunksize, self._usb_read_timeout) self.log(logging.DEBUG, f"Read {tempbuf} from the USB device") except USBError: raise retry -= 1 length = len(tempbuf) # the received buffer contains at least one useful databyte # (first 2 bytes in each packet represent the current modem # status) if length >= 2: if tempbuf[1] & ft232h_tx_empty_bits: if request_gen: req_size -= length - 2 if req_size > 0: cmd = request_gen(req_size) if cmd: self._write_data(cmd) if length > 2: retry = attempt # skip the status bytes chunks = (length + packet_size - 1) // packet_size count = packet_size - 2 self._readbuffer = bytearray() self._readoffset = 0 srcoff = 2 for _ in range(chunks): self._readbuffer += tempbuf[srcoff:srcoff + count] srcoff += packet_size length = len(self._readbuffer) break # received buffer only contains the modem status bytes # no data received, may be late, try again if retry > 0: continue # no actual data self._readbuffer = bytearray() self._readoffset = 0 # no more data to read? return data if length > 0: # data still fits in buf? if (len(data) + length) <= size: data += self._readbuffer[self._readoffset: self._readoffset + length] self._readoffset += length # did we read exactly the right amount of bytes? if len(data) == size: return data else: # partial copy, not enough bytes in the local cache to # fulfill the request part_size = min(size - len(data), len(self._readbuffer) - self._readoffset) if part_size < 0: raise ValueError("Internal Error") data += self._readbuffer[self._readoffset: self._readoffset + part_size] self._readoffset += part_size return data except USBError: self.log(logging.ERROR, "An error occurred while writing to USB device") raise # never reached raise ValueError("Internal error") @property def _clk_hi_data_lo(self) -> Tuple[int, int, int]: """Returns the MPSSE command for driving CLK line high and SDA line low, while preserving the GPIO outputs.""" return (ft232h_cmds['set_bits_low'], ft232h_pins['SCL'] | self._gpio_low, self._i2c_dir | (self._gpio_dir & 0xFF)) @property def _clk_lo_data_input(self) -> Tuple[int, int, int]: """Returns the MPSSE command for driving CLK line low and listening to SDA line, while preserving the GPIO outputs.""" return (ft232h_cmds['set_bits_low'], self._gpio_low, ft232h_pins['SCL'] | (self._gpio_dir & 0xFF)) @property def _clk_lo_data_hi(self) -> Tuple[int, int, int]: """Returns the MPSSE command for driving CLK line low and SDA line high, while preserving the GPIO outputs.""" return (ft232h_cmds['set_bits_low'], ft232h_pins['SDAO'] | self._gpio_low, self._i2c_dir | (self._gpio_dir & 0xFF)) @property def _clk_lo_data_lo(self) -> Tuple[int, int, int]: """Returns the MPSSE command for driving CLK line low and SDA line low, while preserving the GPIO outputs.""" return (ft232h_cmds['set_bits_low'], self._gpio_low, self._i2c_dir | (self._gpio_dir & 0xFF)) @property def _idle(self) -> Tuple[int, int, int]: """Returns the MPSSE command for driving CLK line high and SDA line high, while preserving the GPIO outputs.""" return (ft232h_cmds['set_bits_low'], self._i2c_dir | self._gpio_low, self._i2c_dir | (self._gpio_dir & 0xFF)) @property def _start(self) -> Tuple[int, ...]: """Returns the MPSSE command for issuing and I2C start condition.""" return self._clk_hi_data_lo * self._ck_hd_sta + \ self._clk_lo_data_lo * self._ck_hd_sta @property def _stop(self) -> Tuple[int, ...]: """Returns the MPSSE command for issuing and I2C stop condition.""" return self._clk_lo_data_hi * self._ck_hd_sta + \ self._clk_lo_data_lo * self._ck_hd_sta + \ self._clk_hi_data_lo * self._ck_su_sto + \ self._idle * self._ck_idle def _do_prolog(self, i2caddress: int) -> None: """Sends the MPSSE commands for starting an I2C transaction. Args: i2caddress: I2C address of the slave """ if i2caddress is None: return cmd = bytearray(self._idle * self._ck_delay) cmd.extend(self._start) cmd.extend((ft232h_cmds['write_bytes_NVE_MSB'], 0, 0)) cmd.append(i2caddress) try: self._send_check_ack(cmd) except IOError: raise def _do_write(self, out: list) -> None: """Sends the MPSSE commands for writing bytes to an I2C slave. Args: out: List of bytes to write """ if not isinstance(out, bytearray): out = bytearray(out) if not out: return for byte in out: cmd = bytearray((ft232h_cmds['write_bytes_NVE_MSB'], 0, 0)) cmd.append(byte) self._send_check_ack(cmd) def _do_read(self, readlen: int) -> bytearray: """Sends the MPSSE commands for reading bytes from an I2C slave, and then returns these bytes. Args: readlen: Number of bytes to read Returns: Read bytes as a bytearray """ if not readlen: # force a real read request on device, but discard any result cmd = bytearray() cmd.extend((ft232h_cmds['send_immediate'],)) self._write_data(cmd) self._read_data_bytes(0, 8) return bytearray() ack = (ft232h_cmds['write_bits_NVE_MSB'], 0, 0) nack = (ft232h_cmds['write_bits_NVE_MSB'], 0, 0xFF) read_not_last = ((ft232h_cmds['read_bytes_PVE_MSB'], 0, 0) + ack + self._clk_lo_data_hi * self._ck_delay) read_last = ((ft232h_cmds['read_bytes_PVE_MSB'], 0, 0) + nack + self._clk_lo_data_hi * self._ck_delay) # maximum RX size to fit in FTDI FIFO, minus 2 status bytes chunk_size = self._rx_size - 2 cmd_size = len(read_last) # limit RX chunk size to the count of I2C packable commands in the FTDI # TX FIFO (minus one byte for the last 'send immediate' command) tx_count = (self._tx_size - 1) // cmd_size chunk_size = min(tx_count, chunk_size) chunks = [] rem = readlen if rem > chunk_size: chunk_size //= 2 cmd_chunk = bytearray() cmd_chunk.extend(read_not_last * chunk_size) cmd_chunk.extend((ft232h_cmds['send_immediate'],)) def _write_command_gen(length: int) -> Union[bytearray, bytes]: if length <= 0: # no more data return b'' if length <= chunk_size: cmd_ = bytearray() cmd_.extend(read_not_last * (length - 1)) cmd_.extend(read_last) cmd_.extend((ft232h_cmds['send_immediate'],)) return cmd_ return cmd_chunk while rem: buf = self._read_data_bytes(rem, self._nb_attempt_1, _write_command_gen) chunks.append(buf) rem -= len(buf) else: cmd = bytearray() cmd.extend(read_not_last * (rem - 1)) cmd.extend(read_last) cmd.extend((ft232h_cmds['send_immediate'],)) size = rem self._write_data(cmd) buf = self._read_data_bytes(size, self._nb_attempt_2) chunks.append(buf) return bytearray(b''.join(chunks)) def _send_check_ack(self, cmd: bytearray) -> None: """Actually sends the MPSSE commands generated by :meth:`_do_prolog` and :meth:`_do_write` methods, and checks whether the slave ACKs it. Args: cmd: The MPSSE commands to send """ # SCL low, SDA high-Z cmd.extend(self._clk_lo_data_hi) # read SDA (ack from slave) cmd.extend((ft232h_cmds['read_bits_PVE_MSB'], 0)) cmd.extend((ft232h_cmds['send_immediate'],)) self._write_data(cmd) ack = self._read_data_bytes(1, 8) if not ack: raise IOError('No answer from FTDI') if ack[0] & 0x01: raise IOError('NACK from slave') def _write_i2c(self, address: int, out: list, stop: bool = True) -> None: """Writes bytes to an I2C slave. Args: address: I2C address of the slave out: List of bytes to send stop: Should the stop condition be sent at the end of the message ? """ i2caddress = (address << 1) & 0xFF retries = self._retry_count while True: try: self._do_prolog(i2caddress) self._do_write(out) return except IOError: retries -= 1 if not retries: raise finally: if stop: self._write_data(bytearray(self._stop)) def _read_i2c(self, address: int, length: int, stop: bool = True) -> bytearray: """Reads bytes from an I2C slave. Args: address: I2C address of the slave length: Number of bytes to read stop: Should the stop condition be sent at the end of the message ? """ i2caddress = (address << 1) & 0xFF retries = self._retry_count while True: try: self._do_prolog(i2caddress | 0x01) data = self._do_read(length) if len(data) < length: raise IOError return data except (IOError, OSError): retries -= 1 if not retries: raise finally: if stop: self._write_data(bytearray(self._stop)) def _exchange_i2c(self, address: int, out: list, readlen: int = 0) -> bytearray: """Writes bytes to an I2C slave, and then reads a given number of bytes from this same slave. Args: address: I2C address of the slave out: List of bytes to send readlen: Number of bytes to read Returns: Read bytes as a bytearray """ if readlen < 1: raise IOError('Nothing to read') if readlen > (ft232h_max_payload / 3 - 1): raise IOError("Input payload is too large") if address is None: i2caddress = None else: i2caddress = (address << 1) & 0xFF retries = self._retry_count while True: try: self._do_prolog(i2caddress) self._do_write(out) self._do_prolog(i2caddress | 0x01) data = self._do_read(readlen) if len(data) < readlen: raise IOError return data except (IOError, OSError): retries -= 1 if not retries: raise finally: self._write_data(bytearray(self._stop))
[docs] def write_byte(self, i2c_addr: int, value: int) -> None: """Writes a single byte to an I2C slave, in register 0. Args: i2c_addr: The I2C address of the slave, as an :obj:`int`. value: The value to write, as an :obj:`int`. """ self.log(logging.DEBUG, f"Requested I2C byte write with value {value} to " f"address {i2c_addr}") self.write_i2c_block_data(i2c_addr=i2c_addr, register=0x00, data=[value & 0xFF])
[docs] def write_byte_data(self, i2c_addr: int, register: int, value: int) -> None: """Writes a single byte to an I2C slave, in the specified register. Args: i2c_addr: The I2C address of the slave, as an :obj:`int`. register: The index of the register to be written, as an :obj:`int`. value: The value to write, as an :obj:`int`. """ self.log(logging.DEBUG, f"Requested I2C byte write with value {value} to " f"register {register} at address {i2c_addr}") self.write_i2c_block_data(i2c_addr=i2c_addr, register=register, data=[value & 0xFF])
[docs] def write_word_data(self, i2c_addr: int, register: int, value: int) -> None: """Writes 2 bytes to an I2C slave from a single int value, starting at the specified register. Depending on the size of the registers, the next register may be written as well. Args: i2c_addr: The I2C address of the slave, as an :obj:`int`. register: The index of the first register to be written, as an :obj:`int`. value: The value to write, as an :obj:`int`. """ self.log(logging.DEBUG, f"Requested I2C word write with value {value} to " f"register {register} at address {i2c_addr}") self.write_i2c_block_data(i2c_addr=i2c_addr, register=register, data=[(value >> 8) & 0xFF, value & 0xFF])
[docs] def write_block_data(self, i2c_addr: int, register: int, data: list) -> None: """Actually calls :meth:`write_i2c_block_data`. Args: i2c_addr: The I2C address of the slave, as an :obj:`int`. register: The index of the first register to be written, as an :obj:`int`. data: A :obj:`list` of bytes to write. """ self.log(logging.DEBUG, f"Requested I2C block write with data {data} to " f"register {register} at address {i2c_addr}") self.write_i2c_block_data(i2c_addr=i2c_addr, register=register, data=data)
[docs] def write_i2c_block_data(self, i2c_addr: int, register: int, data: list) -> None: """Writes bytes from a :obj:`list` to an I2C slave, starting at the specified register. Args: i2c_addr: The I2C address of the slave, as an :obj:`int`. register: The index of the first register to be written, as an :obj:`int`. data: A :obj:`list` of bytes to write. """ self.log(logging.DEBUG, f"Requested I2C block write with data {data} to " f"register {register} at address {i2c_addr}") if self._ft232h_mode != 'I2C': raise ValueError("Method only available in I2C mode") if not 0 <= i2c_addr <= 127: raise ValueError("Incorrect i2c address, should be between 0 and 127") self._write_i2c(address=i2c_addr, out=[register] + data)
[docs] def read_byte(self, i2c_addr: int) -> int: """Reads a single byte from an I2C slave, from the register `0`. Args: i2c_addr: The I2C address of the slave, as an :obj:`int`. Returns: Value of the read register """ self.log(logging.DEBUG, f"Requested I2C byte read at address {i2c_addr}") try: return self.read_i2c_block_data(i2c_addr=i2c_addr, register=0x00, length=1)[0] except IndexError: self.log(logging.ERROR, "No data to read from USB device") raise
[docs] def read_byte_data(self, i2c_addr: int, register: int) -> int: """Reads a single byte from an I2C slave, from the specified register. Args: i2c_addr: The I2C address of the slave, as an :obj:`int`. register: The index of the register to be read, as an :obj:`int`. Returns: Value of the read register """ self.log(logging.DEBUG, f"Requested I2C byte read from register {register}" f" at address {i2c_addr}") try: return self.read_i2c_block_data(i2c_addr=i2c_addr, register=register, length=1)[0] except IndexError: self.log(logging.ERROR, "No data to read from USB device") raise
[docs] def read_word_data(self, i2c_addr: int, register: int) -> int: """Reads 2 bytes from an I2C slave, starting at the specified register, and returns them as one single value. Args: i2c_addr: The I2C address of the slave, as an :obj:`int`. register: The index of the first register to be read, as an :obj:`int`. Returns: Value of the read registers """ self.log(logging.DEBUG, f"Requested I2C word read from register {register}" f" at address {i2c_addr}") try: ret = self.read_i2c_block_data(i2c_addr=i2c_addr, register=register, length=2) return (ret[0] << 8) & ret[1] except IndexError: self.log(logging.ERROR, "Not enough data to read from USB device") raise
[docs] def read_i2c_block_data(self, i2c_addr: int, register: int, length: int) -> List[int]: """Reads a given number of bytes from an I2C slave, starting at the specified register. Args: i2c_addr: The I2C address of the slave, as an :obj:`int`. register: The index of the first register to be read, as an :obj:`int`. length: The number of bytes to read, as an :obj:`int`. Returns: Values of read registers as a :obj:`list` """ self.log(logging.DEBUG, f"Requested I2C block read of length {length} from" f" register {register} at address {i2c_addr}") if self._ft232h_mode != 'I2C': raise ValueError("Method only available in I2C mode") if not 0 <= i2c_addr <= 127: raise ValueError("Incorrect i2c address, should be between 0 and 127") if not length >= 0: raise ValueError("length should be a positive integer") if length == 0: return [] retries = 2 while True: try: ret = [byte for byte in self._exchange_i2c(address=i2c_addr, out=[register], readlen=length)] return ret except (IOError, OSError): retries -= 1 if not retries: raise
[docs] def i2c_rdwr(self, *i2c_msgs: I2CMessage) -> None: """Exchanges messages with a slave that doesn't feature registers. A start condition is sent at the beginning of each transaction, but only one stop condition is sent after the last transaction. Args: *i2c_msgs: One or several :class:`~crappy.tool.ft232h.I2CMessage` to exchange with the slave. They are either read or write messages. """ self.log(logging.DEBUG, "Requested I2C readwrite") nr = len(i2c_msgs) for i, msg in enumerate(i2c_msgs): if msg.type == 'w': self._write_i2c(address=msg.addr, out=msg.buf, stop=(i == nr)) elif msg.type == 'r': msg.buf = [byte for byte in self._read_i2c(address=msg.addr, length=msg.len, stop=(i == nr))]
@property def bits_per_word(self) -> int: """Number of bits per SPI words. Can only be set to 8. """ if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") return self._bits_per_word @bits_per_word.setter def bits_per_word(self, value: int) -> None: if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") if not isinstance(value, int): raise TypeError("bits_per_word should be an integer") if value != 8: raise ValueError("bits_per_word values other than 8 are not implemented") self.log(logging.DEBUG, f"Set SPI bits_per_word to {value}") self._bits_per_word = value @property def cshigh(self) -> bool: """If :obj:`True`, the polarity of the CS line is inverted.""" if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") return self._cshigh @cshigh.setter def cshigh(self, value: bool) -> None: if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") if not isinstance(value, bool): raise TypeError("cshigh should be either True or False") self._spi_param_changed = True self.log(logging.DEBUG, f"Set SPI cshigh to {value}") self._cshigh = value @property def loop(self) -> bool: """If :obj:`True`, the loopback mode is enabled.""" if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") return self._loop @loop.setter def loop(self, value: bool) -> None: if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") if not isinstance(value, bool): raise TypeError("loop should be either True or False") if value: self._write_data(bytearray((ft232h_cmds['loopback_start'],))) else: self._write_data(bytearray((ft232h_cmds['loopback_end'],))) self.log(logging.DEBUG, f"Set SPI loop to {value}") self._loop = value @property def no_cs(self) -> bool: """If :obj:`True`, the CS line is not driven.""" if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") return self._no_cs @no_cs.setter def no_cs(self, value: bool) -> None: if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") if not isinstance(value, bool): raise TypeError("no_cs should be either True or False") self.log(logging.DEBUG, f"Set SPI no_cs to {value}") self._no_cs = value @property def lsbfirst(self) -> bool: """If :obj:`True`, data is sent and received LSB first.""" if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") return self._lsbfirst @lsbfirst.setter def lsbfirst(self, value: bool) -> None: if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") if not isinstance(value, bool): raise TypeError("lsbfirst should be either True or False") self.log(logging.DEBUG, f"Set SPI lsbfirst to {value}") self._lsbfirst = value @property def max_speed_hz(self) -> float: """The SPI bus clock frequency in Hz. In SPI modes `1` and `3`, the actual bus clock frequency is 50% higher than the input value because the FT232H is switched to 3-phase clock mode. """ if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") return self._max_speed_hz @max_speed_hz.setter def max_speed_hz(self, value: float) -> None: if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") if self.mode in [1, 3]: if not 3200 <= value <= 2 * ft232h_clock['high'] / 3: raise ValueError("max_speed_hz should be between 3.2kHz and 20MHz in " "spi modes 1 and 3") else: if not 4800 <= value <= ft232h_clock['high']: raise ValueError("max_speed_hz should be between 4.8kHz and 30MHz in " "spi modes 0 and 2") self._spi_param_changed = True if self.mode in [1, 3]: self._set_frequency(3 * value / 2) self._write_data(bytearray([True and ft232h_cmds['enable_clk_3phase'] or ft232h_cmds['disable_clk_3phase']])) else: self._set_frequency(value) self._write_data(bytearray([False and ft232h_cmds['enable_clk_3phase'] or ft232h_cmds['disable_clk_3phase']])) self.log(logging.DEBUG, f"Set SPI max_speed_hz to {value}") self._max_speed_hz = value @property def mode(self) -> int: """The SPI mode used for communicating with the slave. When changing the SPI mode, the bus clock frequency may be reloaded. """ if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") return self._mode @mode.setter def mode(self, value: int) -> None: if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") if value not in range(4): raise ValueError("mode should be an integer between 0 and 3") former_mode = self.mode self.log(logging.DEBUG, f"Set SPI mode to {value}") self._mode = value self._spi_param_changed = True if value % 2 != former_mode % 2: self.max_speed_hz = self.max_speed_hz @property def threewire(self) -> bool: """If :obj:`True`, indicates that the MISO and MOSI lines are connected together. Not currently implemented.""" if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") return self._threewire @threewire.setter def threewire(self, value: bool) -> None: if self._ft232h_mode != 'SPI': raise ValueError("Attribute only available in SPI mode") if not isinstance(value, bool): raise TypeError("threewire should be either True or False") if value: raise ValueError("threewire mode not implemented") self.log(logging.DEBUG, f"Set SPI threewire to {value}") self._threewire = value def _exchange_spi(self, readlen: int, out: list, start: bool, stop: bool, duplex: bool) -> bytes: """Exchanges bytes with an SPI slave. Can read and/or write data, in a sequential or simultaneous way. Also manages the CS line. Args: readlen: The umber of bytes to read, as an :obj:`int`. If 0, no reading is performed. out: A :obj:`list` of bytes to write. If empty, no writing is performed. start: If :obj:`False`, the CS line is not driven before exchanging data, and remains in its previous state. stop: If :obj:`False`, the CS line is not driven after exchanging data, and remains in its previous state. duplex: If :obj:`True`, the data is read and written simultaneously. If :obj:`False`, writes then reads in a sequential way. Returns: Read data as bytes """ if len(out) > ft232h_max_payload: raise IOError("Output payload is too large") if readlen > ft232h_max_payload: raise IOError("Input payload is too large") if not isinstance(out, bytearray): out = bytearray(out) # Re-building the _cs_prolog and _cs_epilog if an SPI parameter has been # modified if self._spi_param_changed: cs_hold = 1 + int(1E6 / self.max_speed_hz) self._cpol = self.mode & 0x2 cs_clock = 0xFFFF & ~((~self._cpol & ft232h_pins['SCK']) | ft232h_pins['DO'] | (self.cshigh and self._cs_bit)) cs_select = 0xFFFF & ~((~self._cpol & ft232h_pins['SCK']) | ft232h_pins['DO'] | ((not self.cshigh) and self._cs_bit)) self._cs_prolog = [cs_clock, cs_select] self._cs_epilog = [cs_select] + [cs_clock] * cs_hold self._spi_param_changed = False # Building the prolog MPSSE command cmd = bytearray() if start: for ctrl in self._cs_prolog: ctrl &= self._spi_mask ctrl |= self._gpio_low ctrl |= self._gpio_high << 8 cmd.extend((ft232h_cmds['set_bits_low'], ctrl & 0xFF, self._direction & 0xFF)) # Building the epilog MPSSE command epilog = bytearray() for ctrl in self._cs_epilog: ctrl &= self._spi_mask ctrl |= self._gpio_low ctrl |= self._gpio_high << 8 epilog.extend((ft232h_cmds['set_bits_low'], ctrl & 0xFF, self._direction & 0xFF)) # Restore idle state if not self.cshigh: cs_high = [ft232h_cmds['set_bits_low'], self._cs_bit | self._gpio_low & 0xFF, self._direction & 0xFF] else: cs_high = [ft232h_cmds['set_bits_low'], self._gpio_low & 0xFF, self._direction & 0xFF] if not self._turbo: cs_high.append(ft232h_cmds['send_immediate']) epilog.extend(cs_high) # Sequential communication if not duplex: # Write MPSSE commands writelen = len(out) if writelen: if not self.lsbfirst: wcmd = (ft232h_cmds['write_bytes_NVE_MSB'] if not self._cpol else ft232h_cmds['write_bytes_PVE_MSB']) else: wcmd = (ft232h_cmds['write_bytes_NVE_LSB'] if not self._cpol else ft232h_cmds['write_bytes_PVE_LSB']) write_cmd = pack('<BH', wcmd, writelen - 1) cmd.extend(write_cmd) cmd.extend(out) # Read MPSSE commands if readlen: if not self.lsbfirst: rcmd = (ft232h_cmds['read_bytes_NVE_MSB'] if not self._cpol else ft232h_cmds['read_bytes_PVE_MSB']) else: rcmd = (ft232h_cmds['read_bytes_NVE_LSB'] if not self._cpol else ft232h_cmds['read_bytes_PVE_LSB']) read_cmd = pack('<BH', rcmd, readlen - 1) cmd.extend(read_cmd) # ==================================================================== if self._turbo: cmd.extend((ft232h_cmds['send_immediate'],)) if self._turbo: if stop: cmd.extend(epilog) self._write_data(cmd) else: self._write_data(cmd) if stop: self._write_data(epilog) # USB read cycle may occur before the FTDI device has actually # sent the data, so try to read more than once if no data is # actually received data = self._read_data_bytes(readlen, 8) # If nothing to read else: if writelen: if self._turbo: if stop: cmd.extend(epilog) self._write_data(cmd) else: self._write_data(cmd) if stop: self._write_data(epilog) data = bytearray() # Simultaneous communication else: if readlen > len(out): tmp = bytearray(out) tmp.extend([0] * (readlen - len(out))) out = tmp exlen = len(out) if not self.lsbfirst: wcmd = (ft232h_cmds['rw_bytes_PVE_NVE_MSB'] if not self._cpol else ft232h_cmds['rw_bytes_NVE_PVE_MSB']) else: wcmd = (ft232h_cmds['rw_bytes_PVE_NVE_LSB'] if not self._cpol else ft232h_cmds['rw_bytes_NVE_PVE_LSB']) write_cmd = pack('<BH', wcmd, exlen - 1) cmd.extend(write_cmd) cmd.extend(out) # ====================================================================== if self._turbo: cmd.extend((ft232h_cmds['send_immediate'],)) if self._turbo: if stop: cmd.extend(epilog) self._write_data(cmd) else: self._write_data(cmd) if stop: self._write_data(epilog) # USB read cycle may occur before the FTDI device has actually # sent the data, so try to read more than once if no data is # actually received data = self._read_data_bytes(exlen, 8) return data
[docs] def readbytes(self, len: int, start: bool = True, stop: bool = True) -> List[int]: """Reads the specified number of bytes from an SPI slave. Args: len: The number of bytes to read, as an :obj:`int`. start: If :obj:`False`, the CS line is not driven before reading data, and remains in its previous state. stop: If :obj:`False`, the CS line is not driven after reading data, and remains in its previous state. Returns: List of read bytes """ self.log(logging.DEBUG, f"Requested SPI bytes read of length {len}") if self._ft232h_mode != 'SPI': raise ValueError("Method only available in SPI mode") return [byte for byte in self._exchange_spi(readlen=len, out=[], start=start, stop=stop, duplex=False)]
[docs] def writebytes(self, values: list, start: bool = True, stop: bool = True) -> None: """Write bytes from a list to an SPI slave. Args: values: A :obj:list` of bytes to write start: If :obj:`False`, the CS line is not driven before reading data, and remains in its previous state. stop: If :obj:`False`, the CS line is not driven after reading data, and remains in its previous state. """ self.log(logging.DEBUG, f"Requested SPI bytes write with values {values}") if self._ft232h_mode != 'SPI': raise ValueError("Method only available in SPI mode") self._exchange_spi(readlen=0, out=values, start=start, stop=stop, duplex=False)
[docs] def writebytes2(self, values: list, start: bool = True, stop: bool = True) -> None: """Actually calls the :meth:`writebytes` method with the same arguments.""" self.log(logging.DEBUG, f"Requested SPI bytes write with values {values}") self.writebytes(values=values, start=start, stop=stop)
[docs] def xfer(self, values: list, speed: Optional[float] = None, delay: float = 0.0, bits: int = 8, start: bool = True, stop: bool = True) -> List[int]: """Simultaneously reads and write bytes to an SPI slave. The number of bytes to read is equal to the number of bytes in the write buffer. Args: values: A :obj:list` of bytes to write. speed: Sets the bus clock frequency in Hz before issuing the command, as a :obj:`float`. delay: Not implemented, should be 0.0 bits: Not implemented, should be 8 start: If :obj:`False`, the CS line is not driven before reading data, and remains in its previous state. stop: If :obj:`False`, the CS line is not driven after reading data, and remains in its previous state. Returns: List of read bytes """ self.log(logging.DEBUG, f"Requested SPI xfer with values {values}") if self._ft232h_mode != 'SPI': raise ValueError("Method only available in SPI mode") if bits != 8: raise ValueError("Only bits=8 is implemented") if delay != 0.0: raise ValueError("The delay parameter is not currently implemented") if speed != self.max_speed_hz and speed is not None: self.max_speed_hz = speed return [byte for byte in self._exchange_spi(readlen=len(values), out=values, start=start, stop=stop, duplex=True)]
[docs] def xfer2(self, values: list, speed: float = 6.0E6, delay: float = 0.0, bits: int = 8, start: bool = True, stop: bool = True) -> List[int]: """Actually calls the :meth:`xfer` method with the same arguments.""" self.log(logging.DEBUG, f"Requested SPI xfer with values {values}") return self.xfer(values=values, speed=speed, delay=delay, bits=bits, start=start, stop=stop)
[docs] def xfer3(self, values: list, speed: float = 6.0E6, delay: float = 0.0, bits: int = 8, start: bool = True, stop: bool = True) -> List[int]: """Actually calls the :meth:`xfer` method with the same arguments.""" self.log(logging.DEBUG, f"Requested SPI xfer with values {values}") return self.xfer(values=values, speed=speed, delay=delay, bits=bits, start=start, stop=stop)
@property def _gpio_all_pins(self) -> int: """Reports the addressable GPIOs as a bitfield. A :obj:`True` bit represents a pin which may be used as a GPIO, a :obj:`False` bit a reserved pin. Returns: Bitfield of configurable GPIO pins """ mask = (1 << ft232h_port_width) - 1 if self._ft232h_mode == 'I2C': return mask & ~self._i2c_mask elif self._ft232h_mode == 'SPI': return mask & ~self._spi_mask else: return mask @property def _direction(self) -> int: """Provides the FTDI pin direction. A :obj:`True` bit represents an output pin, a :obj:`False` bit an input pin. Returns: Bitfield of pins direction. """ if self._ft232h_mode == 'I2C': return self._i2c_dir | self._gpio_dir elif self._ft232h_mode == 'SPI': no_cs_mask = 0xFFFF - (self._cs_bit if self.no_cs else 0) return self._spi_dir | self._gpio_dir & no_cs_mask else: return self._gpio_dir def _read_gpio_raw(self) -> int: """Sends the MPSSE commands for reading all the FT232H pins, and returns the bitmap of read values. Values are determined using 3.3V logic. Returns: Bitmap of pins values """ cmd = bytes([ft232h_cmds['get_bits_low'], ft232h_cmds['get_bits_high'], ft232h_cmds['send_immediate']]) fmt = '<H' self._write_data(cmd) size = calcsize(fmt) data = self._read_data_bytes(size, 8) if len(data) != size: raise IOError('Cannot read GPIO') value, = unpack(fmt, data) return value
[docs] def get_gpio(self, gpio_str: str) -> bool: """Reads the 3.3V-logic voltage value of the specified pin. Args: gpio_str: The name of the GPIO to be read, as a :obj:`str`. Returns: 3.3V-logic value corresponding to the input voltage """ self.log(logging.DEBUG, f"Requested GPIO value reading for {gpio_str}") if gpio_str not in ft232h_pin_nr: raise ValueError("gpio_id should be in {}".format( list(ft232h_pin_nr.values()))) gpio_bit = ft232h_pin_nr[gpio_str] if not self._gpio_all_pins & gpio_bit: raise ValueError("Cannot use pin {} as a GPIO".format(gpio_str)) # Changing the _direction and _gpio_dir bitfields if self._direction & gpio_bit: self._gpio_dir &= 0xFFFF - gpio_bit return bool(self._read_gpio_raw() & gpio_bit)
[docs] def set_gpio(self, gpio_str: str, value: int) -> None: """Sets the specified GPIO as an output and sets its output value. Args: gpio_str: The name of the GPIO to be set, as a :obj:`str`. value: 1 for setting the GPIO high, 0 for setting it low. """ self.log(logging.DEBUG, f"Requested GPIO value writing to {value} " f"for {gpio_str}") if value not in [0, 1]: raise ValueError("value should be either 0 or 1") if gpio_str not in ft232h_pin_nr: raise ValueError("gpio_id should be in {}".format( list(ft232h_pin_nr.values()))) gpio_bit = ft232h_pin_nr[gpio_str] if not self._gpio_all_pins & gpio_bit: raise ValueError("Cannot use pin {} as a GPIO".format(gpio_str)) # Changing the _direction and _gpio_dir bitfields if not (self._direction & gpio_bit): self._gpio_dir |= gpio_bit data = self._read_gpio_raw() if value == 1: data |= gpio_bit else: data &= 0xFFFF - gpio_bit low_data = data & 0xFF low_dir = self._direction & 0xFF high_data = (data >> 8) & 0xFF high_dir = (self._direction >> 8) & 0xFF cmd = bytes([ft232h_cmds['set_bits_low'], low_data, low_dir, ft232h_cmds['set_bits_high'], high_data, high_dir]) self._write_data(cmd) self._gpio_low = low_data & self._gpio_all_pins self._gpio_high = high_data & self._gpio_all_pins
[docs] def close(self) -> None: """Closes the FTDI interface/port.""" if self._usb_dev: self.log(logging.INFO, "Closing the USB connection to the FT232H") if bool(self._usb_dev._ctx.handle): try: self._set_bitmode(0, FT232H.BitMode.RESET) util.release_interface(self._usb_dev, self._index - 1) except (IOError, ValueError, USBError): pass try: self._usb_dev.attach_kernel_driver(self._index - 1) except (NotImplementedError, USBError): pass self.log(logging.INFO, "Releasing the USB resources") util.dispose_resources(self._usb_dev) self._usb_dev = None