# coding: utf-8
from collections import namedtuple
from struct import unpack
from typing import Union, List, Tuple, Optional, Callable
from _io import FileIO
from multiprocessing.synchronize import RLock
from multiprocessing.sharedctypes import Synchronized
from time import time, sleep
import logging
from contextlib import contextmanager
import signal
from .ft232h import FT232H
from ..._global import OptionalModule
try:
from usb import util
from usb.core import find, USBError
Ftdi_req_out = util.build_request_type(util.CTRL_OUT,
util.CTRL_TYPE_VENDOR,
util.CTRL_RECIPIENT_DEVICE)
Ftdi_req_in = util.build_request_type(util.CTRL_IN,
util.CTRL_TYPE_VENDOR,
util.CTRL_RECIPIENT_DEVICE)
except (ModuleNotFoundError, ImportError):
util = OptionalModule("pyusb")
find = OptionalModule("pyusb")
USBError = OptionalModule("pyusb")
Ftdi_req_out = 0x40
Ftdi_req_in = 0xC0
ft232h_cmds = {'write_bytes_PVE_MSB': 0x10,
'write_bytes_NVE_MSB': 0x11,
'write_bits_NVE_MSB': 0x13,
'write_bytes_PVE_LSB': 0x18,
'write_bytes_NVE_LSB': 0x19,
'read_bytes_PVE_MSB': 0x20,
'read_bits_PVE_MSB': 0x22,
'read_bytes_NVE_MSB': 0x24,
'read_bytes_PVE_LSB': 0x28,
'read_bytes_NVE_LSB': 0x2C,
'rw_bytes_PVE_NVE_MSB': 0x31,
'rw_bytes_NVE_PVE_MSB': 0x34,
'rw_bytes_PVE_NVE_LSB': 0x39,
'rw_bytes_NVE_PVE_LSB': 0x3C,
'set_bits_low': 0x80,
'get_bits_low': 0x81,
'set_bits_high': 0x82,
'get_bits_high': 0x83,
'loopback_start': 0x84,
'loopback_end': 0x85,
'set_tck_divisor': 0x86,
'send_immediate': 0x87,
'disable_clk_div5': 0x8A,
'enable_clk_div5': 0x8B,
'enable_clk_3phase': 0x8C,
'disable_clk_3phase': 0x8D,
'enable_clk_adaptative': 0x96,
'disable_clk_adaptative': 0x97,
'drive_zero': 0x9E}
ft232h_sio_req = {'reset': 0x00,
'set_event_char': 0x06,
'set_error_char': 0x07,
'set_latency_timer': 0x09,
'set_bitmode': 0x0B,
'read_eeprom': 0x90,
'write_eeprom': 0x91}
ft232h_sio_args = {'reset': 0,
'purge_RX': 1,
'purge_TX': 2}
ft232h_eeprom_size = 256
ft232h_tx_empty_bits = 0x60
ft232h_eeprom = {'has_serial_pos': 0x0A,
'str_table': 0x0E,
'str_position': 0xA0}
ft232h_pins = {'SCL': 0x01,
'SDAO': 0x02,
'SDAI': 0x04,
'SCL_FB': 0x80,
'SCK': 0x01,
'DO': 0x02,
'DI': 0x04,
'CS': 0x08}
ft232h_i2c_timings = namedtuple('I2CTimings',
't_hd_sta t_su_sta t_su_sto t_buf')
ft232h_i2c_speed = {100E3: ft232h_i2c_timings(4.0E-6, 4.7E-6, 4.0E-6, 4.7E-6),
400E3: ft232h_i2c_timings(0.6E-6, 0.6E-6, 0.6E-6, 1.3E-6),
1E6: ft232h_i2c_timings(0.26E-6, 0.26E-6, 0.26E-6, 0.5E-6)}
class DelayedKeyboardInterrupt:
"""This class implements a context manager for temporarily disabling the
:exc:`KeyboardInterrupt` and storing any exception received in the meantime.
It is meant to avoid having a I2C or SPI communication interrupted, which
could cause devices to bug and not be able to properly finish.
.. versionadded:: 2.0.0
"""
def __enter__(self) -> None:
"""Enters the context and sets :meth:`_handler` as the new handler for
SIGINT signals."""
self._signal_received = None
self._prev_handler = signal.signal(signal.SIGINT, self._handler)
def __exit__(self, _, __, ___) -> None:
"""Exits the context, sets the previous handler back, and handles any
SIGINT signal received while in the context."""
signal.signal(signal.SIGINT, self._prev_handler)
if self._signal_received is not None:
self._prev_handler(*self._signal_received)
def _handler(self, sig, frame) -> None:
"""Handler that just stores the received SIGINT while in the context."""
self._signal_received = (sig, frame)
[docs]
class FT232HServer(FT232H):
"""A class for controlling FTDI's USB to Serial FT232H.
This class is very similar to the :class:`~crappy.tool.ft232h.FT232H` except
it doesn't directly instantiate the USB device nor send commands to it
directly. Instead, the commands are sent to a
:class:`~crappy.tool.ft232h.USBServer` managing communication with the FT232H
device(s).
Communication in SPI and I2C are implemented, along with GPIO control. The
name of the methods for SPI and I2C communication are those of :mod:`smbus`
and :mod:`spidev` libraries, in order to facilitate the use and the
integration in a multi-backend environment. This class also allows to write a
USB serial number in the EEPROM, as there's no default serial number on the
chip.
Note:
The FT232H does not support clock stretching and this may cause bugs with
some I2C devices. Lowering the ``i2c_speed`` may solve the problem.
Important:
If using Adafruit's board, its `I2C Mode` switch should of course be set to
the correct value according to the chosen mode.
Important:
**Only for Linux users:** In order to drive the FT232H, the appropriate
udev rule should be set. This can be done using the `udev_rule_setter`
utility in ``crappy``'s `util` folder. It is also possible to add it
manually by running:
::
$ echo "SUBSYSTEM==\\"usb\\", ATTR{idVendor}==\\"0403\\", \
MODE=\\"0666\\\"" | sudo tee ftdi.rules > /dev/null 2>&1
in a shell opened in ``/etc/udev/rules.d``.
Important:
For controlling several FT232H from the same computer, it is first
necessary to set their USB serial numbers. Otherwise, an error will be
raised. This can be done using the crappy utility
``set_ft232h_serial_nr.py``.
.. versionadded:: 1.5.10
.. versionchanged:: 2.0.0 renamed from *ft232h_server* to *FT232HServer*
"""
[docs]
def __init__(self,
mode: str,
block_index: int,
current_block: Synchronized,
command_file: FileIO,
answer_file: FileIO,
block_lock: RLock,
shared_lock: RLock,
serial_nr: Optional[str] = None,
i2c_speed: float = 100E3,
spi_turbo: bool = False) -> None:
"""Checks the argument validity and initializes the device.
Args:
mode: The communication mode as a :obj:`str`, can be :
::
'SPI', 'I2C', 'GPIO_only', 'Write_serial_nr'
GPIOs can be driven in any mode, but faster speeds are achievable in
`GPIO_only` mode.
block_index: The index the :class:`~crappy.blocks.Block` driving this
FT232HServer instance has been assigned by the
:class:`~crappy.tool.ft232h.USBServer`, as an :obj:`int`.
.. versionchanged:: 2.0.0 renamed from *block_number* to *block_index*
current_block: The handle to a shared :obj:`multiprocessing.Value`
indicating which :class:`~crappy.blocks.Block` can currently
communicate with the :class:`~crappy.tool.ft232h.USBServer`.
.. versionchanged:: 2.0.0
renamed from *current_file* to *current_block*
command_file: A file in which the current command to be executed by the
USB server is written.
answer_file: A file in which the answer to the current command is
written.
block_lock: A :obj:`multiprocessing.Lock` assigned to this
:class:`~crappy.blocks.Block` only, for signaling the
:class:`~crappy.tool.ft232h.USBServer` when the command has been
written in the command_file.
shared_lock: A :obj:`multiprocessing.Lock` common to all the
:class:`~crappy.blocks.Block` that allows the one Block holding it to
communicate with the :class:`~crappy.tool.ft232h.USBServer`.
.. versionchanged:: 2.0.0 renamed from *current_lock* to *shared_lock*
serial_nr: The serial number of the FT232H to drive, as a :obj:`str`. In
`Write_serial_nr` mode, the serial number to be written.
i2c_speed: In I2C mode, the I2C bus clock frequency in Hz. Available
values are :
::
100E3, 400E3, 1E6
or any value between `10kHz` and `100kHz`. Lowering below the default
value may solve I2C clock stretching issues on some devices.
spi_turbo: Increases the achievable bus speed in SPI mode, but may not
work with some devices.
Note:
- **CS pin**:
The CS pin for selecting SPI devices is always `D3`. This pin is
reserved and cannot be used as a GPIO. If you want to drive the CS line
manually, it is possible not to drive the CS pin by setting the SPI
parameter :attr:`no_cs` to :obj:`True` and to drive the CS line from a
GPIO instead.
- ``mode``:
It is not possible to simultaneously control slaves over SPI and I2C,
due to different hardware requirements for the two protocols. Trying to
do so will most likely raise an error or lead to inconsistent behavior.
"""
self._block_index = block_index
self._current_block = current_block
self._command_file = command_file
self._answer_file = answer_file
self._block_lock = block_lock
self._shared_lock = shared_lock
super().__init__(mode=mode, serial_nr=serial_nr, i2c_speed=i2c_speed,
spi_turbo=spi_turbo)
self._nb_attempt_1 = 80
self._nb_attempt_2 = 2
def _handle_command(self, command: list) -> bytes:
"""Parses the command and sends it to the device.
Args:
command: The :obj:`list` containing the command type and the arguments if
any.
Returns:
The index of the command.
"""
# Control transfer out
if command[0] == 'ctrl_transfer_out':
value = b','.join((b'00', str(command[1]).encode(),
str(command[2]).encode(), str(command[3]).encode(),
str(command[4]).encode(), bytes(command[5]),
str(command[6]).encode()))
cmd = b'00'
# Control transfer in
elif command[0] == 'ctrl_transfer_in':
value = b','.join((b'01', str(command[1]).encode(),
str(command[2]).encode(), str(command[3]).encode(),
str(command[4]).encode(), str(command[5]).encode(),
str(command[6]).encode()))
cmd = b'01'
# Write operation
elif command[0] == 'write':
value = b','.join((b'02', str(command[1]).encode(), bytes(command[2]),
str(command[3]).encode()))
cmd = b'02'
# Read operation
elif command[0] == 'read':
value = b','.join((b'03', str(command[1]).encode(),
str(command[2]).encode(), str(command[3]).encode()))
cmd = b'03'
# Checks whether the kernel driver is active
# It doesn't actually interact with the device
elif command[0] == 'is_kernel_driver_active':
value = b','.join((b'04', str(command[1]).encode()))
cmd = b'04'
# Detaches the kernel driver
# It doesn't actually interact with the device
elif command[0] == 'detach_kernel_driver':
value = b','.join((b'05', str(command[1]).encode()))
cmd = b'05'
# Sets the device configuration
elif command[0] == 'set_configuration':
value = b'06'
cmd = b'06'
# Custom command getting information from the current configuration
elif command[0] == 'get_active_configuration':
value = b'07'
cmd = b'07'
# Should the block close the device when leaving ?
# It doesn't actually interact with the device
elif command[0] == 'close?':
value = b'08'
cmd = b'08'
# Checks whether the internal resources have been released or not
# It doesn't actually interact with the device
elif command[0] == '_ctx.handle':
value = b'09'
cmd = b'09'
# Releases the USB interface
# It doesn't actually interact with the device
elif command[0] == 'release_interface':
value = b','.join((b'10', str(command[1]).encode()))
cmd = b'10'
# Detaches the kernel driver
# It doesn't actually interact with the device
elif command[0] == 'attach_kernel_driver':
value = b','.join((b'11', str(command[1]).encode()))
cmd = b'11'
# Releases all the resources used by :mod:`pyusb` for a given device
# It doesn't actually interact with the device
elif command[0] == 'dispose_resources':
value = b'12'
cmd = b'12'
# Registers a block as gone
# It doesn't actually interact with the device
elif command[0] == 'farewell':
value = b'13'
cmd = b'13'
else:
raise ValueError("Wrong command type !")
self.log(logging.DEBUG, f"Writing command {value} to command buffer")
self._command_file.write(value)
return cmd
def _send_server(self, command: list) -> Union[int, bytes, None,
Tuple[int, ...]]:
"""Sends a command to the server and gets the corresponding answer.
Args:
command: A :obj:`list` containing the command type as a first element,
and then the arguments if any.
"""
# Disabling KeyboardInterrupt to avoid unexpected behavior upon CTRL+C
with DelayedKeyboardInterrupt():
self.log(logging.DEBUG, "KeyBoardInterrupt disabled")
# Acquiring the shared lock to get control over the server
with self.acquire_timeout(self._shared_lock, 1) as acquired:
if acquired:
self.log(logging.DEBUG, "Acquired shared lock")
# Acquiring the block lock to indicate the command is being written
with self.acquire_timeout(self._block_lock, 1) as acq:
if acq:
self.log(logging.DEBUG, "Acquired block lock")
self.log(logging.DEBUG,
f"Writing {str(self._block_index).encode()} as the "
f"current block index")
self._current_block.value = self._block_index
self._command_file.seek(0)
self._command_file.truncate(0)
# Writing the command and its arguments to the command file
cmd = self._handle_command(command)
else:
raise TimeoutError("Could not acquire the block lock in 1s")
# Waiting for the server to write the answer
# If using only the lock, it could be re-acquired by this class
# without the server having a chance to get it
t = time()
while not self._answer_file.tell():
sleep(0.00001)
if time() - t > 1:
raise TimeoutError("No answer from the USB server after 1s")
# The lock can be re-acquired once the answer has been written
with self.acquire_timeout(self._block_lock, 1) as acq:
if acq:
self.log(logging.DEBUG, "Acquired the block lock")
# Reading the answer
self._answer_file.seek(0)
answer: List[bytes] = self._answer_file.read().split(b',')
self.log(logging.DEBUG, f"Read {answer} from the answer file")
self._answer_file.seek(0)
self._answer_file.truncate(0)
if cmd != answer[0]:
raise IOError("Got an answer for the command of another block")
# The different answers have to be handled in various ways
if command[0] in ['ctrl_transfer_out',
'write',
'is_kernel_driver_active',
'close?',
'_ctx.handle']:
return int(answer[1])
if command[0] in ['ctrl_transfer_in', 'read']:
return answer[1]
elif command[0] == 'get_active_configuration':
return tuple(int(rep) for rep in answer[1:])
return
else:
raise TimeoutError("Could not acquire the block lock in 1s")
else:
raise TimeoutError("Could not acquire the shared lock in 1s")
self.log(logging.DEBUG, "KeyBoardInterrupt re-enabled")
def _initialize(self) -> None:
"""Initializing the FT232H according to the chosen mode.
The main differences are for the choice of the clock frequency and
parameters.
"""
# FT232H properties
fifo_sizes = (1024, 1024)
latency = 2
# I2C properties
if self._ft232h_mode == 'I2C':
timings = ft232h_i2c_speed[self._i2c_speed if self._i2c_speed in
ft232h_i2c_speed else 100E3]
frequency = self._i2c_speed
self._ck_hd_sta = self._compute_delay_cycles(timings.t_hd_sta)
self._ck_su_sto = self._compute_delay_cycles(timings.t_su_sto)
ck_su_sta = self._compute_delay_cycles(timings.t_su_sta)
ck_buf = self._compute_delay_cycles(timings.t_buf)
self._ck_idle = max(ck_su_sta, ck_buf)
self._ck_delay = ck_buf
self._i2c_mask = ft232h_pins['SCL'] | ft232h_pins['SDAO'] | \
ft232h_pins['SDAI']
self._i2c_dir = ft232h_pins['SCL'] | ft232h_pins['SDAO']
# SPI properties
elif self._ft232h_mode == 'SPI':
frequency = 400E3
self._bits_per_word = 8
self._cshigh = False
self._no_cs = False
self._loop = False
self._lsbfirst = False
self._max_speed_hz = 400E3
self._mode = 0
self._threewire = False
self._spi_param_changed = True
self._cs_bit = ft232h_pins['CS']
self._spi_dir = self._cs_bit | ft232h_pins['SCK'] | ft232h_pins['DO']
self._spi_mask = self._cs_bit | ft232h_pins['SCK'] | \
ft232h_pins['DO'] | ft232h_pins['DI']
else:
frequency = 400E3
# Configuring the USB device, interface and endpoints
try:
if self._send_server(['is_kernel_driver_active', 0]):
self._send_server(['detach_kernel_driver', 0])
self.log(logging.INFO, "Setting USB configuration for the FT232H")
self._send_server(['set_configuration'])
except USBError:
self.log(logging.ERROR,
"Could not set USB device configuration !\nYou may have to "
"install the udev-rules for this USB device, this can be done "
"using the udev_rule_setter utility in the util folder")
raise
self._index, self._in_ep, self._out_ep, self._max_packet_size = \
self._send_server(['get_active_configuration'])
# Invalidate data in the readbuffer
self._readoffset = 0
self._readbuffer = bytearray()
# Drain input buffer
self._purge_buffers()
# Shallow reset
if self._ctrl_transfer_out(ft232h_sio_req['reset'],
ft232h_sio_args['reset']):
raise IOError('Unable to reset FTDI device')
# Reset feature mode
self._set_bitmode(0, FT232H.BitMode.RESET)
# Set latency timer
self._set_latency_timer(latency)
# Set chunk size and invalidate all remaining data
self._writebuffer_chunksize = fifo_sizes[0]
self._readoffset = 0
self._readbuffer = bytearray()
self._readbuffer_chunksize = min(fifo_sizes[0], fifo_sizes[1],
self._max_packet_size)
# Reset feature mode
self._set_bitmode(0, FT232H.BitMode.RESET)
# Drain buffers
self._purge_buffers()
# Disable event and error characters
if self._ctrl_transfer_out(ft232h_sio_req['set_event_char'], 0):
raise IOError('Unable to set event char')
if self._ctrl_transfer_out(ft232h_sio_req['set_error_char'], 0):
raise IOError('Unable to set error char')
# Enable MPSSE mode
if self._ft232h_mode == 'GPIO_only':
self.log(logging.DEBUG, "Setting the mode to GPIO_only")
self._set_bitmode(0xFF, FT232H.BitMode.MPSSE)
else:
self.log(logging.DEBUG, f"Setting the mode to {self._ft232h_mode}")
self._set_bitmode(self._direction, FT232H.BitMode.MPSSE)
# Configure clock
if self._ft232h_mode == 'I2C':
# Note that bus frequency may differ from clock frequency, when
# 3-phase clock is enabled
self._set_frequency(3 * frequency / 2)
else:
self._set_frequency(frequency)
# Configure pins
self.log(logging.DEBUG, "Configuring the FT232H pins")
if self._ft232h_mode == 'I2C':
cmd = bytearray(self._idle)
cmd.extend((ft232h_cmds['set_bits_high'], 0, 0))
self._write_data(cmd)
elif self._ft232h_mode == 'SPI':
cmd = bytearray((ft232h_cmds['set_bits_low'],
self._cs_bit & 0xFF,
self._direction & 0xFF))
cmd.extend((ft232h_cmds['set_bits_high'],
(self._cs_bit >> 8) & 0xFF,
(self._direction >> 8) & 0xFF))
self._write_data(cmd)
else:
cmd = bytearray((ft232h_cmds['set_bits_low'], 0, 0))
cmd.extend((ft232h_cmds['set_bits_high'], 0, 0))
self._write_data(cmd)
# Disable loopback
self.log(logging.DEBUG, "Disabling loopback")
self._write_data(bytearray((ft232h_cmds['loopback_end'],)))
# Validate MPSSE
bytes_ = bytes(self._read_data_bytes(2))
if (len(bytes_) >= 2) and (bytes_[0] == '\xfa'):
raise IOError("Invalid command @ %d" % bytes_[1])
# I2C-specific settings
if self._ft232h_mode == 'I2C':
self.log(logging.DEBUG, "Configuring I2C-specific features")
self._tx_size, self._rx_size = fifo_sizes
# Enable 3-phase clock
self._write_data(bytearray([True and
ft232h_cmds['enable_clk_3phase'] or
ft232h_cmds['disable_clk_3phase']]))
# Enable drivezero mode
self._write_data(bytearray([ft232h_cmds['drive_zero'],
self._i2c_mask & 0xFF,
(self._i2c_mask >> 8) & 0xFF]))
# Disable adaptative clock
self._write_data(bytearray([False and
ft232h_cmds['enable_clk_adaptative'] or
ft232h_cmds['disable_clk_adaptative']]))
def _ctrl_transfer_out(self,
reqtype: int,
value: int,
data: bytes = b'') -> int:
"""Sends a control message to the device.
Args:
reqtype: bmRequest
value: wValue
data: payload
Returns:
Number of bytes actually written
"""
try:
self.log(logging.DEBUG,
f"Sending USB control transfer with request type {Ftdi_req_out}"
f", request {reqtype}, value {value}, index {self._index}, "
f"data {data}")
return self._send_server(['ctrl_transfer_out', Ftdi_req_out, reqtype,
value, self._index, bytearray(data),
self._usb_write_timeout])
except USBError as ex:
raise IOError('UsbError: %s' % str(ex))
def _read_data_bytes(self,
size: int,
attempt: int = 2,
request_gen: Optional[
Callable[[int], Union[bytearray,
bytes]]] = None) -> bytes:
"""Reads data from the FT232H.
Reads data from the FTDI interface. The data buffer is rebuilt from
chunk-sized blocks received over the USB bus. The FTDI device always sends
internal status bytes, which are stripped out as not part of the data
payload.
Args:
size: The number of bytes to receive from the device
attempt: Attempt cycle count
request_gen: A callable that takes the number of bytes read and expects a
bytes buffer to send back to the remote device. This is only useful to
perform optimized/continuous transfer from a slave device.
Returns:
Payload bytes
"""
# Packet size sanity check
if not self._max_packet_size:
raise ValueError("max_packet_size is bogus")
packet_size = self._max_packet_size
length = 1 # initial condition to enter the usb_read loop
data = bytearray()
# everything we want is still in the cache?
if size <= len(self._readbuffer) - self._readoffset:
data = self._readbuffer[self._readoffset:self._readoffset + size]
self._readoffset += size
return data
# something still in the cache, but not enough to satisfy 'size'?
if len(self._readbuffer) - self._readoffset != 0:
data = self._readbuffer[self._readoffset:]
# end of readbuffer reached
self._readoffset = len(self._readbuffer)
# read from USB, filling in the local cache as it is empty
retry = attempt
req_size = size
try:
while (len(data) < size) and (length > 0):
while True:
try:
self.log(logging.DEBUG,
f"Sending USB read command to endpoint {self._out_ep}"
f"to read {self._readbuffer_chunksize} bytes")
tempbuf = self._send_server(['read', self._out_ep,
self._readbuffer_chunksize,
self._usb_read_timeout])
self.log(logging.DEBUG, f"Read {tempbuf} from the USB device")
except USBError:
raise
retry -= 1
length = len(tempbuf)
# the received buffer contains at least one useful databyte
# (first 2 bytes in each packet represent the current modem
# status)
if length >= 2:
if tempbuf[1] & ft232h_tx_empty_bits:
if request_gen:
req_size -= length - 2
if req_size > 0:
cmd = request_gen(req_size)
if cmd:
self._write_data(cmd)
if length > 2:
retry = attempt
# skip the status bytes
chunks = (length + packet_size - 1) // packet_size
count = packet_size - 2
self._readbuffer = bytearray()
self._readoffset = 0
srcoff = 2
for _ in range(chunks):
self._readbuffer += tempbuf[srcoff:srcoff + count]
srcoff += packet_size
length = len(self._readbuffer)
break
# received buffer only contains the modem status bytes
# no data received, may be late, try again
if retry > 0:
continue
# no actual data
self._readbuffer = bytearray()
self._readoffset = 0
# no more data to read?
return data
if length > 0:
# data still fits in buf?
if (len(data) + length) <= size:
data += self._readbuffer[self._readoffset:
self._readoffset + length]
self._readoffset += length
# did we read exactly the right amount of bytes?
if len(data) == size:
return data
else:
# partial copy, not enough bytes in the local cache to
# fulfill the request
part_size = min(size - len(data),
len(self._readbuffer) - self._readoffset)
if part_size < 0:
raise ValueError("Internal Error")
data += self._readbuffer[self._readoffset:
self._readoffset + part_size]
self._readoffset += part_size
return data
except USBError:
self.log(logging.ERROR, "An error occurred while writing to USB device")
raise
# never reached
raise ValueError("Internal error")
def _set_serial_number(self, serial_number: str) -> None:
"""(Over)Writes the serial number.
Writes the desired serial number to the EEPROM. It is then accessible to
USB commands as a string descriptor. Also sets the manufacturer and
product string descriptors.
Args:
serial_number: Serial number to be written in the EEPROM, as a
:obj:`str`.
"""
if not isinstance(serial_number, str):
serial_number = str(serial_number)
if any(char in serial_number for char in ':/'):
raise ValueError("Invalid character : or / in serial number")
# Reading current eeprom
word_count = round(ft232h_eeprom_size / 2)
word_addr = 0
data = bytearray()
while word_count:
try:
self.log(logging.DEBUG,
f"Sending USB control transfer with request type "
f"{Ftdi_req_in}, request {ft232h_sio_req['read_eeprom']}, "
f"value 0, index {word_addr}, data 2")
buf = self._send_server(['ctrl_transfer_in', Ftdi_req_in,
ft232h_sio_req['read_eeprom'], 0, word_addr,
2, self._usb_read_timeout])
self.log(logging.DEBUG, f"Read {buf} from the USB device")
except USBError as exc:
raise IOError('UsbError: %s' % exc) from exc
if not buf:
raise IOError('EEPROM read error @ %d' % (word_addr << 1))
data.extend(buf)
word_count -= 1
word_addr += 1
new_eeprom = data[0:ft232h_eeprom_size]
# Setting the has_serial flag to True
new_eeprom[ft232h_eeprom['has_serial_pos']] |= 1 << 3
# Changing the string descriptors and the descriptors index
str_descriptors = {'manufacturer': 'FTDI',
'product': 'FT232H',
'serial': serial_number}
stream = bytearray()
str_pos = ft232h_eeprom['str_position']
tbl_pos = ft232h_eeprom['str_table']
data_pos = str_pos
for name in str_descriptors:
new_str = str_descriptors[name].encode('utf-16le')
length = len(new_str) + 2
stream.append(length)
stream.append(util.DESC_TYPE_STRING) # string descriptor
stream.extend(new_str)
new_eeprom[tbl_pos] = data_pos
tbl_pos += 1
new_eeprom[tbl_pos] = length
tbl_pos += 1
data_pos += length
new_eeprom[str_pos:str_pos + len(stream)] = stream
# Filling the remaining space with zeros
crc_pos = len(new_eeprom)
rem = crc_pos - (str_pos + len(stream))
new_eeprom[str_pos + len(stream):crc_pos] = bytes(rem)
# Checking the eeprom length
if len(new_eeprom) != ft232h_eeprom_size:
raise ValueError("Eeprom_size not matching, serial number may be "
"too long, eeprom not written")
# Calculating the new checksum and modifying the corresponding bytes
checksum = 0xAAAA
for idx in range(0, len(new_eeprom[:-2]), 2):
v = ((new_eeprom[:-2][idx + 1] << 8) + new_eeprom[:-2][idx]) & 0xFFFF
checksum = v ^ checksum
checksum = ((checksum << 1) & 0xFFFF) | ((checksum >> 15) & 0xFFFF)
new_eeprom[-2] = checksum & 0xFF
new_eeprom[-1] = checksum >> 8
# Updating the eeprom
addr = 0
for word in unpack('<%dH' % (len(new_eeprom) // 2), new_eeprom):
self.log(logging.DEBUG,
f"Sending USB control transfer with request type {Ftdi_req_out}"
f", request {ft232h_sio_req['write_eeprom']}, value {word}, "
f"index {addr >> 1}, data b''")
out = self._send_server(['ctrl_transfer_out', Ftdi_req_out,
ft232h_sio_req['write_eeprom'], word, addr >> 1,
b'', self._usb_write_timeout])
if out:
raise IOError('EEPROM Write Error @ %d' % addr)
addr += 2
def _write_data(self, data: Union[bytearray, bytes]) -> int:
"""Writes data to the FT232H.
Writes the sequence of MPSSE commands and data to the FTDI port. Data
buffer is split into chunk-sized blocks before being sent over the USB bus.
Args:
data: The byte stream to send to the FTDI interface
Returns:
Count of written bytes
"""
offset = 0
size = len(data)
try:
while offset < size:
write_size = self._writebuffer_chunksize
if offset + write_size > size:
write_size = size - offset
try:
self.log(logging.DEBUG,
f"Sending USB write command to endpoint {self._in_ep}"
f"and with data {data[offset:offset + write_size]}")
length = self._send_server(['write', self._in_ep,
data[offset:offset + write_size],
self._usb_write_timeout])
except USBError:
raise
if length <= 0:
raise USBError("Usb bulk write error")
offset += length
return offset
except USBError:
self.log(logging.ERROR, "An error occurred while writing to USB device")
raise
[docs]
def close(self) -> None:
"""Closes the FTDI interface/port."""
if self._send_server(['close?']):
self.log(logging.INFO, "Closing the USB connection to the FT232H")
if self._send_server(['_ctx.handle']):
try:
self._set_bitmode(0, FT232H.BitMode.RESET)
self._send_server(['release_interface', self._index - 1])
except (IOError, ValueError, USBError):
pass
try:
self._send_server(['attach_kernel_driver', self._index - 1])
except (NotImplementedError, USBError):
pass
self.log(logging.INFO, "Releasing the USB resources")
self._send_server(['dispose_resources'])
self._send_server(['farewell'])
@staticmethod
@contextmanager
def acquire_timeout(lock: RLock, timeout: float) -> bool:
"""Short context manager for acquiring a :obj:`multiprocessing.Lock` with a
specified timeout.
Args:
lock: The lock to acquire.
timeout: The timeout for acquiring the Lock, as a :obj:`float`.
Returns:
:obj:`True` if the Lock was successfully acquired, :obj:`False`
otherwise.
"""
ret = False
try:
ret = lock.acquire(timeout=timeout)
yield ret
finally:
if ret:
lock.release()